birschick-bq commented on code in PR #3022: URL: https://github.com/apache/arrow-adbc/pull/3022#discussion_r2176139429
########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -355,111 +389,212 @@ private QueryOptions ValidateOptions() if (firstProjectId != null) { options.ProjectId = firstProjectId; + activity?.AddBigQueryTag("detected_client_project_id", firstProjectId); + // need to reopen the Client with the projectId specified + this.bigQueryConnection.Open(firstProjectId); } } } if (Options == null || Options.Count == 0) return options; + string largeResultDatasetId = BigQueryConstants.DefaultLargeDatasetId; + foreach (KeyValuePair<string, string> keyValuePair in Options) { - if (keyValuePair.Key == BigQueryParameters.AllowLargeResults) - { - options.AllowLargeResults = true ? keyValuePair.Value.ToLower().Equals("true") : false; - } - if (keyValuePair.Key == BigQueryParameters.LargeResultsDestinationTable) + switch (keyValuePair.Key) { - string destinationTable = keyValuePair.Value; + case BigQueryParameters.AllowLargeResults: + options.AllowLargeResults = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddBigQueryParameterTag(BigQueryParameters.AllowLargeResults, options.AllowLargeResults); + break; + case BigQueryParameters.LargeResultsDataset: + largeResultDatasetId = keyValuePair.Value; + activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDataset, largeResultDatasetId); + break; + case BigQueryParameters.LargeResultsDestinationTable: + string destinationTable = keyValuePair.Value; - if (!destinationTable.Contains(".")) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid"); + if (!destinationTable.Contains(".")) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid"); - string projectId = string.Empty; - string datasetId = string.Empty; - string tableId = string.Empty; + string projectId = string.Empty; + string datasetId = string.Empty; + string tableId = string.Empty; - string[] segments = destinationTable.Split('.'); + string[] segments = destinationTable.Split('.'); - if (segments.Length != 3) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed"); + if (segments.Length != 3) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed"); - projectId = segments[0]; - datasetId = segments[1]; - tableId = segments[2]; + projectId = segments[0]; + datasetId = segments[1]; + tableId = segments[2]; - if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim())) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values"); + if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim())) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values"); - options.DestinationTable = new TableReference() + options.DestinationTable = new TableReference() + { + ProjectId = projectId, + DatasetId = datasetId, + TableId = tableId + }; + activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDestinationTable, destinationTable); + break; + case BigQueryParameters.UseLegacySQL: + options.UseLegacySql = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddBigQueryParameterTag(BigQueryParameters.UseLegacySQL, options.UseLegacySql); + break; + } + } + + if (options.AllowLargeResults == true && options.DestinationTable == null) + { + options.DestinationTable = TryGetLargeDestinationTableReference(largeResultDatasetId, activity); + } + + return options; + } + + /// <summary> + /// Attempts to retrieve or create the specified dataset. + /// </summary> + /// <param name="datasetId">The name of the dataset.</param> + /// <returns>A <see cref="TableReference"/> to a randomly generated table name in the specified dataset.</returns> + private TableReference TryGetLargeDestinationTableReference(string datasetId, Activity? activity) + { + BigQueryDataset? dataset = null; + + try + { + activity?.AddBigQueryTag("large_results.try_find_dataset", datasetId); + dataset = this.Client.GetDataset(datasetId); + activity?.AddBigQueryTag("large_results.found_dataset", datasetId); + } + catch (GoogleApiException gaEx) + { + if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound) + { + activity?.AddException(gaEx); + throw new AdbcException($"Failure trying to retrieve dataset {datasetId}", gaEx); + } + } + + if (dataset == null) + { + try + { + activity?.AddBigQueryTag("large_results.try_create_dataset", datasetId); + DatasetReference reference = this.Client.GetDatasetReference(datasetId); + BigQueryDataset bigQueryDataset = new BigQueryDataset(this.Client, new Dataset() { - ProjectId = projectId, - DatasetId = datasetId, - TableId = tableId - }; + DatasetReference = reference, + DefaultTableExpirationMs = (long)TimeSpan.FromDays(1).TotalMilliseconds, + Labels = new Dictionary<string, string>() + { + // lower case, no spaces or periods per https://cloud.google.com/bigquery/docs/labels-intro + { "created_by", this.bigQueryConnection.DriverName.ToLower().Replace(" ","_") + "_v_" + AssemblyVersion.Replace(".","_") } + } + }); + + dataset = this.Client.CreateDataset(datasetId, bigQueryDataset.Resource); + activity?.AddBigQueryTag("large_results.created_dataset", datasetId); } - if (keyValuePair.Key == BigQueryParameters.UseLegacySQL) + catch (Exception ex) { - options.UseLegacySql = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddException(ex); + throw new AdbcException($"Could not create dataset {datasetId}", ex); } } - return options; + + if (dataset == null) + { + throw new AdbcException($"Could not find dataset {datasetId}", AdbcStatusCode.NotFound); + } + else + { + TableReference reference = new TableReference() + { + ProjectId = this.Client.ProjectId, + DatasetId = datasetId, + TableId = "lg_" + Guid.NewGuid().ToString().Replace("-", "") + }; + + activity?.AddBigQueryTag("large_results.reference", reference.ToString()); + + return reference; + } } public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex); - private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action) => await RetryManager.ExecuteWithRetriesAsync<T>(this, action, MaxRetryAttempts, RetryDelayMs); + private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this, action, activity, MaxRetryAttempts, RetryDelayMs); - private class MultiArrowReader : IArrowArrayStream + private class MultiArrowReader : TracingReader { + private static readonly string s_assemblyName = BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement)); + private static readonly string s_assemblyVersion = BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement)); + readonly Schema schema; IEnumerator<IArrowReader>? readers; IArrowReader? reader; - public MultiArrowReader(Schema schema, IEnumerable<IArrowReader> readers) + public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<IArrowReader> readers) : base(statement) { this.schema = schema; this.readers = readers.GetEnumerator(); } - public Schema Schema { get { return this.schema; } } + public override Schema Schema { get { return this.schema; } } - public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) + public override string AssemblyVersion => s_assemblyVersion; + + public override string AssemblyName => s_assemblyName; + + public override async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) { - if (this.readers == null) + return await this.TraceActivityAsync(async activity => { - return null; - } + if (this.readers == null) + { + return null; + } - while (true) - { - if (this.reader == null) + while (true) { - if (!this.readers.MoveNext()) + if (this.reader == null) { - Dispose(); // TODO: Remove this line - return null; + if (!this.readers.MoveNext()) + { + Dispose(); // TODO: Remove this line + return null; + } + this.reader = this.readers.Current; } - this.reader = this.readers.Current; - } - RecordBatch result = await this.reader.ReadNextRecordBatchAsync(cancellationToken); + RecordBatch result = await this.reader.ReadNextRecordBatchAsync(cancellationToken); - if (result != null) - { - return result; - } + if (result != null) + { + return result; + } - this.reader = null; - } + this.reader = null; + } + }); } - public void Dispose() + protected override void Dispose(bool disposing) { - if (this.readers != null) + if (disposing) { - this.readers.Dispose(); - this.readers = null; + if (this.readers != null) + { + this.readers.Dispose(); + this.readers = null; + } } Review Comment: ```suggestion } base.Dispose(disposing) ``` ########## csharp/src/Drivers/BigQuery/RetryManager.cs: ########## @@ -49,13 +51,17 @@ public static async Task<T> ExecuteWithRetriesAsync<T>( } catch (Exception ex) { + activity?.AddBigQueryTag("retry_attempt", retryCount); + activity?.AddException(ex); Review Comment: Exactly right. A handled exception should still be traced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org