This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 87c49e937 fix(csharp/src/Drivers/Databricks): Implement FIFO memory 
acquisition to prevent starvation in CloudFetch (#3756)
87c49e937 is described below

commit 87c49e9377081a1f6c04196e2adbfead6d21a542
Author: eric-wang-1990 <[email protected]>
AuthorDate: Tue Dec 2 13:04:24 2025 -0800

    fix(csharp/src/Drivers/Databricks): Implement FIFO memory acquisition to 
prevent starvation in CloudFetch (#3756)
    
    Fixes memory starvation issue in CloudFetch download pipeline where
    newer downloads could win memory allocation over older waiting
    downloads, causing indefinite delays.
    
    ## Problem
    The CloudFetch downloader was acquiring memory inside parallel download
    tasks with a polling-based wait (10ms intervals). This non-FIFO behavior
    caused memory starvation:
    
    1. Multiple download tasks acquire semaphore slots and start polling for
    memory
    2. All tasks wake up simultaneously every 10ms to check memory
    availability
    3. When memory becomes available, any waiting task could acquire it
    (non-deterministic)
    4. Newer downloads (e.g., file 5, 7, 8) could repeatedly win the race
    over older downloads (e.g., file 4)
    5. File 4 never gets memory despite waiting the longest → **indefinite
    starvation**
    
    Example scenario with 200MB memory, 50MB files, 3 parallel downloads:
    - Files 1, 2, 3 download (150MB used)
    - File 1 completes and releases 50MB
    - Files 4, 5, 6 are all waiting for memory
    - File 5 wins the race and acquires the 50MB
    - File 2 completes, file 7 wins the race
    - File 3 completes, file 8 wins the race
    - File 4 never gets memory because files 5, 7, 8 keep winning
    
    ## Solution
    Move memory acquisition from inside parallel download tasks to the main
    sequential loop. This ensures FIFO ordering:
    
    1. Main loop acquires memory sequentially for each download
    2. Only after memory is acquired does the download task start
    3. Downloads are guaranteed to get memory in the order they were queued
    4. No starvation possible
    
    Additionally, increased default memory buffer from 100MB to 200MB to
    allow more parallel downloads without hitting memory limits.
    
    ## Changes
    - **CloudFetchDownloader.cs**:
    - Moved `AcquireMemoryAsync` from inside `DownloadFileAsync` to main
    loop (line 278-280)
      - Ensures FIFO ordering before spawning download tasks
    - **CloudFetchDownloadManager.cs**:
      - Increased `DefaultMemoryBufferSizeMB` from 100 to 200
      - Better performance for large result sets
    
    ## Testing
    - Manually verified on fast VM with Power BI Desktop
    - Existing CloudFetchDownloader tests still pass
    - FIFO ordering prevents the starvation scenario
    
    ## TODO
    The memory buffer design have a flaw that it only captures the
    compressed size, but does not cover decompressed size. There are a
    couple of options we can take but I wanna land this first since this one
    actually can get blocked for customer.
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../Reader/CloudFetch/CloudFetchDownloadManager.cs |   2 +-
 .../Reader/CloudFetch/CloudFetchDownloader.cs      |   7 +-
 .../E2E/CloudFetch/CloudFetchDownloaderTest.cs     | 153 +++++++++++++++++++++
 3 files changed, 158 insertions(+), 4 deletions(-)

diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
index 934a4af20..98e984baf 100644
--- 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
+++ 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
@@ -34,7 +34,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         // Default values
         private const int DefaultParallelDownloads = 3;
         private const int DefaultPrefetchCount = 2;
-        private const int DefaultMemoryBufferSizeMB = 100;
+        private const int DefaultMemoryBufferSizeMB = 200;
         private const bool DefaultPrefetchEnabled = true;
         private const int DefaultTimeoutMinutes = 5;
         private const int DefaultMaxUrlRefreshAttempts = 3;
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index 41715947c..f7bc3b201 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -275,6 +275,10 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                         // Acquire a download slot
                         await 
_downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 
+                        // Acquire memory for this download (FIFO - acquired 
in sequential loop)
+                        long size = downloadResult.Size;
+                        await _memoryManager.AcquireMemoryAsync(size, 
cancellationToken).ConfigureAwait(false);
+
                         // Start the download task
                         Task downloadTask = DownloadFileAsync(downloadResult, 
cancellationToken)
                             .ContinueWith(t =>
@@ -386,9 +390,6 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                     new("expected_size_kb", size / 1024.0)
             ]);
 
