CurtHagenlocher commented on code in PR #2669:
URL: https://github.com/apache/arrow-adbc/pull/2669#discussion_r2027111358


##########
csharp/src/Drivers/Apache/Spark/Lz4Utilities.cs:
##########
@@ -0,0 +1,57 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    /// <summary>
+    /// Utility class for LZ4 compression/decompression operations.
+    /// </summary>
+    internal static class Lz4Utilities
+    {
+        /// <summary>
+        /// Decompresses LZ4 compressed data into a memory stream.
+        /// </summary>
+        /// <param name="compressedData">The compressed data bytes.</param>
+        /// <param name="cancellationToken">A cancellation token.</param>
+        /// <returns>A memory stream containing the decompressed data, 
positioned at the beginning of the stream.</returns>
+        /// <exception cref="AdbcException">Thrown when decompression 
fails.</exception>
+        public static async Task<MemoryStream> DecompressLz4Async(byte[] 
compressedData, CancellationToken cancellationToken = default)
+        {
+            try
+            {
+                var outputStream = new MemoryStream();
+                using (var inputStream = new MemoryStream(compressedData))
+                using (var decompressor = LZ4Stream.Decode(inputStream))
+                {
+                    await decompressor.CopyToAsync(outputStream);
+                }
+                outputStream.Position = 0;
+                return outputStream;
+            }
+            catch (Exception ex)
+            {
+                throw new AdbcException($"Failed to decompress LZ4 data: 
{ex.Message}", ex);
+            }
+        }
+    }
+} 

Review Comment:
   There's a single trailing space on this last line which is triggering the 
formatting error.



##########
csharp/src/Drivers/Apache/Spark/SparkDatabricksReader.cs:
##########
@@ -79,6 +91,49 @@ public SparkDatabricksReader(HiveServer2Statement statement, 
Schema schema)
             }
         }
 
+        private async Task ProcessFetchedBatchesAsync(CancellationToken 
cancellationToken)
+        {
+            var batch = this.batches![this.index];
+
+            // Ensure batch data exists
+            if (batch.Batch == null || batch.Batch.Length == 0)
+            {
+                this.index++;
+                return;
+            }
+
+            try
+            {
+                byte[] dataToUse = batch.Batch;
+
+                // If LZ4 compression is enabled, try to decompress the data
+                if (isLz4Compressed)
+                {
+                    try
+                    {
+                        var dataStream = await 
Lz4Utilities.DecompressLz4Async(batch.Batch, cancellationToken);
+                        dataToUse = dataStream.ToArray();

Review Comment:
   This could be optimized to remove one allocation and one buffer copy by 
doing the following:
   1) Make `dataToUse` be a `ReadOnlyMemory<byte>` instead of an array.
   2) After computing `dataStream`, do `dataToUse = new 
ReadOnlyMemory<byte>(dataStream.GetBuffer(), 0, dataStream.Length);`
   3) Change ChunkStream to accept a `ReadOnlyMemory<byte>` instead of an array 
as an argument.
   
   If we know the uncompressed length in advance we could do even better. (One 
of the benefits of the Arrow compressed formats is that it reports that in the 
header -- but we obviously can't use that format if the server isn't producing 
it... .)



##########
csharp/src/Drivers/Apache/Spark/Lz4Utilities.cs:
##########
@@ -0,0 +1,57 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    /// <summary>
+    /// Utility class for LZ4 compression/decompression operations.
+    /// </summary>
+    internal static class Lz4Utilities
+    {
+        /// <summary>
+        /// Decompresses LZ4 compressed data into a memory stream.
+        /// </summary>
+        /// <param name="compressedData">The compressed data bytes.</param>
+        /// <param name="cancellationToken">A cancellation token.</param>
+        /// <returns>A memory stream containing the decompressed data, 
positioned at the beginning of the stream.</returns>
+        /// <exception cref="AdbcException">Thrown when decompression 
fails.</exception>
+        public static async Task<MemoryStream> DecompressLz4Async(byte[] 
compressedData, CancellationToken cancellationToken = default)

Review Comment:
   FWIW, as this is entirely CPU-bound there's no advantage (and maybe even a 
small perf hit) in making it `async`.



##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -155,9 +155,10 @@ internal void SetMaxBytesPerFile(long maxBytesPerFile)
         /// </summary>
         public sealed class Options : ApacheParameters

Review Comment:
   Unrelated to this change, it would be nice if these properties were listed 
with the others in csharp/src/Drivers/Apache/Spark/README.md. (That could be 
done as a followup.)



##########
csharp/src/Drivers/Apache/Spark/SparkDatabricksReader.cs:
##########
@@ -79,6 +91,49 @@ public SparkDatabricksReader(HiveServer2Statement statement, 
Schema schema)
             }
         }
 
