This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new ac4b890e2 fix(csharp/src/Drivers/Databricks): Reduce LZ4 decompression 
memory by using Custom Array Pool (#3654)
ac4b890e2 is described below

commit ac4b890e24a51e60609887cc400b1c2ddc43797b
Author: eric-wang-1990 <[email protected]>
AuthorDate: Fri Nov 7 10:26:48 2025 -0800

    fix(csharp/src/Drivers/Databricks): Reduce LZ4 decompression memory by 
using Custom Array Pool (#3654)
    
    Fix: Reduce Lz4 decompression memory by using Customize ArrayPool.
    
    ## Summary
    
    Reduces LZ4 internal buffer memory allocation from ~900MB to ~40MB (96%
    reduction) for large Databricks query results by implementing a custom
    ArrayPool that supports buffer sizes larger than .NET's default 1MB
    limit.
    
    **Important**: This optimization primarily reduces:
    - **Total allocations**: 222 × 4MB → reuse of 10 pooled buffers
    - **GC pressure**: Fewer LOH allocations → fewer Gen2 collections
    
    But does NOT significantly reduce:
    - **Peak concurrent memory**: With `parallelDownloads=1`, peak is still
    ~8-16MB (1-2 buffers in use)
    
    
    ## Solution
    
    Created a custom ArrayPool by overriding K4os.Compression.LZ4's buffer
    allocation methods:
    
    1. **CustomLZ4FrameReader.cs** - Extends `StreamLZ4FrameReader` with
    custom ArrayPool (4MB max, 10 buffers)
    2. **CustomLZ4DecoderStream.cs** - Stream wrapper using
    `CustomLZ4FrameReader`
    3. **Updated Lz4Utilities.cs** - Use `CustomLZ4DecoderStream` instead of
    default `LZ4Stream.Decode()`
    
    ### Key Implementation
    ```csharp
    // CustomLZ4FrameReader.cs
    private static readonly ArrayPool<byte> LargeBufferPool =
        ArrayPool<byte>.Create(
            maxArrayLength: 4 * 1024 * 1024,    // 4MB (matches Databricks' 
maxBlockSize)
            maxArraysPerBucket: 10               // Pool capacity: 10 × 4MB = 
40MB
        );
    
    protected override byte[] AllocBuffer(int size)
    {
        return LargeBufferPool.Rent(size);
    }
    
    protected override void ReleaseBuffer(byte[] buffer)
    {
        if (buffer != null)
        {
            LargeBufferPool.Return(buffer, clearArray: false);
        }
    }
    ```
    ### Performance
    - **CPU**: No degradation (pooling reduces allocation overhead)
    - **GC**: Significantly reduced Gen2 collections (fewer LOH allocations)
    - **Latency**: Slight improvement (buffer reuse faster than fresh
    allocation)
    
    ## Why This Works
    
    **K4os Library Design**:
    - `LZ4FrameReader` has `virtual` methods: `AllocBuffer()` and
    `ReleaseBuffer()`
    - Default implementation calls `BufferPool.Alloc()` → `DefaultArrayPool`
    (1MB limit)
    - Overriding allows injection of custom 4MB pool
    
    **Buffer Lifecycle**:
    1. Decompression needs 4MB buffer → Rent from pool
    2. Decompression completes → Return to pool
    3. Next decompression → Reuse buffer from pool
    4. With `parallelDownloads=1` (default), only 1-2 buffers active at once
    
    ## Concurrency Considerations
    
    | parallel_downloads | Buffers Needed | Pool Sufficient? |
    |-------------------|----------------|------------------|
    | 1 (default) | 1-2 × 4MB | ✅ Yes |
    | 4 | 4-8 × 4MB | ✅ Yes |
    | 8 | 8-16 × 4MB | ⚠️ Borderline |
    | 16+ | 16-32 × 4MB | ❌ No (exceeds pool capacity) |
    
    **Recommendation**: If using `parallel_downloads > 4`, consider
    increasing `maxArraysPerBucket` in future enhancement.
    
    ## Files Changed
    
    ### New Files
    - `src/Drivers/Databricks/CustomLZ4FrameReader.cs` (~80 lines)
    - `src/Drivers/Databricks/CustomLZ4DecoderStream.cs` (~118 lines)
    
    ### Modified Files
    - `src/Drivers/Databricks/Lz4Utilities.cs` - Use
    `CustomLZ4DecoderStream`, add telemetry
    
    ## Testing & Validation
    Before:
    <img width="1097" height="235" alt="image"
    
src="https://github.com/user-attachments/assets/9257d331-383a-4baf-b2de-749fe77eb8d0";
    />
    | Method | ReadDelayMs | Mean | Min | Max | Median | Peak Memory (MB) |
    Gen0 | Gen1 | Gen2 | Allocated |
    |------------------ |------------
    
|--------:|--------:|--------:|--------:|----------------------------:|------------:|-----------:|-----------:|----------:|
    | ExecuteLargeQuery | 5 | 15.95 s | 14.99 s | 16.64 s | 16.21 s | See
    previous console output | 364000.0000 | 63000.0000 | 38000.0000 | 2.73
    GB |
    
    After:
    <img width="975" height="477" alt="image"
    
src="https://github.com/user-attachments/assets/51168c79-d0b3-4def-a934-d7abb635b7aa";
    />'
    
    | Method | ReadDelayMs | Mean | Median | Min | Max | Peak Memory (MB) |
    Gen0 | Gen1 | Gen2 | Allocated |
    |------------------ |------------
    
|--------:|--------:|--------:|--------:|----------------------------:|------------:|-----------:|-----------:|----------:|
    | ExecuteLargeQuery | 5 | 25.00 s | 19.71 s | 19.70 s | 35.57 s | See
    previous console output | 405000.0000 | 30000.0000 | 24000.0000 | 1.94
    GB |
    
    
    
    
    ## References
    
    -
    
[K4os.Compression.LZ4](https://github.com/MiloszKrajewski/K4os.Compression.LZ4)
    - [LZ4 Frame Format
    Spec](https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md)
    - [.NET ArrayPool
    
Docs](https://learn.microsoft.com/en-us/dotnet/api/system.buffers.arraypool-1)
    - [LOH Best
    
Practices](https://learn.microsoft.com/en-us/dotnet/standard/garbage-collection/large-object-heap)
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../Drivers/Databricks/CustomLZ4DecoderStream.cs   | 152 +++++++++++++++++++++
 .../src/Drivers/Databricks/CustomLZ4FrameReader.cs |  80 +++++++++++
 .../src/Drivers/Databricks/DatabricksConnection.cs |  19 ++-
 .../src/Drivers/Databricks/DatabricksDatabase.cs   |  11 +-
 csharp/src/Drivers/Databricks/Lz4Utilities.cs      |  38 +++++-
 .../Reader/CloudFetch/CloudFetchDownloader.cs      |   9 +-
 .../Drivers/Databricks/Reader/DatabricksReader.cs  |   4 +-
 7 files changed, 300 insertions(+), 13 deletions(-)

diff --git a/csharp/src/Drivers/Databricks/CustomLZ4DecoderStream.cs 
b/csharp/src/Drivers/Databricks/CustomLZ4DecoderStream.cs
new file mode 100644
index 000000000..2e70ca10b
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/CustomLZ4DecoderStream.cs
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using K4os.Compression.LZ4.Encoders;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+    /// <summary>
+    /// Custom LZ4 decoder stream that uses CustomLZ4FrameReader for buffer 
pooling.
+    /// This replaces K4os.Compression.LZ4.Streams.LZ4DecoderStream to use our 
custom reader
+    /// that pools 4MB+ buffers.
+    ///
+    /// Why not inherit from LZ4DecoderStream or LZ4StreamOnStreamEssentials?
+    /// - LZ4DecoderStream directly instantiates StreamLZ4FrameReader (no 
injection point)
+    /// - LZ4StreamOnStreamEssentials has a 'private protected' constructor 
(inaccessible from external assemblies)
+    ///
+    /// What features from K4os base classes are intentionally omitted:
+    /// - Timeout support: Not needed since inner stream (MemoryStream) 
doesn't support timeouts
+    /// - Write operations: This is a read-only decompression stream
+    /// - DisposeAsync: Optional - base Stream.DisposeAsync() calls our 
Dispose(bool) which is sufficient
+    /// </summary>
+    internal sealed class CustomLZ4DecoderStream : Stream
+    {
+        private readonly CustomLZ4FrameReader _reader;
+        private readonly Stream _inner;
+        private readonly bool _leaveOpen;
+        private readonly bool _interactive;
+        private bool _disposed;
+
+        /// <summary>
+        /// Creates a new CustomLZ4DecoderStream instance.
+        /// </summary>
+        /// <param name="inner">The inner stream containing compressed LZ4 
data.</param>
+        /// <param name="decoderFactory">Factory function to create the LZ4 
decoder.</param>
+        /// <param name="bufferPool">The ArrayPool to use for buffer 
allocation (from DatabricksDatabase).</param>
+        /// <param name="leaveOpen">Whether to leave the inner stream open 
when disposing.</param>
+        /// <param name="interactive">Interactive mode - provide bytes as soon 
as available.</param>
+        public CustomLZ4DecoderStream(
+            Stream inner,
+            Func<ILZ4Descriptor, ILZ4Decoder> decoderFactory,
+            System.Buffers.ArrayPool<byte> bufferPool,
+            bool leaveOpen = false,
+            bool interactive = false)
+        {
+            _inner = inner ?? throw new ArgumentNullException(nameof(inner));
+            _reader = new CustomLZ4FrameReader(inner, true, decoderFactory, 
bufferPool);
+            _leaveOpen = leaveOpen;
+            _interactive = interactive;
+        }
+
+        public override bool CanRead => !_disposed && _inner.CanRead;
+        public override bool CanSeek => false;
+        public override bool CanWrite => false;
+
+        // Timeout properties are not implemented since:
+        // - The inner stream (MemoryStream in our use case) doesn't support 
timeouts
+        // - LZ4 decompression is CPU-bound, not I/O-bound, so timeouts don't 
apply
+        public override bool CanTimeout => false;
+        public override int ReadTimeout
+        {
+            get => throw new InvalidOperationException("LZ4 decoder stream 
does not support timeouts");
+            set => throw new InvalidOperationException("LZ4 decoder stream 
does not support timeouts");
+        }
+        public override int WriteTimeout
+        {
+            get => throw new InvalidOperationException("LZ4 decoder stream 
does not support timeouts");
+            set => throw new InvalidOperationException("LZ4 decoder stream 
does not support timeouts");
+        }
+
+        public override long Length => _reader.GetFrameLength() ?? -1;
+        public override long Position
+        {
+            get => _reader.GetBytesRead();
+            set => throw new NotSupportedException("LZ4 stream does not 
support setting position");
+        }
+
+        public override long Seek(long offset, SeekOrigin origin) =>
+            throw new NotSupportedException("LZ4 stream does not support 
seeking");
+
+        public override void SetLength(long value) =>
+            throw new NotSupportedException("LZ4 stream does not support 
SetLength");
+
+        public override void Write(byte[] buffer, int offset, int count) =>
+            throw new NotSupportedException("LZ4 decoder stream does not 
support writing");
+
+        public override int ReadByte() => _reader.ReadOneByte();
+
+        public override int Read(byte[] buffer, int offset, int count) =>
+            _reader.ReadManyBytes(buffer.AsSpan(offset, count), _interactive);
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int 
count, CancellationToken cancellationToken) =>
+            _reader.ReadManyBytesAsync(cancellationToken, 
buffer.AsMemory(offset, count), _interactive);
+
+#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
+        public override int Read(Span<byte> buffer) =>
+            _reader.ReadManyBytes(buffer, _interactive);
+
+        public override ValueTask<int> ReadAsync(Memory<byte> buffer, 
CancellationToken cancellationToken = default) =>
+            new(_reader.ReadManyBytesAsync(cancellationToken, buffer, 
_interactive));
+#endif
+
+        public override void Flush()
+        {
+            // No-op for read-only stream - nothing to flush since we only read
+        }
+
+        public override Task FlushAsync(CancellationToken cancellationToken)
+        {
+            // No-op for read-only stream - nothing to flush since we only read
+            return Task.CompletedTask;
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            // Double-dispose protection: only dispose once
+            if (!_disposed)
+            {
+                if (disposing)
+                {
+                    // Dispose managed resources
+                    _reader.Dispose();  // Returns 4MB buffer to pool
+                    if (!_leaveOpen)
+                    {
+                        _inner?.Dispose();  // Dispose inner stream if we own 
it
+                    }
+                }
+                // No unmanaged resources to clean up (no finalizer needed)
+                _disposed = true;
+            }
+            base.Dispose(disposing);
+        }
+    }
+}
diff --git a/csharp/src/Drivers/Databricks/CustomLZ4FrameReader.cs 
b/csharp/src/Drivers/Databricks/CustomLZ4FrameReader.cs
new file mode 100644
index 000000000..635be2e6f
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/CustomLZ4FrameReader.cs
@@ -0,0 +1,80 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Buffers;
+using System.IO;
+using K4os.Compression.LZ4.Encoders;
+using K4os.Compression.LZ4.Streams;
+using K4os.Compression.LZ4.Streams.Frames;
+using K4os.Compression.LZ4.Streams.Internal;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+    /// <summary>
+    /// Custom LZ4 frame reader that uses a custom ArrayPool to support 
pooling of 4MB+ buffers.
+    /// This solves the issue where Databricks LZ4 frames declare 
maxBlockSize=4MB but .NET's
+    /// default ArrayPool only pools buffers up to 1MB, causing 900MB of fresh 
LOH allocations.
+    /// </summary>
+    internal sealed class CustomLZ4FrameReader : StreamLZ4FrameReader
+    {
+        private readonly ArrayPool<byte> _bufferPool;
+
+        /// <summary>
+        /// Creates a new CustomLZ4FrameReader instance.
+        /// </summary>
+        /// <param name="stream">The stream to read compressed LZ4 data 
from.</param>
+        /// <param name="leaveOpen">Whether to leave the stream open when 
disposing.</param>
+        /// <param name="decoderFactory">Factory function to create the LZ4 
decoder.</param>
+        /// <param name="bufferPool">The ArrayPool to use for buffer 
allocation (from DatabricksDatabase).</param>
+        public CustomLZ4FrameReader(
+            Stream stream,
+            bool leaveOpen,
+            Func<ILZ4Descriptor, ILZ4Decoder> decoderFactory,
+            ArrayPool<byte> bufferPool)
+            : base(stream, leaveOpen, decoderFactory)
+        {
+            _bufferPool = bufferPool;
+        }
+
+        /// <summary>
+        /// Overrides buffer allocation to use our custom ArrayPool that 
supports 4MB+ buffers.
+        /// </summary>
+        /// <param name="size">The size of buffer to allocate (typically 4MB 
for Databricks).</param>
+        /// <returns>A buffer of at least the requested size, pooled if 
possible.</returns>
+        protected override byte[] AllocBuffer(int size)
+        {
+            // Use our custom pool instead of the default BufferPool (which 
uses ArrayPool.Shared with 1MB limit)
+            return _bufferPool.Rent(size);
+        }
+
+        /// <summary>
+        /// Overrides buffer release to return buffers to our custom ArrayPool.
+        /// </summary>
+        /// <param name="buffer">The buffer to return to the pool.</param>
+        protected override void ReleaseBuffer(byte[] buffer)
+        {
+            if (buffer != null)
+            {
+                // Clear the buffer to prevent stale data from previous 
decompressions
+                // from corrupting subsequent operations. The performance 
overhead (~1-2ms
+                // per 4MB buffer) is negligible compared to network I/O and 
decompression time.
+                _bufferPool.Return(buffer, clearArray: true);
+            }
+        }
+    }
+}
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 74e43802b..2d0e687f5 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -97,8 +97,25 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
 
         private HttpClient? _authHttpClient;
 
-        public DatabricksConnection(IReadOnlyDictionary<string, string> 
properties) : base(MergeWithDefaultEnvironmentConfig(properties))
+        /// <summary>
+        /// LZ4 buffer pool for decompression.
+        /// If provided by Database, this is shared across all connections for 
optimal pooling.
+        /// If created directly, each connection has its own pool.
+        /// </summary>
+        internal System.Buffers.ArrayPool<byte> Lz4BufferPool { get; }
+
+        public DatabricksConnection(IReadOnlyDictionary<string, string> 
properties)
+            : this(properties, null)
+        {
+        }
+
+        internal DatabricksConnection(
+            IReadOnlyDictionary<string, string> properties,
+            System.Buffers.ArrayPool<byte>? lz4BufferPool)
+            : base(MergeWithDefaultEnvironmentConfig(properties))
         {
+            // Use provided pool (from Database) or create new instance (for 
direct construction)
+            Lz4BufferPool = lz4BufferPool ?? 
System.Buffers.ArrayPool<byte>.Create(maxArrayLength: 4 * 1024 * 1024, 
maxArraysPerBucket: 10);
             ValidateProperties();
         }
 
diff --git a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs 
b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
index 933c6d382..8917d1616 100644
--- a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
@@ -29,6 +29,14 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
     {
         readonly IReadOnlyDictionary<string, string> properties;
 
+        /// <summary>
+        /// LZ4 buffer pool for decompression shared across all connections 
from this database.
+        /// Sized for 4MB buffers (Databricks maxBlockSize) with capacity for 
10 buffers.
+        /// This pool is instance-based to allow cleanup when the database is 
disposed.
+        /// </summary>
+        internal readonly System.Buffers.ArrayPool<byte> Lz4BufferPool =
+            System.Buffers.ArrayPool<byte>.Create(maxArrayLength: 4 * 1024 * 
1024, maxArraysPerBucket: 10);
+
         public DatabricksDatabase(IReadOnlyDictionary<string, string> 
properties)
         {
             this.properties = properties;
@@ -43,7 +51,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                     : options
                         .Concat(properties.Where(x => 
!options.Keys.Contains(x.Key, StringComparer.OrdinalIgnoreCase)))
                         .ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
-                DatabricksConnection connection = new 
DatabricksConnection(mergedProperties);
+                // Share the LZ4 buffer pool with this connection via 
constructor
+                DatabricksConnection connection = new 
DatabricksConnection(mergedProperties, this.Lz4BufferPool);
                 connection.OpenAsync().Wait();
                 connection.ApplyServerSidePropertiesAsync().Wait();
                 return connection;
diff --git a/csharp/src/Drivers/Databricks/Lz4Utilities.cs 
b/csharp/src/Drivers/Databricks/Lz4Utilities.cs
index 127a890c3..0b9f13d90 100644
--- a/csharp/src/Drivers/Databricks/Lz4Utilities.cs
+++ b/csharp/src/Drivers/Databricks/Lz4Utilities.cs
@@ -16,15 +16,18 @@
 */
 
 using System;
+using System.Buffers;
 using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
+using K4os.Compression.LZ4.Encoders;
 using K4os.Compression.LZ4.Streams;
 
 namespace Apache.Arrow.Adbc.Drivers.Databricks
 {
     /// <summary>
     /// Utility class for LZ4 compression/decompression operations.
+    /// Uses CustomLZ4DecoderStream with custom buffer pooling to handle 
Databricks' 4MB LZ4 frames efficiently.
     /// </summary>
     internal static class Lz4Utilities
     {
@@ -37,28 +40,36 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// Decompresses LZ4 compressed data into memory.
         /// </summary>
         /// <param name="compressedData">The compressed data bytes.</param>
+        /// <param name="bufferPool">The ArrayPool to use for buffer 
allocation (from DatabricksDatabase).</param>
         /// <returns>A ReadOnlyMemory containing the decompressed 
data.</returns>
         /// <exception cref="AdbcException">Thrown when decompression 
fails.</exception>
-        public static ReadOnlyMemory<byte> DecompressLz4(byte[] compressedData)
+        public static ReadOnlyMemory<byte> DecompressLz4(byte[] 
compressedData, ArrayPool<byte> bufferPool)
         {
-            return DecompressLz4(compressedData, DefaultBufferSize);
+            return DecompressLz4(compressedData, DefaultBufferSize, 
bufferPool);
         }
 
         /// <summary>
         /// Decompresses LZ4 compressed data into memory with a specified 
buffer size.
+        /// Uses CustomLZ4DecoderStream with custom ArrayPool to efficiently 
pool 4MB+ buffers.
         /// </summary>
         /// <param name="compressedData">The compressed data bytes.</param>
         /// <param name="bufferSize">The buffer size to use for decompression 
operations.</param>
+        /// <param name="bufferPool">The ArrayPool to use for buffer 
allocation (from DatabricksDatabase).</param>
         /// <returns>A ReadOnlyMemory containing the decompressed 
data.</returns>
         /// <exception cref="AdbcException">Thrown when decompression 
fails.</exception>
-        public static ReadOnlyMemory<byte> DecompressLz4(byte[] 
compressedData, int bufferSize)
+        public static ReadOnlyMemory<byte> DecompressLz4(byte[] 
compressedData, int bufferSize, ArrayPool<byte> bufferPool)
         {
             try
             {
                 using (var outputStream = new MemoryStream())
                 {
                     using (var inputStream = new MemoryStream(compressedData))
-                    using (var decompressor = LZ4Stream.Decode(inputStream))
+                    using (var decompressor = new CustomLZ4DecoderStream(
+                        inputStream,
+                        descriptor => descriptor.CreateDecoder(),
+                        bufferPool,
+                        leaveOpen: false,
+                        interactive: false))
                     {
                         decompressor.CopyTo(outputStream, bufferSize);
                     }
@@ -66,7 +77,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                     // Get the underlying buffer and its valid length without 
copying
                     // The buffer remains valid after MemoryStream disposal 
since we hold a reference to it
                     byte[] buffer = outputStream.GetBuffer();
-                    return new ReadOnlyMemory<byte>(buffer, 0, 
(int)outputStream.Length);
+                    int length = (int)outputStream.Length;
+
+                    return new ReadOnlyMemory<byte>(buffer, 0, length);
                 }
             }
             catch (Exception ex)
@@ -80,28 +93,33 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// Returns the buffer and length as a tuple for efficient wrapping in 
a MemoryStream.
         /// </summary>
         /// <param name="compressedData">The compressed data bytes.</param>
+        /// <param name="bufferPool">The ArrayPool to use for buffer 
allocation (from DatabricksDatabase).</param>
         /// <param name="cancellationToken">Cancellation token for the async 
operation.</param>
         /// <returns>A tuple containing the decompressed buffer and its valid 
length.</returns>
         /// <exception cref="AdbcException">Thrown when decompression 
fails.</exception>
         public static Task<(byte[] buffer, int length)> DecompressLz4Async(
             byte[] compressedData,
+            ArrayPool<byte> bufferPool,
             CancellationToken cancellationToken = default)
         {
-            return DecompressLz4Async(compressedData, DefaultBufferSize, 
cancellationToken);
+            return DecompressLz4Async(compressedData, DefaultBufferSize, 
bufferPool, cancellationToken);
         }
 
         /// <summary>
         /// Asynchronously decompresses LZ4 compressed data into memory with a 
specified buffer size.
         /// Returns the buffer and length as a tuple for efficient wrapping in 
a MemoryStream.
+        /// Uses CustomLZ4DecoderStream with custom ArrayPool to efficiently 
pool 4MB+ buffers.
         /// </summary>
         /// <param name="compressedData">The compressed data bytes.</param>
         /// <param name="bufferSize">The buffer size to use for decompression 
operations.</param>
+        /// <param name="bufferPool">The ArrayPool to use for buffer 
allocation (from DatabricksDatabase).</param>
         /// <param name="cancellationToken">Cancellation token for the async 
operation.</param>
         /// <returns>A tuple containing the decompressed buffer and its valid 
length.</returns>
         /// <exception cref="AdbcException">Thrown when decompression 
fails.</exception>
         public static async Task<(byte[] buffer, int length)> 
DecompressLz4Async(
             byte[] compressedData,
             int bufferSize,
+            ArrayPool<byte> bufferPool,
             CancellationToken cancellationToken = default)
         {
             try
@@ -109,7 +127,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                 using (var outputStream = new MemoryStream())
                 {
                     using (var inputStream = new MemoryStream(compressedData))
-                    using (var decompressor = LZ4Stream.Decode(inputStream))
+                    using (var decompressor = new CustomLZ4DecoderStream(
+                        inputStream,
+                        descriptor => descriptor.CreateDecoder(),
+                        bufferPool,
+                        leaveOpen: false,
+                        interactive: false))
                     {
                         await decompressor.CopyToAsync(outputStream, 
bufferSize, cancellationToken).ConfigureAwait(false);
                     }
@@ -118,6 +141,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                     // The buffer remains valid after MemoryStream disposal 
since we hold a reference to it
                     byte[] buffer = outputStream.GetBuffer();
                     int length = (int)outputStream.Length;
+
                     return (buffer, length);
                 }
             }
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index 8fee61d92..913d177df 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -33,7 +33,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
     /// </summary>
     internal sealed class CloudFetchDownloader : ICloudFetchDownloader, 
IActivityTracer
     {
-        private readonly ITracingStatement _statement;
+        private readonly IHiveServer2Statement _statement;
         private readonly BlockingCollection<IDownloadResult> _downloadQueue;
         private readonly BlockingCollection<IDownloadResult> _resultQueue;
         private readonly ICloudFetchMemoryBufferManager _memoryManager;
@@ -55,7 +55,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         /// <summary>
         /// Initializes a new instance of the <see 
cref="CloudFetchDownloader"/> class.
         /// </summary>
-        /// <param name="statement">The tracing statement for Activity 
context.</param>
+        /// <param name="statement">The HiveServer2 statement for Activity 
context.</param>
         /// <param name="downloadQueue">The queue of downloads to 
process.</param>
         /// <param name="resultQueue">The queue to add completed downloads 
to.</param>
         /// <param name="memoryManager">The memory buffer manager.</param>
@@ -68,7 +68,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         /// <param name="maxUrlRefreshAttempts">The maximum number of URL 
refresh attempts.</param>
         /// <param name="urlExpirationBufferSeconds">Buffer time in seconds 
before URL expiration to trigger refresh.</param>
         public CloudFetchDownloader(
-            ITracingStatement statement,
+            IHiveServer2Statement statement,
             BlockingCollection<IDownloadResult> downloadQueue,
             BlockingCollection<IDownloadResult> resultQueue,
             ICloudFetchMemoryBufferManager memoryManager,
@@ -494,8 +494,11 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                         var decompressStopwatch = Stopwatch.StartNew();
 
                         // Use shared Lz4Utilities for decompression 
(consolidates logic with non-CloudFetch path)
+                        // Pass the connection's buffer pool for efficient LZ4 
decompression
+                        var connection = 
(DatabricksConnection)_statement.Connection;
                         var (buffer, length) = await 
Lz4Utilities.DecompressLz4Async(
                             fileData,
+                            connection.Lz4BufferPool,
                             cancellationToken).ConfigureAwait(false);
 
                         // Create the dataStream from the decompressed buffer
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs 
b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
index dc69f0f7a..2fc6268b0 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
@@ -121,7 +121,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
                 // If LZ4 compression is enabled, decompress the data
                 if (isLz4Compressed)
                 {
-                    dataToUse = Lz4Utilities.DecompressLz4(batch.Batch);
+                    // Pass the connection's buffer pool for efficient LZ4 
decompression
+                    var connection = 
(DatabricksConnection)this.statement.Connection;
+                    dataToUse = Lz4Utilities.DecompressLz4(batch.Batch, 
connection.Lz4BufferPool);
                 }
 
                 this.reader = new 
SingleBatch(ArrowSerializationHelpers.DeserializeRecordBatch(this.schema, 
dataToUse));

Reply via email to