-                // Acquire memory before downloading
-                await _memoryManager.AcquireMemoryAsync(size, 
cancellationToken).ConfigureAwait(false);
-
                 // Retry logic for downloading files
                 for (int retry = 0; retry < _maxRetries; retry++)
                 {
diff --git 
a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs 
b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
index 67c8367dd..7511192e4 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Concurrent;
+using System.Collections.Generic;
 using System.IO;
 using System.Net;
 using System.Net.Http;
@@ -25,6 +26,7 @@ using System.Threading;
 using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
 using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch;
+using Apache.Arrow.Adbc.Tracing;
 using Apache.Hive.Service.Rpc.Thrift;
 using Moq;
 using Moq.Protected;
@@ -65,6 +67,11 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
                         ExpiryTime = 
DateTimeOffset.UtcNow.AddMinutes(30).ToUnixTimeMilliseconds()
                     };
                 });
+
+            // Set up activity tracing - CloudFetchDownloader implements 
IActivityTracer
+            // and delegates to _statement.Trace and _statement.TraceParent 
(CloudFetchDownloader.cs:605-607)
+            _mockStatement.Setup(s => s.Trace).Returns(new ActivityTrace());
+            _mockStatement.Setup(s => s.TraceParent).Returns((string?)null);
         }
 
         [Fact]
@@ -628,6 +635,152 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
             await downloader.StopAsync();
         }
 
