birschick-bq commented on code in PR #3422:
URL: https://github.com/apache/arrow-adbc/pull/3422#discussion_r2418222400
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
+
+ public CancellationContext(CancellationRegistry
cancellationRegistry)
+ {
+ cancellationTokenSource = new CancellationTokenSource();
+ this.cancellationRegistry = cancellationRegistry;
+ this.cancellationRegistry.Register(this);
+ }
+
+ public CancellationToken CancellationToken =>
cancellationTokenSource.Token;
+
+ public void Cancel()
+ {
+ cancellationTokenSource.Cancel();
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposedValue)
+ {
+ if (disposing)
+ {
+ cancellationRegistry.Unregister(this);
Review Comment:
Removed recursive call - _disposed was stopping the infinite recursion.
Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]