This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new ac4b890e2 fix(csharp/src/Drivers/Databricks): Reduce LZ4 decompression
memory by using Custom Array Pool (#3654)
ac4b890e2 is described below
commit ac4b890e24a51e60609887cc400b1c2ddc43797b
Author: eric-wang-1990 <[email protected]>
AuthorDate: Fri Nov 7 10:26:48 2025 -0800
fix(csharp/src/Drivers/Databricks): Reduce LZ4 decompression memory by
using Custom Array Pool (#3654)
Fix: Reduce Lz4 decompression memory by using Customize ArrayPool.
## Summary
Reduces LZ4 internal buffer memory allocation from ~900MB to ~40MB (96%
reduction) for large Databricks query results by implementing a custom
ArrayPool that supports buffer sizes larger than .NET's default 1MB
limit.
**Important**: This optimization primarily reduces:
- **Total allocations**: 222 × 4MB → reuse of 10 pooled buffers
- **GC pressure**: Fewer LOH allocations → fewer Gen2 collections
But does NOT significantly reduce:
- **Peak concurrent memory**: With `parallelDownloads=1`, peak is still
~8-16MB (1-2 buffers in use)
## Solution
Created a custom ArrayPool by overriding K4os.Compression.LZ4's buffer
allocation methods:
1. **CustomLZ4FrameReader.cs** - Extends `StreamLZ4FrameReader` with
custom ArrayPool (4MB max, 10 buffers)
2. **CustomLZ4DecoderStream.cs** - Stream wrapper using
`CustomLZ4FrameReader`
3. **Updated Lz4Utilities.cs** - Use `CustomLZ4DecoderStream` instead of
default `LZ4Stream.Decode()`
### Key Implementation
```csharp
// CustomLZ4FrameReader.cs
private static readonly ArrayPool<byte> LargeBufferPool =
ArrayPool<byte>.Create(
maxArrayLength: 4 * 1024 * 1024, // 4MB (matches Databricks'
maxBlockSize)
maxArraysPerBucket: 10 // Pool capacity: 10 × 4MB =
40MB
);
protected override byte[] AllocBuffer(int size)
{
return LargeBufferPool.Rent(size);
}
protected override void ReleaseBuffer(byte[] buffer)
{
if (buffer != null)
{
LargeBufferPool.Return(buffer, clearArray: false);
}
}
```
### Performance
- **CPU**: No degradation (pooling reduces allocation overhead)
- **GC**: Significantly reduced Gen2 collections (fewer LOH allocations)
- **Latency**: Slight improvement (buffer reuse faster than fresh
allocation)
## Why This Works
**K4os Library Design**:
- `LZ4FrameReader` has `virtual` methods: `AllocBuffer()` and
`ReleaseBuffer()`
- Default implementation calls `BufferPool.Alloc()` → `DefaultArrayPool`
(1MB limit)
- Overriding allows injection of custom 4MB pool
**Buffer Lifecycle**:
1. Decompression needs 4MB buffer → Rent from pool
2. Decompression completes → Return to pool
3. Next decompression → Reuse buffer from pool
4. With `parallelDownloads=1` (default), only 1-2 buffers active at once
## Concurrency Considerations
| parallel_downloads | Buffers Needed | Pool Sufficient? |
|-------------------|----------------|------------------|
| 1 (default) | 1-2 × 4MB | ✅ Yes |
| 4 | 4-8 × 4MB | ✅ Yes |
| 8 | 8-16 × 4MB | ⚠️ Borderline |
| 16+ | 16-32 × 4MB | ❌ No (exceeds pool capacity) |
**Recommendation**: If using `parallel_downloads > 4`, consider
increasing `maxArraysPerBucket` in future enhancement.
## Files Changed
### New Files
- `src/Drivers/Databricks/CustomLZ4FrameReader.cs` (~80 lines)
- `src/Drivers/Databricks/CustomLZ4DecoderStream.cs` (~118 lines)
### Modified Files
- `src/Drivers/Databricks/Lz4Utilities.cs` - Use
`CustomLZ4DecoderStream`, add telemetry
## Testing & Validation
Before:
<img width="1097" height="235" alt="image"
src="https://github.com/user-attachments/assets/9257d331-383a-4baf-b2de-749fe77eb8d0"
/>
| Method | ReadDelayMs | Mean | Min | Max | Median | Peak Memory (MB) |
Gen0 | Gen1 | Gen2 | Allocated |
|------------------ |------------
|--------:|--------:|--------:|--------:|----------------------------:|------------:|-----------:|-----------:|----------:|
| ExecuteLargeQuery | 5 | 15.95 s | 14.99 s | 16.64 s | 16.21 s | See
previous console output | 364000.0000 | 63000.0000 | 38000.0000 | 2.73
GB |
After:
<img width="975" height="477" alt="image"
src="https://github.com/user-attachments/assets/51168c79-d0b3-4def-a934-d7abb635b7aa"
/>'
| Method | ReadDelayMs | Mean | Median | Min | Max | Peak Memory (MB) |
Gen0 | Gen1 | Gen2 | Allocated |
|------------------ |------------
|--------:|--------:|--------:|--------:|----------------------------:|------------:|-----------:|-----------:|----------:|
| ExecuteLargeQuery | 5 | 25.00 s | 19.71 s | 19.70 s | 35.57 s | See
previous console output | 405000.0000 | 30000.0000 | 24000.0000 | 1.94
GB |
## References
-
[K4os.Compression.LZ4](https://github.com/MiloszKrajewski/K4os.Compression.LZ4)
- [LZ4 Frame Format
Spec](https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md)
- [.NET ArrayPool
Docs](https://learn.microsoft.com/en-us/dotnet/api/system.buffers.arraypool-1)
- [LOH Best
Practices](https://learn.microsoft.com/en-us/dotnet/standard/garbage-collection/large-object-heap)
---------
Co-authored-by: Claude <[email protected]>
---
.../Drivers/Databricks/CustomLZ4DecoderStream.cs | 152 +++++++++++++++++++++
.../src/Drivers/Databricks/CustomLZ4FrameReader.cs | 80 +++++++++++
.../src/Drivers/Databricks/DatabricksConnection.cs | 19 ++-
.../src/Drivers/Databricks/DatabricksDatabase.cs | 11 +-
csharp/src/Drivers/Databricks/Lz4Utilities.cs | 38 +++++-
.../Reader/CloudFetch/CloudFetchDownloader.cs | 9 +-
.../Drivers/Databricks/Reader/DatabricksReader.cs | 4 +-
7 files changed, 300 insertions(+), 13 deletions(-)
diff --git a/csharp/src/Drivers/Databricks/CustomLZ4DecoderStream.cs
b/csharp/src/Drivers/Databricks/CustomLZ4DecoderStream.cs
new file mode 100644
index 000000000..2e70ca10b
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/CustomLZ4DecoderStream.cs
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using K4os.Compression.LZ4.Encoders;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+ /// <summary>
+ /// Custom LZ4 decoder stream that uses CustomLZ4FrameReader for buffer
pooling.
+ /// This replaces K4os.Compression.LZ4.Streams.LZ4DecoderStream to use our
custom reader
+ /// that pools 4MB+ buffers.
+ ///
+ /// Why not inherit from LZ4DecoderStream or LZ4StreamOnStreamEssentials?
+ /// - LZ4DecoderStream directly instantiates StreamLZ4FrameReader (no
injection point)
+ /// - LZ4StreamOnStreamEssentials has a 'private protected' constructor
(inaccessible from external assemblies)
+ ///
+ /// What features from K4os base classes are intentionally omitted:
+ /// - Timeout support: Not needed since inner stream (MemoryStream)
doesn't support timeouts
+ /// - Write operations: This is a read-only decompression stream
+ /// - DisposeAsync: Optional - base Stream.DisposeAsync() calls our
Dispose(bool) which is sufficient
+ /// </summary>
+ internal sealed class CustomLZ4DecoderStream : Stream
+ {
+ private readonly CustomLZ4FrameReader _reader;
+ private readonly Stream _inner;
+ private readonly bool _leaveOpen;
+ private readonly bool _interactive;
+ private bool _disposed;
+
+ /// <summary>
+ /// Creates a new CustomLZ4DecoderStream instance.
+ /// </summary>
+ /// <param name="inner">The inner stream containing compressed LZ4
data.</param>
+ /// <param name="decoderFactory">Factory function to create the LZ4
decoder.</param>
+ /// <param name="bufferPool">The ArrayPool to use for buffer
allocation (from DatabricksDatabase).</param>
+ /// <param name="leaveOpen">Whether to leave the inner stream open
when disposing.</param>
+ /// <param name="interactive">Interactive mode - provide bytes as soon
as available.</param>
+ public CustomLZ4DecoderStream(
+ Stream inner,
+ Func<ILZ4Descriptor, ILZ4Decoder> decoderFactory,
+ System.Buffers.ArrayPool<byte> bufferPool,
+ bool leaveOpen = false,
+ bool interactive = false)
+ {
+ _inner = inner ?? throw new ArgumentNullException(nameof(inner));
+ _reader = new CustomLZ4FrameReader(inner, true, decoderFactory,
bufferPool);
+ _leaveOpen = leaveOpen;
+ _interactive = interactive;
+ }
+
+ public override bool CanRead => !_disposed && _inner.CanRead;
+ public override bool CanSeek => false;
+ public override bool CanWrite => false;
+
+ // Timeout properties are not implemented since:
+ // - The inner stream (MemoryStream in our use case) doesn't support
timeouts
+ // - LZ4 decompression is CPU-bound, not I/O-bound, so timeouts don't
apply
+ public override bool CanTimeout => false;
+ public override int ReadTimeout
+ {
+ get => throw new InvalidOperationException("LZ4 decoder stream
does not support timeouts");
+ set => throw new InvalidOperationException("LZ4 decoder stream
does not support timeouts");
+ }
+ public override int WriteTimeout
+ {
+ get => throw new InvalidOperationException("LZ4 decoder stream
does not support timeouts");
+ set => throw new InvalidOperationException("LZ4 decoder stream
does not support timeouts");
+ }
+
+ public override long Length => _reader.GetFrameLength() ?? -1;
+ public override long Position
+ {
+ get => _reader.GetBytesRead();
+ set => throw new NotSupportedException("LZ4 stream does not
support setting position");
+ }
+
+ public override long Seek(long offset, SeekOrigin origin) =>
+ throw new NotSupportedException("LZ4 stream does not support
seeking");
+
+ public override void SetLength(long value) =>
+ throw new NotSupportedException("LZ4 stream does not support
SetLength");
+
+ public override void Write(byte[] buffer, int offset, int count) =>
+ throw new NotSupportedException("LZ4 decoder stream does not
support writing");
+
+ public override int ReadByte() => _reader.ReadOneByte();
+
+ public override int Read(byte[] buffer, int offset, int count) =>
+ _reader.ReadManyBytes(buffer.AsSpan(offset, count), _interactive);
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int
count, CancellationToken cancellationToken) =>
+ _reader.ReadManyBytesAsync(cancellationToken,
buffer.AsMemory(offset, count), _interactive);
+
+#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
+ public override int Read(Span<byte> buffer) =>
+ _reader.ReadManyBytes(buffer, _interactive);
+
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer,
CancellationToken cancellationToken = default) =>
+ new(_reader.ReadManyBytesAsync(cancellationToken, buffer,
_interactive));
+#endif
+
+ public override void Flush()
+ {
+ // No-op for read-only stream - nothing to flush since we only read
+ }
+
+ public override Task FlushAsync(CancellationToken cancellationToken)
+ {
+ // No-op for read-only stream - nothing to flush since we only read
+ return Task.CompletedTask;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ // Double-dispose protection: only dispose once
+ if (!_disposed)
+ {
+ if (disposing)
+ {
+ // Dispose managed resources
+ _reader.Dispose(); // Returns 4MB buffer to pool
+ if (!_leaveOpen)
+ {
+ _inner?.Dispose(); // Dispose inner stream if we own
it
+ }
+ }
+ // No unmanaged resources to clean up (no finalizer needed)
+ _disposed = true;
+ }
+ base.Dispose(disposing);
+ }
+ }
+}
diff --git a/csharp/src/Drivers/Databricks/CustomLZ4FrameReader.cs
b/csharp/src/Drivers/Databricks/CustomLZ4FrameReader.cs
new file mode 100644
index 000000000..635be2e6f
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/CustomLZ4FrameReader.cs
@@ -0,0 +1,80 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Buffers;
+using System.IO;
+using K4os.Compression.LZ4.Encoders;
+using K4os.Compression.LZ4.Streams;
+using K4os.Compression.LZ4.Streams.Frames;
+using K4os.Compression.LZ4.Streams.Internal;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+ /// <summary>
+ /// Custom LZ4 frame reader that uses a custom ArrayPool to support
pooling of 4MB+ buffers.
+ /// This solves the issue where Databricks LZ4 frames declare
maxBlockSize=4MB but .NET's
+ /// default ArrayPool only pools buffers up to 1MB, causing 900MB of fresh
LOH allocations.
+ /// </summary>
+ internal sealed class CustomLZ4FrameReader : StreamLZ4FrameReader
+ {
+ private readonly ArrayPool<byte> _bufferPool;
+
+ /// <summary>
+ /// Creates a new CustomLZ4FrameReader instance.
+ /// </summary>
+ /// <param name="stream">The stream to read compressed LZ4 data
from.</param>
+ /// <param name="leaveOpen">Whether to leave the stream open when
disposing.</param>
+ /// <param name="decoderFactory">Factory function to create the LZ4
decoder.</param>
+ /// <param name="bufferPool">The ArrayPool to use for buffer
allocation (from DatabricksDatabase).</param>
+ public CustomLZ4FrameReader(
+ Stream stream,
+ bool leaveOpen,
+ Func<ILZ4Descriptor, ILZ4Decoder> decoderFactory,
+ ArrayPool<byte> bufferPool)
+ : base(stream, leaveOpen, decoderFactory)
+ {
+ _bufferPool = bufferPool;
+ }
+
+ /// <summary>
+ /// Overrides buffer allocation to use our custom ArrayPool that
supports 4MB+ buffers.
+ /// </summary>
+ /// <param name="size">The size of buffer to allocate (typically 4MB
for Databricks).</param>
+ /// <returns>A buffer of at least the requested size, pooled if
possible.</returns>
+ protected override byte[] AllocBuffer(int size)
+ {
+ // Use our custom pool instead of the default BufferPool (which
uses ArrayPool.Shared with 1MB limit)
+ return _bufferPool.Rent(size);
+ }
+
+ /// <summary>
+ /// Overrides buffer release to return buffers to our custom ArrayPool.
+ /// </summary>
+ /// <param name="buffer">The buffer to return to the pool.</param>
+ protected override void ReleaseBuffer(byte[] buffer)
+ {
+ if (buffer != null)
+ {
+ // Clear the buffer to prevent stale data from previous
decompressions
+ // from corrupting subsequent operations. The performance
overhead (~1-2ms
+ // per 4MB buffer) is negligible compared to network I/O and
decompression time.
+ _bufferPool.Return(buffer, clearArray: true);
+ }
+ }
+ }
+}
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 74e43802b..2d0e687f5 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -97,8 +97,25 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
private HttpClient? _authHttpClient;
- public DatabricksConnection(IReadOnlyDictionary<string, string>
properties) : base(MergeWithDefaultEnvironmentConfig(properties))
+ /// <summary>
+ /// LZ4 buffer pool for decompression.
+ /// If provided by Database, this is shared across all connections for
optimal pooling.
+ /// If created directly, each connection has its own pool.
+ /// </summary>
+ internal System.Buffers.ArrayPool<byte> Lz4BufferPool { get; }
+
+ public DatabricksConnection(IReadOnlyDictionary<string, string>
properties)
+ : this(properties, null)
+ {
+ }
+
+ internal DatabricksConnection(
+ IReadOnlyDictionary<string, string> properties,
+ System.Buffers.ArrayPool<byte>? lz4BufferPool)
+ : base(MergeWithDefaultEnvironmentConfig(properties))
{
+ // Use provided pool (from Database) or create new instance (for
direct construction)
+ Lz4BufferPool = lz4BufferPool ??
System.Buffers.ArrayPool<byte>.Create(maxArrayLength: 4 * 1024 * 1024,
maxArraysPerBucket: 10);
ValidateProperties();
}
diff --git a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
index 933c6d382..8917d1616 100644
--- a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
@@ -29,6 +29,14 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
{
readonly IReadOnlyDictionary<string, string> properties;
+ /// <summary>
+ /// LZ4 buffer pool for decompression shared across all connections
from this database.
+ /// Sized for 4MB buffers (Databricks maxBlockSize) with capacity for
10 buffers.
+ /// This pool is instance-based to allow cleanup when the database is
disposed.
+ /// </summary>
+ internal readonly System.Buffers.ArrayPool<byte> Lz4BufferPool =
+ System.Buffers.ArrayPool<byte>.Create(maxArrayLength: 4 * 1024 *
1024, maxArraysPerBucket: 10);
+
public DatabricksDatabase(IReadOnlyDictionary<string, string>
properties)
{
this.properties = properties;
@@ -43,7 +51,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
: options
.Concat(properties.Where(x =>
!options.Keys.Contains(x.Key, StringComparer.OrdinalIgnoreCase)))
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
- DatabricksConnection connection = new
DatabricksConnection(mergedProperties);
+ // Share the LZ4 buffer pool with this connection via
constructor
+ DatabricksConnection connection = new
DatabricksConnection(mergedProperties, this.Lz4BufferPool);
connection.OpenAsync().Wait();
connection.ApplyServerSidePropertiesAsync().Wait();
return connection;
diff --git a/csharp/src/Drivers/Databricks/Lz4Utilities.cs
b/csharp/src/Drivers/Databricks/Lz4Utilities.cs
index 127a890c3..0b9f13d90 100644
--- a/csharp/src/Drivers/Databricks/Lz4Utilities.cs
+++ b/csharp/src/Drivers/Databricks/Lz4Utilities.cs
@@ -16,15 +16,18 @@
*/
using System;
+using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
+using K4os.Compression.LZ4.Encoders;
using K4os.Compression.LZ4.Streams;
namespace Apache.Arrow.Adbc.Drivers.Databricks
{
/// <summary>
/// Utility class for LZ4 compression/decompression operations.
+ /// Uses CustomLZ4DecoderStream with custom buffer pooling to handle
Databricks' 4MB LZ4 frames efficiently.
/// </summary>
internal static class Lz4Utilities
{
@@ -37,28 +40,36 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// Decompresses LZ4 compressed data into memory.
/// </summary>
/// <param name="compressedData">The compressed data bytes.</param>
+ /// <param name="bufferPool">The ArrayPool to use for buffer
allocation (from DatabricksDatabase).</param>
/// <returns>A ReadOnlyMemory containing the decompressed
data.</returns>
/// <exception cref="AdbcException">Thrown when decompression
fails.</exception>
- public static ReadOnlyMemory<byte> DecompressLz4(byte[] compressedData)
+ public static ReadOnlyMemory<byte> DecompressLz4(byte[]
compressedData, ArrayPool<byte> bufferPool)
{
- return DecompressLz4(compressedData, DefaultBufferSize);
+ return DecompressLz4(compressedData, DefaultBufferSize,
bufferPool);
}
/// <summary>
/// Decompresses LZ4 compressed data into memory with a specified
buffer size.
+ /// Uses CustomLZ4DecoderStream with custom ArrayPool to efficiently
pool 4MB+ buffers.
/// </summary>
/// <param name="compressedData">The compressed data bytes.</param>
/// <param name="bufferSize">The buffer size to use for decompression
operations.</param>
+ /// <param name="bufferPool">The ArrayPool to use for buffer
allocation (from DatabricksDatabase).</param>
/// <returns>A ReadOnlyMemory containing the decompressed
data.</returns>
/// <exception cref="AdbcException">Thrown when decompression
fails.</exception>
- public static ReadOnlyMemory<byte> DecompressLz4(byte[]
compressedData, int bufferSize)
+ public static ReadOnlyMemory<byte> DecompressLz4(byte[]
compressedData, int bufferSize, ArrayPool<byte> bufferPool)
{
try
{
using (var outputStream = new MemoryStream())
{
using (var inputStream = new MemoryStream(compressedData))
- using (var decompressor = LZ4Stream.Decode(inputStream))
+ using (var decompressor = new CustomLZ4DecoderStream(
+ inputStream,
+ descriptor => descriptor.CreateDecoder(),
+ bufferPool,
+ leaveOpen: false,
+ interactive: false))
{
decompressor.CopyTo(outputStream, bufferSize);
}
@@ -66,7 +77,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
// Get the underlying buffer and its valid length without
copying
// The buffer remains valid after MemoryStream disposal
since we hold a reference to it
byte[] buffer = outputStream.GetBuffer();
- return new ReadOnlyMemory<byte>(buffer, 0,
(int)outputStream.Length);
+ int length = (int)outputStream.Length;
+
+ return new ReadOnlyMemory<byte>(buffer, 0, length);
}
}
catch (Exception ex)
@@ -80,28 +93,33 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// Returns the buffer and length as a tuple for efficient wrapping in
a MemoryStream.
/// </summary>
/// <param name="compressedData">The compressed data bytes.</param>
+ /// <param name="bufferPool">The ArrayPool to use for buffer
allocation (from DatabricksDatabase).</param>
/// <param name="cancellationToken">Cancellation token for the async
operation.</param>
/// <returns>A tuple containing the decompressed buffer and its valid
length.</returns>
/// <exception cref="AdbcException">Thrown when decompression
fails.</exception>
public static Task<(byte[] buffer, int length)> DecompressLz4Async(
byte[] compressedData,
+ ArrayPool<byte> bufferPool,
CancellationToken cancellationToken = default)
{
- return DecompressLz4Async(compressedData, DefaultBufferSize,
cancellationToken);
+ return DecompressLz4Async(compressedData, DefaultBufferSize,
bufferPool, cancellationToken);
}
/// <summary>
/// Asynchronously decompresses LZ4 compressed data into memory with a
specified buffer size.
/// Returns the buffer and length as a tuple for efficient wrapping in
a MemoryStream.
+ /// Uses CustomLZ4DecoderStream with custom ArrayPool to efficiently
pool 4MB+ buffers.
/// </summary>
/// <param name="compressedData">The compressed data bytes.</param>
/// <param name="bufferSize">The buffer size to use for decompression
operations.</param>
+ /// <param name="bufferPool">The ArrayPool to use for buffer
allocation (from DatabricksDatabase).</param>
/// <param name="cancellationToken">Cancellation token for the async
operation.</param>
/// <returns>A tuple containing the decompressed buffer and its valid
length.</returns>
/// <exception cref="AdbcException">Thrown when decompression
fails.</exception>
public static async Task<(byte[] buffer, int length)>
DecompressLz4Async(
byte[] compressedData,
int bufferSize,
+ ArrayPool<byte> bufferPool,
CancellationToken cancellationToken = default)
{
try
@@ -109,7 +127,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
using (var outputStream = new MemoryStream())
{
using (var inputStream = new MemoryStream(compressedData))
- using (var decompressor = LZ4Stream.Decode(inputStream))
+ using (var decompressor = new CustomLZ4DecoderStream(
+ inputStream,
+ descriptor => descriptor.CreateDecoder(),
+ bufferPool,
+ leaveOpen: false,
+ interactive: false))
{
await decompressor.CopyToAsync(outputStream,
bufferSize, cancellationToken).ConfigureAwait(false);
}
@@ -118,6 +141,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
// The buffer remains valid after MemoryStream disposal
since we hold a reference to it
byte[] buffer = outputStream.GetBuffer();
int length = (int)outputStream.Length;
+
return (buffer, length);
}
}
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index 8fee61d92..913d177df 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -33,7 +33,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// </summary>
internal sealed class CloudFetchDownloader : ICloudFetchDownloader,
IActivityTracer
{
- private readonly ITracingStatement _statement;
+ private readonly IHiveServer2Statement _statement;
private readonly BlockingCollection<IDownloadResult> _downloadQueue;
private readonly BlockingCollection<IDownloadResult> _resultQueue;
private readonly ICloudFetchMemoryBufferManager _memoryManager;
@@ -55,7 +55,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// <summary>
/// Initializes a new instance of the <see
cref="CloudFetchDownloader"/> class.
/// </summary>
- /// <param name="statement">The tracing statement for Activity
context.</param>
+ /// <param name="statement">The HiveServer2 statement for Activity
context.</param>
/// <param name="downloadQueue">The queue of downloads to
process.</param>
/// <param name="resultQueue">The queue to add completed downloads
to.</param>
/// <param name="memoryManager">The memory buffer manager.</param>
@@ -68,7 +68,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// <param name="maxUrlRefreshAttempts">The maximum number of URL
refresh attempts.</param>
/// <param name="urlExpirationBufferSeconds">Buffer time in seconds
before URL expiration to trigger refresh.</param>
public CloudFetchDownloader(
- ITracingStatement statement,
+ IHiveServer2Statement statement,
BlockingCollection<IDownloadResult> downloadQueue,
BlockingCollection<IDownloadResult> resultQueue,
ICloudFetchMemoryBufferManager memoryManager,
@@ -494,8 +494,11 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
var decompressStopwatch = Stopwatch.StartNew();
// Use shared Lz4Utilities for decompression
(consolidates logic with non-CloudFetch path)
+ // Pass the connection's buffer pool for efficient LZ4
decompression
+ var connection =
(DatabricksConnection)_statement.Connection;
var (buffer, length) = await
Lz4Utilities.DecompressLz4Async(
fileData,
+ connection.Lz4BufferPool,
cancellationToken).ConfigureAwait(false);
// Create the dataStream from the decompressed buffer
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
index dc69f0f7a..2fc6268b0 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
@@ -121,7 +121,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
// If LZ4 compression is enabled, decompress the data
if (isLz4Compressed)
{
- dataToUse = Lz4Utilities.DecompressLz4(batch.Batch);
+ // Pass the connection's buffer pool for efficient LZ4
decompression
+ var connection =
(DatabricksConnection)this.statement.Connection;
+ dataToUse = Lz4Utilities.DecompressLz4(batch.Batch,
connection.Lz4BufferPool);
}
this.reader = new
SingleBatch(ArrowSerializationHelpers.DeserializeRecordBatch(this.schema,
dataToUse));