birschick-bq commented on code in PR #3422:
URL: https://github.com/apache/arrow-adbc/pull/3422#discussion_r2418224649


##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference 
TryGetLargeDestinationTableReference(string datasetId, Ac
 
         private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, 
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this, 
action, activity, MaxRetryAttempts, RetryDelayMs);
 
+        private async Task<T> ExecuteCancellableJobAsync<T>(
+            JobCancellationContext context,
+            Activity? activity,
+            Func<JobCancellationContext, Task<T>> func)
+        {
+            try
+            {
+                return await func(context).ConfigureAwait(false);
+            }
+            catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out 
OperationCanceledException? cancelledEx))
+            {
+                activity?.AddException(cancelledEx!);
+                try
+                {
+                    if (context?.Job != null)
+                    {
+                        activity?.AddBigQueryTag("job.cancel", 
context.Job.Reference.JobId);
+                        await context.Job.CancelAsync().ConfigureAwait(false);
+                    }
+                }
+                catch (Exception e)
+                {
+                    activity?.AddException(e);
+                }
+                throw;
+            }
+            finally
+            {
+                // Job is no longer in context after completion or cancellation
+                context.Job = null;
+            }
+        }
+
+        private class CancellationContext : IDisposable
+        {
+            private readonly CancellationRegistry cancellationRegistry;
+            private readonly CancellationTokenSource cancellationTokenSource;
+
+            private bool _disposedValue;
+
+            public CancellationContext(CancellationRegistry 
cancellationRegistry)
+            {
+                cancellationTokenSource = new CancellationTokenSource();
+                this.cancellationRegistry = cancellationRegistry;
+                this.cancellationRegistry.Register(this);
+            }
+
+            public CancellationToken CancellationToken => 
cancellationTokenSource.Token;
+
+            public void Cancel()
+            {
+                cancellationTokenSource.Cancel();
+            }
+
+            protected virtual void Dispose(bool disposing)
+            {
+                if (!_disposedValue)
+                {
+                    if (disposing)
+                    {
+                        cancellationRegistry.Unregister(this);
+                        cancellationTokenSource.Dispose();
+                    }
+                    _disposedValue = true;
+                }
+            }
+
+            public void Dispose()
+            {
+                // Do not change this code. Put cleanup code in 'Dispose(bool 
disposing)' method
+                Dispose(disposing: true);
+                GC.SuppressFinalize(this);
+            }
+        }
+
+        private class JobCancellationContext : CancellationContext
+        {
+            public JobCancellationContext(CancellationRegistry 
cancellationRegistry, BigQueryJob? job = default)
+                : base(cancellationRegistry)
+            {
+                Job = job;
+            }
+
+            public BigQueryJob? Job { get; set; }

Review Comment:
   Actually, the ability to swap out a Job mid-context is by design. It allows 
multiple calls/steps to share the same context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to