This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 9051acfbb fix(csharp/src/Drivers/BigQuery): handle dispose of 
Statement before Stream (#3608)
9051acfbb is described below

commit 9051acfbba59657dafc27fb8e877c88f1660d5fa
Author: Bruce Irschick <[email protected]>
AuthorDate: Wed Oct 22 14:06:58 2025 -0700

    fix(csharp/src/Drivers/BigQuery): handle dispose of Statement before Stream 
(#3608)
    
    This fix handles the possibility that the parent statement disposes
    before the created stream.
---
 csharp/src/Drivers/BigQuery/BigQueryStatement.cs | 19 ++++++---
 csharp/test/Drivers/BigQuery/StatementTests.cs   | 54 ++++++++++++++++++++++--
 2 files changed, 65 insertions(+), 8 deletions(-)

diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs 
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 1d8baeab1..91693746e 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -106,7 +106,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                     
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout,
 seconds);
                 }
 
-                JobCancellationContext cancellationContext = new 
JobCancellationContext(cancellationRegistry, job);
+                using JobCancellationContext cancellationContext = new 
JobCancellationContext(cancellationRegistry, job);
+
                 // We can't checkJobStatus, Otherwise, the timeout in 
QueryResultsOptions is meaningless.
                 // When encountering a long-running job, it should be 
controlled by the timeout in the Google SDK instead of blocking in a while loop.
                 Func<Task<BigQueryResults>> getJobResults = async () =>
@@ -215,7 +216,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                 IEnumerable<IArrowReader> readers = await 
ExecuteWithRetriesAsync(getArrowReadersFunc, activity).ConfigureAwait(false);
 
                 // Note: MultiArrowReader must dispose the cancellationContext.
-                IArrowArrayStream stream = new MultiArrowReader(this, 
TranslateSchema(results.Schema), readers, cancellationContext);
+                IArrowArrayStream stream = new MultiArrowReader(this, 
TranslateSchema(results.Schema), readers, new 
CancellationContext(cancellationRegistry));
                 activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, 
totalRows);
                 return new QueryResult(totalRows, stream);
             });
@@ -641,20 +642,27 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
         private sealed class CancellationRegistry : IDisposable
         {
             private readonly ConcurrentDictionary<CancellationContext, byte> 
contexts = new();
+            private bool disposed;
 
             public CancellationContext Register(CancellationContext context)
             {
+                if (disposed) throw new 
ObjectDisposedException(nameof(CancellationRegistry));
+
                 contexts.TryAdd(context, 0);
                 return context;
             }
 
             public bool Unregister(CancellationContext context)
             {
+                if (disposed) return false;
+
                 return contexts.TryRemove(context, out _);
             }
 
             public void CancelAll()
             {
+                if (disposed) throw new 
ObjectDisposedException(nameof(CancellationRegistry));
+
                 foreach (CancellationContext context in contexts.Keys)
                 {
                     context.Cancel();
@@ -663,11 +671,11 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
 
             public void Dispose()
             {
-                foreach (CancellationContext context in contexts.Keys)
+                if (!disposed)
                 {
-                    context.Dispose();
+                    contexts.Clear();
+                    disposed = true;
                 }
-                contexts.Clear();
             }
         }
 
@@ -706,6 +714,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
 
                     while (true)
                     {
+                        linkedCts.Token.ThrowIfCancellationRequested();
                         if (this.reader == null)
                         {
                             if (!this.readers.MoveNext())
diff --git a/csharp/test/Drivers/BigQuery/StatementTests.cs 
b/csharp/test/Drivers/BigQuery/StatementTests.cs
index 8b3a2552e..aa32153ac 100644
--- a/csharp/test/Drivers/BigQuery/StatementTests.cs
+++ b/csharp/test/Drivers/BigQuery/StatementTests.cs
@@ -114,14 +114,62 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
         }
 
         [Fact]
-        public async Task CanCancelStreamFromStatement()
+        public async Task CanCancelStreamAndDisposeStatement()
         {
             foreach (BigQueryTestEnvironment environment in _environments)
             {
-                AdbcConnection adbcConnection = 
GetAdbcConnection(environment.Name);
+                using AdbcConnection adbcConnection = 
GetAdbcConnection(environment.Name);
 
                 AdbcStatement statement = adbcConnection.CreateStatement();
 
+                // Execute the query/cancel multiple times to validate 
consistent behavior
+                const int iterations = 3;
+                QueryResult[] results = new QueryResult[iterations];
+                for (int i = 0; i < iterations; i++)
+                {
+                    _outputHelper?.WriteLine($"Iteration {i + 1} of 
{iterations}");
+                    // Generate unique column names so query will not be 
served from cache
+                    string columnName1 = Guid.NewGuid().ToString("N");
+                    string columnName2 = Guid.NewGuid().ToString("N");
+                    statement.SqlQuery = $"SELECT `{columnName2}` AS 
`{columnName1}` FROM UNNEST(GENERATE_ARRAY(1, 100)) AS `{columnName2}`";
+                    _outputHelper?.WriteLine($"Query: {statement.SqlQuery}");
+
+                    // Expect this to take about 10 seconds without 
cancellation
+                    results[i] = statement.ExecuteQuery();
+                }
+                statement.Cancel();
+                statement.Dispose();
+                for (int index = 0; index < iterations; index++)
+                {
+                    try
+                    {
+                        QueryResult queryResult = results[index];
+                        using IArrowArrayStream? stream = queryResult.Stream;
+                        Assert.NotNull(stream);
+                        RecordBatch batch = await 
stream.ReadNextRecordBatchAsync();
+
+                        Assert.Fail("Expecting OperationCanceledException to 
be thrown.");
+                    }
+                    catch (Exception ex) when 
(BigQueryUtils.ContainsException(ex, out OperationCanceledException? _))
+                    {
+                        _outputHelper?.WriteLine($"Received expected 
OperationCanceledException: {ex.Message}");
+                    }
+                    catch (Exception ex) when (ex is not FailException)
+                    {
+                        Assert.Fail($"Expecting OperationCanceledException to 
be thrown. Instead, received {ex.GetType().Name}: {ex.Message}");
+                    }
+                }
+            }
+        }
+
+        [Fact]
+        public async Task CanCancelStreamFromStatement()
+        {
+            foreach (BigQueryTestEnvironment environment in _environments)
+            {
+                using AdbcConnection adbcConnection = 
GetAdbcConnection(environment.Name);
+                using AdbcStatement statement = 
adbcConnection.CreateStatement();
+
                 // Execute the query/cancel multiple times to validate 
consistent behavior
                 const int iterations = 3;
                 QueryResult[] results = new QueryResult[iterations];
@@ -143,7 +191,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
                     try
                     {
                         QueryResult queryResult = results[index];
-                        IArrowArrayStream? stream = queryResult.Stream;
+                        using IArrowArrayStream? stream = queryResult.Stream;
                         Assert.NotNull(stream);
                         RecordBatch batch = await 
stream.ReadNextRecordBatchAsync();
 

Reply via email to