This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b01c6e3f chore(csharp/src/Drivers/Databricks): Revert previous 
"memory utilization of cloud downloads" change (#3655)
0b01c6e3f is described below

commit 0b01c6e3f2a35ad8124ee6262ac2a65efb9bc06a
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Thu Oct 30 19:01:55 2025 -0700

    chore(csharp/src/Drivers/Databricks): Revert previous "memory utilization 
of cloud downloads" change (#3655)
    
    Reverts #3652 as it seems to have caused a regression.
---
 .../Reader/CloudFetch/CloudFetchDownloader.cs      | 20 ++++++++++++--------
 .../Reader/CloudFetch/CloudFetchReader.cs          |  2 +-
 .../Databricks/Reader/CloudFetch/DownloadResult.cs | 20 +++++++++-----------
 .../Reader/CloudFetch/EndOfResultsGuard.cs         |  5 +++--
 .../Reader/CloudFetch/ICloudFetchInterfaces.cs     |  9 +++++----
 .../E2E/CloudFetch/CloudFetchDownloaderTest.cs     | 22 ++++++++++------------
 6 files changed, 40 insertions(+), 38 deletions(-)

diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index 9dae54783..8fee61d92 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -22,7 +22,9 @@ using System.IO;
 using System.Net.Http;
 using System.Threading;
 using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
 using Apache.Arrow.Adbc.Tracing;
+using K4os.Compression.LZ4.Streams;
 
 namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
 {
@@ -481,7 +483,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                 }
 
                 // Process the downloaded file data
-                Memory<byte> data;
+                MemoryStream dataStream;
                 long actualSize = fileData.Length;
 
                 // If the data is LZ4 compressed, decompress it
@@ -496,11 +498,13 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                             fileData,
                             cancellationToken).ConfigureAwait(false);
 
-                        data = new Memory<byte>(buffer, 0, length);
+                        // Create the dataStream from the decompressed buffer
+                        dataStream = new MemoryStream(buffer, 0, length, 
writable: false, publiclyVisible: true);
+                        dataStream.Position = 0;
                         decompressStopwatch.Stop();
 
                         // Calculate throughput metrics
-                        double compressionRatio = (double)length / actualSize;
+                        double compressionRatio = (double)dataStream.Length / 
actualSize;
 
                         
activity?.AddEvent("cloudfetch.decompression_complete", [
                             new("offset", downloadResult.Link.StartRowOffset),
@@ -508,12 +512,12 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                             new("decompression_time_ms", 
decompressStopwatch.ElapsedMilliseconds),
                             new("compressed_size_bytes", actualSize),
                             new("compressed_size_kb", actualSize / 1024.0),
-                            new("decompressed_size_bytes", length),
-                            new("decompressed_size_kb", length / 1024.0),
+                            new("decompressed_size_bytes", dataStream.Length),
+                            new("decompressed_size_kb", dataStream.Length / 
1024.0),
                             new("compression_ratio", compressionRatio)
                         ]);
 
-                        actualSize = length;
+                        actualSize = dataStream.Length;
                     }
                     catch (Exception ex)
                     {
@@ -532,7 +536,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                 }
                 else
                 {
-                    data = fileData;
+                    dataStream = new MemoryStream(fileData);
                 }
 
                 // Stop the stopwatch and log download completion
@@ -548,7 +552,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                 ]);
 
                 // Set the download as completed with the original size
-                downloadResult.SetCompleted(data, size);
+                downloadResult.SetCompleted(dataStream, size);
             }, activityName: "DownloadFile");
         }
 
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
index 8201e436d..99c0b18e5 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
@@ -145,7 +145,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                             // Create a new reader for the downloaded file
                             try
                             {
-                                this.currentReader = new 
ArrowStreamReader(this.currentDownloadResult.Data);
+                                this.currentReader = new 
ArrowStreamReader(this.currentDownloadResult.DataStream);
                                 continue;
                             }
                             catch (Exception ex)
diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
index bc44a3f66..d035fa369 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.IO;
 using System.Threading.Tasks;
 using Apache.Hive.Service.Rpc.Thrift;
 
@@ -28,7 +29,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
     {
         private readonly TaskCompletionSource<bool> _downloadCompletionSource;
         private readonly ICloudFetchMemoryBufferManager _memoryManager;
-        private ReadOnlyMemory<byte> _data;
+        private Stream? _dataStream;
         private bool _isDisposed;
         private long _size;
 
@@ -49,7 +50,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         public TSparkArrowResultLink Link { get; private set; }
 
         /// <inheritdoc />
-        public ReadOnlyMemory<byte> Data
+        public Stream DataStream
         {
             get
             {
@@ -58,7 +59,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                 {
                     throw new InvalidOperationException("Download has not 
completed yet.");
                 }
-                return _data;
+                return _dataStream!;
             }
         }
 
@@ -102,14 +103,10 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         }
 
         /// <inheritdoc />
-        public void SetCompleted(ReadOnlyMemory<byte> data, long size)
+        public void SetCompleted(Stream dataStream, long size)
         {
             ThrowIfDisposed();
-            if (data.Length == 0)
-            {
-                throw new ArgumentException("Data cannot be empty.", 
nameof(data));
-            }
-            _data = data;
+            _dataStream = dataStream ?? throw new 
ArgumentNullException(nameof(dataStream));
             _downloadCompletionSource.TrySetResult(true);
             _size = size;
         }
@@ -129,9 +126,10 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                 return;
             }
 
