CurtHagenlocher commented on code in PR #2634:
URL: https://github.com/apache/arrow-adbc/pull/2634#discussion_r2008022011
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -27,6 +27,11 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
internal class HiveServer2Statement : AdbcStatement
{
+ internal TSparkRowSetType currentResultFormat =
TSparkRowSetType.ARROW_BASED_SET;
Review Comment:
Is this the right default when the result format isn't set by the server?
More broadly, if the added properties are only relevant for Spark then it
feels a little inappropriate to put these two added fields into
`HiveServer2Statement`. Perhaps the simplest change would be to add the
`TGetResultSetMetadataResp` to the parameter list for `NewReader` so that
`SparkConnection.NewReader` is able to read the additional metadata properties
directly.
##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -54,12 +68,97 @@ protected override void
SetStatementProperties(TExecuteStatementReq statement)
};
}
+ public override void SetOption(string key, string value)
+ {
+ switch (key)
+ {
+ case Options.UseCloudFetch:
+ if (bool.TryParse(value, out bool useCloudFetch))
+ {
+ SetUseCloudFetch(useCloudFetch);
Review Comment:
These can just set the fields directly and we don't need to define
additional setters (especially as they aren't part of any public API).
##########
artifacts/Apache.Arrow.Adbc.Tests.Drivers.Apache/Debug/net8.0/.msCoverageSourceRootsMapping_Apache.Arrow.Adbc.Tests.Drivers.Apache:
##########
Review Comment:
Please remove added file.
##########
csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs:
##########
@@ -0,0 +1,269 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
+{
+ /// <summary>
+ /// Reader for CloudFetch results from Databricks Spark Thrift server.
+ /// Handles downloading and processing URL-based result sets.
+ /// </summary>
+ internal sealed class SparkCloudFetchReader : IArrowArrayStream
+ {
+ private const int MaxRetries = 3;
+ private const int RetryDelayMs = 500;
+
+ private HiveServer2Statement? statement;
+ private readonly Schema schema;
+ private List<TSparkArrowResultLink>? resultLinks;
+ private int linkIndex;
+ private ArrowStreamReader? currentReader;
+ private readonly bool isLz4Compressed;
+ private long startOffset;
+
+ // Lazy initialization of HttpClient
+ private readonly Lazy<HttpClient> _httpClient = new
Lazy<HttpClient>(() =>
+ {
+ var client = new HttpClient();
+ client.Timeout = TimeSpan.FromMinutes(5); // Set a reasonable
timeout for large downloads
+ return client;
+ });
+
+ /// <summary>
+ /// Initializes a new instance of the <see
cref="SparkCloudFetchReader"/> class.
+ /// </summary>
+ /// <param name="statement">The HiveServer2 statement.</param>
+ /// <param name="schema">The Arrow schema.</param>
+ /// <param name="isLz4Compressed">Whether the results are LZ4
compressed.</param>
+ public SparkCloudFetchReader(HiveServer2Statement statement, Schema
schema, bool isLz4Compressed)
+ {
+ this.statement = statement;
+ this.schema = schema;
+ this.isLz4Compressed = isLz4Compressed;
+ }
+
+ /// <summary>
+ /// Gets the Arrow schema.
+ /// </summary>
+ public Schema Schema { get { return schema; } }
+
+ private HttpClient HttpClient
+ {
+ get { return _httpClient.Value; }
+ }
+
+ /// <summary>
+ /// Reads the next record batch from the result set.
+ /// </summary>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The next record batch, or null if there are no more
batches.</returns>
+ public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ {
+ while (true)
+ {
+ // If we have a current reader, try to read the next batch
+ if (this.currentReader != null)
+ {
+ RecordBatch? next = await
this.currentReader.ReadNextRecordBatchAsync(cancellationToken);
+ if (next != null)
+ {
+ return next;
+ }
+ else
+ {
+ this.currentReader.Dispose();
+ this.currentReader = null;
+ }
+ }
+
+ // If we have more links to process, download and process the
next one
+ if (this.resultLinks != null && this.linkIndex <
this.resultLinks.Count)
+ {
+ var link = this.resultLinks[this.linkIndex++];
+ byte[]? fileData = null;
+
+ // Retry logic for downloading files
+ for (int retry = 0; retry < MaxRetries; retry++)
+ {
+ try
+ {
+ fileData = await DownloadFileAsync(link.FileLink,
cancellationToken);
+ break; // Success, exit retry loop
+ }
+ catch (Exception ex) when (retry < MaxRetries - 1)
+ {
+ // Log the error and retry
+ Console.WriteLine($"Error downloading file
(attempt {retry + 1}/{MaxRetries}): {ex.Message}");
+ await Task.Delay(RetryDelayMs * (retry + 1),
cancellationToken);
+ }
+ }
+
+ if (fileData == null)
+ {
+ // All retries failed, continue to the next link or
fetch more links
+ continue;
+ }
+
+ // Process the downloaded file data
+ MemoryStream dataStream;
+
+ // If the data is LZ4 compressed, decompress it
+ if (this.isLz4Compressed)
Review Comment:
Is it possible to leverage the Apache.Arrow.Compression assembly to do
decompression? It works by passing a `CompressionCodecFactory` to the
`ArrowStreamReader` constructor.
##########
csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs:
##########
@@ -0,0 +1,269 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
+{
+ /// <summary>
+ /// Reader for CloudFetch results from Databricks Spark Thrift server.
+ /// Handles downloading and processing URL-based result sets.
+ /// </summary>
+ internal sealed class SparkCloudFetchReader : IArrowArrayStream
+ {
+ private const int MaxRetries = 3;
+ private const int RetryDelayMs = 500;
+
+ private HiveServer2Statement? statement;
+ private readonly Schema schema;
+ private List<TSparkArrowResultLink>? resultLinks;
+ private int linkIndex;
+ private ArrowStreamReader? currentReader;
+ private readonly bool isLz4Compressed;
+ private long startOffset;
+
+ // Lazy initialization of HttpClient
+ private readonly Lazy<HttpClient> _httpClient = new
Lazy<HttpClient>(() =>
+ {
+ var client = new HttpClient();
+ client.Timeout = TimeSpan.FromMinutes(5); // Set a reasonable
timeout for large downloads
+ return client;
+ });
+
+ /// <summary>
+ /// Initializes a new instance of the <see
cref="SparkCloudFetchReader"/> class.
+ /// </summary>
+ /// <param name="statement">The HiveServer2 statement.</param>
+ /// <param name="schema">The Arrow schema.</param>
+ /// <param name="isLz4Compressed">Whether the results are LZ4
compressed.</param>
+ public SparkCloudFetchReader(HiveServer2Statement statement, Schema
schema, bool isLz4Compressed)
+ {
+ this.statement = statement;
+ this.schema = schema;
+ this.isLz4Compressed = isLz4Compressed;
+ }
+
+ /// <summary>
+ /// Gets the Arrow schema.
+ /// </summary>
+ public Schema Schema { get { return schema; } }
+
+ private HttpClient HttpClient
+ {
+ get { return _httpClient.Value; }
+ }
+
+ /// <summary>
+ /// Reads the next record batch from the result set.
+ /// </summary>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The next record batch, or null if there are no more
batches.</returns>
+ public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ {
+ while (true)
+ {
+ // If we have a current reader, try to read the next batch
+ if (this.currentReader != null)
+ {
+ RecordBatch? next = await
this.currentReader.ReadNextRecordBatchAsync(cancellationToken);
+ if (next != null)
+ {
+ return next;
+ }
+ else
+ {
+ this.currentReader.Dispose();
+ this.currentReader = null;
+ }
+ }
+
+ // If we have more links to process, download and process the
next one
+ if (this.resultLinks != null && this.linkIndex <
this.resultLinks.Count)
+ {
+ var link = this.resultLinks[this.linkIndex++];
+ byte[]? fileData = null;
+
+ // Retry logic for downloading files
+ for (int retry = 0; retry < MaxRetries; retry++)
+ {
+ try
+ {
+ fileData = await DownloadFileAsync(link.FileLink,
cancellationToken);
+ break; // Success, exit retry loop
+ }
+ catch (Exception ex) when (retry < MaxRetries - 1)
+ {
+ // Log the error and retry
+ Console.WriteLine($"Error downloading file
(attempt {retry + 1}/{MaxRetries}): {ex.Message}");
+ await Task.Delay(RetryDelayMs * (retry + 1),
cancellationToken);
+ }
+ }
+
+ if (fileData == null)
+ {
+ // All retries failed, continue to the next link or
fetch more links
+ continue;
Review Comment:
I'm a little concerned about the failure mode here and in line 150. Won't
this mean that the user silently gets less data and doesn't even know about it?
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -27,6 +27,11 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
internal class HiveServer2Statement : AdbcStatement
{
+ internal TSparkRowSetType currentResultFormat =
TSparkRowSetType.ARROW_BASED_SET;
+ // Track the result format and compression for the current query
+ internal bool isLz4Compressed = false;
+
+
Review Comment:
nit: remove extra blank line
##########
csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs:
##########
@@ -29,7 +31,18 @@ public SparkDatabricksConnection(IReadOnlyDictionary<string,
string> properties)
{
}
- internal override IArrowArrayStream NewReader<T>(T statement, Schema
schema) => new SparkDatabricksReader(statement, schema);
+ internal override IArrowArrayStream NewReader<T>(T statement, Schema
schema)
+ {
+ // Choose the appropriate reader based on the result format
+ if (statement.currentResultFormat ==
TSparkRowSetType.URL_BASED_SET)
+ {
+ return new SparkCloudFetchReader(statement as
HiveServer2Statement, schema, statement.isLz4Compressed);
Review Comment:
The `as` cast is unnecessary (and potentially confusing) because the
constraint on `T` already ensures that `statement` is a `HiveServer2Statement`.
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -84,7 +89,19 @@ private async Task<QueryResult>
ExecuteQueryAsyncInternal(CancellationToken canc
// take QueryTimeoutSeconds (but this could be restricting)
await ExecuteStatementAsync(cancellationToken); // --> get
QueryTimeout +
await HiveServer2Connection.PollForResponseAsync(OperationHandle!,
Connection.Client, PollTimeMilliseconds, cancellationToken); // + poll, up to
QueryTimeout
- Schema schema = await GetResultSetSchemaAsync(OperationHandle!,
Connection.Client, cancellationToken); // + get the result, up to QueryTimeout
+ TGetResultSetMetadataResp response = await
HiveServer2Connection.GetResultSetMetadataAsync(OperationHandle!,
Connection.Client, cancellationToken);
Review Comment:
I believe after this change that `GetResultSetSchemaAsync` is no longer used
and can be deleted.
##########
csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs:
##########
@@ -0,0 +1,269 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
+{
+ /// <summary>
+ /// Reader for CloudFetch results from Databricks Spark Thrift server.
+ /// Handles downloading and processing URL-based result sets.
+ /// </summary>
+ internal sealed class SparkCloudFetchReader : IArrowArrayStream
+ {
+ private const int MaxRetries = 3;
+ private const int RetryDelayMs = 500;
+
+ private HiveServer2Statement? statement;
+ private readonly Schema schema;
+ private List<TSparkArrowResultLink>? resultLinks;
+ private int linkIndex;
+ private ArrowStreamReader? currentReader;
+ private readonly bool isLz4Compressed;
+ private long startOffset;
+
+ // Lazy initialization of HttpClient
+ private readonly Lazy<HttpClient> _httpClient = new
Lazy<HttpClient>(() =>
+ {
+ var client = new HttpClient();
+ client.Timeout = TimeSpan.FromMinutes(5); // Set a reasonable
timeout for large downloads
Review Comment:
Should this timeout be configurable via a connection parameter?
##########
csharp/test/Drivers/Apache/Apache.Arrow.Adbc.Tests.Drivers.Apache.csproj:
##########
@@ -7,13 +7,15 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
+ <PackageReference Include="Moq" Version="4.20.72" />
Review Comment:
Moq doesn't appear to get used in this PR.
##########
csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs:
##########
@@ -0,0 +1,269 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
+{
+ /// <summary>
+ /// Reader for CloudFetch results from Databricks Spark Thrift server.
+ /// Handles downloading and processing URL-based result sets.
+ /// </summary>
+ internal sealed class SparkCloudFetchReader : IArrowArrayStream
+ {
+ private const int MaxRetries = 3;
+ private const int RetryDelayMs = 500;
+
+ private HiveServer2Statement? statement;
+ private readonly Schema schema;
+ private List<TSparkArrowResultLink>? resultLinks;
+ private int linkIndex;
+ private ArrowStreamReader? currentReader;
+ private readonly bool isLz4Compressed;
+ private long startOffset;
+
+ // Lazy initialization of HttpClient
+ private readonly Lazy<HttpClient> _httpClient = new
Lazy<HttpClient>(() =>
Review Comment:
nit: I know that the C# code in this repository as a whole isn't very
consistent about field names, but it would be nice to at least stay consistent
inside one source file in terms of starting with an underscore vs not.
##########
csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs:
##########
@@ -0,0 +1,269 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
+{
+ /// <summary>
+ /// Reader for CloudFetch results from Databricks Spark Thrift server.
+ /// Handles downloading and processing URL-based result sets.
+ /// </summary>
+ internal sealed class SparkCloudFetchReader : IArrowArrayStream
+ {
+ private const int MaxRetries = 3;
+ private const int RetryDelayMs = 500;
+
+ private HiveServer2Statement? statement;
+ private readonly Schema schema;
+ private List<TSparkArrowResultLink>? resultLinks;
+ private int linkIndex;
+ private ArrowStreamReader? currentReader;
+ private readonly bool isLz4Compressed;
+ private long startOffset;
+
+ // Lazy initialization of HttpClient
+ private readonly Lazy<HttpClient> _httpClient = new
Lazy<HttpClient>(() =>
+ {
+ var client = new HttpClient();
+ client.Timeout = TimeSpan.FromMinutes(5); // Set a reasonable
timeout for large downloads
+ return client;
+ });
+
+ /// <summary>
+ /// Initializes a new instance of the <see
cref="SparkCloudFetchReader"/> class.
+ /// </summary>
+ /// <param name="statement">The HiveServer2 statement.</param>
+ /// <param name="schema">The Arrow schema.</param>
+ /// <param name="isLz4Compressed">Whether the results are LZ4
compressed.</param>
+ public SparkCloudFetchReader(HiveServer2Statement statement, Schema
schema, bool isLz4Compressed)
+ {
+ this.statement = statement;
+ this.schema = schema;
+ this.isLz4Compressed = isLz4Compressed;
+ }
+
+ /// <summary>
+ /// Gets the Arrow schema.
+ /// </summary>
+ public Schema Schema { get { return schema; } }
+
+ private HttpClient HttpClient
+ {
+ get { return _httpClient.Value; }
+ }
+
+ /// <summary>
+ /// Reads the next record batch from the result set.
+ /// </summary>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The next record batch, or null if there are no more
batches.</returns>
+ public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ {
+ while (true)
+ {
+ // If we have a current reader, try to read the next batch
+ if (this.currentReader != null)
+ {
+ RecordBatch? next = await
this.currentReader.ReadNextRecordBatchAsync(cancellationToken);
+ if (next != null)
+ {
+ return next;
+ }
+ else
+ {
+ this.currentReader.Dispose();
+ this.currentReader = null;
+ }
+ }
+
+ // If we have more links to process, download and process the
next one
+ if (this.resultLinks != null && this.linkIndex <
this.resultLinks.Count)
+ {
+ var link = this.resultLinks[this.linkIndex++];
+ byte[]? fileData = null;
+
+ // Retry logic for downloading files
+ for (int retry = 0; retry < MaxRetries; retry++)
+ {
+ try
+ {
+ fileData = await DownloadFileAsync(link.FileLink,
cancellationToken);
+ break; // Success, exit retry loop
+ }
+ catch (Exception ex) when (retry < MaxRetries - 1)
+ {
+ // Log the error and retry
+ Console.WriteLine($"Error downloading file
(attempt {retry + 1}/{MaxRetries}): {ex.Message}");
Review Comment:
The use of `Console.WriteLine` isn't a good idea. We don't currently have a
good way to write these kinds of logs; consider removing or replacing with
`Debug.WriteLine` which is a nop in release builds and writes to the debug
listener in debug builds.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]