This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 74200ee67 fix(csharp/src/Drivers/BigQuery): improve selective handling 
of cancellation exception (#3615)
74200ee67 is described below

commit 74200ee67aba48c99e45c127f7b4e36ae9d957dc
Author: Bruce Irschick <[email protected]>
AuthorDate: Tue Oct 28 11:52:30 2025 -0700

    fix(csharp/src/Drivers/BigQuery): improve selective handling of 
cancellation exception (#3615)
    
    After adding cancellation functionality to BigQuery statements, there is
    a report that `task was cancelled` messages are now appearing.
    
    This is likely due to the change where the retry code stops processing
    when it gets a `OperationCancelledExecption`.
    
    This change now only exits early if the cancellation was requested on
    the passed cancellation token.
---
 csharp/src/Drivers/BigQuery/BigQueryStatement.cs | 17 +++++++++++------
 csharp/src/Drivers/BigQuery/RetryManager.cs      |  8 ++++++--
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs 
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 1034b0f8e..52bcc8e0e 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -120,7 +120,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                     }).ConfigureAwait(false);
                 };
 
-                BigQueryResults results = await 
ExecuteWithRetriesAsync(getJobResults, activity).ConfigureAwait(false);
+                BigQueryResults results = await 
ExecuteWithRetriesAsync(getJobResults, activity, 
cancellationContext.CancellationToken).ConfigureAwait(false);
 
                 TokenProtectedReadClientManger clientMgr = new 
TokenProtectedReadClientManger(Credential);
                 clientMgr.UpdateToken = () => Task.Run(() =>
@@ -180,7 +180,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                             throw new AdbcException($"Unable to obtain result 
from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
                         };
 
-                    results = await 
ExecuteWithRetriesAsync(getMultiJobResults, activity).ConfigureAwait(false);
+                    results = await 
ExecuteWithRetriesAsync(getMultiJobResults, activity, 
cancellationContext.CancellationToken).ConfigureAwait(false);
                 }
 
                 if (results?.TableReference == null)
@@ -213,7 +213,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                         return await GetArrowReaders(clientMgr, table, 
results.TableReference.ProjectId, maxStreamCount, activity, 
context.CancellationToken).ConfigureAwait(false);
                     }).ConfigureAwait(false);
                 };
-                IEnumerable<IArrowReader> readers = await 
ExecuteWithRetriesAsync(getArrowReadersFunc, activity).ConfigureAwait(false);
+                IEnumerable<IArrowReader> readers = await 
ExecuteWithRetriesAsync(getArrowReadersFunc, activity, 
cancellationContext.CancellationToken).ConfigureAwait(false);
 
                 // Note: MultiArrowReader must dispose the cancellationContext.
                 IArrowArrayStream stream = new MultiArrowReader(this, 
TranslateSchema(results.Schema), readers, new 
CancellationContext(cancellationRegistry));
@@ -287,7 +287,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                         return await 
context.Job.GetQueryResultsAsync(getQueryResultsOptions, 
context.CancellationToken).ConfigureAwait(false);
                     }).ConfigureAwait(false);
                 };
-                BigQueryResults? result = await 
ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity);
+                BigQueryResults? result = await 
ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity, 
context.CancellationToken);
                 long updatedRows = result?.NumDmlAffectedRows.HasValue == true 
? result.NumDmlAffectedRows.Value : -1L;
 
                 activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, 
updatedRows);
@@ -562,7 +562,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
 
         public bool TokenRequiresUpdate(Exception ex) => 
BigQueryUtils.TokenRequiresUpdate(ex);
 
-        private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, 
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this, 
action, activity, MaxRetryAttempts, RetryDelayMs);
+        private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, 
Activity? activity, CancellationToken cancellationToken = default) =>
+            await RetryManager.ExecuteWithRetriesAsync<T>(this, action, 
activity, MaxRetryAttempts, RetryDelayMs, cancellationToken);
 
         private async Task<T> ExecuteCancellableJobAsync<T>(
             JobCancellationContext context,
@@ -573,8 +574,12 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
             {
                 return await func(context).ConfigureAwait(false);
             }
-            catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out 
OperationCanceledException? cancelledEx))
+            catch (Exception ex)
+                when (context.CancellationToken.IsCancellationRequested &&
+                    BigQueryUtils.ContainsException(ex, out 
OperationCanceledException? cancelledEx))
             {
+                // Note: OperationCanceledException could be thrown from the 
call,
+                // but we only want to handle when the cancellation was 
requested from the context.
                 activity?.AddException(cancelledEx!);
                 try
                 {
diff --git a/csharp/src/Drivers/BigQuery/RetryManager.cs 
b/csharp/src/Drivers/BigQuery/RetryManager.cs
index 09e3621c7..7da26504d 100644
--- a/csharp/src/Drivers/BigQuery/RetryManager.cs
+++ b/csharp/src/Drivers/BigQuery/RetryManager.cs
@@ -18,6 +18,7 @@
 
 using System;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace Apache.Arrow.Adbc.Drivers.BigQuery
@@ -32,7 +33,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
             Func<Task<T>> action,
             Activity? activity,
             int maxRetries = 5,
-            int initialDelayMilliseconds = 200)
+            int initialDelayMilliseconds = 200,
+            CancellationToken cancellationToken = default)
         {
             if (action == null)
             {
@@ -49,8 +51,10 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                     T result = await action();
                     return result;
                 }
-                catch (Exception ex) when 
(!BigQueryUtils.ContainsException(ex, out OperationCanceledException? _))
+                catch (Exception ex) when 
(!cancellationToken.IsCancellationRequested)
                 {
+                    // Note: OperationCanceledException could be thrown from 
the call,
+                    // but we only want to break out when the cancellation was 
requested from the caller.
                     activity?.AddBigQueryTag("retry_attempt", retryCount);
                     activity?.AddException(ex);
 

Reply via email to