This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 6fa0db2b2 feat(csharp/src/Drivers/Databricks): Add Activity-based
distributed tracing to CloudFetch pipeline (#3580)
6fa0db2b2 is described below
commit 6fa0db2b2e5b0d263589b419bd4ec0de8ccd7723
Author: eric-wang-1990 <[email protected]>
AuthorDate: Mon Oct 20 08:23:24 2025 -0700
feat(csharp/src/Drivers/Databricks): Add Activity-based distributed tracing
to CloudFetch pipeline (#3580)
## Summary
This PR implements comprehensive Activity-based distributed tracing for
the CloudFetch download pipeline in the Databricks C# driver, enabling
real-time monitoring, structured logging, and improved observability.
### Key Changes:
- Add Activity-based tracing to CloudFetchDownloader with
TraceActivityAsync
- Create child Activities per individual file download for real-time
progress visibility
- Replace all Trace.TraceInformation/Error calls with Activity.AddEvent
for structured logging
- Add Activity tags for searchable metadata (offset, URL, file sizes)
- Implement proper Activity context flow through async/await chains
- Update CloudFetchDownloadManager to pass statement for tracing context
- Fix all tests to include statement parameter in CloudFetchDownloader
constructor
### Architecture:
The implementation follows a hierarchical Activity structure:
```
Statement Activity (parent)
├─ DownloadFilesAsync Activity (overall batch)
│ ├─ DownloadFile Activity (file 1) - flushes when complete
│ ├─ DownloadFile Activity (file 2) - flushes when complete
│ └─ ...
└─ ReadNextRecordBatchAsync Activity (reader operations)
```
### Benefits:
- **Real-time progress monitoring**: Events flush immediately as each
file completes (not batched)
- **Better fault tolerance**: Completed downloads are logged before
process crashes
- **Improved debuggability**: Searchable Activity tags enable filtering
by offset, URL, size
- **Granular metrics**: Per-file download times, throughput, compression
ratios visible in logs
- **OpenTelemetry-compatible**: Activities follow
System.Diagnostics.Activity standard
### Events Logged:
- `cloudfetch.download_start` - File download initiated
- `cloudfetch.content_length` - Actual file size from HTTP response
- `cloudfetch.download_retry` - Retry attempt with reason
- `cloudfetch.url_refreshed_before_download` - URL refreshed proactively
- `cloudfetch.url_refreshed_after_auth_error` - URL refreshed after
401/403
- `cloudfetch.decompression_complete` - LZ4 decompression metrics
- `cloudfetch.download_complete` - Download success with throughput
- `cloudfetch.download_failed_all_retries` - Final failure after all
retries
- `cloudfetch.download_summary` - Overall batch statistics
## Test Plan
- ✅ All existing CloudFetchDownloader E2E tests pass (7 test methods)
- ✅ Build succeeds with 0 warnings
- Manual testing: Query with CloudFetch enabled and verify Activity
events in logs
- Verified Activity context flows correctly through async/await chains
- Confirmed child Activities flush independently upon completion
`{"Status":"Ok","HasRemoteParent":false,"Kind":"Client","OperationName":"DownloadFile","Duration":"00:00:00.5952467","StartTimeUtc":"2025-10-16T15:00:48.1657713Z","Id":"00-dc2baa073e36e8feab91170cb360e2f1-b71e0296e60abf8b-01","ParentId":"00-dc2baa073e36e8feab91170cb360e2f1-06801209b222d0e9-01","RootId":"dc2baa073e36e8feab91170cb360e2f1","TraceStateString":null,"SpanId":"b71e0296e60abf8b","TraceId":"dc2baa073e36e8feab91170cb360e2f1","Recorded":true,"IsAllDataRequested":true,"ActivityTr
[...]
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude <[email protected]>
---
.../Reader/CloudFetch/CloudFetchDownloadManager.cs | 2 +
.../Reader/CloudFetch/CloudFetchDownloader.cs | 548 ++++++++++++---------
.../Reader/CloudFetch/CloudFetchReader.cs | 1 +
.../Reader/CloudFetch/CloudFetchResultFetcher.cs | 8 +-
.../Reader/CloudFetch/ICloudFetchInterfaces.cs | 1 -
.../E2E/CloudFetch/CloudFetchDownloaderTest.cs | 7 +
6 files changed, 333 insertions(+), 234 deletions(-)
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
index 97b9fa968..98e984baf 100644
---
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
+++
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Concurrent;
+using System.Diagnostics;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
@@ -207,6 +208,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
// Initialize the downloader
_downloader = new CloudFetchDownloader(
+ _statement,
_downloadQueue,
_resultQueue,
_memoryManager,
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index ba65cf3cf..f4f3e5d01 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -22,6 +22,8 @@ using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
using K4os.Compression.LZ4.Streams;
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
@@ -29,8 +31,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// <summary>
/// Downloads files from URLs.
/// </summary>
- internal sealed class CloudFetchDownloader : ICloudFetchDownloader
+ internal sealed class CloudFetchDownloader : ICloudFetchDownloader,
IActivityTracer
{
+ private readonly ITracingStatement _statement;
private readonly BlockingCollection<IDownloadResult> _downloadQueue;
private readonly BlockingCollection<IDownloadResult> _resultQueue;
private readonly ICloudFetchMemoryBufferManager _memoryManager;
@@ -52,6 +55,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// <summary>
/// Initializes a new instance of the <see
cref="CloudFetchDownloader"/> class.
/// </summary>
+ /// <param name="statement">The tracing statement for Activity
context.</param>
/// <param name="downloadQueue">The queue of downloads to
process.</param>
/// <param name="resultQueue">The queue to add completed downloads
to.</param>
/// <param name="memoryManager">The memory buffer manager.</param>
@@ -64,6 +68,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
/// <param name="maxUrlRefreshAttempts">The maximum number of URL
refresh attempts.</param>
/// <param name="urlExpirationBufferSeconds">Buffer time in seconds
before URL expiration to trigger refresh.</param>
public CloudFetchDownloader(
+ ITracingStatement statement,
BlockingCollection<IDownloadResult> downloadQueue,
BlockingCollection<IDownloadResult> resultQueue,
ICloudFetchMemoryBufferManager memoryManager,
@@ -76,6 +81,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
int maxUrlRefreshAttempts = 3,
int urlExpirationBufferSeconds = 60)
{
+ _statement = statement ?? throw new
ArgumentNullException(nameof(statement));
_downloadQueue = downloadQueue ?? throw new
ArgumentNullException(nameof(downloadQueue));
_resultQueue = resultQueue ?? throw new
ArgumentNullException(nameof(resultQueue));
_memoryManager = memoryManager ?? throw new
ArgumentNullException(nameof(memoryManager));
@@ -194,299 +200,370 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
private async Task DownloadFilesAsync(CancellationToken
cancellationToken)
{
- await Task.Yield();
-
- int totalFiles = 0;
- int successfulDownloads = 0;
- int failedDownloads = 0;
- long totalBytes = 0;
- var overallStopwatch = Stopwatch.StartNew();
-
- try
+ await this.TraceActivityAsync(async activity =>
{
- // Keep track of active download tasks
- var downloadTasks = new ConcurrentDictionary<Task,
IDownloadResult>();
- var downloadTaskCompletionSource = new
TaskCompletionSource<bool>();
+ await Task.Yield();
- // Process items from the download queue until it's completed
- foreach (var downloadResult in
_downloadQueue.GetConsumingEnumerable(cancellationToken))
+ int totalFiles = 0;
+ int successfulDownloads = 0;
+ int failedDownloads = 0;
+ long totalBytes = 0;
+ var overallStopwatch = Stopwatch.StartNew();
+
+ try
{
- totalFiles++;
+ // Keep track of active download tasks
+ var downloadTasks = new ConcurrentDictionary<Task,
IDownloadResult>();
+ var downloadTaskCompletionSource = new
TaskCompletionSource<bool>();
- // Check if there's an error before processing more
downloads
- if (HasError)
+ // Process items from the download queue until it's
completed
+ foreach (var downloadResult in
_downloadQueue.GetConsumingEnumerable(cancellationToken))
{
- // Add the failed download result to the queue to
signal the error
- // This will be caught by GetNextDownloadedFileAsync
- break;
- }
+ // Check if there's an error before processing more
downloads
+ if (HasError)
+ {
+ // Add the failed download result to the queue to
signal the error
+ // This will be caught by
GetNextDownloadedFileAsync
+ break;
+ }
- // Check if this is the end of results guard
- if (downloadResult == EndOfResultsGuard.Instance)
- {
- // Wait for all active downloads to complete
- if (downloadTasks.Count > 0)
+ // Check if this is the end of results guard
+ if (downloadResult == EndOfResultsGuard.Instance)
{
- try
+ // Wait for all active downloads to complete
+ if (downloadTasks.Count > 0)
{
- await
Task.WhenAll(downloadTasks.Keys).ConfigureAwait(false);
+ try
+ {
+ await
Task.WhenAll(downloadTasks.Keys).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ activity?.AddException(ex,
[new("error.context", "cloudfetch.wait_for_downloads")]);
+ // Don't set error here, as individual
download tasks will handle their own errors
+ }
}
- catch (Exception ex)
+
+ // Only add the guard if there's no error
+ if (!HasError)
{
- Trace.TraceWarning($"Error waiting for
downloads to complete: {ex.Message}");
- // Don't set error here, as individual
download tasks will handle their own errors
+ // Add the guard to the result queue to signal
the end of results
+ _resultQueue.Add(EndOfResultsGuard.Instance,
cancellationToken);
+ _isCompleted = true;
}
+ break;
}
- // Only add the guard if there's no error
- if (!HasError)
- {
- // Add the guard to the result queue to signal the
end of results
- _resultQueue.Add(EndOfResultsGuard.Instance,
cancellationToken);
- _isCompleted = true;
- }
- break;
- }
-
- // Check if the URL is expired or about to expire
- if
(downloadResult.IsExpiredOrExpiringSoon(_urlExpirationBufferSeconds))
- {
- // Get a refreshed URL before starting the download
- var refreshedLink = await
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset,
cancellationToken);
- if (refreshedLink != null)
- {
- // Update the download result with the refreshed
link
-
downloadResult.UpdateWithRefreshedLink(refreshedLink);
- Trace.TraceInformation($"Updated URL for file at
offset {refreshedLink.StartRowOffset} before download");
- }
- }
-
- // Acquire a download slot
- await
_downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
+ // This is a real file, count it
+ totalFiles++;
- // Start the download task
- Task downloadTask = DownloadFileAsync(downloadResult,
cancellationToken)
- .ContinueWith(t =>
+ // Check if the URL is expired or about to expire
+ if
(downloadResult.IsExpiredOrExpiringSoon(_urlExpirationBufferSeconds))
{
- // Release the download slot
- _downloadSemaphore.Release();
-
- // Remove the task from the dictionary
- downloadTasks.TryRemove(t, out _);
-
- // Handle any exceptions
- if (t.IsFaulted)
- {
- Exception ex = t.Exception?.InnerException ??
new Exception("Unknown error");
- Trace.TraceError($"Download failed for file
{SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}");
-
- // Set the download as failed
- downloadResult.SetFailed(ex);
- failedDownloads++;
-
- // Set the error state to stop the download
process
- SetError(ex);
-
- // Signal that we should stop processing
downloads
-
downloadTaskCompletionSource.TrySetException(ex);
- }
- else if (!t.IsFaulted && !t.IsCanceled)
+ // Get a refreshed URL before starting the download
+ var refreshedLink = await
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset,
cancellationToken);
+ if (refreshedLink != null)
{
- successfulDownloads++;
- totalBytes += downloadResult.Size;
+ // Update the download result with the
refreshed link
+
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+
activity?.AddEvent("cloudfetch.url_refreshed_before_download", [
+ new("offset", refreshedLink.StartRowOffset)
+ ]);
}
- }, cancellationToken);
-
- // Add the task to the dictionary
- downloadTasks[downloadTask] = downloadResult;
+ }
- // Add the result to the result queue add the result here
to assure the download sequence.
- _resultQueue.Add(downloadResult, cancellationToken);
+ // Acquire a download slot
+ await
_downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
- // If there's an error, stop processing more downloads
- if (HasError)
- {
- break;
+ // Start the download task
+ Task downloadTask = DownloadFileAsync(downloadResult,
cancellationToken)
+ .ContinueWith(t =>
+ {
+ // Release the download slot
+ _downloadSemaphore.Release();
+
+ // Remove the task from the dictionary
+ downloadTasks.TryRemove(t, out _);
+
+ // Handle any exceptions
+ if (t.IsFaulted)
+ {
+ Exception ex = t.Exception?.InnerException
?? new Exception("Unknown error");
+ string sanitizedUrl =
SanitizeUrl(downloadResult.Link.FileLink);
+ activity?.AddException(ex, [
+ new("error.context",
"cloudfetch.download_failed"),
+ new("offset",
downloadResult.Link.StartRowOffset),
+ new("sanitized_url", sanitizedUrl)
+ ]);
+
+ // Set the download as failed
+ downloadResult.SetFailed(ex);
+ failedDownloads++;
+
+ // Set the error state to stop the
download process
+ SetError(ex, activity);
+
+ // Signal that we should stop processing
downloads
+
downloadTaskCompletionSource.TrySetException(ex);
+ }
+ else if (!t.IsFaulted && !t.IsCanceled)
+ {
+ successfulDownloads++;
+ totalBytes += downloadResult.Size;
+ }
+ }, cancellationToken);
+
+ // Add the task to the dictionary
+ downloadTasks[downloadTask] = downloadResult;
+
+ // Add the result to the result queue add the result
here to assure the download sequence.
+ _resultQueue.Add(downloadResult, cancellationToken);
+
+ // If there's an error, stop processing more downloads
+ if (HasError)
+ {
+ break;
+ }
}
}
- }
- catch (OperationCanceledException) when
(cancellationToken.IsCancellationRequested)
- {
- // Expected when cancellation is requested
- Trace.TraceInformation("Download process was cancelled");
- }
- catch (Exception ex)
- {
- Trace.TraceError($"Error in download loop: {ex.Message}");
- SetError(ex);
- }
- finally
- {
- overallStopwatch.Stop();
-
- Trace.TraceInformation(
- $"Download process completed. Total files: {totalFiles},
Successful: {successfulDownloads}, " +
- $"Failed: {failedDownloads}, Total size: {totalBytes /
1024.0 / 1024.0:F2} MB, Total time: {overallStopwatch.ElapsedMilliseconds /
1000.0:F2} sec");
-
- // If there's an error, add the error to the result queue
- if (HasError)
+ catch (OperationCanceledException) when
(cancellationToken.IsCancellationRequested)
{
- CompleteWithError();
+ // Expected when cancellation is requested
+ activity?.AddEvent("cloudfetch.download_cancelled");
}
- }
+ catch (Exception ex)
+ {
+ activity?.AddException(ex, [new("error.context",
"cloudfetch.download_loop")]);
+ SetError(ex, activity);
+ }
+ finally
+ {
+ overallStopwatch.Stop();
+
+ activity?.AddEvent("cloudfetch.download_summary", [
+ new("total_files", totalFiles),
+ new("successful_downloads", successfulDownloads),
+ new("failed_downloads", failedDownloads),
+ new("total_bytes", totalBytes),
+ new("total_mb", totalBytes / 1024.0 / 1024.0),
+ new("total_time_ms",
overallStopwatch.ElapsedMilliseconds),
+ new("total_time_sec",
overallStopwatch.ElapsedMilliseconds / 1000.0)
+ ]);
+
+ // If there's an error, add the error to the result queue
+ if (HasError)
+ {
+ CompleteWithError(activity);
+ }
+ }
+ });
}
private async Task DownloadFileAsync(IDownloadResult downloadResult,
CancellationToken cancellationToken)
{
- string url = downloadResult.Link.FileLink;
- string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
- byte[]? fileData = null;
-
- // Use the size directly from the download result
- long size = downloadResult.Size;
-
- // Create a stopwatch to track download time
- var stopwatch = Stopwatch.StartNew();
-
- // Log download start
- Trace.TraceInformation($"Starting download of file {sanitizedUrl},
expected size: {size / 1024.0:F2} KB");
-
- // Acquire memory before downloading
- await _memoryManager.AcquireMemoryAsync(size,
cancellationToken).ConfigureAwait(false);
-
- // Retry logic for downloading files
- for (int retry = 0; retry < _maxRetries; retry++)
+ await this.TraceActivityAsync(async activity =>
{
- try
+ string url = downloadResult.Link.FileLink;
+ string sanitizedUrl =
SanitizeUrl(downloadResult.Link.FileLink);
+ byte[]? fileData = null;
+
+ // Use the size directly from the download result
+ long size = downloadResult.Size;
+
+ // Add tags to the Activity for filtering/searching
+ activity?.SetTag("cloudfetch.offset",
downloadResult.Link.StartRowOffset);
+ activity?.SetTag("cloudfetch.sanitized_url", sanitizedUrl);
+ activity?.SetTag("cloudfetch.expected_size_bytes", size);
+
+ // Create a stopwatch to track download time
+ var stopwatch = Stopwatch.StartNew();
+
+ // Log download start
+ activity?.AddEvent("cloudfetch.download_start", [
+ new("offset", downloadResult.Link.StartRowOffset),
+ new("sanitized_url", sanitizedUrl),
+ new("expected_size_bytes", size),
+ new("expected_size_kb", size / 1024.0)
+ ]);
+
+ // Acquire memory before downloading
+ await _memoryManager.AcquireMemoryAsync(size,
cancellationToken).ConfigureAwait(false);
+
+ // Retry logic for downloading files
+ for (int retry = 0; retry < _maxRetries; retry++)
{
- // Download the file directly
- using HttpResponseMessage response = await
_httpClient.GetAsync(
- url,
- HttpCompletionOption.ResponseHeadersRead,
- cancellationToken).ConfigureAwait(false);
-
- // Check if the response indicates an expired URL
(typically 403 or 401)
- if (response.StatusCode ==
System.Net.HttpStatusCode.Forbidden ||
- response.StatusCode ==
System.Net.HttpStatusCode.Unauthorized)
+ try
{
- // If we've already tried refreshing too many times,
fail
- if (downloadResult.RefreshAttempts >=
_maxUrlRefreshAttempts)
+ // Download the file directly
+ using HttpResponseMessage response = await
_httpClient.GetAsync(
+ url,
+ HttpCompletionOption.ResponseHeadersRead,
+ cancellationToken).ConfigureAwait(false);
+
+ // Check if the response indicates an expired URL
(typically 403 or 401)
+ if (response.StatusCode ==
System.Net.HttpStatusCode.Forbidden ||
+ response.StatusCode ==
System.Net.HttpStatusCode.Unauthorized)
{
- throw new InvalidOperationException($"Failed to
download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
- }
+ // If we've already tried refreshing too many
times, fail
+ if (downloadResult.RefreshAttempts >=
_maxUrlRefreshAttempts)
+ {
+ throw new InvalidOperationException($"Failed
to download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
+ }
- // Try to refresh the URL
- var refreshedLink = await
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset,
cancellationToken);
- if (refreshedLink != null)
- {
- // Update the download result with the refreshed
link
-
downloadResult.UpdateWithRefreshedLink(refreshedLink);
- url = refreshedLink.FileLink;
- sanitizedUrl = SanitizeUrl(url);
+ // Try to refresh the URL
+ var refreshedLink = await
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset,
cancellationToken);
+ if (refreshedLink != null)
+ {
+ // Update the download result with the
refreshed link
+
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+ url = refreshedLink.FileLink;
+ sanitizedUrl = SanitizeUrl(url);
+
+
activity?.AddEvent("cloudfetch.url_refreshed_after_auth_error", [
+ new("offset",
refreshedLink.StartRowOffset),
+ new("sanitized_url", sanitizedUrl)
+ ]);
+
+ // Continue to the next retry attempt with the
refreshed URL
+ continue;
+ }
+ else
+ {
+ // If refresh failed, throw an exception
+ throw new InvalidOperationException("Failed to
refresh expired URL.");
+ }
+ }
- Trace.TraceInformation($"URL for file at offset
{refreshedLink.StartRowOffset} was refreshed after expired URL response");
+ response.EnsureSuccessStatusCode();
- // Continue to the next retry attempt with the
refreshed URL
- continue;
- }
- else
+ // Log the download size if available from response
headers
+ long? contentLength =
response.Content.Headers.ContentLength;
+ if (contentLength.HasValue && contentLength.Value > 0)
{
- // If refresh failed, throw an exception
- throw new InvalidOperationException("Failed to
refresh expired URL.");
+ activity?.AddEvent("cloudfetch.content_length", [
+ new("offset",
downloadResult.Link.StartRowOffset),
+ new("sanitized_url", sanitizedUrl),
+ new("content_length_bytes",
contentLength.Value),
+ new("content_length_mb", contentLength.Value /
1024.0 / 1024.0)
+ ]);
}
- }
-
- response.EnsureSuccessStatusCode();
- // Log the download size if available from response headers
- long? contentLength =
response.Content.Headers.ContentLength;
- if (contentLength.HasValue && contentLength.Value > 0)
+ // Read the file data
+ fileData = await
response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
+ break; // Success, exit retry loop
+ }
+ catch (Exception ex) when (retry < _maxRetries - 1 &&
!cancellationToken.IsCancellationRequested)
{
- Trace.TraceInformation($"Actual file size for
{sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
+ // Log the error and retry
+ activity?.AddException(ex, [
+ new("error.context", "cloudfetch.download_retry"),
+ new("offset", downloadResult.Link.StartRowOffset),
+ new("sanitized_url", SanitizeUrl(url)),
+ new("attempt", retry + 1),
+ new("max_retries", _maxRetries)
+ ]);
+
+ await Task.Delay(_retryDelayMs * (retry + 1),
cancellationToken).ConfigureAwait(false);
}
-
- // Read the file data
- fileData = await
response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
- break; // Success, exit retry loop
}
- catch (Exception ex) when (retry < _maxRetries - 1 &&
!cancellationToken.IsCancellationRequested)
+
+ if (fileData == null)
{
- // Log the error and retry
- Trace.TraceError($"Error downloading file
{SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
+ stopwatch.Stop();
+
activity?.AddEvent("cloudfetch.download_failed_all_retries", [
+ new("offset", downloadResult.Link.StartRowOffset),
+ new("sanitized_url", sanitizedUrl),
+ new("max_retries", _maxRetries),
+ new("elapsed_time_ms", stopwatch.ElapsedMilliseconds)
+ ]);
- await Task.Delay(_retryDelayMs * (retry + 1),
cancellationToken).ConfigureAwait(false);
+ // Release the memory we acquired
+ _memoryManager.ReleaseMemory(size);
+ throw new InvalidOperationException($"Failed to download
file from {url} after {_maxRetries} attempts.");
}
- }
- if (fileData == null)
- {
- stopwatch.Stop();
- Trace.TraceError($"Failed to download file {sanitizedUrl}
after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds}
ms");
-
- // Release the memory we acquired
- _memoryManager.ReleaseMemory(size);
- throw new InvalidOperationException($"Failed to download file
from {url} after {_maxRetries} attempts.");
- }
+ // Process the downloaded file data
+ MemoryStream dataStream;
+ long actualSize = fileData.Length;
- // Process the downloaded file data
- MemoryStream dataStream;
- long actualSize = fileData.Length;
-
- // If the data is LZ4 compressed, decompress it
- if (_isLz4Compressed)
- {
- try
+ // If the data is LZ4 compressed, decompress it
+ if (_isLz4Compressed)
{
- var decompressStopwatch = Stopwatch.StartNew();
- dataStream = new MemoryStream();
- using (var inputStream = new MemoryStream(fileData))
- using (var decompressor = LZ4Stream.Decode(inputStream))
+ try
{
- await decompressor.CopyToAsync(dataStream, 81920,
cancellationToken).ConfigureAwait(false);
+ var decompressStopwatch = Stopwatch.StartNew();
+ dataStream = new MemoryStream();
+ using (var inputStream = new MemoryStream(fileData))
+ using (var decompressor =
LZ4Stream.Decode(inputStream))
+ {
+ await decompressor.CopyToAsync(dataStream, 81920,
cancellationToken).ConfigureAwait(false);
+ }
+ dataStream.Position = 0;
+ decompressStopwatch.Stop();
+
+
activity?.AddEvent("cloudfetch.decompression_complete", [
+ new("offset", downloadResult.Link.StartRowOffset),
+ new("sanitized_url", sanitizedUrl),
+ new("decompression_time_ms",
decompressStopwatch.ElapsedMilliseconds),
+ new("compressed_size_bytes", actualSize),
+ new("compressed_size_kb", actualSize / 1024.0),
+ new("decompressed_size_bytes", dataStream.Length),
+ new("decompressed_size_kb", dataStream.Length /
1024.0),
+ new("compression_ratio", (double)dataStream.Length
/ actualSize)
+ ]);
+
+ actualSize = dataStream.Length;
+ }
+ catch (Exception ex)
+ {
+ stopwatch.Stop();
+ activity?.AddException(ex, [
+ new("error.context", "cloudfetch.decompression"),
+ new("offset", downloadResult.Link.StartRowOffset),
+ new("sanitized_url", sanitizedUrl),
+ new("elapsed_time_ms",
stopwatch.ElapsedMilliseconds)
+ ]);
+
+ // Release the memory we acquired
+ _memoryManager.ReleaseMemory(size);
+ throw new InvalidOperationException($"Error
decompressing data: {ex.Message}", ex);
}
- dataStream.Position = 0;
- decompressStopwatch.Stop();
-
- Trace.TraceInformation($"Decompressed file {sanitizedUrl}
in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {actualSize /
1024.0:F2} KB, Decompressed size: {dataStream.Length / 1024.0:F2} KB");
-
- actualSize = dataStream.Length;
}
- catch (Exception ex)
+ else
{
- stopwatch.Stop();
- Trace.TraceError($"Error decompressing data for file
{sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds}
ms");
-
- // Release the memory we acquired
- _memoryManager.ReleaseMemory(size);
- throw new InvalidOperationException($"Error decompressing
data: {ex.Message}", ex);
+ dataStream = new MemoryStream(fileData);
}
- }
- else
- {
- dataStream = new MemoryStream(fileData);
- }
- // Stop the stopwatch and log download completion
- stopwatch.Stop();
- Trace.TraceInformation($"Completed download of file
{sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency:
{stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0)
/ (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s");
-
- // Set the download as completed with the original size
- downloadResult.SetCompleted(dataStream, size);
+ // Stop the stopwatch and log download completion
+ stopwatch.Stop();
+ double throughputMBps = (actualSize / 1024.0 / 1024.0) /
(stopwatch.ElapsedMilliseconds / 1000.0);
+ activity?.AddEvent("cloudfetch.download_complete", [
+ new("offset", downloadResult.Link.StartRowOffset),
+ new("sanitized_url", sanitizedUrl),
+ new("actual_size_bytes", actualSize),
+ new("actual_size_kb", actualSize / 1024.0),
+ new("latency_ms", stopwatch.ElapsedMilliseconds),
+ new("throughput_mbps", throughputMBps)
+ ]);
+
+ // Set the download as completed with the original size
+ downloadResult.SetCompleted(dataStream, size);
+ }, activityName: "DownloadFile");
}
- private void SetError(Exception ex)
+ private void SetError(Exception ex, Activity? activity = null)
{
lock (_errorLock)
{
if (_error == null)
{
- Trace.TraceError($"Setting error state: {ex.Message}");
+ activity?.AddException(ex, [new("error.context",
"cloudfetch.error_state_set")]);
_error = ex;
}
}
}
- private void CompleteWithError()
+ private void CompleteWithError(Activity? activity = null)
{
try
{
@@ -498,7 +575,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
catch (Exception ex)
{
- Trace.TraceError($"Error completing with error: {ex.Message}");
+ activity?.AddException(ex, [new("error.context",
"cloudfetch.complete_with_error_failed")]);
}
}
@@ -516,5 +593,14 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
return "cloud-storage-url";
}
}
+
+ // IActivityTracer implementation - delegates to statement
+ ActivityTrace IActivityTracer.Trace => _statement.Trace;
+
+ string? IActivityTracer.TraceParent => _statement.TraceParent;
+
+ public string AssemblyVersion => _statement.AssemblyVersion;
+
+ public string AssemblyName => _statement.AssemblyName;
}
}
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
index 8c1730833..99c0b18e5 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
@@ -69,6 +69,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
// Initialize the download manager
+ // Activity context will be captured dynamically by CloudFetch
components when events are logged
if (isPrefetchEnabled)
{
downloadManager = new CloudFetchDownloadManager(statement,
schema, response, initialResults, isLz4Compressed, httpClient);
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
index 3bf88ca53..152516e42 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
@@ -24,6 +24,7 @@ using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Hive.Service.Rpc.Thrift;
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
@@ -180,7 +181,10 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
var refreshedLink =
response.Results.ResultLinks.FirstOrDefault(l => l.StartRowOffset == offset);
if (refreshedLink != null)
{
- Trace.TraceInformation($"Successfully fetched URL for
offset {offset}");
+ Activity.Current?.AddEvent("cloudfetch.url_fetched", [
+ new("offset", offset),
+ new("url_length", refreshedLink.FileLink?.Length
?? 0)
+ ]);
// Create a download result for the refreshed link
var downloadResult = new DownloadResult(refreshedLink,
_memoryManager);
@@ -190,7 +194,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
}
}
- Trace.TraceWarning($"Failed to fetch URL for offset {offset}");
+ Activity.Current?.AddEvent("cloudfetch.url_fetch_failed",
[new("offset", offset)]);
return null;
}
finally
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
index ee1fdb454..128e3a510 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
@@ -16,7 +16,6 @@
*/
using System;
-using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
diff --git
a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
index 42c20c2c2..67c8367dd 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
@@ -90,6 +90,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
var mockHttpMessageHandler = new Mock<HttpMessageHandler>();
var httpClient = new HttpClient(mockHttpMessageHandler.Object);
var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
_downloadQueue,
_resultQueue,
_mockMemoryManager.Object,
@@ -147,6 +148,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
// Create the downloader and add the download to the queue
var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
_downloadQueue,
_resultQueue,
_mockMemoryManager.Object,
@@ -229,6 +231,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
// Create the downloader and add the download to the queue
var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
_downloadQueue,
_resultQueue,
_mockMemoryManager.Object,
@@ -302,6 +305,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
// Create the downloader
var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
_downloadQueue,
_resultQueue,
_mockMemoryManager.Object,
@@ -392,6 +396,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
// Create the downloader and add the download to the queue
var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
_downloadQueue,
_resultQueue,
_mockMemoryManager.Object,
@@ -486,6 +491,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
// Create the downloader
var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
_downloadQueue,
_resultQueue,
_mockMemoryManager.Object,
@@ -590,6 +596,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
// Create the downloader and add the download to the queue
var downloader = new CloudFetchDownloader(
+ _mockStatement.Object,
_downloadQueue,
_resultQueue,
_mockMemoryManager.Object,