This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 74200ee67 fix(csharp/src/Drivers/BigQuery): improve selective handling
of cancellation exception (#3615)
74200ee67 is described below
commit 74200ee67aba48c99e45c127f7b4e36ae9d957dc
Author: Bruce Irschick <[email protected]>
AuthorDate: Tue Oct 28 11:52:30 2025 -0700
fix(csharp/src/Drivers/BigQuery): improve selective handling of
cancellation exception (#3615)
After adding cancellation functionality to BigQuery statements, there is
a report that `task was cancelled` messages are now appearing.
This is likely due to the change where the retry code stops processing
when it gets a `OperationCancelledExecption`.
This change now only exits early if the cancellation was requested on
the passed cancellation token.
---
csharp/src/Drivers/BigQuery/BigQueryStatement.cs | 17 +++++++++++------
csharp/src/Drivers/BigQuery/RetryManager.cs | 8 ++++++--
2 files changed, 17 insertions(+), 8 deletions(-)
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 1034b0f8e..52bcc8e0e 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -120,7 +120,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
}).ConfigureAwait(false);
};
- BigQueryResults results = await
ExecuteWithRetriesAsync(getJobResults, activity).ConfigureAwait(false);
+ BigQueryResults results = await
ExecuteWithRetriesAsync(getJobResults, activity,
cancellationContext.CancellationToken).ConfigureAwait(false);
TokenProtectedReadClientManger clientMgr = new
TokenProtectedReadClientManger(Credential);
clientMgr.UpdateToken = () => Task.Run(() =>
@@ -180,7 +180,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
throw new AdbcException($"Unable to obtain result
from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
};
- results = await
ExecuteWithRetriesAsync(getMultiJobResults, activity).ConfigureAwait(false);
+ results = await
ExecuteWithRetriesAsync(getMultiJobResults, activity,
cancellationContext.CancellationToken).ConfigureAwait(false);
}
if (results?.TableReference == null)
@@ -213,7 +213,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return await GetArrowReaders(clientMgr, table,
results.TableReference.ProjectId, maxStreamCount, activity,
context.CancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
};
- IEnumerable<IArrowReader> readers = await
ExecuteWithRetriesAsync(getArrowReadersFunc, activity).ConfigureAwait(false);
+ IEnumerable<IArrowReader> readers = await
ExecuteWithRetriesAsync(getArrowReadersFunc, activity,
cancellationContext.CancellationToken).ConfigureAwait(false);
// Note: MultiArrowReader must dispose the cancellationContext.
IArrowArrayStream stream = new MultiArrowReader(this,
TranslateSchema(results.Schema), readers, new
CancellationContext(cancellationRegistry));
@@ -287,7 +287,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return await
context.Job.GetQueryResultsAsync(getQueryResultsOptions,
context.CancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
};
- BigQueryResults? result = await
ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity);
+ BigQueryResults? result = await
ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity,
context.CancellationToken);
long updatedRows = result?.NumDmlAffectedRows.HasValue == true
? result.NumDmlAffectedRows.Value : -1L;
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows,
updatedRows);
@@ -562,7 +562,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public bool TokenRequiresUpdate(Exception ex) =>
BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity, CancellationToken cancellationToken = default) =>
+ await RetryManager.ExecuteWithRetriesAsync<T>(this, action,
activity, MaxRetryAttempts, RetryDelayMs, cancellationToken);
private async Task<T> ExecuteCancellableJobAsync<T>(
JobCancellationContext context,
@@ -573,8 +574,12 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
return await func(context).ConfigureAwait(false);
}
- catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ catch (Exception ex)
+ when (context.CancellationToken.IsCancellationRequested &&
+ BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
{
+ // Note: OperationCanceledException could be thrown from the
call,
+ // but we only want to handle when the cancellation was
requested from the context.
activity?.AddException(cancelledEx!);
try
{
diff --git a/csharp/src/Drivers/BigQuery/RetryManager.cs
b/csharp/src/Drivers/BigQuery/RetryManager.cs
index 09e3621c7..7da26504d 100644
--- a/csharp/src/Drivers/BigQuery/RetryManager.cs
+++ b/csharp/src/Drivers/BigQuery/RetryManager.cs
@@ -18,6 +18,7 @@
using System;
using System.Diagnostics;
+using System.Threading;
using System.Threading.Tasks;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
@@ -32,7 +33,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
Func<Task<T>> action,
Activity? activity,
int maxRetries = 5,
- int initialDelayMilliseconds = 200)
+ int initialDelayMilliseconds = 200,
+ CancellationToken cancellationToken = default)
{
if (action == null)
{
@@ -49,8 +51,10 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
T result = await action();
return result;
}
- catch (Exception ex) when
(!BigQueryUtils.ContainsException(ex, out OperationCanceledException? _))
+ catch (Exception ex) when
(!cancellationToken.IsCancellationRequested)
{
+ // Note: OperationCanceledException could be thrown from
the call,
+ // but we only want to break out when the cancellation was
requested from the caller.
activity?.AddBigQueryTag("retry_attempt", retryCount);
activity?.AddException(ex);