This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new eae39ddb5 feat(csharp/src/Drivers/Databricks): Improve memory
utilization of cloud downloads (#3652)
eae39ddb5 is described below
commit eae39ddb5ad4bae85eea675371a3e408f4b1721d
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Thu Oct 30 06:52:55 2025 -0700
feat(csharp/src/Drivers/Databricks): Improve memory utilization of cloud
downloads (#3652)
Improves memory utilization of cloud downloads by casting the downloaded
and/or decompressed cloud data sets directly into Arrow data rather than
having to deserialize them through a stream.
NOTE: I have not benchmarked this change.
---
.../Reader/CloudFetch/CloudFetchDownloader.cs | 20 ++++++++------------
.../Reader/CloudFetch/CloudFetchReader.cs | 2 +-
.../Databricks/Reader/CloudFetch/DownloadResult.cs | 20 +++++++++++---------
.../Reader/CloudFetch/EndOfResultsGuard.cs | 5 ++---
.../Reader/CloudFetch/ICloudFetchInterfaces.cs | 9 ++++-----
.../E2E/CloudFetch/CloudFetchDownloaderTest.cs | 22 ++++++++++++----------
6 files changed, 38 insertions(+), 40 deletions(-)
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index 8fee61d92..9dae54783 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -22,9 +22,7 @@ using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
-using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Adbc.Tracing;
-using K4os.Compression.LZ4.Streams;
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
@@ -483,7 +481,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
// Process the downloaded file data
- MemoryStream dataStream;
+ Memory<byte> data;
long actualSize = fileData.Length;
// If the data is LZ4 compressed, decompress it
@@ -498,13 +496,11 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
fileData,
cancellationToken).ConfigureAwait(false);
- // Create the dataStream from the decompressed buffer
- dataStream = new MemoryStream(buffer, 0, length,
writable: false, publiclyVisible: true);
- dataStream.Position = 0;
+ data = new Memory<byte>(buffer, 0, length);
decompressStopwatch.Stop();
// Calculate throughput metrics
- double compressionRatio = (double)dataStream.Length /
actualSize;
+ double compressionRatio = (double)length / actualSize;
activity?.AddEvent("cloudfetch.decompression_complete", [
new("offset", downloadResult.Link.StartRowOffset),
@@ -512,12 +508,12 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
new("decompression_time_ms",
decompressStopwatch.ElapsedMilliseconds),
new("compressed_size_bytes", actualSize),
new("compressed_size_kb", actualSize / 1024.0),
- new("decompressed_size_bytes", dataStream.Length),
- new("decompressed_size_kb", dataStream.Length /
1024.0),
+ new("decompressed_size_bytes", length),
+ new("decompressed_size_kb", length / 1024.0),
new("compression_ratio", compressionRatio)
]);
- actualSize = dataStream.Length;
+ actualSize = length;
}
catch (Exception ex)
{
@@ -536,7 +532,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
else
{
- dataStream = new MemoryStream(fileData);
+ data = fileData;
}
// Stop the stopwatch and log download completion
@@ -552,7 +548,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
]);
// Set the download as completed with the original size
- downloadResult.SetCompleted(dataStream, size);
+ downloadResult.SetCompleted(data, size);
}, activityName: "DownloadFile");
}
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
index 99c0b18e5..8201e436d 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
@@ -145,7 +145,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
// Create a new reader for the downloaded file
try
{
- this.currentReader = new
ArrowStreamReader(this.currentDownloadResult.DataStream);
+ this.currentReader = new
ArrowStreamReader(this.currentDownloadResult.Data);
continue;
}
catch (Exception ex)
diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
index d035fa369..bc44a3f66 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
@@ -16,7 +16,6 @@
*/
using System;
-using System.IO;
using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
@@ -29,7 +28,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
private readonly TaskCompletionSource<bool> _downloadCompletionSource;
private readonly ICloudFetchMemoryBufferManager _memoryManager;
- private Stream? _dataStream;
+ private ReadOnlyMemory<byte> _data;
private bool _isDisposed;
private long _size;
@@ -50,7 +49,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
public TSparkArrowResultLink Link { get; private set; }
/// <inheritdoc />
- public Stream DataStream
+ public ReadOnlyMemory<byte> Data
{
get
{
@@ -59,7 +58,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
throw new InvalidOperationException("Download has not
completed yet.");
}
- return _dataStream!;
+ return _data;
}
}
@@ -103,10 +102,14 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
/// <inheritdoc />
- public void SetCompleted(Stream dataStream, long size)
+ public void SetCompleted(ReadOnlyMemory<byte> data, long size)
{
ThrowIfDisposed();
- _dataStream = dataStream ?? throw new
ArgumentNullException(nameof(dataStream));
+ if (data.Length == 0)
+ {
+ throw new ArgumentException("Data cannot be empty.",
nameof(data));
+ }
+ _data = data;
_downloadCompletionSource.TrySetResult(true);
_size = size;
}
@@ -126,10 +129,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
return;
}
- if (_dataStream != null)
+ if (_data.Length > 0)
{
- _dataStream.Dispose();
- _dataStream = null;
+ _data = default;
// Release memory back to the manager
if (_size > 0)
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
index 73c88023a..366fa8e05 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
@@ -16,7 +16,6 @@
*/
using System;
-using System.IO;
using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
@@ -43,7 +42,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
public TSparkArrowResultLink Link => throw new
NotSupportedException("EndOfResultsGuard does not have a link.");
/// <inheritdoc />
- public Stream DataStream => throw new
NotSupportedException("EndOfResultsGuard does not have a data stream.");
+ public ReadOnlyMemory<byte> Data => throw new
NotSupportedException("EndOfResultsGuard does not have data.");
/// <inheritdoc />
public long Size => 0;
@@ -58,7 +57,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
public int RefreshAttempts => 0;
/// <inheritdoc />
- public void SetCompleted(Stream dataStream, long size) => throw new
NotSupportedException("EndOfResultsGuard cannot be completed.");
+ public void SetCompleted(ReadOnlyMemory<byte> data, long size) =>
throw new NotSupportedException("EndOfResultsGuard cannot be completed.");
/// <inheritdoc />
public void SetFailed(Exception exception) => throw new
NotSupportedException("EndOfResultsGuard cannot fail.");
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
index 128e3a510..374466116 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
@@ -16,7 +16,6 @@
*/
using System;
-using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
@@ -34,9 +33,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
TSparkArrowResultLink Link { get; }
/// <summary>
- /// Gets the stream containing the downloaded data.
+ /// Gets the memory containing the downloaded data.
/// </summary>
- Stream DataStream { get; }
+ ReadOnlyMemory<byte> Data { get; }
/// <summary>
/// Gets the size of the downloaded data in bytes.
@@ -61,9 +60,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// <summary>
/// Sets the download as completed with the provided data stream.
/// </summary>
- /// <param name="dataStream">The stream containing the downloaded
data.</param>
+ /// <param name="data">The downloaded data.</param>
/// <param name="size">The size of the downloaded data in
bytes.</param>
- void SetCompleted(Stream dataStream, long size);
+ void SetCompleted(ReadOnlyMemory<byte> data, long size);
/// <summary>
/// Sets the download as failed with the specified exception.
diff --git
a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
index 67c8367dd..597314a40 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
@@ -25,6 +25,7 @@ using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Hive.Service.Rpc.Thrift;
using Moq;
using Moq.Protected;
@@ -46,6 +47,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
_resultQueue = new BlockingCollection<IDownloadResult>(new
ConcurrentQueue<IDownloadResult>(), 10);
_mockMemoryManager = new Mock<ICloudFetchMemoryBufferManager>();
_mockStatement = new Mock<IHiveServer2Statement>();
+ _mockStatement.SetupGet(x => x.Trace).Returns(new ActivityTrace());
_mockResultFetcher = new Mock<ICloudFetchResultFetcher>();
// Set up memory manager defaults
@@ -136,13 +138,13 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
mockDownloadResult.Setup(r =>
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
- // Capture the stream and size passed to SetCompleted
- Stream? capturedStream = null;
+ // Capture the date and size passed to SetCompleted
+ ReadOnlyMemory<byte> capturedData = default;
long capturedSize = 0;
- mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<Stream>(),
It.IsAny<long>()))
- .Callback<Stream, long>((stream, size) =>
+ mockDownloadResult.Setup(r =>
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
+ .Callback<ReadOnlyMemory<byte>, long>((data, size) =>
{
- capturedStream = stream;
+ capturedData = data;
capturedSize = size;
});
@@ -176,11 +178,11 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
Assert.Same(mockDownloadResult.Object, result);
// Verify SetCompleted was called
- mockDownloadResult.Verify(r => r.SetCompleted(It.IsAny<Stream>(),
It.IsAny<long>()), Times.Once);
+ mockDownloadResult.Verify(r =>
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()), Times.Once);
// Verify the content of the stream
- Assert.NotNull(capturedStream);
- using (var reader = new StreamReader(capturedStream))
+ Assert.NotEqual(0, capturedData.Length);
+ using (var reader = new StreamReader(new
MemoryStream(capturedData.ToArray())))
{
string content = reader.ReadToEnd();
Assert.Equal(testContent, content);
@@ -484,8 +486,8 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
mockDownloadResult.Setup(r => r.Size).Returns(100);
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
mockDownloadResult.Setup(r =>
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
- mockDownloadResult.Setup(r =>
r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()))
- .Callback<Stream, long>((_, _) => { });
+ mockDownloadResult.Setup(r =>
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
+ .Callback<ReadOnlyMemory<byte>, long>((_, _) => { });
downloadResults[i] = mockDownloadResult.Object;
}