birschick-bq commented on code in PR #3022: URL: https://github.com/apache/arrow-adbc/pull/3022#discussion_r2175846049
########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -97,64 +104,92 @@ public BigQueryConnection(IReadOnlyDictionary<string, string> properties) internal int RetryDelayMs { get; private set; } = 200; + public override string AssemblyVersion => s_assemblyVersion; + + public override string AssemblyName => s_assemblyName; + /// <summary> /// Initializes the internal BigQuery connection /// </summary> + /// <param name="projectId">A project ID that has been specified by the caller, not a user.</param> /// <exception cref="ArgumentException"></exception> - internal BigQueryClient Open() + internal BigQueryClient Open(string? projectId = null) { - string? projectId = null; - string? billingProjectId = null; - TimeSpan? clientTimeout = null; - - // if the caller doesn't specify a projectId, use the default - if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) - projectId = BigQueryConstants.DetectProjectId; - - // in some situations, the publicProjectId gets passed and causes an error when we try to create a query job: - // Google.GoogleApiException : The service bigquery has thrown an exception. HttpStatusCode is Forbidden. - // Access Denied: Project bigquery-public-data: User does not have bigquery.jobs.create permission in - // project bigquery-public-data. - // so if that is the case, treat it as if we need to detect the projectId - if (projectId.Equals(publicProjectId, StringComparison.OrdinalIgnoreCase)) - projectId = BigQueryConstants.DetectProjectId; - - // the billing project can be null if it's not specified - this.properties.TryGetValue(BigQueryParameters.BillingProjectId, out billingProjectId); - - if (this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out string? result)) + return this.TraceActivity(activity => { - if (!string.IsNullOrEmpty(result)) - this.includePublicProjectIds = Convert.ToBoolean(result); - } + activity?.AddTag("Action", "Open"); Review Comment: No need to set the "Action" tag, because it is automatically set via the TraceActivity as the activity name where it takes the name of the method calling it - in this case, `Open`. ########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -97,64 +104,92 @@ public BigQueryConnection(IReadOnlyDictionary<string, string> properties) internal int RetryDelayMs { get; private set; } = 200; + public override string AssemblyVersion => s_assemblyVersion; + + public override string AssemblyName => s_assemblyName; + /// <summary> /// Initializes the internal BigQuery connection /// </summary> + /// <param name="projectId">A project ID that has been specified by the caller, not a user.</param> /// <exception cref="ArgumentException"></exception> - internal BigQueryClient Open() + internal BigQueryClient Open(string? projectId = null) { - string? projectId = null; - string? billingProjectId = null; - TimeSpan? clientTimeout = null; - - // if the caller doesn't specify a projectId, use the default - if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) - projectId = BigQueryConstants.DetectProjectId; - - // in some situations, the publicProjectId gets passed and causes an error when we try to create a query job: - // Google.GoogleApiException : The service bigquery has thrown an exception. HttpStatusCode is Forbidden. - // Access Denied: Project bigquery-public-data: User does not have bigquery.jobs.create permission in - // project bigquery-public-data. - // so if that is the case, treat it as if we need to detect the projectId - if (projectId.Equals(publicProjectId, StringComparison.OrdinalIgnoreCase)) - projectId = BigQueryConstants.DetectProjectId; - - // the billing project can be null if it's not specified - this.properties.TryGetValue(BigQueryParameters.BillingProjectId, out billingProjectId); - - if (this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out string? result)) + return this.TraceActivity(activity => { - if (!string.IsNullOrEmpty(result)) - this.includePublicProjectIds = Convert.ToBoolean(result); - } + activity?.AddTag("Action", "Open"); - if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout, out string? timeoutSeconds) && - int.TryParse(timeoutSeconds, out int seconds)) - { - clientTimeout = TimeSpan.FromSeconds(seconds); - } + string? billingProjectId = null; + TimeSpan? clientTimeout = null; - SetCredential(); + if (string.IsNullOrEmpty(projectId)) + { + // if the caller doesn't specify a projectId, use the default + if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) + { + projectId = BigQueryConstants.DetectProjectId; + } + else + { + activity?.AddTag(BigQueryParameters.ProjectId, projectId); + } - BigQueryClientBuilder bigQueryClientBuilder = new BigQueryClientBuilder() - { - ProjectId = projectId, - QuotaProject = billingProjectId, - GoogleCredential = Credential - }; + // in some situations, the publicProjectId gets passed and causes an error when we try to create a query job: + // Google.GoogleApiException : The service bigquery has thrown an exception. HttpStatusCode is Forbidden. + // Access Denied: Project bigquery-public-data: User does not have bigquery.jobs.create permission in + // project bigquery-public-data. + // so if that is the case, treat it as if we need to detect the projectId + if (projectId.Equals(BigQueryConstants.PublicProjectId, StringComparison.OrdinalIgnoreCase)) + { + projectId = BigQueryConstants.DetectProjectId; + activity?.AddTag("ChangePublicProjectIdToDetectProjectId", projectId); + } + } - BigQueryClient client = bigQueryClientBuilder.Build(); + // the billing project can be null if it's not specified + if (this.properties.TryGetValue(BigQueryParameters.BillingProjectId, out billingProjectId)) + { + activity?.AddTag(BigQueryParameters.BillingProjectId, billingProjectId); + } - if (clientTimeout.HasValue) - { - client.Service.HttpClient.Timeout = clientTimeout.Value; - } + if (this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out string? result)) + { + if (!string.IsNullOrEmpty(result)) + { + this.includePublicProjectIds = Convert.ToBoolean(result); + activity?.AddTag(BigQueryParameters.IncludePublicProjectId, this.includePublicProjectIds); + } + } + + if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout, out string? timeoutSeconds) && + int.TryParse(timeoutSeconds, out int seconds)) + { + clientTimeout = TimeSpan.FromSeconds(seconds); + activity?.AddTag(BigQueryParameters.ClientTimeout, seconds); + } + + SetCredential(activity); + + BigQueryClientBuilder bigQueryClientBuilder = new BigQueryClientBuilder() + { + ProjectId = projectId, + QuotaProject = billingProjectId, + GoogleCredential = Credential + }; + + BigQueryClient client = bigQueryClientBuilder.Build(); + + if (clientTimeout.HasValue) + { + client.Service.HttpClient.Timeout = clientTimeout.Value; + } - Client = client; - return client; + Client = client; + return client; + }); } - internal void SetCredential() + internal void SetCredential(Activity? activity = default) Review Comment: Passing `activity` to another method is totally valid. Just keep in mind that the exception and tag will be associated with the caller (`Open`) and you won't have two entries in the trace for both methods. ########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -302,42 +335,47 @@ private IArrowType GetType(TableFieldSchema field, IArrowType type) return type; } - private IArrowReader? ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string streamName) + private IArrowReader? ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string streamName, Activity? activity) { - Func<Task<IArrowReader?>> func = () => Task.FromResult<IArrowReader?>(ReadChunk(clientMgr, streamName)); - return RetryManager.ExecuteWithRetriesAsync<IArrowReader?>(clientMgr, func, MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult(); + Func<Task<IArrowReader?>> func = () => Task.FromResult<IArrowReader?>(ReadChunk(clientMgr, streamName, activity)); + return RetryManager.ExecuteWithRetriesAsync<IArrowReader?>(clientMgr, func, activity, MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult(); } - private static IArrowReader? ReadChunk(TokenProtectedReadClientManger clientMgr, string streamName) + private static IArrowReader? ReadChunk(TokenProtectedReadClientManger clientMgr, string streamName, Activity? activity) { - return ReadChunk(clientMgr.ReadClient, streamName); + return ReadChunk(clientMgr.ReadClient, streamName, activity); } - private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName) + private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName, Activity? activity) { // Ideally we wouldn't need to indirect through a stream, but the necessary APIs in Arrow // are internal. (TODO: consider changing Arrow). + activity?.AddTag("ReadStream", streamName); Review Comment: You might consider tag names from this namespace ... https://opentelemetry.io/docs/specs/semconv/registry/attributes/messaging/ ########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -205,21 +223,30 @@ public override UpdateResult ExecuteUpdate() private async Task<UpdateResult> ExecuteUpdateInternalAsync() { - GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions(); - - if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true && - int.TryParse(timeoutSeconds, out int seconds) && - seconds >= 0) + return await this.TraceActivity(async activity => { - getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds); - } + activity?.AddTag("Action", "ExecuteUpdateInternalAsync"); Review Comment: Not necessary. ########## csharp/src/Drivers/BigQuery/RetryManager.cs: ########## @@ -49,13 +51,17 @@ public static async Task<T> ExecuteWithRetriesAsync<T>( } catch (Exception ex) { + activity?.AddTag("RetryAttempt", retryCount); Review Comment: tag name should be "fully qualified" ########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -355,112 +393,214 @@ private QueryOptions ValidateOptions() if (firstProjectId != null) { options.ProjectId = firstProjectId; + activity?.AddTag("detected_client_project_id", firstProjectId); + // need to reopen the Client with the projectId specified + this.bigQueryConnection.Open(firstProjectId); } } } if (Options == null || Options.Count == 0) return options; + string largeResultDatasetId = BigQueryConstants.DefaultLargeDatasetId; + foreach (KeyValuePair<string, string> keyValuePair in Options) { - if (keyValuePair.Key == BigQueryParameters.AllowLargeResults) - { - options.AllowLargeResults = true ? keyValuePair.Value.ToLower().Equals("true") : false; - } - if (keyValuePair.Key == BigQueryParameters.LargeResultsDestinationTable) + switch (keyValuePair.Key) { - string destinationTable = keyValuePair.Value; + case BigQueryParameters.AllowLargeResults: + options.AllowLargeResults = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddTag(BigQueryParameters.AllowLargeResults, options.AllowLargeResults); + break; + case BigQueryParameters.LargeResultsDataset: + largeResultDatasetId = keyValuePair.Value; + activity?.AddTag(BigQueryParameters.LargeResultsDataset, largeResultDatasetId); + break; + case BigQueryParameters.LargeResultsDestinationTable: + string destinationTable = keyValuePair.Value; + + if (!destinationTable.Contains(".")) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid"); + + string projectId = string.Empty; + string datasetId = string.Empty; + string tableId = string.Empty; + + string[] segments = destinationTable.Split('.'); - if (!destinationTable.Contains(".")) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid"); + if (segments.Length != 3) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed"); - string projectId = string.Empty; - string datasetId = string.Empty; - string tableId = string.Empty; + projectId = segments[0]; + datasetId = segments[1]; + tableId = segments[2]; - string[] segments = destinationTable.Split('.'); + if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim())) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values"); + + options.DestinationTable = new TableReference() + { + ProjectId = projectId, + DatasetId = datasetId, + TableId = tableId + }; + activity?.AddTag(BigQueryParameters.LargeResultsDestinationTable, destinationTable); + break; + case BigQueryParameters.UseLegacySQL: + options.UseLegacySql = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddTag(BigQueryParameters.UseLegacySQL, options.UseLegacySql); + break; + } + } - if (segments.Length != 3) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed"); + if (options.AllowLargeResults == true && options.DestinationTable == null) + { + options.DestinationTable = TryGetLargeDestinationTableReference(largeResultDatasetId, activity); + } - projectId = segments[0]; - datasetId = segments[1]; - tableId = segments[2]; + return options; + } - if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim())) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values"); + /// <summary> + /// Attempts to retrieve or create the specified dataset. + /// </summary> + /// <param name="datasetId">The name of the dataset.</param> + /// <returns>A <see cref="TableReference"/> to a randomly generated table name in the specified dataset.</returns> + private TableReference TryGetLargeDestinationTableReference(string datasetId, Activity? activity) + { + BigQueryDataset? dataset = null; - options.DestinationTable = new TableReference() + try + { + activity?.AddTag("TryFindDataset", datasetId); + dataset = this.Client.GetDataset(datasetId); + activity?.AddTag("FoundDataset", datasetId); + } + catch (GoogleApiException gaEx) + { + if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound) + { + activity?.AddException(gaEx); + throw new AdbcException($"Failure trying to retrieve dataset {datasetId}", gaEx); + } + } + + if (dataset == null) + { + try + { + activity?.AddTag("TryCreateDataset", datasetId); + DatasetReference reference = this.Client.GetDatasetReference(datasetId); + BigQueryDataset bigQueryDataset = new BigQueryDataset(this.Client, new Dataset() { - ProjectId = projectId, - DatasetId = datasetId, - TableId = tableId - }; + DatasetReference = reference, + DefaultTableExpirationMs = (long)TimeSpan.FromDays(1).TotalMilliseconds, + Labels = new Dictionary<string, string>() + { + // lower case, no spaces or periods per https://cloud.google.com/bigquery/docs/labels-intro + { "created_by", this.bigQueryConnection.DriverName.ToLower().Replace(" ","_") + "_v_" + AssemblyVersion.Replace(".","_") } + } + }); + + dataset = this.Client.CreateDataset(datasetId, bigQueryDataset.Resource); + activity?.AddTag("CreatedDataset", datasetId); } - if (keyValuePair.Key == BigQueryParameters.UseLegacySQL) + catch (Exception ex) { - options.UseLegacySql = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddException(ex); + throw new AdbcException($"Could not create dataset {datasetId}", ex); } } - return options; + + if (dataset == null) + { + throw new AdbcException($"Could not find dataset {datasetId}", AdbcStatusCode.NotFound); + } + else + { + TableReference reference = new TableReference() + { + ProjectId = this.Client.ProjectId, + DatasetId = datasetId, + TableId = "lg_" + Guid.NewGuid().ToString().Replace("-", "") + }; + + activity?.AddTag("LargeDestinationTableReference", reference.ToString()); + + return reference; + } } public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex); - private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action) => await RetryManager.ExecuteWithRetriesAsync<T>(this, action, MaxRetryAttempts, RetryDelayMs); + private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this, action, activity, MaxRetryAttempts, RetryDelayMs); - private class MultiArrowReader : IArrowArrayStream + private class MultiArrowReader : TracingReader { + private static readonly string s_assemblyName = BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement)); + private static readonly string s_assemblyVersion = BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement)); + readonly Schema schema; IEnumerator<IArrowReader>? readers; IArrowReader? reader; - public MultiArrowReader(Schema schema, IEnumerable<IArrowReader> readers) + public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<IArrowReader> readers) : base(statement) { this.schema = schema; this.readers = readers.GetEnumerator(); } - public Schema Schema { get { return this.schema; } } + public override Schema Schema { get { return this.schema; } } - public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) + public override string AssemblyVersion => s_assemblyVersion; + + public override string AssemblyName => s_assemblyName; + + public override async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) { - if (this.readers == null) + return await this.TraceActivityAsync(async activity => { - return null; - } + activity?.AddTag("Action", "ReadNextRecordBatchAsync"); - while (true) - { - if (this.reader == null) + if (this.readers == null) + { + return null; + } + + while (true) { - if (!this.readers.MoveNext()) + if (this.reader == null) { - Dispose(); // TODO: Remove this line - return null; + if (!this.readers.MoveNext()) + { + Dispose(); // TODO: Remove this line + return null; + } + this.reader = this.readers.Current; } - this.reader = this.readers.Current; - } - RecordBatch result = await this.reader.ReadNextRecordBatchAsync(cancellationToken); + RecordBatch result = await this.reader.ReadNextRecordBatchAsync(cancellationToken); - if (result != null) - { - return result; - } + if (result != null) + { + return result; + } - this.reader = null; - } + this.reader = null; + } + }); } - public void Dispose() + public new void Dispose() Review Comment: Using the Dispose pattern, you should now override the `Dispose(bool disposing)` base method, instead of overriding/hiding the `Dispose()` method. ########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -355,112 +393,214 @@ private QueryOptions ValidateOptions() if (firstProjectId != null) { options.ProjectId = firstProjectId; + activity?.AddTag("detected_client_project_id", firstProjectId); Review Comment: tag name should be "fully qualified" ########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -253,11 +294,15 @@ private GoogleCredential ApplyScopes(GoogleCredential credential) public override IArrowArrayStream GetInfo(IReadOnlyList<AdbcInfoCode> codes) { - const int strValTypeID = 0; + return this.TraceActivity(activity => + { + activity?.AddTag("Action", "GetInfo"); Review Comment: Net necessary. ########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -375,10 +428,15 @@ public override IArrowArrayStream GetObjects( IReadOnlyList<string>? tableTypes, string? columnNamePattern) { - IArrowArray[] dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern, - tableNamePattern, tableTypes, columnNamePattern); + return this.TraceActivity(activity => + { + activity?.AddTag("Action", "GetObjects"); Review Comment: Not necessary. ########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -80,122 +89,131 @@ public override QueryResult ExecuteQuery() private async Task<QueryResult> ExecuteQueryInternalAsync() { - QueryOptions queryOptions = ValidateOptions(); - BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null, queryOptions); - - JobReference jobReference = job.Reference; - GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions(); - - if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true && - int.TryParse(timeoutSeconds, out int seconds) && - seconds >= 0) + return await this.TraceActivity(async activity => { - getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds); - } + activity?.AddTag("Action", "ExecuteQueryInternalAsync"); Review Comment: Not necessary. ########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -97,64 +104,92 @@ public BigQueryConnection(IReadOnlyDictionary<string, string> properties) internal int RetryDelayMs { get; private set; } = 200; + public override string AssemblyVersion => s_assemblyVersion; + + public override string AssemblyName => s_assemblyName; + /// <summary> /// Initializes the internal BigQuery connection /// </summary> + /// <param name="projectId">A project ID that has been specified by the caller, not a user.</param> /// <exception cref="ArgumentException"></exception> - internal BigQueryClient Open() + internal BigQueryClient Open(string? projectId = null) { - string? projectId = null; - string? billingProjectId = null; - TimeSpan? clientTimeout = null; - - // if the caller doesn't specify a projectId, use the default - if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) - projectId = BigQueryConstants.DetectProjectId; - - // in some situations, the publicProjectId gets passed and causes an error when we try to create a query job: - // Google.GoogleApiException : The service bigquery has thrown an exception. HttpStatusCode is Forbidden. - // Access Denied: Project bigquery-public-data: User does not have bigquery.jobs.create permission in - // project bigquery-public-data. - // so if that is the case, treat it as if we need to detect the projectId - if (projectId.Equals(publicProjectId, StringComparison.OrdinalIgnoreCase)) - projectId = BigQueryConstants.DetectProjectId; - - // the billing project can be null if it's not specified - this.properties.TryGetValue(BigQueryParameters.BillingProjectId, out billingProjectId); - - if (this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out string? result)) + return this.TraceActivity(activity => { - if (!string.IsNullOrEmpty(result)) - this.includePublicProjectIds = Convert.ToBoolean(result); - } + activity?.AddTag("Action", "Open"); - if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout, out string? timeoutSeconds) && - int.TryParse(timeoutSeconds, out int seconds)) - { - clientTimeout = TimeSpan.FromSeconds(seconds); - } + string? billingProjectId = null; + TimeSpan? clientTimeout = null; - SetCredential(); + if (string.IsNullOrEmpty(projectId)) + { + // if the caller doesn't specify a projectId, use the default + if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) + { + projectId = BigQueryConstants.DetectProjectId; + } + else + { + activity?.AddTag(BigQueryParameters.ProjectId, projectId); + } - BigQueryClientBuilder bigQueryClientBuilder = new BigQueryClientBuilder() - { - ProjectId = projectId, - QuotaProject = billingProjectId, - GoogleCredential = Credential - }; + // in some situations, the publicProjectId gets passed and causes an error when we try to create a query job: + // Google.GoogleApiException : The service bigquery has thrown an exception. HttpStatusCode is Forbidden. + // Access Denied: Project bigquery-public-data: User does not have bigquery.jobs.create permission in + // project bigquery-public-data. + // so if that is the case, treat it as if we need to detect the projectId + if (projectId.Equals(BigQueryConstants.PublicProjectId, StringComparison.OrdinalIgnoreCase)) + { + projectId = BigQueryConstants.DetectProjectId; + activity?.AddTag("ChangePublicProjectIdToDetectProjectId", projectId); Review Comment: While any string is valid as a activity/tag identifier, there are "semantic conventions" for activity (span) / event / tag (attributes) names. I would say this tag name does not follow those conventions. ? `adbc.bigquery.project_id.detect` ? ########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -302,42 +335,47 @@ private IArrowType GetType(TableFieldSchema field, IArrowType type) return type; } - private IArrowReader? ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string streamName) + private IArrowReader? ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string streamName, Activity? activity) { - Func<Task<IArrowReader?>> func = () => Task.FromResult<IArrowReader?>(ReadChunk(clientMgr, streamName)); - return RetryManager.ExecuteWithRetriesAsync<IArrowReader?>(clientMgr, func, MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult(); + Func<Task<IArrowReader?>> func = () => Task.FromResult<IArrowReader?>(ReadChunk(clientMgr, streamName, activity)); + return RetryManager.ExecuteWithRetriesAsync<IArrowReader?>(clientMgr, func, activity, MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult(); } - private static IArrowReader? ReadChunk(TokenProtectedReadClientManger clientMgr, string streamName) + private static IArrowReader? ReadChunk(TokenProtectedReadClientManger clientMgr, string streamName, Activity? activity) { - return ReadChunk(clientMgr.ReadClient, streamName); + return ReadChunk(clientMgr.ReadClient, streamName, activity); } - private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName) + private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName, Activity? activity) { // Ideally we wouldn't need to indirect through a stream, but the necessary APIs in Arrow // are internal. (TODO: consider changing Arrow). + activity?.AddTag("ReadStream", streamName); BigQueryReadClient.ReadRowsStream readRowsStream = client.ReadRows(new ReadRowsRequest { ReadStream = streamName }); IAsyncEnumerator<ReadRowsResponse> enumerator = readRowsStream.GetResponseStream().GetAsyncEnumerator(); ReadRowsStream stream = new ReadRowsStream(enumerator); if (stream.HasRows) { + activity?.AddTag("ReadStream.HasRows", "true"); return new ArrowStreamReader(stream); } else { + activity?.AddTag("ReadStream.HasRows", "false"); return null; } } - private QueryOptions ValidateOptions() + private QueryOptions ValidateOptions(Activity? activity) { QueryOptions options = new QueryOptions(); if (Client.ProjectId == BigQueryConstants.DetectProjectId) { + activity?.AddTag("client_project_id", BigQueryConstants.DetectProjectId); Review Comment: tag name should be "fully qualified" ########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -412,10 +470,17 @@ internal void UpdateClientToken() { if (Client == null) { Client = Open(); } - Func<Task<BigQueryResults?>> func = () => Client.ExecuteQueryAsync(sql, parameters ?? Enumerable.Empty<BigQueryParameter>(), queryOptions, resultsOptions); - BigQueryResults? result = ExecuteWithRetriesAsync<BigQueryResults?>(func).GetAwaiter().GetResult(); + return this.TraceActivity(activity => + { + activity?.AddTag("Action", "ExecuteQuery"); Review Comment: Not necessary. ########## csharp/src/Drivers/BigQuery/BigQueryStatement.cs: ########## @@ -355,112 +393,214 @@ private QueryOptions ValidateOptions() if (firstProjectId != null) { options.ProjectId = firstProjectId; + activity?.AddTag("detected_client_project_id", firstProjectId); + // need to reopen the Client with the projectId specified + this.bigQueryConnection.Open(firstProjectId); } } } if (Options == null || Options.Count == 0) return options; + string largeResultDatasetId = BigQueryConstants.DefaultLargeDatasetId; + foreach (KeyValuePair<string, string> keyValuePair in Options) { - if (keyValuePair.Key == BigQueryParameters.AllowLargeResults) - { - options.AllowLargeResults = true ? keyValuePair.Value.ToLower().Equals("true") : false; - } - if (keyValuePair.Key == BigQueryParameters.LargeResultsDestinationTable) + switch (keyValuePair.Key) { - string destinationTable = keyValuePair.Value; + case BigQueryParameters.AllowLargeResults: + options.AllowLargeResults = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddTag(BigQueryParameters.AllowLargeResults, options.AllowLargeResults); + break; + case BigQueryParameters.LargeResultsDataset: + largeResultDatasetId = keyValuePair.Value; + activity?.AddTag(BigQueryParameters.LargeResultsDataset, largeResultDatasetId); + break; + case BigQueryParameters.LargeResultsDestinationTable: + string destinationTable = keyValuePair.Value; + + if (!destinationTable.Contains(".")) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid"); + + string projectId = string.Empty; + string datasetId = string.Empty; + string tableId = string.Empty; + + string[] segments = destinationTable.Split('.'); - if (!destinationTable.Contains(".")) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid"); + if (segments.Length != 3) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed"); - string projectId = string.Empty; - string datasetId = string.Empty; - string tableId = string.Empty; + projectId = segments[0]; + datasetId = segments[1]; + tableId = segments[2]; - string[] segments = destinationTable.Split('.'); + if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim())) + throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values"); + + options.DestinationTable = new TableReference() + { + ProjectId = projectId, + DatasetId = datasetId, + TableId = tableId + }; + activity?.AddTag(BigQueryParameters.LargeResultsDestinationTable, destinationTable); + break; + case BigQueryParameters.UseLegacySQL: + options.UseLegacySql = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddTag(BigQueryParameters.UseLegacySQL, options.UseLegacySql); + break; + } + } - if (segments.Length != 3) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed"); + if (options.AllowLargeResults == true && options.DestinationTable == null) + { + options.DestinationTable = TryGetLargeDestinationTableReference(largeResultDatasetId, activity); + } - projectId = segments[0]; - datasetId = segments[1]; - tableId = segments[2]; + return options; + } - if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim())) - throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values"); + /// <summary> + /// Attempts to retrieve or create the specified dataset. + /// </summary> + /// <param name="datasetId">The name of the dataset.</param> + /// <returns>A <see cref="TableReference"/> to a randomly generated table name in the specified dataset.</returns> + private TableReference TryGetLargeDestinationTableReference(string datasetId, Activity? activity) + { + BigQueryDataset? dataset = null; - options.DestinationTable = new TableReference() + try + { + activity?.AddTag("TryFindDataset", datasetId); + dataset = this.Client.GetDataset(datasetId); + activity?.AddTag("FoundDataset", datasetId); + } + catch (GoogleApiException gaEx) + { + if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound) + { + activity?.AddException(gaEx); + throw new AdbcException($"Failure trying to retrieve dataset {datasetId}", gaEx); + } + } + + if (dataset == null) + { + try + { + activity?.AddTag("TryCreateDataset", datasetId); + DatasetReference reference = this.Client.GetDatasetReference(datasetId); + BigQueryDataset bigQueryDataset = new BigQueryDataset(this.Client, new Dataset() { - ProjectId = projectId, - DatasetId = datasetId, - TableId = tableId - }; + DatasetReference = reference, + DefaultTableExpirationMs = (long)TimeSpan.FromDays(1).TotalMilliseconds, + Labels = new Dictionary<string, string>() + { + // lower case, no spaces or periods per https://cloud.google.com/bigquery/docs/labels-intro + { "created_by", this.bigQueryConnection.DriverName.ToLower().Replace(" ","_") + "_v_" + AssemblyVersion.Replace(".","_") } + } + }); + + dataset = this.Client.CreateDataset(datasetId, bigQueryDataset.Resource); + activity?.AddTag("CreatedDataset", datasetId); } - if (keyValuePair.Key == BigQueryParameters.UseLegacySQL) + catch (Exception ex) { - options.UseLegacySql = true ? keyValuePair.Value.ToLower().Equals("true") : false; + activity?.AddException(ex); + throw new AdbcException($"Could not create dataset {datasetId}", ex); } } - return options; + + if (dataset == null) + { + throw new AdbcException($"Could not find dataset {datasetId}", AdbcStatusCode.NotFound); + } + else + { + TableReference reference = new TableReference() + { + ProjectId = this.Client.ProjectId, + DatasetId = datasetId, + TableId = "lg_" + Guid.NewGuid().ToString().Replace("-", "") + }; + + activity?.AddTag("LargeDestinationTableReference", reference.ToString()); + + return reference; + } } public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex); - private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action) => await RetryManager.ExecuteWithRetriesAsync<T>(this, action, MaxRetryAttempts, RetryDelayMs); + private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this, action, activity, MaxRetryAttempts, RetryDelayMs); - private class MultiArrowReader : IArrowArrayStream + private class MultiArrowReader : TracingReader { + private static readonly string s_assemblyName = BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement)); + private static readonly string s_assemblyVersion = BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement)); + readonly Schema schema; IEnumerator<IArrowReader>? readers; IArrowReader? reader; - public MultiArrowReader(Schema schema, IEnumerable<IArrowReader> readers) + public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<IArrowReader> readers) : base(statement) { this.schema = schema; this.readers = readers.GetEnumerator(); } - public Schema Schema { get { return this.schema; } } + public override Schema Schema { get { return this.schema; } } - public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) + public override string AssemblyVersion => s_assemblyVersion; + + public override string AssemblyName => s_assemblyName; + + public override async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) { - if (this.readers == null) + return await this.TraceActivityAsync(async activity => { - return null; - } + activity?.AddTag("Action", "ReadNextRecordBatchAsync"); Review Comment: Not necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org