birschick-bq commented on code in PR #3022:
URL: https://github.com/apache/arrow-adbc/pull/3022#discussion_r2176139429
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -355,111 +389,212 @@ private QueryOptions ValidateOptions()
if (firstProjectId != null)
{
options.ProjectId = firstProjectId;
+ activity?.AddBigQueryTag("detected_client_project_id",
firstProjectId);
+ // need to reopen the Client with the projectId
specified
+ this.bigQueryConnection.Open(firstProjectId);
}
}
}
if (Options == null || Options.Count == 0)
return options;
+ string largeResultDatasetId =
BigQueryConstants.DefaultLargeDatasetId;
+
foreach (KeyValuePair<string, string> keyValuePair in Options)
{
- if (keyValuePair.Key == BigQueryParameters.AllowLargeResults)
- {
- options.AllowLargeResults = true ?
keyValuePair.Value.ToLower().Equals("true") : false;
- }
- if (keyValuePair.Key ==
BigQueryParameters.LargeResultsDestinationTable)
+ switch (keyValuePair.Key)
{
- string destinationTable = keyValuePair.Value;
+ case BigQueryParameters.AllowLargeResults:
+ options.AllowLargeResults = true ?
keyValuePair.Value.ToLower().Equals("true") : false;
+
activity?.AddBigQueryParameterTag(BigQueryParameters.AllowLargeResults,
options.AllowLargeResults);
+ break;
+ case BigQueryParameters.LargeResultsDataset:
+ largeResultDatasetId = keyValuePair.Value;
+
activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDataset,
largeResultDatasetId);
+ break;
+ case BigQueryParameters.LargeResultsDestinationTable:
+ string destinationTable = keyValuePair.Value;
- if (!destinationTable.Contains("."))
- throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
is invalid");
+ if (!destinationTable.Contains("."))
+ throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
is invalid");
- string projectId = string.Empty;
- string datasetId = string.Empty;
- string tableId = string.Empty;
+ string projectId = string.Empty;
+ string datasetId = string.Empty;
+ string tableId = string.Empty;
- string[] segments = destinationTable.Split('.');
+ string[] segments = destinationTable.Split('.');
- if (segments.Length != 3)
- throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
cannot be parsed");
+ if (segments.Length != 3)
+ throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
cannot be parsed");
- projectId = segments[0];
- datasetId = segments[1];
- tableId = segments[2];
+ projectId = segments[0];
+ datasetId = segments[1];
+ tableId = segments[2];
- if (string.IsNullOrEmpty(projectId.Trim()) ||
string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
- throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
contains invalid values");
+ if (string.IsNullOrEmpty(projectId.Trim()) ||
string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
+ throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
contains invalid values");
- options.DestinationTable = new TableReference()
+ options.DestinationTable = new TableReference()
+ {
+ ProjectId = projectId,
+ DatasetId = datasetId,
+ TableId = tableId
+ };
+
activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDestinationTable,
destinationTable);
+ break;
+ case BigQueryParameters.UseLegacySQL:
+ options.UseLegacySql = true ?
keyValuePair.Value.ToLower().Equals("true") : false;
+
activity?.AddBigQueryParameterTag(BigQueryParameters.UseLegacySQL,
options.UseLegacySql);
+ break;
+ }
+ }
+
+ if (options.AllowLargeResults == true && options.DestinationTable
== null)
+ {
+ options.DestinationTable =
TryGetLargeDestinationTableReference(largeResultDatasetId, activity);
+ }
+
+ return options;
+ }
+
+ /// <summary>
+ /// Attempts to retrieve or create the specified dataset.
+ /// </summary>
+ /// <param name="datasetId">The name of the dataset.</param>
+ /// <returns>A <see cref="TableReference"/> to a randomly generated
table name in the specified dataset.</returns>
+ private TableReference TryGetLargeDestinationTableReference(string
datasetId, Activity? activity)
+ {
+ BigQueryDataset? dataset = null;
+
+ try
+ {
+ activity?.AddBigQueryTag("large_results.try_find_dataset",
datasetId);
+ dataset = this.Client.GetDataset(datasetId);
+ activity?.AddBigQueryTag("large_results.found_dataset",
datasetId);
+ }
+ catch (GoogleApiException gaEx)
+ {
+ if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound)
+ {
+ activity?.AddException(gaEx);
+ throw new AdbcException($"Failure trying to retrieve
dataset {datasetId}", gaEx);
+ }
+ }
+
+ if (dataset == null)
+ {
+ try
+ {
+
activity?.AddBigQueryTag("large_results.try_create_dataset", datasetId);
+ DatasetReference reference =
this.Client.GetDatasetReference(datasetId);
+ BigQueryDataset bigQueryDataset = new
BigQueryDataset(this.Client, new Dataset()
{
- ProjectId = projectId,
- DatasetId = datasetId,
- TableId = tableId
- };
+ DatasetReference = reference,
+ DefaultTableExpirationMs =
(long)TimeSpan.FromDays(1).TotalMilliseconds,
+ Labels = new Dictionary<string, string>()
+ {
+ // lower case, no spaces or periods per
https://cloud.google.com/bigquery/docs/labels-intro
+ { "created_by",
this.bigQueryConnection.DriverName.ToLower().Replace(" ","_") + "_v_" +
AssemblyVersion.Replace(".","_") }
+ }
+ });
+
+ dataset = this.Client.CreateDataset(datasetId,
bigQueryDataset.Resource);
+ activity?.AddBigQueryTag("large_results.created_dataset",
datasetId);
}
- if (keyValuePair.Key == BigQueryParameters.UseLegacySQL)
+ catch (Exception ex)
{
- options.UseLegacySql = true ?
keyValuePair.Value.ToLower().Equals("true") : false;
+ activity?.AddException(ex);
+ throw new AdbcException($"Could not create dataset
{datasetId}", ex);
}
}
- return options;
+
+ if (dataset == null)
+ {
+ throw new AdbcException($"Could not find dataset {datasetId}",
AdbcStatusCode.NotFound);
+ }
+ else
+ {
+ TableReference reference = new TableReference()
+ {
+ ProjectId = this.Client.ProjectId,
+ DatasetId = datasetId,
+ TableId = "lg_" + Guid.NewGuid().ToString().Replace("-",
"")
+ };
+
+ activity?.AddBigQueryTag("large_results.reference",
reference.ToString());
+
+ return reference;
+ }
}
public bool TokenRequiresUpdate(Exception ex) =>
BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action)
=> await RetryManager.ExecuteWithRetriesAsync<T>(this, action,
MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
- private class MultiArrowReader : IArrowArrayStream
+ private class MultiArrowReader : TracingReader
{
+ private static readonly string s_assemblyName =
BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement));
+ private static readonly string s_assemblyVersion =
BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement));
+
readonly Schema schema;
IEnumerator<IArrowReader>? readers;
IArrowReader? reader;
- public MultiArrowReader(Schema schema, IEnumerable<IArrowReader>
readers)
+ public MultiArrowReader(BigQueryStatement statement, Schema
schema, IEnumerable<IArrowReader> readers) : base(statement)
{
this.schema = schema;
this.readers = readers.GetEnumerator();
}
- public Schema Schema { get { return this.schema; } }
+ public override Schema Schema { get { return this.schema; } }
- public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ public override string AssemblyVersion => s_assemblyVersion;
+
+ public override string AssemblyName => s_assemblyName;
+
+ public override async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- if (this.readers == null)
+ return await this.TraceActivityAsync(async activity =>
{
- return null;
- }
+ if (this.readers == null)
+ {
+ return null;
+ }
- while (true)
- {
- if (this.reader == null)
+ while (true)
{
- if (!this.readers.MoveNext())
+ if (this.reader == null)
{
- Dispose(); // TODO: Remove this line
- return null;
+ if (!this.readers.MoveNext())
+ {
+ Dispose(); // TODO: Remove this line
+ return null;
+ }
+ this.reader = this.readers.Current;
}
- this.reader = this.readers.Current;
- }
- RecordBatch result = await
this.reader.ReadNextRecordBatchAsync(cancellationToken);
+ RecordBatch result = await
this.reader.ReadNextRecordBatchAsync(cancellationToken);
- if (result != null)
- {
- return result;
- }
+ if (result != null)
+ {
+ return result;
+ }
- this.reader = null;
- }
+ this.reader = null;
+ }
+ });
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
{
- if (this.readers != null)
+ if (disposing)
{
- this.readers.Dispose();
- this.readers = null;
+ if (this.readers != null)
+ {
+ this.readers.Dispose();
+ this.readers = null;
+ }
}
Review Comment:
```suggestion
}
base.Dispose(disposing)
```
##########
csharp/src/Drivers/BigQuery/RetryManager.cs:
##########
@@ -49,13 +51,17 @@ public static async Task<T> ExecuteWithRetriesAsync<T>(
}
catch (Exception ex)
{
+ activity?.AddBigQueryTag("retry_attempt", retryCount);
+ activity?.AddException(ex);
Review Comment:
Exactly right. A handled exception should still be traced.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]