This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 028d22f9d feat(csharp/src/Drivers/Apache/Spark): Add Lz4 compression
support to arrow batch reader (#2669)
028d22f9d is described below
commit 028d22f9d2d8b2fd582f5aadbae07fdfd8a8e036
Author: eric-wang-1990 <[email protected]>
AuthorDate: Fri Apr 4 20:20:58 2025 +0000
feat(csharp/src/Drivers/Apache/Spark): Add Lz4 compression support to arrow
batch reader (#2669)
1. Create a new file
[Lz4Utilities.cs](https://github.com/apache/arrow-adbc/compare/main...eric-wang-1990:arrow-adbc:add_lz4_to_arrowbatch#diff-7275463b6b9fcc3cf3b954580e3ded8b0b7237a90e0f4aea33eb11e613f3db39)
to abstract common Lz4 decompress util functions for both cloud fetch
and arrow batch.
2. Add support for decompress arrow batch with Lz4
3. Rename adbc.spark.cloudfetch.lz4.enabled to
adbc.spark.lz4Compression.enabled since it is not specific to cloudfetch
4. Add test to test both cloudFetch and arrowBatch in StatementTests.
---
.../Spark/CloudFetch/SparkCloudFetchReader.cs | 77 ++++++++++------------
csharp/src/Drivers/Apache/Spark/Lz4Utilities.cs | 57 ++++++++++++++++
.../Apache/Spark/SparkDatabricksConnection.cs | 2 +-
.../Drivers/Apache/Spark/SparkDatabricksReader.cs | 51 +++++++++++++-
csharp/src/Drivers/Apache/Spark/SparkStatement.cs | 3 +-
csharp/src/Drivers/Apache/Thrift/ChunkStream.cs | 10 ++-
csharp/test/Drivers/Apache/Spark/StatementTests.cs | 45 +++++++++++++
7 files changed, 198 insertions(+), 47 deletions(-)
diff --git
a/csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs
b/csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs
index 343bb5a0d..a96d6647b 100644
--- a/csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs
+++ b/csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs
@@ -22,7 +22,9 @@ using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using K4os.Compression.LZ4.Streams;
@@ -161,59 +163,52 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
var link = this.resultLinks[this.linkIndex++];
byte[]? fileData = null;
- // Retry logic for downloading files
- for (int retry = 0; retry < this.maxRetries; retry++)
+ try
{
- try
+ // Try to download with retry logic
+ for (int retry = 0; retry < this.maxRetries; retry++)
{
- fileData = await DownloadFileAsync(link.FileLink,
cancellationToken);
- break; // Success, exit retry loop
+ try
+ {
+ fileData = await
DownloadFileAsync(link.FileLink, cancellationToken);
+ break; // Success, exit retry loop
+ }
+ catch (Exception) when (retry < this.maxRetries -
1)
+ {
+ // Only delay and retry if we haven't reached
max retries
+ await Task.Delay(this.retryDelayMs * (retry +
1), cancellationToken);
+ }
}
- catch (Exception ex) when (retry < this.maxRetries - 1)
+
+ // If download still failed after all retries
+ if (fileData == null)
{
- // Log the error and retry
- Debug.WriteLine($"Error downloading file (attempt
{retry + 1}/{this.maxRetries}): {ex.Message}");
- await Task.Delay(this.retryDelayMs * (retry + 1),
cancellationToken);
+ throw new AdbcException($"Failed to download
CloudFetch data from {link.FileLink} after {this.maxRetries} attempts");
}
- }
- // Process the downloaded file data
- MemoryStream dataStream;
+ ReadOnlyMemory<byte> dataToUse = new
ReadOnlyMemory<byte>(fileData);
- // If the data is LZ4 compressed, decompress it
- if (this.isLz4Compressed)
- {
- try
- {
- dataStream = new MemoryStream();
- using (var inputStream = new
MemoryStream(fileData!))
- using (var decompressor =
LZ4Stream.Decode(inputStream))
- {
- await decompressor.CopyToAsync(dataStream);
- }
- dataStream.Position = 0;
- }
- catch (Exception ex)
+ // If the data is LZ4 compressed, decompress it
+ if (this.isLz4Compressed)
{
- Debug.WriteLine($"Error decompressing data:
{ex.Message}");
- continue; // Skip this link and try the next one
+ dataToUse = Lz4Utilities.DecompressLz4(fileData);
}
- }
- else
- {
- dataStream = new MemoryStream(fileData!);
- }
- try
- {
- this.currentReader = new ArrowStreamReader(dataStream);
+ // Use ChunkStream which supports ReadOnlyMemory<byte>
directly
+ this.currentReader = new ArrowStreamReader(new
ChunkStream(this.schema, dataToUse));
continue;
}
catch (Exception ex)
{
- Debug.WriteLine($"Error creating Arrow reader:
{ex.Message}");
- dataStream.Dispose();
- continue; // Skip this link and try the next one
+ // Create concise error message based on exception type
+ string errorPrefix = $"CloudFetch link
{this.linkIndex-1}:";
+ string errorMessage = ex switch
+ {
+ _ when ex.GetType().Name.Contains("LZ4") =>
$"{errorPrefix} LZ4 decompression failed - Data may be corrupted",
+ HttpRequestException or TaskCanceledException =>
$"{errorPrefix} Download failed - {ex.Message}",
+ _ => $"{errorPrefix} Processing failed -
{ex.Message}" // Default case for any other exception
+ };
+ throw new AdbcException(errorMessage, ex);
}
}
@@ -242,9 +237,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
}
catch (Exception ex)
{
- Debug.WriteLine($"Error fetching results from server:
{ex.Message}");
- this.statement = null; // Mark as done due to error
- return null;
+ throw new AdbcException($"Server request failed -
{ex.Message}", ex);
}
// Check if we have URL-based results
diff --git a/csharp/src/Drivers/Apache/Spark/Lz4Utilities.cs
b/csharp/src/Drivers/Apache/Spark/Lz4Utilities.cs
new file mode 100644
index 000000000..42cf21878
--- /dev/null
+++ b/csharp/src/Drivers/Apache/Spark/Lz4Utilities.cs
@@ -0,0 +1,57 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Buffers;
+using System.IO;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+ /// <summary>
+ /// Utility class for LZ4 compression/decompression operations.
+ /// </summary>
+ internal static class Lz4Utilities
+ {
+ /// <summary>
+ /// Decompresses LZ4 compressed data into memory.
+ /// </summary>
+ /// <param name="compressedData">The compressed data bytes.</param>
+ /// <returns>A ReadOnlyMemory containing the decompressed
data.</returns>
+ /// <exception cref="AdbcException">Thrown when decompression
fails.</exception>
+ public static ReadOnlyMemory<byte> DecompressLz4(byte[] compressedData)
+ {
+ try
+ {
+ var outputStream = new MemoryStream();
+ using (var inputStream = new MemoryStream(compressedData))
+ using (var decompressor = LZ4Stream.Decode(inputStream))
+ {
+ decompressor.CopyTo(outputStream);
+ }
+ // Get the underlying buffer and its valid length without
copying
+ return new ReadOnlyMemory<byte>(outputStream.GetBuffer(), 0,
(int)outputStream.Length);
+ // Note: We're not disposing the outputStream here because
we're returning its buffer.
+ // The memory will be reclaimed when the ReadOnlyMemory is no
longer referenced.
+ }
+ catch (Exception ex)
+ {
+ throw new AdbcException($"Failed to decompress LZ4 data:
{ex.Message}", ex);
+ }
+ }
+ }
+}
diff --git a/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
b/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
index 75705189b..2e8e52e72 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
@@ -56,7 +56,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
}
else
{
- return new SparkDatabricksReader(statement, schema);
+ return new SparkDatabricksReader(statement, schema,
isLz4Compressed);
}
}
diff --git a/csharp/src/Drivers/Apache/Spark/SparkDatabricksReader.cs
b/csharp/src/Drivers/Apache/Spark/SparkDatabricksReader.cs
index 0e0166926..fd8731f73 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkDatabricksReader.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkDatabricksReader.cs
@@ -15,12 +15,15 @@
* limitations under the License.
*/
+using System;
using System.Collections.Generic;
+using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
+using K4os.Compression.LZ4.Streams;
namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
@@ -31,11 +34,18 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
List<TSparkArrowBatch>? batches;
int index;
IArrowReader? reader;
+ bool isLz4Compressed;
public SparkDatabricksReader(HiveServer2Statement statement, Schema
schema)
+ : this(statement, schema, false)
+ {
+ }
+
+ public SparkDatabricksReader(HiveServer2Statement statement, Schema
schema, bool isLz4Compressed)
{
this.statement = statement;
this.schema = schema;
+ this.isLz4Compressed = isLz4Compressed;
}
public Schema Schema { get { return schema; } }
@@ -56,7 +66,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
if (this.batches != null && this.index < this.batches.Count)
{
- this.reader = new ArrowStreamReader(new
ChunkStream(this.schema, this.batches[this.index++].Batch));
+ ProcessFetchedBatches();
continue;
}
@@ -70,6 +80,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
TFetchResultsReq request = new
TFetchResultsReq(this.statement.OperationHandle!, TFetchOrientation.FETCH_NEXT,
this.statement.BatchSize);
TFetchResultsResp response = await
this.statement.Connection.Client!.FetchResults(request, cancellationToken);
+
+ // Make sure we get the arrowBatches
this.batches = response.Results.ArrowBatches;
if (!response.HasMoreRows)
@@ -79,6 +91,43 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
}
}
+ private void ProcessFetchedBatches()
+ {
+ var batch = this.batches![this.index];
+
+ // Ensure batch data exists
+ if (batch.Batch == null || batch.Batch.Length == 0)
+ {
+ this.index++;
+ return;
+ }
+
+ try
+ {
+ ReadOnlyMemory<byte> dataToUse = new
ReadOnlyMemory<byte>(batch.Batch);
+
+ // If LZ4 compression is enabled, decompress the data
+ if (isLz4Compressed)
+ {
+ dataToUse = Lz4Utilities.DecompressLz4(batch.Batch);
+ }
+
+ // Always use ChunkStream which ensures proper schema handling
+ this.reader = new ArrowStreamReader(new
ChunkStream(this.schema, dataToUse));
+ }
+ catch (Exception ex)
+ {
+ // Create concise error message based on exception type
+ string errorMessage = ex switch
+ {
+ _ when ex.GetType().Name.Contains("LZ4") => $"Batch
{this.index}: LZ4 decompression failed - Data may be corrupted",
+ _ => $"Batch {this.index}: Processing failed -
{ex.Message}" // Default case for any other exception
+ };
+ throw new AdbcException(errorMessage, ex);
+ }
+ this.index++;
+ }
+
public void Dispose()
{
}
diff --git a/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
b/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
index 4c4e61562..eeb8ba60b 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
@@ -155,9 +155,10 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
/// </summary>
public sealed class Options : ApacheParameters
{
+ // Lz4 compression option
+ public const string CanDecompressLz4 =
"adbc.spark.lz4_compression.enabled";
// CloudFetch options
public const string UseCloudFetch =
"adbc.spark.cloudfetch.enabled";
- public const string CanDecompressLz4 =
"adbc.spark.cloudfetch.lz4.enabled";
public const string MaxBytesPerFile =
"adbc.spark.cloudfetch.max_bytes_per_file";
}
}
diff --git a/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
b/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
index 4db0e8076..499e4930c 100644
--- a/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
+++ b/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
@@ -26,11 +26,17 @@ namespace Apache.Arrow.Adbc.Drivers.Apache
internal class ChunkStream : Stream
{
ReadOnlyMemory<byte> currentBuffer;
- byte[] data;
+ ReadOnlyMemory<byte> data;
bool first;
int position;
public ChunkStream(Schema schema, byte[] data)
+ : this(schema, new ReadOnlyMemory<byte>(data))
+ {
+ // Call the other constructor to avoid duplication
+ }
+
+ public ChunkStream(Schema schema, ReadOnlyMemory<byte> data)
{
MemoryStream buffer = new MemoryStream();
ArrowStreamWriter writer = new ArrowStreamWriter(buffer, schema,
leaveOpen: true);
@@ -70,7 +76,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache
{
return 0;
}
- this.currentBuffer = new ReadOnlyMemory<byte>(this.data);
+ this.currentBuffer = this.data;
this.position = 0;
remaining = this.currentBuffer.Length - this.position;
}
diff --git a/csharp/test/Drivers/Apache/Spark/StatementTests.cs
b/csharp/test/Drivers/Apache/Spark/StatementTests.cs
index da170a66d..b313d1d62 100644
--- a/csharp/test/Drivers/Apache/Spark/StatementTests.cs
+++ b/csharp/test/Drivers/Apache/Spark/StatementTests.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common;
@@ -39,6 +40,50 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
base.StatementTimeoutTest(statementWithExceptions);
}
+ [SkippableTheory]
+ [InlineData(true, "CloudFetch enabled")]
+ [InlineData(false, "CloudFetch disabled")]
+ public async Task LZ4DecompressionCapabilityTest(bool useCloudFetch,
string configName)
+ {
+ OutputHelper?.WriteLine($"Testing with LZ4 decompression
capability enabled ({configName})");
+
+ // Create a connection using the test configuration
+ using AdbcConnection connection = NewConnection();
+ using var statement = connection.CreateStatement();
+
+ // Set options for LZ4 decompression (enabled by default) and
CloudFetch as specified
+ statement.SetOption(SparkStatement.Options.UseCloudFetch,
useCloudFetch.ToString().ToLower());
+ OutputHelper?.WriteLine($"CloudFetch is {(useCloudFetch ?
"enabled" : "disabled")}");
+ OutputHelper?.WriteLine("LZ4 decompression capability is enabled
by default");
+
+ // Execute a query that should return data
+ statement.SqlQuery = "SELECT id, CAST(id AS STRING) as id_string,
id * 2 as id_doubled FROM RANGE(100)";
+ QueryResult result = statement.ExecuteQuery();
+
+ // Verify we have a valid stream
+ Assert.NotNull(result.Stream);
+
+ // Read all batches
+ int totalRows = 0;
+ int batchCount = 0;
+
+ while (result.Stream != null)
+ {
+ using var batch = await
result.Stream.ReadNextRecordBatchAsync();
+ if (batch == null)
+ break;
+
+ batchCount++;
+ totalRows += batch.Length;
+ OutputHelper?.WriteLine($"Batch {batchCount}: Read
{batch.Length} rows");
+ }
+
+ // Verify we got all rows
+ Assert.Equal(100, totalRows);
+ OutputHelper?.WriteLine($"Successfully read {totalRows} rows in
{batchCount} batches with {configName}");
+ OutputHelper?.WriteLine("NOTE: Whether actual LZ4 compression was
used is determined by the server");
+ }
+
internal class LongRunningStatementTimeoutTestData :
ShortRunningStatementTimeoutTestData
{
public LongRunningStatementTimeoutTestData()