This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 87c49e937 fix(csharp/src/Drivers/Databricks): Implement FIFO memory
acquisition to prevent starvation in CloudFetch (#3756)
87c49e937 is described below
commit 87c49e9377081a1f6c04196e2adbfead6d21a542
Author: eric-wang-1990 <[email protected]>
AuthorDate: Tue Dec 2 13:04:24 2025 -0800
fix(csharp/src/Drivers/Databricks): Implement FIFO memory acquisition to
prevent starvation in CloudFetch (#3756)
Fixes memory starvation issue in CloudFetch download pipeline where
newer downloads could win memory allocation over older waiting
downloads, causing indefinite delays.
## Problem
The CloudFetch downloader was acquiring memory inside parallel download
tasks with a polling-based wait (10ms intervals). This non-FIFO behavior
caused memory starvation:
1. Multiple download tasks acquire semaphore slots and start polling for
memory
2. All tasks wake up simultaneously every 10ms to check memory
availability
3. When memory becomes available, any waiting task could acquire it
(non-deterministic)
4. Newer downloads (e.g., file 5, 7, 8) could repeatedly win the race
over older downloads (e.g., file 4)
5. File 4 never gets memory despite waiting the longest → **indefinite
starvation**
Example scenario with 200MB memory, 50MB files, 3 parallel downloads:
- Files 1, 2, 3 download (150MB used)
- File 1 completes and releases 50MB
- Files 4, 5, 6 are all waiting for memory
- File 5 wins the race and acquires the 50MB
- File 2 completes, file 7 wins the race
- File 3 completes, file 8 wins the race
- File 4 never gets memory because files 5, 7, 8 keep winning
## Solution
Move memory acquisition from inside parallel download tasks to the main
sequential loop. This ensures FIFO ordering:
1. Main loop acquires memory sequentially for each download
2. Only after memory is acquired does the download task start
3. Downloads are guaranteed to get memory in the order they were queued
4. No starvation possible
Additionally, increased default memory buffer from 100MB to 200MB to
allow more parallel downloads without hitting memory limits.
## Changes
- **CloudFetchDownloader.cs**:
- Moved `AcquireMemoryAsync` from inside `DownloadFileAsync` to main
loop (line 278-280)
- Ensures FIFO ordering before spawning download tasks
- **CloudFetchDownloadManager.cs**:
- Increased `DefaultMemoryBufferSizeMB` from 100 to 200
- Better performance for large result sets
## Testing
- Manually verified on fast VM with Power BI Desktop
- Existing CloudFetchDownloader tests still pass
- FIFO ordering prevents the starvation scenario
## TODO
The memory buffer design have a flaw that it only captures the
compressed size, but does not cover decompressed size. There are a
couple of options we can take but I wanna land this first since this one
actually can get blocked for customer.
---------
Co-authored-by: Claude <[email protected]>
---
.../Reader/CloudFetch/CloudFetchDownloadManager.cs | 2 +-
.../Reader/CloudFetch/CloudFetchDownloader.cs | 7 +-
.../E2E/CloudFetch/CloudFetchDownloaderTest.cs | 153 +++++++++++++++++++++
3 files changed, 158 insertions(+), 4 deletions(-)
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
index 934a4af20..98e984baf 100644
---
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
+++
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
@@ -34,7 +34,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
// Default values
private const int DefaultParallelDownloads = 3;
private const int DefaultPrefetchCount = 2;
- private const int DefaultMemoryBufferSizeMB = 100;
+ private const int DefaultMemoryBufferSizeMB = 200;
private const bool DefaultPrefetchEnabled = true;
private const int DefaultTimeoutMinutes = 5;
private const int DefaultMaxUrlRefreshAttempts = 3;
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index 41715947c..f7bc3b201 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -275,6 +275,10 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
// Acquire a download slot
await
_downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
+ // Acquire memory for this download (FIFO - acquired
in sequential loop)
+ long size = downloadResult.Size;
+ await _memoryManager.AcquireMemoryAsync(size,
cancellationToken).ConfigureAwait(false);
+
// Start the download task
Task downloadTask = DownloadFileAsync(downloadResult,
cancellationToken)
.ContinueWith(t =>
@@ -386,9 +390,6 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
new("expected_size_kb", size / 1024.0)
]);
- // Acquire memory before downloading
- await _memoryManager.AcquireMemoryAsync(size,
cancellationToken).ConfigureAwait(false);
-
// Retry logic for downloading files
for (int retry = 0; retry < _maxRetries; retry++)
{
diff --git
a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
index 67c8367dd..7511192e4 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Concurrent;
+using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
@@ -25,6 +26,7 @@ using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Hive.Service.Rpc.Thrift;
using Moq;
using Moq.Protected;
@@ -65,6 +67,11 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
ExpiryTime =
DateTimeOffset.UtcNow.AddMinutes(30).ToUnixTimeMilliseconds()
};
});
+
+ // Set up activity tracing - CloudFetchDownloader implements
IActivityTracer
+ // and delegates to _statement.Trace and _statement.TraceParent
(CloudFetchDownloader.cs:605-607)
+ _mockStatement.Setup(s => s.Trace).Returns(new ActivityTrace());
+ _mockStatement.Setup(s => s.TraceParent).Returns((string?)null);
}
[Fact]
@@ -628,6 +635,152 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
await downloader.StopAsync();
}
+ [Fact]
+ public async Task Downloader_WithFIFO_ProcessesFilesInOrder()
+ {
+ // This test verifies FIFO (First-In-First-Out) ordering behavior
when memory is limited:
+ // 1. maxParallelDownloads=3 allows up to 3 concurrent downloads
+ // 2. Memory acquisition blocks until enough memory is available
(CloudFetchDownloader.cs:280)
+ // 3. When memory is limited, files wait and are processed in FIFO
order
+ // 4. Results are added to result queue immediately after memory
acquisition (CloudFetchDownloader.cs:324)
+ //
+ // This test verifies that file 3 does NOT start downloading
before file 2 by checking
+ // the actual order downloads are initiated and items appear in
resultQueue.
+
+ // Track download start order (when HTTP request is actually made)
+ var downloadStartOrder = new List<long>();
+ var downloadStartLock = new object();
+
+ byte[] testContent = new byte[10];
+ var mockHttpHandler = new Mock<HttpMessageHandler>();
+ mockHttpHandler.Protected()
+ .Setup<Task<HttpResponseMessage>>(
+ "SendAsync",
+ ItExpr.IsAny<HttpRequestMessage>(),
+ ItExpr.IsAny<CancellationToken>())
+ .ReturnsAsync((HttpRequestMessage request, CancellationToken
token) =>
+ {
+ // Track when download actually starts
+ var url = request.RequestUri?.ToString() ?? "";
+ long size = url.Contains("test1") ? 100 :
url.Contains("test2") ? 200 : 300;
+ lock (downloadStartLock)
+ {
+ downloadStartOrder.Add(size);
+ }
+ // Simulate download time
+ Thread.Sleep(20);
+ return new HttpResponseMessage
+ {
+ StatusCode = HttpStatusCode.OK,
+ Content = new ByteArrayContent(testContent)
+ };
+ });
+
+ var httpClient = new HttpClient(mockHttpHandler.Object);
+
+ // Track memory acquisition order
+ var memoryAcquisitionOrder = new List<long>();
+ var memoryLock = new SemaphoreSlim(1, 1); // Only allow one memory
acquisition at a time
+
+ // Mock memory manager to control memory availability
+ var mockMemoryManager = new Mock<ICloudFetchMemoryBufferManager>();
+ mockMemoryManager.Setup(m =>
m.AcquireMemoryAsync(It.IsAny<long>(), It.IsAny<CancellationToken>()))
+ .Returns(async (long size, CancellationToken token) =>
+ {
+ // Acquire lock to ensure only one file gets memory at a
time
+ await memoryLock.WaitAsync(token);
+ memoryAcquisitionOrder.Add(size);
+ // Add small delay to simulate memory acquisition
+ await Task.Delay(10, token);
+ memoryLock.Release();
+ });
+
+ // Create three download results with different sizes to verify
ordering
+ var downloadResult1 = CreateMockDownloadResult(100,
"http://test1.com").Object;
+ var downloadResult2 = CreateMockDownloadResult(200,
"http://test2.com").Object;
+ var downloadResult3 = CreateMockDownloadResult(300,
"http://test3.com").Object;
+
+ // Create downloader with maxParallelDownloads=3
+ var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
+ _downloadQueue,
+ _resultQueue,
+ mockMemoryManager.Object,
+ httpClient,
+ _mockResultFetcher.Object,
+ 3, // Allow up to 3 parallel downloads
+ false,
+ 1,
+ 10);
+
+ await downloader.StartAsync(CancellationToken.None);
+
+ // Add all three downloads to the queue
+ _downloadQueue.Add(downloadResult1);
+ _downloadQueue.Add(downloadResult2);
+ _downloadQueue.Add(downloadResult3);
+ _downloadQueue.Add(EndOfResultsGuard.Instance);
+
+ // Wait for all downloads to complete
+ // Each download takes: 10ms (memory acquisition) + 20ms (HTTP
download) = 30ms
+ // Sequential: 3 * 30ms = 90ms minimum, add buffer for test
overhead
+ await Task.Delay(300);
+
+ // Verify items appear in resultQueue in FIFO order
+ // Note: resultQueue is populated at CloudFetchDownloader.cs:324
immediately after memory acquisition
+ var queueItems = new List<long>();
+ foreach (var item in _resultQueue)
+ {
+ if (item != EndOfResultsGuard.Instance)
+ {
+ queueItems.Add(item.Size);
+ }
+ }
+
+ // Verify memory was acquired in FIFO order
+ Assert.Equal(3, memoryAcquisitionOrder.Count);
+ Assert.Equal(100, memoryAcquisitionOrder[0]);
+ Assert.Equal(200, memoryAcquisitionOrder[1]);
+ Assert.Equal(300, memoryAcquisitionOrder[2]);
+
+ // Verify downloads started in FIFO order (file 3 did NOT start
before file 2)
+ Assert.Equal(3, downloadStartOrder.Count);
+ Assert.Equal(100, downloadStartOrder[0]);
+ Assert.Equal(200, downloadStartOrder[1]);
+ Assert.Equal(300, downloadStartOrder[2]);
+
+ // Verify items appeared in resultQueue in FIFO order
+ Assert.Equal(3, queueItems.Count);
+ Assert.Equal(100, queueItems[0]);
+ Assert.Equal(200, queueItems[1]);
+ Assert.Equal(300, queueItems[2]);
+
+ await downloader.StopAsync();
+ Assert.False(downloader.HasError);
+ }
+
+ private Mock<IDownloadResult> CreateMockDownloadResult(long size,
string url)
+ {
+ var link = new TSparkArrowResultLink
+ {
+ FileLink = url,
+ BytesNum = size,
+ ExpiryTime =
DateTimeOffset.UtcNow.AddHours(1).ToUnixTimeMilliseconds()
+ };
+
+ var mock = new Mock<IDownloadResult>();
+ mock.Setup(r => r.Link).Returns(link);
+ mock.Setup(r => r.Size).Returns(size);
+ mock.Setup(r => r.RefreshAttempts).Returns(0);
+ mock.Setup(r =>
r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
+ mock.Setup(r => r.SetCompleted(It.IsAny<Stream>(),
It.IsAny<long>()))
+ .Callback<Stream, long>((stream, size) =>
+ {
+ // Capture the stream but don't need to do anything with
it for this test
+ });
+ return mock;
+ }
+
private static Mock<HttpMessageHandler> CreateMockHttpMessageHandler(
byte[]? content,
HttpStatusCode statusCode = HttpStatusCode.OK,