birschick-bq commented on code in PR #3022:
URL: https://github.com/apache/arrow-adbc/pull/3022#discussion_r2176139429


##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -355,111 +389,212 @@ private QueryOptions ValidateOptions()
                     if (firstProjectId != null)
                     {
                         options.ProjectId = firstProjectId;
+                        activity?.AddBigQueryTag("detected_client_project_id", 
firstProjectId);
+                        // need to reopen the Client with the projectId 
specified
+                        this.bigQueryConnection.Open(firstProjectId);
                     }
                 }
             }
 
             if (Options == null || Options.Count == 0)
                 return options;
 
+            string largeResultDatasetId = 
BigQueryConstants.DefaultLargeDatasetId;
+
             foreach (KeyValuePair<string, string> keyValuePair in Options)
             {
-                if (keyValuePair.Key == BigQueryParameters.AllowLargeResults)
-                {
-                    options.AllowLargeResults = true ? 
keyValuePair.Value.ToLower().Equals("true") : false;
-                }
-                if (keyValuePair.Key == 
BigQueryParameters.LargeResultsDestinationTable)
+                switch (keyValuePair.Key)
                 {
-                    string destinationTable = keyValuePair.Value;
+                    case BigQueryParameters.AllowLargeResults:
+                        options.AllowLargeResults = true ? 
keyValuePair.Value.ToLower().Equals("true") : false;
+                        
activity?.AddBigQueryParameterTag(BigQueryParameters.AllowLargeResults, 
options.AllowLargeResults);
+                        break;
+                    case BigQueryParameters.LargeResultsDataset:
+                        largeResultDatasetId = keyValuePair.Value;
+                        
activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDataset, 
largeResultDatasetId);
+                        break;
+                    case BigQueryParameters.LargeResultsDestinationTable:
+                        string destinationTable = keyValuePair.Value;
 
-                    if (!destinationTable.Contains("."))
-                        throw new 
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} 
is invalid");
+                        if (!destinationTable.Contains("."))
+                            throw new 
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} 
is invalid");
 
-                    string projectId = string.Empty;
-                    string datasetId = string.Empty;
-                    string tableId = string.Empty;
+                        string projectId = string.Empty;
+                        string datasetId = string.Empty;
+                        string tableId = string.Empty;
 
-                    string[] segments = destinationTable.Split('.');
+                        string[] segments = destinationTable.Split('.');
 
-                    if (segments.Length != 3)
-                        throw new 
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} 
cannot be parsed");
+                        if (segments.Length != 3)
+                            throw new 
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} 
cannot be parsed");
 
-                    projectId = segments[0];
-                    datasetId = segments[1];
-                    tableId = segments[2];
+                        projectId = segments[0];
+                        datasetId = segments[1];
+                        tableId = segments[2];
 
-                    if (string.IsNullOrEmpty(projectId.Trim()) || 
string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
-                        throw new 
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} 
contains invalid values");
+                        if (string.IsNullOrEmpty(projectId.Trim()) || 
string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
+                            throw new 
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} 
contains invalid values");
 
-                    options.DestinationTable = new TableReference()
+                        options.DestinationTable = new TableReference()
+                        {
+                            ProjectId = projectId,
+                            DatasetId = datasetId,
+                            TableId = tableId
+                        };
+                        
activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDestinationTable,
 destinationTable);
