This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 0b01c6e3f chore(csharp/src/Drivers/Databricks): Revert previous
"memory utilization of cloud downloads" change (#3655)
0b01c6e3f is described below
commit 0b01c6e3f2a35ad8124ee6262ac2a65efb9bc06a
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Thu Oct 30 19:01:55 2025 -0700
chore(csharp/src/Drivers/Databricks): Revert previous "memory utilization
of cloud downloads" change (#3655)
Reverts #3652 as it seems to have caused a regression.
---
.../Reader/CloudFetch/CloudFetchDownloader.cs | 20 ++++++++++++--------
.../Reader/CloudFetch/CloudFetchReader.cs | 2 +-
.../Databricks/Reader/CloudFetch/DownloadResult.cs | 20 +++++++++-----------
.../Reader/CloudFetch/EndOfResultsGuard.cs | 5 +++--
.../Reader/CloudFetch/ICloudFetchInterfaces.cs | 9 +++++----
.../E2E/CloudFetch/CloudFetchDownloaderTest.cs | 22 ++++++++++------------
6 files changed, 40 insertions(+), 38 deletions(-)
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index 9dae54783..8fee61d92 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -22,7 +22,9 @@ using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Adbc.Tracing;
+using K4os.Compression.LZ4.Streams;
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
@@ -481,7 +483,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
// Process the downloaded file data
- Memory<byte> data;
+ MemoryStream dataStream;
long actualSize = fileData.Length;
// If the data is LZ4 compressed, decompress it
@@ -496,11 +498,13 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
fileData,
cancellationToken).ConfigureAwait(false);
- data = new Memory<byte>(buffer, 0, length);
+ // Create the dataStream from the decompressed buffer
+ dataStream = new MemoryStream(buffer, 0, length,
writable: false, publiclyVisible: true);
+ dataStream.Position = 0;
decompressStopwatch.Stop();
// Calculate throughput metrics
- double compressionRatio = (double)length / actualSize;
+ double compressionRatio = (double)dataStream.Length /
actualSize;
activity?.AddEvent("cloudfetch.decompression_complete", [
new("offset", downloadResult.Link.StartRowOffset),
@@ -508,12 +512,12 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
new("decompression_time_ms",
decompressStopwatch.ElapsedMilliseconds),
new("compressed_size_bytes", actualSize),
new("compressed_size_kb", actualSize / 1024.0),
- new("decompressed_size_bytes", length),
- new("decompressed_size_kb", length / 1024.0),
+ new("decompressed_size_bytes", dataStream.Length),
+ new("decompressed_size_kb", dataStream.Length /
1024.0),
new("compression_ratio", compressionRatio)
]);
- actualSize = length;
+ actualSize = dataStream.Length;
}
catch (Exception ex)
{
@@ -532,7 +536,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
else
{
- data = fileData;
+ dataStream = new MemoryStream(fileData);
}
// Stop the stopwatch and log download completion
@@ -548,7 +552,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
]);
// Set the download as completed with the original size
- downloadResult.SetCompleted(data, size);
+ downloadResult.SetCompleted(dataStream, size);
}, activityName: "DownloadFile");
}
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
index 8201e436d..99c0b18e5 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
@@ -145,7 +145,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
// Create a new reader for the downloaded file
try
{
- this.currentReader = new
ArrowStreamReader(this.currentDownloadResult.Data);
+ this.currentReader = new
ArrowStreamReader(this.currentDownloadResult.DataStream);
continue;
}
catch (Exception ex)
diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
index bc44a3f66..d035fa369 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.IO;
using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
@@ -28,7 +29,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
private readonly TaskCompletionSource<bool> _downloadCompletionSource;
private readonly ICloudFetchMemoryBufferManager _memoryManager;
- private ReadOnlyMemory<byte> _data;
+ private Stream? _dataStream;
private bool _isDisposed;
private long _size;
@@ -49,7 +50,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
public TSparkArrowResultLink Link { get; private set; }
/// <inheritdoc />
- public ReadOnlyMemory<byte> Data
+ public Stream DataStream
{
get
{
@@ -58,7 +59,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
throw new InvalidOperationException("Download has not
completed yet.");
}
- return _data;
+ return _dataStream!;
}
}
@@ -102,14 +103,10 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
/// <inheritdoc />
- public void SetCompleted(ReadOnlyMemory<byte> data, long size)
+ public void SetCompleted(Stream dataStream, long size)
{
ThrowIfDisposed();
- if (data.Length == 0)
- {
- throw new ArgumentException("Data cannot be empty.",
nameof(data));
- }
- _data = data;
+ _dataStream = dataStream ?? throw new
ArgumentNullException(nameof(dataStream));
_downloadCompletionSource.TrySetResult(true);
_size = size;
}
@@ -129,9 +126,10 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
return;
}
- if (_data.Length > 0)
+ if (_dataStream != null)
{
- _data = default;
+ _dataStream.Dispose();
+ _dataStream = null;
// Release memory back to the manager
if (_size > 0)
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
index 366fa8e05..73c88023a 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.IO;
using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
@@ -42,7 +43,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
public TSparkArrowResultLink Link => throw new
NotSupportedException("EndOfResultsGuard does not have a link.");
/// <inheritdoc />
- public ReadOnlyMemory<byte> Data => throw new
NotSupportedException("EndOfResultsGuard does not have data.");
+ public Stream DataStream => throw new
NotSupportedException("EndOfResultsGuard does not have a data stream.");
/// <inheritdoc />
public long Size => 0;
@@ -57,7 +58,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
public int RefreshAttempts => 0;
/// <inheritdoc />
- public void SetCompleted(ReadOnlyMemory<byte> data, long size) =>
throw new NotSupportedException("EndOfResultsGuard cannot be completed.");
+ public void SetCompleted(Stream dataStream, long size) => throw new
NotSupportedException("EndOfResultsGuard cannot be completed.");
/// <inheritdoc />
public void SetFailed(Exception exception) => throw new
NotSupportedException("EndOfResultsGuard cannot fail.");
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
index 374466116..128e3a510 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
@@ -33,9 +34,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
TSparkArrowResultLink Link { get; }
/// <summary>
- /// Gets the memory containing the downloaded data.
+ /// Gets the stream containing the downloaded data.
/// </summary>
- ReadOnlyMemory<byte> Data { get; }
+ Stream DataStream { get; }
/// <summary>
/// Gets the size of the downloaded data in bytes.
@@ -60,9 +61,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// <summary>
/// Sets the download as completed with the provided data stream.
/// </summary>
- /// <param name="data">The downloaded data.</param>
+ /// <param name="dataStream">The stream containing the downloaded
data.</param>
/// <param name="size">The size of the downloaded data in
bytes.</param>
- void SetCompleted(ReadOnlyMemory<byte> data, long size);
+ void SetCompleted(Stream dataStream, long size);
/// <summary>
/// Sets the download as failed with the specified exception.
diff --git
a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
index 597314a40..67c8367dd 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
@@ -25,7 +25,6 @@ using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch;
-using Apache.Arrow.Adbc.Tracing;
using Apache.Hive.Service.Rpc.Thrift;
using Moq;
using Moq.Protected;
@@ -47,7 +46,6 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
_resultQueue = new BlockingCollection<IDownloadResult>(new
ConcurrentQueue<IDownloadResult>(), 10);
_mockMemoryManager = new Mock<ICloudFetchMemoryBufferManager>();
_mockStatement = new Mock<IHiveServer2Statement>();
- _mockStatement.SetupGet(x => x.Trace).Returns(new ActivityTrace());
_mockResultFetcher = new Mock<ICloudFetchResultFetcher>();
// Set up memory manager defaults
@@ -138,13 +136,13 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
mockDownloadResult.Setup(r =>
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
- // Capture the date and size passed to SetCompleted
- ReadOnlyMemory<byte> capturedData = default;
+ // Capture the stream and size passed to SetCompleted
+ Stream? capturedStream = null;
long capturedSize = 0;
- mockDownloadResult.Setup(r =>
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
- .Callback<ReadOnlyMemory<byte>, long>((data, size) =>
+ mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<Stream>(),
It.IsAny<long>()))
+ .Callback<Stream, long>((stream, size) =>
{
- capturedData = data;
+ capturedStream = stream;
capturedSize = size;
});
@@ -178,11 +176,11 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
Assert.Same(mockDownloadResult.Object, result);
// Verify SetCompleted was called
- mockDownloadResult.Verify(r =>
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()), Times.Once);
+ mockDownloadResult.Verify(r => r.SetCompleted(It.IsAny<Stream>(),
It.IsAny<long>()), Times.Once);
// Verify the content of the stream
- Assert.NotEqual(0, capturedData.Length);
- using (var reader = new StreamReader(new
MemoryStream(capturedData.ToArray())))
+ Assert.NotNull(capturedStream);
+ using (var reader = new StreamReader(capturedStream))
{
string content = reader.ReadToEnd();
Assert.Equal(testContent, content);
@@ -486,8 +484,8 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
mockDownloadResult.Setup(r => r.Size).Returns(100);
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
mockDownloadResult.Setup(r =>
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
- mockDownloadResult.Setup(r =>
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
- .Callback<ReadOnlyMemory<byte>, long>((_, _) => { });
+ mockDownloadResult.Setup(r =>
r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()))
+ .Callback<Stream, long>((_, _) => { });
downloadResults[i] = mockDownloadResult.Object;
}