-            if (_data.Length > 0)
+            if (_dataStream != null)
             {
-                _data = default;
+                _dataStream.Dispose();
+                _dataStream = null;
 
                 // Release memory back to the manager
                 if (_size > 0)
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
index 366fa8e05..73c88023a 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.IO;
 using System.Threading.Tasks;
 using Apache.Hive.Service.Rpc.Thrift;
 
@@ -42,7 +43,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         public TSparkArrowResultLink Link => throw new 
NotSupportedException("EndOfResultsGuard does not have a link.");
 
         /// <inheritdoc />
-        public ReadOnlyMemory<byte> Data => throw new 
NotSupportedException("EndOfResultsGuard does not have data.");
+        public Stream DataStream => throw new 
NotSupportedException("EndOfResultsGuard does not have a data stream.");
 
         /// <inheritdoc />
         public long Size => 0;
@@ -57,7 +58,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         public int RefreshAttempts => 0;
 
         /// <inheritdoc />
-        public void SetCompleted(ReadOnlyMemory<byte> data, long size) => 
throw new NotSupportedException("EndOfResultsGuard cannot be completed.");
+        public void SetCompleted(Stream dataStream, long size) => throw new 
NotSupportedException("EndOfResultsGuard cannot be completed.");
 
         /// <inheritdoc />
         public void SetFailed(Exception exception) => throw new 
NotSupportedException("EndOfResultsGuard cannot fail.");
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
index 374466116..128e3a510 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
 using Apache.Hive.Service.Rpc.Thrift;
@@ -33,9 +34,9 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         TSparkArrowResultLink Link { get; }
 
         /// <summary>
-        /// Gets the memory containing the downloaded data.
+        /// Gets the stream containing the downloaded data.
         /// </summary>
-        ReadOnlyMemory<byte> Data { get; }
+        Stream DataStream { get; }
 
         /// <summary>
         /// Gets the size of the downloaded data in bytes.
@@ -60,9 +61,9 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         /// <summary>
         /// Sets the download as completed with the provided data stream.
         /// </summary>
-        /// <param name="data">The downloaded data.</param>
+        /// <param name="dataStream">The stream containing the downloaded 
data.</param>
         /// <param name="size">The size of the downloaded data in 
bytes.</param>
-        void SetCompleted(ReadOnlyMemory<byte> data, long size);
+        void SetCompleted(Stream dataStream, long size);
 
         /// <summary>
         /// Sets the download as failed with the specified exception.
diff --git 
a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs 
b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
index 597314a40..67c8367dd 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
@@ -25,7 +25,6 @@ using System.Threading;
 using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
 using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch;
-using Apache.Arrow.Adbc.Tracing;
 using Apache.Hive.Service.Rpc.Thrift;
 using Moq;
 using Moq.Protected;
@@ -47,7 +46,6 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
             _resultQueue = new BlockingCollection<IDownloadResult>(new 
ConcurrentQueue<IDownloadResult>(), 10);
             _mockMemoryManager = new Mock<ICloudFetchMemoryBufferManager>();
             _mockStatement = new Mock<IHiveServer2Statement>();
-            _mockStatement.SetupGet(x => x.Trace).Returns(new ActivityTrace());
             _mockResultFetcher = new Mock<ICloudFetchResultFetcher>();
 
             // Set up memory manager defaults
@@ -138,13 +136,13 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
             mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
             mockDownloadResult.Setup(r => 
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
 
-            // Capture the date and size passed to SetCompleted
-            ReadOnlyMemory<byte> capturedData = default;
+            // Capture the stream and size passed to SetCompleted
+            Stream? capturedStream = null;
             long capturedSize = 0;
-            mockDownloadResult.Setup(r => 
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
-                .Callback<ReadOnlyMemory<byte>, long>((data, size) =>
+            mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<Stream>(), 
It.IsAny<long>()))
+                .Callback<Stream, long>((stream, size) =>
                 {
-                    capturedData = data;
+                    capturedStream = stream;
                     capturedSize = size;
                 });
 
@@ -178,11 +176,11 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
             Assert.Same(mockDownloadResult.Object, result);
 
             // Verify SetCompleted was called
-            mockDownloadResult.Verify(r => 
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()), Times.Once);
+            mockDownloadResult.Verify(r => r.SetCompleted(It.IsAny<Stream>(), 
It.IsAny<long>()), Times.Once);
 
             // Verify the content of the stream
-            Assert.NotEqual(0, capturedData.Length);
-            using (var reader = new StreamReader(new 
MemoryStream(capturedData.ToArray())))
+            Assert.NotNull(capturedStream);
+            using (var reader = new StreamReader(capturedStream))
             {
                 string content = reader.ReadToEnd();
                 Assert.Equal(testContent, content);
@@ -486,8 +484,8 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
                 mockDownloadResult.Setup(r => r.Size).Returns(100);
                 mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
                 mockDownloadResult.Setup(r => 
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
-                mockDownloadResult.Setup(r => 
r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
-                    .Callback<ReadOnlyMemory<byte>, long>((_, _) => { });
+                mockDownloadResult.Setup(r => 
r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()))
+                    .Callback<Stream, long>((_, _) => { });
                 downloadResults[i] = mockDownloadResult.Object;
             }
 

Reply via email to