This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 9051acfbb fix(csharp/src/Drivers/BigQuery): handle dispose of
Statement before Stream (#3608)
9051acfbb is described below
commit 9051acfbba59657dafc27fb8e877c88f1660d5fa
Author: Bruce Irschick <[email protected]>
AuthorDate: Wed Oct 22 14:06:58 2025 -0700
fix(csharp/src/Drivers/BigQuery): handle dispose of Statement before Stream
(#3608)
This fix handles the possibility that the parent statement disposes
before the created stream.
---
csharp/src/Drivers/BigQuery/BigQueryStatement.cs | 19 ++++++---
csharp/test/Drivers/BigQuery/StatementTests.cs | 54 ++++++++++++++++++++++--
2 files changed, 65 insertions(+), 8 deletions(-)
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 1d8baeab1..91693746e 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -106,7 +106,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout,
seconds);
}
- JobCancellationContext cancellationContext = new
JobCancellationContext(cancellationRegistry, job);
+ using JobCancellationContext cancellationContext = new
JobCancellationContext(cancellationRegistry, job);
+
// We can't checkJobStatus, Otherwise, the timeout in
QueryResultsOptions is meaningless.
// When encountering a long-running job, it should be
controlled by the timeout in the Google SDK instead of blocking in a while loop.
Func<Task<BigQueryResults>> getJobResults = async () =>
@@ -215,7 +216,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
IEnumerable<IArrowReader> readers = await
ExecuteWithRetriesAsync(getArrowReadersFunc, activity).ConfigureAwait(false);
// Note: MultiArrowReader must dispose the cancellationContext.
- IArrowArrayStream stream = new MultiArrowReader(this,
TranslateSchema(results.Schema), readers, cancellationContext);
+ IArrowArrayStream stream = new MultiArrowReader(this,
TranslateSchema(results.Schema), readers, new
CancellationContext(cancellationRegistry));
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows,
totalRows);
return new QueryResult(totalRows, stream);
});
@@ -641,20 +642,27 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
private sealed class CancellationRegistry : IDisposable
{
private readonly ConcurrentDictionary<CancellationContext, byte>
contexts = new();
+ private bool disposed;
public CancellationContext Register(CancellationContext context)
{
+ if (disposed) throw new
ObjectDisposedException(nameof(CancellationRegistry));
+
contexts.TryAdd(context, 0);
return context;
}
public bool Unregister(CancellationContext context)
{
+ if (disposed) return false;
+
return contexts.TryRemove(context, out _);
}
public void CancelAll()
{
+ if (disposed) throw new
ObjectDisposedException(nameof(CancellationRegistry));
+
foreach (CancellationContext context in contexts.Keys)
{
context.Cancel();
@@ -663,11 +671,11 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public void Dispose()
{
- foreach (CancellationContext context in contexts.Keys)
+ if (!disposed)
{
- context.Dispose();
+ contexts.Clear();
+ disposed = true;
}
- contexts.Clear();
}
}
@@ -706,6 +714,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
while (true)
{
+ linkedCts.Token.ThrowIfCancellationRequested();
if (this.reader == null)
{
if (!this.readers.MoveNext())
diff --git a/csharp/test/Drivers/BigQuery/StatementTests.cs
b/csharp/test/Drivers/BigQuery/StatementTests.cs
index 8b3a2552e..aa32153ac 100644
--- a/csharp/test/Drivers/BigQuery/StatementTests.cs
+++ b/csharp/test/Drivers/BigQuery/StatementTests.cs
@@ -114,14 +114,62 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
}
[Fact]
- public async Task CanCancelStreamFromStatement()
+ public async Task CanCancelStreamAndDisposeStatement()
{
foreach (BigQueryTestEnvironment environment in _environments)
{
- AdbcConnection adbcConnection =
GetAdbcConnection(environment.Name);
+ using AdbcConnection adbcConnection =
GetAdbcConnection(environment.Name);
AdbcStatement statement = adbcConnection.CreateStatement();
+ // Execute the query/cancel multiple times to validate
consistent behavior
+ const int iterations = 3;
+ QueryResult[] results = new QueryResult[iterations];
+ for (int i = 0; i < iterations; i++)
+ {
+ _outputHelper?.WriteLine($"Iteration {i + 1} of
{iterations}");
+ // Generate unique column names so query will not be
served from cache
+ string columnName1 = Guid.NewGuid().ToString("N");
+ string columnName2 = Guid.NewGuid().ToString("N");
+ statement.SqlQuery = $"SELECT `{columnName2}` AS
`{columnName1}` FROM UNNEST(GENERATE_ARRAY(1, 100)) AS `{columnName2}`";
+ _outputHelper?.WriteLine($"Query: {statement.SqlQuery}");
+
+ // Expect this to take about 10 seconds without
cancellation
+ results[i] = statement.ExecuteQuery();
+ }
+ statement.Cancel();
+ statement.Dispose();
+ for (int index = 0; index < iterations; index++)
+ {
+ try
+ {
+ QueryResult queryResult = results[index];
+ using IArrowArrayStream? stream = queryResult.Stream;
+ Assert.NotNull(stream);
+ RecordBatch batch = await
stream.ReadNextRecordBatchAsync();
+
+ Assert.Fail("Expecting OperationCanceledException to
be thrown.");
+ }
+ catch (Exception ex) when
(BigQueryUtils.ContainsException(ex, out OperationCanceledException? _))
+ {
+ _outputHelper?.WriteLine($"Received expected
OperationCanceledException: {ex.Message}");
+ }
+ catch (Exception ex) when (ex is not FailException)
+ {
+ Assert.Fail($"Expecting OperationCanceledException to
be thrown. Instead, received {ex.GetType().Name}: {ex.Message}");
+ }
+ }
+ }
+ }
+
+ [Fact]
+ public async Task CanCancelStreamFromStatement()
+ {
+ foreach (BigQueryTestEnvironment environment in _environments)
+ {
+ using AdbcConnection adbcConnection =
GetAdbcConnection(environment.Name);
+ using AdbcStatement statement =
adbcConnection.CreateStatement();
+
// Execute the query/cancel multiple times to validate
consistent behavior
const int iterations = 3;
QueryResult[] results = new QueryResult[iterations];
@@ -143,7 +191,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
try
{
QueryResult queryResult = results[index];
- IArrowArrayStream? stream = queryResult.Stream;
+ using IArrowArrayStream? stream = queryResult.Stream;
Assert.NotNull(stream);
RecordBatch batch = await
stream.ReadNextRecordBatchAsync();