CurtHagenlocher commented on code in PR #3422:
URL: https://github.com/apache/arrow-adbc/pull/3422#discussion_r2411595249
##########
csharp/src/Drivers/BigQuery/BigQueryUtils.cs:
##########
@@ -42,5 +42,34 @@ public static bool TokenRequiresUpdate(Exception ex)
internal static string GetAssemblyName(Type type) =>
type.Assembly.GetName().Name!;
internal static string GetAssemblyVersion(Type type) =>
FileVersionInfo.GetVersionInfo(type.Assembly.Location).ProductVersion ??
string.Empty;
+
+ public static bool ContainsException<T>(Exception exception, out T?
containedException) where T : Exception
+ {
+ if (exception is AggregateException aggregateException)
+ {
+ foreach (Exception? ex in aggregateException.InnerExceptions)
+ {
+ if (ContainsException(ex, out T? inner))
+ {
+ containedException = inner;
+ return true;
+ }
+ }
+ }
+
+ Exception? e = exception;
+ while (e != null)
+ {
+ if (e is T ce)
+ {
+ containedException = ce;
+ return true;
+ }
+ e = e.InnerException;
Review Comment:
In theory, could the `InnerException` itself not be an `AggregateException`?
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -592,6 +764,7 @@ protected override void Dispose(bool disposing)
this.readers.Dispose();
this.readers = null;
}
+ this.cancellationContext.Dispose();
Review Comment:
Consider moving inside the `if (this.readers != null)` as the two should
share a lifetime.
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
Review Comment:
By usage, this code base has over twice as many `_disposed`s as
`_disposedValue`s (though we could normalize that in a separate PR).
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
Review Comment:
(In the same vein, this class has two fields that don't start with an
underscore and one which does. And here too is another inconsistency across the
code base :(.)
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
+
+ public CancellationContext(CancellationRegistry
cancellationRegistry)
+ {
+ cancellationTokenSource = new CancellationTokenSource();
+ this.cancellationRegistry = cancellationRegistry;
+ this.cancellationRegistry.Register(this);
+ }
+
+ public CancellationToken CancellationToken =>
cancellationTokenSource.Token;
+
+ public void Cancel()
+ {
+ cancellationTokenSource.Cancel();
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposedValue)
+ {
+ if (disposing)
+ {
+ cancellationRegistry.Unregister(this);
+ cancellationTokenSource.Dispose();
+ }
+ _disposedValue = true;
+ }
+ }
+
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool
disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+ }
+
+ private class JobCancellationContext : CancellationContext
+ {
+ public JobCancellationContext(CancellationRegistry
cancellationRegistry, BigQueryJob? job = default)
+ : base(cancellationRegistry)
+ {
+ Job = job;
+ }
+
+ public BigQueryJob? Job { get; set; }
Review Comment:
Should the setter throw an exception if the existing value is non-null?
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
Review Comment:
```suggestion
if (context.Job != null)
```
`context` is declared as not null.
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
+
+ public CancellationContext(CancellationRegistry
cancellationRegistry)
+ {
+ cancellationTokenSource = new CancellationTokenSource();
+ this.cancellationRegistry = cancellationRegistry;
+ this.cancellationRegistry.Register(this);
+ }
+
+ public CancellationToken CancellationToken =>
cancellationTokenSource.Token;
+
+ public void Cancel()
+ {
+ cancellationTokenSource.Cancel();
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposedValue)
+ {
+ if (disposing)
+ {
+ cancellationRegistry.Unregister(this);
+ cancellationTokenSource.Dispose();
+ }
+ _disposedValue = true;
+ }
+ }
+
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool
disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
Review Comment:
This class doesn't have a finalizer
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
+
+ public CancellationContext(CancellationRegistry
cancellationRegistry)
+ {
+ cancellationTokenSource = new CancellationTokenSource();
+ this.cancellationRegistry = cancellationRegistry;
+ this.cancellationRegistry.Register(this);
+ }
+
+ public CancellationToken CancellationToken =>
cancellationTokenSource.Token;
+
+ public void Cancel()
+ {
+ cancellationTokenSource.Cancel();
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposedValue)
+ {
+ if (disposing)
+ {
+ cancellationRegistry.Unregister(this);
Review Comment:
I'm not sure how this doesn't trigger a stack overflow. Won't the call to
`Dispose` on line 679 cause a recursive call back here?
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -529,19 +571,148 @@ private TableReference
TryGetLargeDestinationTableReference(string datasetId, Ac
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteCancellableJobAsync<T>(
+ JobCancellationContext context,
+ Activity? activity,
+ Func<JobCancellationContext, Task<T>> func)
+ {
+ try
+ {
+ return await func(context).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out
OperationCanceledException? cancelledEx))
+ {
+ activity?.AddException(cancelledEx!);
+ try
+ {
+ if (context?.Job != null)
+ {
+ activity?.AddBigQueryTag("job.cancel",
context.Job.Reference.JobId);
+ await context.Job.CancelAsync().ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
+ {
+ activity?.AddException(e);
+ }
+ throw;
+ }
+ finally
+ {
+ // Job is no longer in context after completion or cancellation
+ context.Job = null;
+ }
+ }
+
+ private class CancellationContext : IDisposable
+ {
+ private readonly CancellationRegistry cancellationRegistry;
+ private readonly CancellationTokenSource cancellationTokenSource;
+
+ private bool _disposedValue;
+
+ public CancellationContext(CancellationRegistry
cancellationRegistry)
+ {
+ cancellationTokenSource = new CancellationTokenSource();
+ this.cancellationRegistry = cancellationRegistry;
+ this.cancellationRegistry.Register(this);
+ }
+
+ public CancellationToken CancellationToken =>
cancellationTokenSource.Token;
+
+ public void Cancel()
+ {
+ cancellationTokenSource.Cancel();
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposedValue)
+ {
+ if (disposing)
+ {
+ cancellationRegistry.Unregister(this);
+ cancellationTokenSource.Dispose();
+ }
+ _disposedValue = true;
+ }
+ }
+
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool
disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+ }
+
+ private class JobCancellationContext : CancellationContext
+ {
+ public JobCancellationContext(CancellationRegistry
cancellationRegistry, BigQueryJob? job = default)
+ : base(cancellationRegistry)
+ {
+ Job = job;
+ }
+
+ public BigQueryJob? Job { get; set; }
+ }
+
+ private sealed class CancellationRegistry : IDisposable
+ {
+ private readonly ConcurrentDictionary<CancellationContext, byte>
contexts = new();
+
+ public CancellationContext Register(CancellationContext context)
+ {
+ contexts.TryAdd(context, 0);
+ return context;
+ }
+
+ public CancellationContext Register()
Review Comment:
This doesn't seem to be used. (If it were, maybe `Create` would be a better
name for what it does?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]