+        private async Task ProcessFetchedBatchesAsync(CancellationToken 
cancellationToken)
+        {
+            var batch = this.batches![this.index];
+
+            // Ensure batch data exists
+            if (batch.Batch == null || batch.Batch.Length == 0)
+            {
+                this.index++;
+                return;
+            }
+
+            try
+            {
+                byte[] dataToUse = batch.Batch;
+
+                // If LZ4 compression is enabled, try to decompress the data
+                if (isLz4Compressed)
+                {
+                    try
+                    {
+                        var dataStream = await 
Lz4Utilities.DecompressLz4Async(batch.Batch, cancellationToken);
+                        dataToUse = dataStream.ToArray();
+                        dataStream.Dispose();
+                    }
+                    catch (Exception ex)
+                    {
+                        // If decompression fails, use the original data
+                        System.Diagnostics.Debug.WriteLine($"Failed to 
decompress LZ4 data: {ex.Message}");
+                    }
+                }
+
+                // Always use ChunkStream which ensures proper schema handling
+                this.reader = new ArrowStreamReader(new 
ChunkStream(this.schema, dataToUse));
+            }
+            catch (Exception ex)
+            {
+                // Log any errors and skip this batch
+                System.Diagnostics.Debug.WriteLine($"Error processing batch: 
{ex.Message}");

Review Comment:
   It seems like a bad idea to just suppress the error. Are there circumstances 
under which dropping these rows do not result in silent data loss?



##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -155,9 +155,10 @@ internal void SetMaxBytesPerFile(long maxBytesPerFile)
         /// </summary>
         public sealed class Options : ApacheParameters
         {
+            // Lz4 compression option
+            public const string CanDecompressLz4 = 
"adbc.spark.lz4Compression.enabled";

Review Comment:
   option names in ADBC tend heavily to "snake case" instead of "camel case" 
e.g. `"lz4_compression"` vs `"lz4Compression"`.



##########
csharp/src/Drivers/Apache/Spark/SparkDatabricksReader.cs:
##########
@@ -79,6 +91,49 @@ public SparkDatabricksReader(HiveServer2Statement statement, 
Schema schema)
             }
         }
 
+        private async Task ProcessFetchedBatchesAsync(CancellationToken 
cancellationToken)
+        {
+            var batch = this.batches![this.index];
+
+            // Ensure batch data exists
+            if (batch.Batch == null || batch.Batch.Length == 0)
+            {
+                this.index++;
+                return;
+            }
+
+            try
+            {
+                byte[] dataToUse = batch.Batch;
+
+                // If LZ4 compression is enabled, try to decompress the data
+                if (isLz4Compressed)
+                {
+                    try
+                    {
+                        var dataStream = await 
Lz4Utilities.DecompressLz4Async(batch.Batch, cancellationToken);

Review Comment:
   ```suggestion
                           using dataStream = await 
Lz4Utilities.DecompressLz4Async(batch.Batch, cancellationToken);
   ```
   Then the explicit `Dispose()` can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to