+                        break;
+                    case BigQueryParameters.UseLegacySQL:
+                        options.UseLegacySql = true ? 
keyValuePair.Value.ToLower().Equals("true") : false;
+                        
activity?.AddBigQueryParameterTag(BigQueryParameters.UseLegacySQL, 
options.UseLegacySql);
+                        break;
+                }
+            }
+
+            if (options.AllowLargeResults == true && options.DestinationTable 
== null)
+            {
+                options.DestinationTable = 
TryGetLargeDestinationTableReference(largeResultDatasetId, activity);
+            }
+
+            return options;
+        }
+
+        /// <summary>
+        /// Attempts to retrieve or create the specified dataset.
+        /// </summary>
+        /// <param name="datasetId">The name of the dataset.</param>
+        /// <returns>A <see cref="TableReference"/> to a randomly generated 
table name in the specified dataset.</returns>
+        private TableReference TryGetLargeDestinationTableReference(string 
datasetId, Activity? activity)
+        {
+            BigQueryDataset? dataset = null;
+
+            try
+            {
+                activity?.AddBigQueryTag("large_results.try_find_dataset", 
datasetId);
+                dataset = this.Client.GetDataset(datasetId);
+                activity?.AddBigQueryTag("large_results.found_dataset", 
datasetId);
+            }
+            catch (GoogleApiException gaEx)
+            {
+                if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound)
+                {
+                    activity?.AddException(gaEx);
+                    throw new AdbcException($"Failure trying to retrieve 
dataset {datasetId}", gaEx);
+                }
+            }
+
+            if (dataset == null)
+            {
+                try
+                {
+                    
activity?.AddBigQueryTag("large_results.try_create_dataset", datasetId);
+                    DatasetReference reference = 
this.Client.GetDatasetReference(datasetId);
+                    BigQueryDataset bigQueryDataset = new 
BigQueryDataset(this.Client, new Dataset()
                     {
-                        ProjectId = projectId,
-                        DatasetId = datasetId,
-                        TableId = tableId
-                    };
+                        DatasetReference = reference,
+                        DefaultTableExpirationMs = 
(long)TimeSpan.FromDays(1).TotalMilliseconds,
+                        Labels = new Dictionary<string, string>()
+                        {
+                            // lower case, no spaces or periods per 
https://cloud.google.com/bigquery/docs/labels-intro
+                            { "created_by", 
this.bigQueryConnection.DriverName.ToLower().Replace(" ","_") + "_v_" + 
AssemblyVersion.Replace(".","_") }
+                        }
+                    });
+
+                    dataset = this.Client.CreateDataset(datasetId, 
bigQueryDataset.Resource);
+                    activity?.AddBigQueryTag("large_results.created_dataset", 
datasetId);
                 }
-                if (keyValuePair.Key == BigQueryParameters.UseLegacySQL)
+                catch (Exception ex)
                 {
-                    options.UseLegacySql = true ? 
keyValuePair.Value.ToLower().Equals("true") : false;
+                    activity?.AddException(ex);
+                    throw new AdbcException($"Could not create dataset 
{datasetId}", ex);
                 }
             }
-            return options;
+
+            if (dataset == null)
+            {
+                throw new AdbcException($"Could not find dataset {datasetId}", 
AdbcStatusCode.NotFound);
+            }
+            else
+            {
+                TableReference reference = new TableReference()
+                {
+                    ProjectId = this.Client.ProjectId,
+                    DatasetId = datasetId,
+                    TableId = "lg_" + Guid.NewGuid().ToString().Replace("-", 
"")
+                };
+
+                activity?.AddBigQueryTag("large_results.reference", 
reference.ToString());
+
+                return reference;
+            }
         }
 
         public bool TokenRequiresUpdate(Exception ex) => 
BigQueryUtils.TokenRequiresUpdate(ex);
 
-        private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action) 
=> await RetryManager.ExecuteWithRetriesAsync<T>(this, action, 
MaxRetryAttempts, RetryDelayMs);
+        private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, 
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this, 
action, activity, MaxRetryAttempts, RetryDelayMs);
 
-        private class MultiArrowReader : IArrowArrayStream
+        private class MultiArrowReader : TracingReader
         {
+            private static readonly string s_assemblyName = 
BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement));
+            private static readonly string s_assemblyVersion = 
BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement));
+
             readonly Schema schema;
             IEnumerator<IArrowReader>? readers;
             IArrowReader? reader;
 
-            public MultiArrowReader(Schema schema, IEnumerable<IArrowReader> 
readers)
+            public MultiArrowReader(BigQueryStatement statement, Schema 
schema, IEnumerable<IArrowReader> readers) : base(statement)
             {
                 this.schema = schema;
                 this.readers = readers.GetEnumerator();
             }
 
-            public Schema Schema { get { return this.schema; } }
+            public override Schema Schema { get { return this.schema; } }
 
-            public async ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            public override string AssemblyVersion => s_assemblyVersion;
+
+            public override string AssemblyName => s_assemblyName;
+
+            public override async ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
             {
-                if (this.readers == null)
+                return await this.TraceActivityAsync(async activity =>
                 {
-                    return null;
-                }
+                    if (this.readers == null)
+                    {
+                        return null;
+                    }
 
-                while (true)
-                {
-                    if (this.reader == null)
+                    while (true)
                     {
-                        if (!this.readers.MoveNext())
+                        if (this.reader == null)
                         {
-                            Dispose(); // TODO: Remove this line
-                            return null;
+                            if (!this.readers.MoveNext())
+                            {
+                                Dispose(); // TODO: Remove this line
+                                return null;
+                            }
+                            this.reader = this.readers.Current;
                         }
-                        this.reader = this.readers.Current;
-                    }
 
-                    RecordBatch result = await 
this.reader.ReadNextRecordBatchAsync(cancellationToken);
+                        RecordBatch result = await 
this.reader.ReadNextRecordBatchAsync(cancellationToken);
 
-                    if (result != null)
-                    {
-                        return result;
-                    }
+                        if (result != null)
+                        {
+                            return result;
+                        }
 
-                    this.reader = null;
-                }
+                        this.reader = null;
+                    }
+                });
             }
 
-            public void Dispose()
+            protected override void Dispose(bool disposing)
             {
-                if (this.readers != null)
+                if (disposing)
                 {
-                    this.readers.Dispose();
-                    this.readers = null;
+                    if (this.readers != null)
+                    {
+                        this.readers.Dispose();
+                        this.readers = null;
+                    }
                 }

Review Comment:
   ```suggestion
                   }
                   base.Dispose(disposing)
   ```



##########
csharp/src/Drivers/BigQuery/RetryManager.cs:
##########
@@ -49,13 +51,17 @@ public static async Task<T> ExecuteWithRetriesAsync<T>(
                 }
                 catch (Exception ex)
                 {
+                    activity?.AddBigQueryTag("retry_attempt", retryCount);
+                    activity?.AddException(ex);

Review Comment:
   Exactly right. A handled exception should still be traced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to