+        [Fact]
+        public async Task Downloader_WithFIFO_ProcessesFilesInOrder()
+        {
+            // This test verifies FIFO (First-In-First-Out) ordering behavior 
when memory is limited:
+            // 1. maxParallelDownloads=3 allows up to 3 concurrent downloads
+            // 2. Memory acquisition blocks until enough memory is available 
(CloudFetchDownloader.cs:280)
+            // 3. When memory is limited, files wait and are processed in FIFO 
order
+            // 4. Results are added to result queue immediately after memory 
acquisition (CloudFetchDownloader.cs:324)
+            //
+            // This test verifies that file 3 does NOT start downloading 
before file 2 by checking
+            // the actual order downloads are initiated and items appear in 
resultQueue.
+
+            // Track download start order (when HTTP request is actually made)
+            var downloadStartOrder = new List<long>();
+            var downloadStartLock = new object();
+
+            byte[] testContent = new byte[10];
+            var mockHttpHandler = new Mock<HttpMessageHandler>();
+            mockHttpHandler.Protected()
+                .Setup<Task<HttpResponseMessage>>(
+                    "SendAsync",
+                    ItExpr.IsAny<HttpRequestMessage>(),
+                    ItExpr.IsAny<CancellationToken>())
+                .ReturnsAsync((HttpRequestMessage request, CancellationToken 
token) =>
+                {
+                    // Track when download actually starts
+                    var url = request.RequestUri?.ToString() ?? "";
+                    long size = url.Contains("test1") ? 100 : 
url.Contains("test2") ? 200 : 300;
+                    lock (downloadStartLock)
+                    {
+                        downloadStartOrder.Add(size);
+                    }
+                    // Simulate download time
+                    Thread.Sleep(20);
+                    return new HttpResponseMessage
+                    {
+                        StatusCode = HttpStatusCode.OK,
+                        Content = new ByteArrayContent(testContent)
+                    };
+                });
+
+            var httpClient = new HttpClient(mockHttpHandler.Object);
+
+            // Track memory acquisition order
+            var memoryAcquisitionOrder = new List<long>();
+            var memoryLock = new SemaphoreSlim(1, 1); // Only allow one memory 
acquisition at a time
+
+            // Mock memory manager to control memory availability
+            var mockMemoryManager = new Mock<ICloudFetchMemoryBufferManager>();
+            mockMemoryManager.Setup(m => 
m.AcquireMemoryAsync(It.IsAny<long>(), It.IsAny<CancellationToken>()))
+                .Returns(async (long size, CancellationToken token) =>
+                {
+                    // Acquire lock to ensure only one file gets memory at a 
time
+                    await memoryLock.WaitAsync(token);
+                    memoryAcquisitionOrder.Add(size);
+                    // Add small delay to simulate memory acquisition
+                    await Task.Delay(10, token);
+                    memoryLock.Release();
+                });
+
+            // Create three download results with different sizes to verify 
ordering
+            var downloadResult1 = CreateMockDownloadResult(100, 
"http://test1.com";).Object;
+            var downloadResult2 = CreateMockDownloadResult(200, 
"http://test2.com";).Object;
+            var downloadResult3 = CreateMockDownloadResult(300, 
"http://test3.com";).Object;
+
+            // Create downloader with maxParallelDownloads=3
+            var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
+                _downloadQueue,
+                _resultQueue,
+                mockMemoryManager.Object,
+                httpClient,
+                _mockResultFetcher.Object,
+                3, // Allow up to 3 parallel downloads
+                false,
+                1,
+                10);
+
+            await downloader.StartAsync(CancellationToken.None);
+
+            // Add all three downloads to the queue
+            _downloadQueue.Add(downloadResult1);
+            _downloadQueue.Add(downloadResult2);
+            _downloadQueue.Add(downloadResult3);
+            _downloadQueue.Add(EndOfResultsGuard.Instance);
+
+            // Wait for all downloads to complete
+            // Each download takes: 10ms (memory acquisition) + 20ms (HTTP 
download) = 30ms
+            // Sequential: 3 * 30ms = 90ms minimum, add buffer for test 
overhead
+            await Task.Delay(300);
+
+            // Verify items appear in resultQueue in FIFO order
+            // Note: resultQueue is populated at CloudFetchDownloader.cs:324 
immediately after memory acquisition
+            var queueItems = new List<long>();
+            foreach (var item in _resultQueue)
+            {
+                if (item != EndOfResultsGuard.Instance)
+                {
+                    queueItems.Add(item.Size);
+                }
+            }
+
+            // Verify memory was acquired in FIFO order
+            Assert.Equal(3, memoryAcquisitionOrder.Count);
+            Assert.Equal(100, memoryAcquisitionOrder[0]);
+            Assert.Equal(200, memoryAcquisitionOrder[1]);
+            Assert.Equal(300, memoryAcquisitionOrder[2]);
+
+            // Verify downloads started in FIFO order (file 3 did NOT start 
before file 2)
+            Assert.Equal(3, downloadStartOrder.Count);
+            Assert.Equal(100, downloadStartOrder[0]);
+            Assert.Equal(200, downloadStartOrder[1]);
+            Assert.Equal(300, downloadStartOrder[2]);
+
+            // Verify items appeared in resultQueue in FIFO order
+            Assert.Equal(3, queueItems.Count);
+            Assert.Equal(100, queueItems[0]);
+            Assert.Equal(200, queueItems[1]);
+            Assert.Equal(300, queueItems[2]);
+
+            await downloader.StopAsync();
+            Assert.False(downloader.HasError);
+        }
+
+        private Mock<IDownloadResult> CreateMockDownloadResult(long size, 
string url)
+        {
+            var link = new TSparkArrowResultLink
+            {
+                FileLink = url,
+                BytesNum = size,
+                ExpiryTime = 
DateTimeOffset.UtcNow.AddHours(1).ToUnixTimeMilliseconds()
+            };
+
+            var mock = new Mock<IDownloadResult>();
+            mock.Setup(r => r.Link).Returns(link);
+            mock.Setup(r => r.Size).Returns(size);
+            mock.Setup(r => r.RefreshAttempts).Returns(0);
+            mock.Setup(r => 
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
+            mock.Setup(r => r.SetCompleted(It.IsAny<Stream>(), 
It.IsAny<long>()))
+                .Callback<Stream, long>((stream, size) =>
+                {
+                    // Capture the stream but don't need to do anything with 
it for this test
+                });
+            return mock;
+        }
+
         private static Mock<HttpMessageHandler> CreateMockHttpMessageHandler(
             byte[]? content,
             HttpStatusCode statusCode = HttpStatusCode.OK,

Reply via email to