birschick-bq commented on code in PR #3422:
URL: https://github.com/apache/arrow-adbc/pull/3422#discussion_r2418224649
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
+
+ public CancellationContext(CancellationRegistry
cancellationRegistry)
+ {
+ cancellationTokenSource = new CancellationTokenSource();
+ this.cancellationRegistry = cancellationRegistry;
+ this.cancellationRegistry.Register(this);
+ }
+
+ public CancellationToken CancellationToken =>
cancellationTokenSource.Token;
+
+ public void Cancel()
+ {
+ cancellationTokenSource.Cancel();
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposedValue)
+ {
+ if (disposing)
+ {
+ cancellationRegistry.Unregister(this);
+ cancellationTokenSource.Dispose();
+ }
+ _disposedValue = true;
+ }
+ }
+
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool
disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+ }
+
+ private class JobCancellationContext : CancellationContext
+ {
+ public JobCancellationContext(CancellationRegistry
cancellationRegistry, BigQueryJob? job = default)
+ : base(cancellationRegistry)
+ {
+ Job = job;
+ }
+
+ public BigQueryJob? Job { get; set; }
Review Comment:
Actually, the ability to swap out a Job mid-context is by design. It allows
multiple calls/steps to share the same context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]