This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fa0db2b2 feat(csharp/src/Drivers/Databricks): Add Activity-based 
distributed tracing to CloudFetch pipeline (#3580)
6fa0db2b2 is described below

commit 6fa0db2b2e5b0d263589b419bd4ec0de8ccd7723
Author: eric-wang-1990 <[email protected]>
AuthorDate: Mon Oct 20 08:23:24 2025 -0700

    feat(csharp/src/Drivers/Databricks): Add Activity-based distributed tracing 
to CloudFetch pipeline (#3580)
    
    ## Summary
    
    This PR implements comprehensive Activity-based distributed tracing for
    the CloudFetch download pipeline in the Databricks C# driver, enabling
    real-time monitoring, structured logging, and improved observability.
    
    ### Key Changes:
    - Add Activity-based tracing to CloudFetchDownloader with
    TraceActivityAsync
    - Create child Activities per individual file download for real-time
    progress visibility
    - Replace all Trace.TraceInformation/Error calls with Activity.AddEvent
    for structured logging
    - Add Activity tags for searchable metadata (offset, URL, file sizes)
    - Implement proper Activity context flow through async/await chains
    - Update CloudFetchDownloadManager to pass statement for tracing context
    - Fix all tests to include statement parameter in CloudFetchDownloader
    constructor
    
    ### Architecture:
    The implementation follows a hierarchical Activity structure:
    ```
    Statement Activity (parent)
      ├─ DownloadFilesAsync Activity (overall batch)
      │   ├─ DownloadFile Activity (file 1) - flushes when complete
      │   ├─ DownloadFile Activity (file 2) - flushes when complete
      │   └─ ...
      └─ ReadNextRecordBatchAsync Activity (reader operations)
    ```
    
    ### Benefits:
    - **Real-time progress monitoring**: Events flush immediately as each
    file completes (not batched)
    - **Better fault tolerance**: Completed downloads are logged before
    process crashes
    - **Improved debuggability**: Searchable Activity tags enable filtering
    by offset, URL, size
    - **Granular metrics**: Per-file download times, throughput, compression
    ratios visible in logs
    - **OpenTelemetry-compatible**: Activities follow
    System.Diagnostics.Activity standard
    
    ### Events Logged:
    - `cloudfetch.download_start` - File download initiated
    - `cloudfetch.content_length` - Actual file size from HTTP response
    - `cloudfetch.download_retry` - Retry attempt with reason
    - `cloudfetch.url_refreshed_before_download` - URL refreshed proactively
    - `cloudfetch.url_refreshed_after_auth_error` - URL refreshed after
    401/403
    - `cloudfetch.decompression_complete` - LZ4 decompression metrics
    - `cloudfetch.download_complete` - Download success with throughput
    - `cloudfetch.download_failed_all_retries` - Final failure after all
    retries
    - `cloudfetch.download_summary` - Overall batch statistics
    
    ## Test Plan
    
    - ✅ All existing CloudFetchDownloader E2E tests pass (7 test methods)
    - ✅ Build succeeds with 0 warnings
    - Manual testing: Query with CloudFetch enabled and verify Activity
    events in logs
    - Verified Activity context flows correctly through async/await chains
    - Confirmed child Activities flush independently upon completion
    
    
    
`{"Status":"Ok","HasRemoteParent":false,"Kind":"Client","OperationName":"DownloadFile","Duration":"00:00:00.5952467","StartTimeUtc":"2025-10-16T15:00:48.1657713Z","Id":"00-dc2baa073e36e8feab91170cb360e2f1-b71e0296e60abf8b-01","ParentId":"00-dc2baa073e36e8feab91170cb360e2f1-06801209b222d0e9-01","RootId":"dc2baa073e36e8feab91170cb360e2f1","TraceStateString":null,"SpanId":"b71e0296e60abf8b","TraceId":"dc2baa073e36e8feab91170cb360e2f1","Recorded":true,"IsAllDataRequested":true,"ActivityTr
 [...]
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../Reader/CloudFetch/CloudFetchDownloadManager.cs |   2 +
 .../Reader/CloudFetch/CloudFetchDownloader.cs      | 548 ++++++++++++---------
 .../Reader/CloudFetch/CloudFetchReader.cs          |   1 +
 .../Reader/CloudFetch/CloudFetchResultFetcher.cs   |   8 +-
 .../Reader/CloudFetch/ICloudFetchInterfaces.cs     |   1 -
 .../E2E/CloudFetch/CloudFetchDownloaderTest.cs     |   7 +
 6 files changed, 333 insertions(+), 234 deletions(-)

diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
index 97b9fa968..98e984baf 100644
--- 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
+++ 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Concurrent;
+using System.Diagnostics;
 using System.Net.Http;
 using System.Threading;
 using System.Threading.Tasks;
@@ -207,6 +208,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
 
             // Initialize the downloader
             _downloader = new CloudFetchDownloader(
+                _statement,
                 _downloadQueue,
                 _resultQueue,
                 _memoryManager,
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
index ba65cf3cf..f4f3e5d01 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs
@@ -22,6 +22,8 @@ using System.IO;
 using System.Net.Http;
 using System.Threading;
 using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
 using K4os.Compression.LZ4.Streams;
 
 namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
@@ -29,8 +31,9 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
     /// <summary>
     /// Downloads files from URLs.
     /// </summary>
-    internal sealed class CloudFetchDownloader : ICloudFetchDownloader
+    internal sealed class CloudFetchDownloader : ICloudFetchDownloader, 
IActivityTracer
     {
+        private readonly ITracingStatement _statement;
         private readonly BlockingCollection<IDownloadResult> _downloadQueue;
         private readonly BlockingCollection<IDownloadResult> _resultQueue;
         private readonly ICloudFetchMemoryBufferManager _memoryManager;
@@ -52,6 +55,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         /// <summary>
         /// Initializes a new instance of the <see 
cref="CloudFetchDownloader"/> class.
         /// </summary>
+        /// <param name="statement">The tracing statement for Activity 
context.</param>
         /// <param name="downloadQueue">The queue of downloads to 
process.</param>
         /// <param name="resultQueue">The queue to add completed downloads 
to.</param>
         /// <param name="memoryManager">The memory buffer manager.</param>
@@ -64,6 +68,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
         /// <param name="maxUrlRefreshAttempts">The maximum number of URL 
refresh attempts.</param>
         /// <param name="urlExpirationBufferSeconds">Buffer time in seconds 
before URL expiration to trigger refresh.</param>
         public CloudFetchDownloader(
+            ITracingStatement statement,
             BlockingCollection<IDownloadResult> downloadQueue,
             BlockingCollection<IDownloadResult> resultQueue,
             ICloudFetchMemoryBufferManager memoryManager,
@@ -76,6 +81,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
             int maxUrlRefreshAttempts = 3,
             int urlExpirationBufferSeconds = 60)
         {
+            _statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
             _downloadQueue = downloadQueue ?? throw new 
ArgumentNullException(nameof(downloadQueue));
             _resultQueue = resultQueue ?? throw new 
ArgumentNullException(nameof(resultQueue));
             _memoryManager = memoryManager ?? throw new 
ArgumentNullException(nameof(memoryManager));
@@ -194,299 +200,370 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
 
         private async Task DownloadFilesAsync(CancellationToken 
cancellationToken)
         {
-            await Task.Yield();
-
-            int totalFiles = 0;
-            int successfulDownloads = 0;
-            int failedDownloads = 0;
-            long totalBytes = 0;
-            var overallStopwatch = Stopwatch.StartNew();
-
-            try
+            await this.TraceActivityAsync(async activity =>
             {
-                // Keep track of active download tasks
-                var downloadTasks = new ConcurrentDictionary<Task, 
IDownloadResult>();
-                var downloadTaskCompletionSource = new 
TaskCompletionSource<bool>();
+                await Task.Yield();
 
-                // Process items from the download queue until it's completed
-                foreach (var downloadResult in 
_downloadQueue.GetConsumingEnumerable(cancellationToken))
+                int totalFiles = 0;
+                int successfulDownloads = 0;
+                int failedDownloads = 0;
+                long totalBytes = 0;
+                var overallStopwatch = Stopwatch.StartNew();
+
+                try
                 {
-                    totalFiles++;
+                    // Keep track of active download tasks
+                    var downloadTasks = new ConcurrentDictionary<Task, 
IDownloadResult>();
+                    var downloadTaskCompletionSource = new 
TaskCompletionSource<bool>();
 
-                    // Check if there's an error before processing more 
downloads
-                    if (HasError)
+                    // Process items from the download queue until it's 
completed
+                    foreach (var downloadResult in 
_downloadQueue.GetConsumingEnumerable(cancellationToken))
                     {
-                        // Add the failed download result to the queue to 
signal the error
-                        // This will be caught by GetNextDownloadedFileAsync
-                        break;
-                    }
+                        // Check if there's an error before processing more 
downloads
+                        if (HasError)
+                        {
+                            // Add the failed download result to the queue to 
signal the error
+                            // This will be caught by 
GetNextDownloadedFileAsync
+                            break;
+                        }
 
-                    // Check if this is the end of results guard
-                    if (downloadResult == EndOfResultsGuard.Instance)
-                    {
-                        // Wait for all active downloads to complete
-                        if (downloadTasks.Count > 0)
+                        // Check if this is the end of results guard
+                        if (downloadResult == EndOfResultsGuard.Instance)
                         {
-                            try
+                            // Wait for all active downloads to complete
+                            if (downloadTasks.Count > 0)
                             {
-                                await 
Task.WhenAll(downloadTasks.Keys).ConfigureAwait(false);
+                                try
+                                {
+                                    await 
Task.WhenAll(downloadTasks.Keys).ConfigureAwait(false);
+                                }
+                                catch (Exception ex)
+                                {
+                                    activity?.AddException(ex, 
[new("error.context", "cloudfetch.wait_for_downloads")]);
+                                    // Don't set error here, as individual 
download tasks will handle their own errors
+                                }
                             }
-                            catch (Exception ex)
+
+                            // Only add the guard if there's no error
+                            if (!HasError)
                             {
-                                Trace.TraceWarning($"Error waiting for 
downloads to complete: {ex.Message}");
-                                // Don't set error here, as individual 
download tasks will handle their own errors
+                                // Add the guard to the result queue to signal 
the end of results
+                                _resultQueue.Add(EndOfResultsGuard.Instance, 
cancellationToken);
+                                _isCompleted = true;
                             }
+                            break;
                         }
 
-                        // Only add the guard if there's no error
-                        if (!HasError)
-                        {
-                            // Add the guard to the result queue to signal the 
end of results
-                            _resultQueue.Add(EndOfResultsGuard.Instance, 
cancellationToken);
-                            _isCompleted = true;
-                        }
-                        break;
-                    }
-
-                    // Check if the URL is expired or about to expire
-                    if 
(downloadResult.IsExpiredOrExpiringSoon(_urlExpirationBufferSeconds))
-                    {
-                        // Get a refreshed URL before starting the download
-                        var refreshedLink = await 
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, 
cancellationToken);
-                        if (refreshedLink != null)
-                        {
-                            // Update the download result with the refreshed 
link
-                            
downloadResult.UpdateWithRefreshedLink(refreshedLink);
-                            Trace.TraceInformation($"Updated URL for file at 
offset {refreshedLink.StartRowOffset} before download");
-                        }
-                    }
-
-                    // Acquire a download slot
-                    await 
_downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
+                        // This is a real file, count it
+                        totalFiles++;
 
-                    // Start the download task
-                    Task downloadTask = DownloadFileAsync(downloadResult, 
cancellationToken)
-                        .ContinueWith(t =>
+                        // Check if the URL is expired or about to expire
+                        if 
(downloadResult.IsExpiredOrExpiringSoon(_urlExpirationBufferSeconds))
                         {
-                            // Release the download slot
-                            _downloadSemaphore.Release();
-
-                            // Remove the task from the dictionary
-                            downloadTasks.TryRemove(t, out _);
-
-                            // Handle any exceptions
-                            if (t.IsFaulted)
-                            {
-                                Exception ex = t.Exception?.InnerException ?? 
new Exception("Unknown error");
-                                Trace.TraceError($"Download failed for file 
{SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}");
-
-                                // Set the download as failed
-                                downloadResult.SetFailed(ex);
-                                failedDownloads++;
-
-                                // Set the error state to stop the download 
process
-                                SetError(ex);
-
-                                // Signal that we should stop processing 
downloads
-                                
downloadTaskCompletionSource.TrySetException(ex);
-                            }
-                            else if (!t.IsFaulted && !t.IsCanceled)
+                            // Get a refreshed URL before starting the download
+                            var refreshedLink = await 
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, 
cancellationToken);
+                            if (refreshedLink != null)
                             {
-                                successfulDownloads++;
-                                totalBytes += downloadResult.Size;
+                                // Update the download result with the 
refreshed link
+                                
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+                                
activity?.AddEvent("cloudfetch.url_refreshed_before_download", [
+                                    new("offset", refreshedLink.StartRowOffset)
+                                ]);
                             }
-                        }, cancellationToken);
-
-                    // Add the task to the dictionary
-                    downloadTasks[downloadTask] = downloadResult;
+                        }
 
-                    // Add the result to the result queue add the result here 
to assure the download sequence.
-                    _resultQueue.Add(downloadResult, cancellationToken);
+                        // Acquire a download slot
+                        await 
_downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 
-                    // If there's an error, stop processing more downloads
-                    if (HasError)
-                    {
-                        break;
+                        // Start the download task
+                        Task downloadTask = DownloadFileAsync(downloadResult, 
cancellationToken)
+                            .ContinueWith(t =>
+                            {
+                                // Release the download slot
+                                _downloadSemaphore.Release();
+
+                                // Remove the task from the dictionary
+                                downloadTasks.TryRemove(t, out _);
+
+                                // Handle any exceptions
+                                if (t.IsFaulted)
+                                {
+                                    Exception ex = t.Exception?.InnerException 
?? new Exception("Unknown error");
+                                    string sanitizedUrl = 
SanitizeUrl(downloadResult.Link.FileLink);
+                                    activity?.AddException(ex, [
+                                        new("error.context", 
"cloudfetch.download_failed"),
+                                        new("offset", 
downloadResult.Link.StartRowOffset),
+                                        new("sanitized_url", sanitizedUrl)
+                                    ]);
+
+                                    // Set the download as failed
+                                    downloadResult.SetFailed(ex);
+                                    failedDownloads++;
+
+                                    // Set the error state to stop the 
download process
+                                    SetError(ex, activity);
+
+                                    // Signal that we should stop processing 
downloads
+                                    
downloadTaskCompletionSource.TrySetException(ex);
+                                }
+                                else if (!t.IsFaulted && !t.IsCanceled)
+                                {
+                                    successfulDownloads++;
+                                    totalBytes += downloadResult.Size;
+                                }
+                            }, cancellationToken);
+
+                        // Add the task to the dictionary
+                        downloadTasks[downloadTask] = downloadResult;
+
+                        // Add the result to the result queue add the result 
here to assure the download sequence.
+                        _resultQueue.Add(downloadResult, cancellationToken);
+
+                        // If there's an error, stop processing more downloads
+                        if (HasError)
+                        {
+                            break;
+                        }
                     }
                 }
-            }
-            catch (OperationCanceledException) when 
(cancellationToken.IsCancellationRequested)
-            {
-                // Expected when cancellation is requested
-                Trace.TraceInformation("Download process was cancelled");
-            }
-            catch (Exception ex)
-            {
-                Trace.TraceError($"Error in download loop: {ex.Message}");
-                SetError(ex);
-            }
-            finally
-            {
-                overallStopwatch.Stop();
-
-                Trace.TraceInformation(
-                    $"Download process completed. Total files: {totalFiles}, 
Successful: {successfulDownloads}, " +
-                    $"Failed: {failedDownloads}, Total size: {totalBytes / 
1024.0 / 1024.0:F2} MB, Total time: {overallStopwatch.ElapsedMilliseconds / 
1000.0:F2} sec");
-
-                // If there's an error, add the error to the result queue
-                if (HasError)
+                catch (OperationCanceledException) when 
(cancellationToken.IsCancellationRequested)
                 {
-                    CompleteWithError();
+                    // Expected when cancellation is requested
+                    activity?.AddEvent("cloudfetch.download_cancelled");
                 }
-            }
+                catch (Exception ex)
+                {
+                    activity?.AddException(ex, [new("error.context", 
"cloudfetch.download_loop")]);
+                    SetError(ex, activity);
+                }
+                finally
+                {
+                    overallStopwatch.Stop();
+
+                    activity?.AddEvent("cloudfetch.download_summary", [
+                        new("total_files", totalFiles),
+                        new("successful_downloads", successfulDownloads),
+                        new("failed_downloads", failedDownloads),
+                        new("total_bytes", totalBytes),
+                        new("total_mb", totalBytes / 1024.0 / 1024.0),
+                        new("total_time_ms", 
overallStopwatch.ElapsedMilliseconds),
+                        new("total_time_sec", 
overallStopwatch.ElapsedMilliseconds / 1000.0)
+                    ]);
+
+                    // If there's an error, add the error to the result queue
+                    if (HasError)
+                    {
+                        CompleteWithError(activity);
+                    }
+                }
+            });
         }
 
         private async Task DownloadFileAsync(IDownloadResult downloadResult, 
CancellationToken cancellationToken)
         {
-            string url = downloadResult.Link.FileLink;
-            string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
-            byte[]? fileData = null;
-
-            // Use the size directly from the download result
-            long size = downloadResult.Size;
-
-            // Create a stopwatch to track download time
-            var stopwatch = Stopwatch.StartNew();
-
-            // Log download start
-            Trace.TraceInformation($"Starting download of file {sanitizedUrl}, 
expected size: {size / 1024.0:F2} KB");
-
-            // Acquire memory before downloading
-            await _memoryManager.AcquireMemoryAsync(size, 
cancellationToken).ConfigureAwait(false);
-
-            // Retry logic for downloading files
-            for (int retry = 0; retry < _maxRetries; retry++)
+            await this.TraceActivityAsync(async activity =>
             {
-                try
+                string url = downloadResult.Link.FileLink;
+                string sanitizedUrl = 
SanitizeUrl(downloadResult.Link.FileLink);
+                byte[]? fileData = null;
+
+                // Use the size directly from the download result
+                long size = downloadResult.Size;
+
+                // Add tags to the Activity for filtering/searching
+                activity?.SetTag("cloudfetch.offset", 
downloadResult.Link.StartRowOffset);
+                activity?.SetTag("cloudfetch.sanitized_url", sanitizedUrl);
+                activity?.SetTag("cloudfetch.expected_size_bytes", size);
+
+                // Create a stopwatch to track download time
+                var stopwatch = Stopwatch.StartNew();
+
+                // Log download start
+                activity?.AddEvent("cloudfetch.download_start", [
+                new("offset", downloadResult.Link.StartRowOffset),
+                    new("sanitized_url", sanitizedUrl),
+                    new("expected_size_bytes", size),
+                    new("expected_size_kb", size / 1024.0)
+            ]);
+
+                // Acquire memory before downloading
+                await _memoryManager.AcquireMemoryAsync(size, 
cancellationToken).ConfigureAwait(false);
+
+                // Retry logic for downloading files
+                for (int retry = 0; retry < _maxRetries; retry++)
                 {
-                    // Download the file directly
-                    using HttpResponseMessage response = await 
_httpClient.GetAsync(
-                        url,
-                        HttpCompletionOption.ResponseHeadersRead,
-                        cancellationToken).ConfigureAwait(false);
-
-                    // Check if the response indicates an expired URL 
(typically 403 or 401)
-                    if (response.StatusCode == 
System.Net.HttpStatusCode.Forbidden ||
-                        response.StatusCode == 
System.Net.HttpStatusCode.Unauthorized)
+                    try
                     {
-                        // If we've already tried refreshing too many times, 
fail
-                        if (downloadResult.RefreshAttempts >= 
_maxUrlRefreshAttempts)
+                        // Download the file directly
+                        using HttpResponseMessage response = await 
_httpClient.GetAsync(
+                            url,
+                            HttpCompletionOption.ResponseHeadersRead,
+                            cancellationToken).ConfigureAwait(false);
+
+                        // Check if the response indicates an expired URL 
(typically 403 or 401)
+                        if (response.StatusCode == 
System.Net.HttpStatusCode.Forbidden ||
+                            response.StatusCode == 
System.Net.HttpStatusCode.Unauthorized)
                         {
-                            throw new InvalidOperationException($"Failed to 
download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
-                        }
+                            // If we've already tried refreshing too many 
times, fail
+                            if (downloadResult.RefreshAttempts >= 
_maxUrlRefreshAttempts)
+                            {
+                                throw new InvalidOperationException($"Failed 
to download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
+                            }
 
-                        // Try to refresh the URL
-                        var refreshedLink = await 
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, 
cancellationToken);
-                        if (refreshedLink != null)
-                        {
-                            // Update the download result with the refreshed 
link
-                            
downloadResult.UpdateWithRefreshedLink(refreshedLink);
-                            url = refreshedLink.FileLink;
-                            sanitizedUrl = SanitizeUrl(url);
+                            // Try to refresh the URL
+                            var refreshedLink = await 
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, 
cancellationToken);
+                            if (refreshedLink != null)
+                            {
+                                // Update the download result with the 
refreshed link
+                                
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+                                url = refreshedLink.FileLink;
+                                sanitizedUrl = SanitizeUrl(url);
+
+                                
activity?.AddEvent("cloudfetch.url_refreshed_after_auth_error", [
+                                    new("offset", 
refreshedLink.StartRowOffset),
+                                    new("sanitized_url", sanitizedUrl)
+                                ]);
+
+                                // Continue to the next retry attempt with the 
refreshed URL
+                                continue;
+                            }
+                            else
+                            {
+                                // If refresh failed, throw an exception
+                                throw new InvalidOperationException("Failed to 
refresh expired URL.");
+                            }
+                        }
 
-                            Trace.TraceInformation($"URL for file at offset 
{refreshedLink.StartRowOffset} was refreshed after expired URL response");
+                        response.EnsureSuccessStatusCode();
 
-                            // Continue to the next retry attempt with the 
refreshed URL
-                            continue;
-                        }
-                        else
+                        // Log the download size if available from response 
headers
+                        long? contentLength = 
response.Content.Headers.ContentLength;
+                        if (contentLength.HasValue && contentLength.Value > 0)
                         {
-                            // If refresh failed, throw an exception
-                            throw new InvalidOperationException("Failed to 
refresh expired URL.");
+                            activity?.AddEvent("cloudfetch.content_length", [
+                                new("offset", 
downloadResult.Link.StartRowOffset),
+                                new("sanitized_url", sanitizedUrl),
+                                new("content_length_bytes", 
contentLength.Value),
+                                new("content_length_mb", contentLength.Value / 
1024.0 / 1024.0)
+                            ]);
                         }
-                    }
-
-                    response.EnsureSuccessStatusCode();
 
-                    // Log the download size if available from response headers
-                    long? contentLength = 
response.Content.Headers.ContentLength;
-                    if (contentLength.HasValue && contentLength.Value > 0)
+                        // Read the file data
+                        fileData = await 
response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
+                        break; // Success, exit retry loop
+                    }
+                    catch (Exception ex) when (retry < _maxRetries - 1 && 
!cancellationToken.IsCancellationRequested)
                     {
-                        Trace.TraceInformation($"Actual file size for 
{sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
+                        // Log the error and retry
+                        activity?.AddException(ex, [
+                            new("error.context", "cloudfetch.download_retry"),
+                            new("offset", downloadResult.Link.StartRowOffset),
+                            new("sanitized_url", SanitizeUrl(url)),
+                            new("attempt", retry + 1),
+                            new("max_retries", _maxRetries)
+                        ]);
+
+                        await Task.Delay(_retryDelayMs * (retry + 1), 
cancellationToken).ConfigureAwait(false);
                     }
-
-                    // Read the file data
-                    fileData = await 
response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
-                    break; // Success, exit retry loop
                 }
-                catch (Exception ex) when (retry < _maxRetries - 1 && 
!cancellationToken.IsCancellationRequested)
+
+                if (fileData == null)
                 {
-                    // Log the error and retry
-                    Trace.TraceError($"Error downloading file 
{SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
+                    stopwatch.Stop();
+                    
activity?.AddEvent("cloudfetch.download_failed_all_retries", [
+                        new("offset", downloadResult.Link.StartRowOffset),
+                        new("sanitized_url", sanitizedUrl),
+                        new("max_retries", _maxRetries),
+                        new("elapsed_time_ms", stopwatch.ElapsedMilliseconds)
+                    ]);
 
-                    await Task.Delay(_retryDelayMs * (retry + 1), 
cancellationToken).ConfigureAwait(false);
+                    // Release the memory we acquired
+                    _memoryManager.ReleaseMemory(size);
+                    throw new InvalidOperationException($"Failed to download 
file from {url} after {_maxRetries} attempts.");
                 }
-            }
 
-            if (fileData == null)
-            {
-                stopwatch.Stop();
-                Trace.TraceError($"Failed to download file {sanitizedUrl} 
after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds} 
ms");
-
-                // Release the memory we acquired
-                _memoryManager.ReleaseMemory(size);
-                throw new InvalidOperationException($"Failed to download file 
from {url} after {_maxRetries} attempts.");
-            }
+                // Process the downloaded file data
+                MemoryStream dataStream;
+                long actualSize = fileData.Length;
 
-            // Process the downloaded file data
-            MemoryStream dataStream;
-            long actualSize = fileData.Length;
-
-            // If the data is LZ4 compressed, decompress it
-            if (_isLz4Compressed)
-            {
-                try
+                // If the data is LZ4 compressed, decompress it
+                if (_isLz4Compressed)
                 {
-                    var decompressStopwatch = Stopwatch.StartNew();
-                    dataStream = new MemoryStream();
-                    using (var inputStream = new MemoryStream(fileData))
-                    using (var decompressor = LZ4Stream.Decode(inputStream))
+                    try
                     {
-                        await decompressor.CopyToAsync(dataStream, 81920, 
cancellationToken).ConfigureAwait(false);
+                        var decompressStopwatch = Stopwatch.StartNew();
+                        dataStream = new MemoryStream();
+                        using (var inputStream = new MemoryStream(fileData))
+                        using (var decompressor = 
LZ4Stream.Decode(inputStream))
+                        {
+                            await decompressor.CopyToAsync(dataStream, 81920, 
cancellationToken).ConfigureAwait(false);
+                        }
+                        dataStream.Position = 0;
+                        decompressStopwatch.Stop();
+
+                        
activity?.AddEvent("cloudfetch.decompression_complete", [
+                            new("offset", downloadResult.Link.StartRowOffset),
+                            new("sanitized_url", sanitizedUrl),
+                            new("decompression_time_ms", 
decompressStopwatch.ElapsedMilliseconds),
+                            new("compressed_size_bytes", actualSize),
+                            new("compressed_size_kb", actualSize / 1024.0),
+                            new("decompressed_size_bytes", dataStream.Length),
+                            new("decompressed_size_kb", dataStream.Length / 
1024.0),
+                            new("compression_ratio", (double)dataStream.Length 
/ actualSize)
+                        ]);
+
+                        actualSize = dataStream.Length;
+                    }
+                    catch (Exception ex)
+                    {
+                        stopwatch.Stop();
+                        activity?.AddException(ex, [
+                            new("error.context", "cloudfetch.decompression"),
+                            new("offset", downloadResult.Link.StartRowOffset),
+                            new("sanitized_url", sanitizedUrl),
+                            new("elapsed_time_ms", 
stopwatch.ElapsedMilliseconds)
+                        ]);
+
+                        // Release the memory we acquired
+                        _memoryManager.ReleaseMemory(size);
+                        throw new InvalidOperationException($"Error 
decompressing data: {ex.Message}", ex);
                     }
-                    dataStream.Position = 0;
-                    decompressStopwatch.Stop();
-
-                    Trace.TraceInformation($"Decompressed file {sanitizedUrl} 
in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {actualSize / 
1024.0:F2} KB, Decompressed size: {dataStream.Length / 1024.0:F2} KB");
-
-                    actualSize = dataStream.Length;
                 }
-                catch (Exception ex)
+                else
                 {
-                    stopwatch.Stop();
-                    Trace.TraceError($"Error decompressing data for file 
{sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} 
ms");
-
-                    // Release the memory we acquired
-                    _memoryManager.ReleaseMemory(size);
-                    throw new InvalidOperationException($"Error decompressing 
data: {ex.Message}", ex);
+                    dataStream = new MemoryStream(fileData);
                 }
-            }
-            else
-            {
-                dataStream = new MemoryStream(fileData);
-            }
 
-            // Stop the stopwatch and log download completion
-            stopwatch.Stop();
-            Trace.TraceInformation($"Completed download of file 
{sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency: 
{stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0) 
/ (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s");
-
-            // Set the download as completed with the original size
-            downloadResult.SetCompleted(dataStream, size);
+                // Stop the stopwatch and log download completion
+                stopwatch.Stop();
+                double throughputMBps = (actualSize / 1024.0 / 1024.0) / 
(stopwatch.ElapsedMilliseconds / 1000.0);
+                activity?.AddEvent("cloudfetch.download_complete", [
+                    new("offset", downloadResult.Link.StartRowOffset),
+                    new("sanitized_url", sanitizedUrl),
+                    new("actual_size_bytes", actualSize),
+                    new("actual_size_kb", actualSize / 1024.0),
+                    new("latency_ms", stopwatch.ElapsedMilliseconds),
+                    new("throughput_mbps", throughputMBps)
+                ]);
+
+                // Set the download as completed with the original size
+                downloadResult.SetCompleted(dataStream, size);
+            }, activityName: "DownloadFile");
         }
 
-        private void SetError(Exception ex)
+        private void SetError(Exception ex, Activity? activity = null)
         {
             lock (_errorLock)
             {
                 if (_error == null)
                 {
-                    Trace.TraceError($"Setting error state: {ex.Message}");
+                    activity?.AddException(ex, [new("error.context", 
"cloudfetch.error_state_set")]);
                     _error = ex;
                 }
             }
         }
 
-        private void CompleteWithError()
+        private void CompleteWithError(Activity? activity = null)
         {
             try
             {
@@ -498,7 +575,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
             }
             catch (Exception ex)
             {
-                Trace.TraceError($"Error completing with error: {ex.Message}");
+                activity?.AddException(ex, [new("error.context", 
"cloudfetch.complete_with_error_failed")]);
             }
         }
 
@@ -516,5 +593,14 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                 return "cloud-storage-url";
             }
         }
+
+        // IActivityTracer implementation - delegates to statement
+        ActivityTrace IActivityTracer.Trace => _statement.Trace;
+
+        string? IActivityTracer.TraceParent => _statement.TraceParent;
+
+        public string AssemblyVersion => _statement.AssemblyVersion;
+
+        public string AssemblyName => _statement.AssemblyName;
     }
 }
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
index 8c1730833..99c0b18e5 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs
@@ -69,6 +69,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
             }
 
             // Initialize the download manager
+            // Activity context will be captured dynamically by CloudFetch 
components when events are logged
             if (isPrefetchEnabled)
             {
                 downloadManager = new CloudFetchDownloadManager(statement, 
schema, response, initialResults, isLz4Compressed, httpClient);
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
index 3bf88ca53..152516e42 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
@@ -24,6 +24,7 @@ using System.Threading;
 using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
 using Apache.Hive.Service.Rpc.Thrift;
 
 namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
@@ -180,7 +181,10 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                     var refreshedLink = 
response.Results.ResultLinks.FirstOrDefault(l => l.StartRowOffset == offset);
                     if (refreshedLink != null)
                     {
-                        Trace.TraceInformation($"Successfully fetched URL for 
offset {offset}");
+                        Activity.Current?.AddEvent("cloudfetch.url_fetched", [
+                            new("offset", offset),
+                            new("url_length", refreshedLink.FileLink?.Length 
?? 0)
+                        ]);
 
                         // Create a download result for the refreshed link
                         var downloadResult = new DownloadResult(refreshedLink, 
_memoryManager);
@@ -190,7 +194,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
                     }
                 }
 
-                Trace.TraceWarning($"Failed to fetch URL for offset {offset}");
+                Activity.Current?.AddEvent("cloudfetch.url_fetch_failed", 
[new("offset", offset)]);
                 return null;
             }
             finally
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
index ee1fdb454..128e3a510 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs
@@ -16,7 +16,6 @@
  */
 
 using System;
-using System.Collections.Generic;
 using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
diff --git 
a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs 
b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
index 42c20c2c2..67c8367dd 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs
@@ -90,6 +90,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
             var mockHttpMessageHandler = new Mock<HttpMessageHandler>();
             var httpClient = new HttpClient(mockHttpMessageHandler.Object);
             var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
                 _downloadQueue,
                 _resultQueue,
                 _mockMemoryManager.Object,
@@ -147,6 +148,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
 
             // Create the downloader and add the download to the queue
             var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
                 _downloadQueue,
                 _resultQueue,
                 _mockMemoryManager.Object,
@@ -229,6 +231,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
 
             // Create the downloader and add the download to the queue
             var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
                 _downloadQueue,
                 _resultQueue,
                 _mockMemoryManager.Object,
@@ -302,6 +305,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
 
             // Create the downloader
             var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
                 _downloadQueue,
                 _resultQueue,
                 _mockMemoryManager.Object,
@@ -392,6 +396,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
 
             // Create the downloader and add the download to the queue
             var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
                 _downloadQueue,
                 _resultQueue,
                 _mockMemoryManager.Object,
@@ -486,6 +491,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
 
             // Create the downloader
             var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
                 _downloadQueue,
                 _resultQueue,
                 _mockMemoryManager.Object,
@@ -590,6 +596,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Drivers.Databricks.CloudFetch
 
             // Create the downloader and add the download to the queue
             var downloader = new CloudFetchDownloader(
+                _mockStatement.Object,
                 _downloadQueue,
                 _resultQueue,
                 _mockMemoryManager.Object,


Reply via email to