This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 332e145d9 feat(csharp): fix powerbi hang when reading cloudfetch 
result in Databricks driver (#2747)
332e145d9 is described below

commit 332e145d9aaf63b18a94f0c6d24a83ff4bf873b8
Author: Jade Wang <[email protected]>
AuthorDate: Tue Apr 29 05:08:15 2025 -0700

    feat(csharp): fix powerbi hang when reading cloudfetch result in Databricks 
driver (#2747)
    
    ### Summary
    This PR fixes an issue where PowerBI would hang when reading CloudFetch
    results and significantly improves the logging capabilities in the
    CloudFetch downloader component.
    
    ### Problem
    1. The CloudFetchReader was not properly disposing of the download
    manager after completing downloads, causing resource leaks that led to
    PowerBI hanging.
    2. The CloudFetchDownloader was using Debug.WriteLine for logging, which
    is inadequate for production scenarios and doesn't provide sufficient
    diagnostic information.
    
    ### Solution
    - Fixed resource management in CloudFetchReader by properly disposing
    the download manager after all files are processed
    - Replaced Debug.WriteLine calls with more comprehensive Trace logging
    - Added detailed performance metrics and diagnostics:
      - Download start/completion timestamps
      - File sizes and throughput calculations
      - Decompression metrics
      - Overall download statistics (total files, success/failure counts)
    - Added URL sanitization for secure logging
    - Added proper error tracking and reporting
    
    ### Testing
    - Enhanced CloudFetchE2ETest to verify that the reader properly
    completes after all data is consumed
    - Verified that PowerBI no longer hangs when reading CloudFetch results
---
 .../Databricks/CloudFetch/CloudFetchDownloader.cs  | 75 ++++++++++++++++++++--
 .../Databricks/CloudFetch/CloudFetchReader.cs      |  4 +-
 .../test/Drivers/Databricks/CloudFetchE2ETest.cs   |  2 +
 3 files changed, 74 insertions(+), 7 deletions(-)

diff --git a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs 
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs
index e3cec4a2f..8aadf58f2 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs
@@ -55,6 +55,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
         /// <param name="httpClient">The HTTP client to use for 
downloads.</param>
         /// <param name="maxParallelDownloads">The maximum number of parallel 
downloads.</param>
         /// <param name="isLz4Compressed">Whether the results are LZ4 
compressed.</param>
+        /// <param name="logger">The logger instance.</param>
         /// <param name="maxRetries">The maximum number of retry 
attempts.</param>
         /// <param name="retryDelayMs">The delay between retry attempts in 
milliseconds.</param>
         public CloudFetchDownloader(
@@ -184,6 +185,12 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
         {
             await Task.Yield();
 
+            int totalFiles = 0;
+            int successfulDownloads = 0;
+            int failedDownloads = 0;
+            long totalBytes = 0;
+            var overallStopwatch = Stopwatch.StartNew();
+
             try
             {
                 // Keep track of active download tasks
@@ -193,6 +200,8 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                 // Process items from the download queue until it's completed
                 foreach (var downloadResult in 
_downloadQueue.GetConsumingEnumerable(cancellationToken))
                 {
+                    totalFiles++;
+
                     // Check if there's an error before processing more 
downloads
                     if (HasError)
                     {
@@ -213,7 +222,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                             }
                             catch (Exception ex)
                             {
-                                Debug.WriteLine($"Error waiting for downloads 
to complete: {ex.Message}");
+                                Trace.TraceWarning($"Error waiting for 
downloads to complete: {ex.Message}");
                                 // Don't set error here, as individual 
download tasks will handle their own errors
                             }
                         }
@@ -245,10 +254,11 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                             if (t.IsFaulted)
                             {
                                 Exception ex = t.Exception?.InnerException ?? 
new Exception("Unknown error");
-                                Debug.WriteLine($"Download failed: 
{ex.Message}");
+                                Trace.TraceError($"Download failed for file 
{SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}");
 
                                 // Set the download as failed
                                 downloadResult.SetFailed(ex);
+                                failedDownloads++;
 
                                 // Set the error state to stop the download 
process
                                 SetError(ex);
@@ -256,6 +266,11 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                                 // Signal that we should stop processing 
downloads
                                 
downloadTaskCompletionSource.TrySetException(ex);
                             }
+                            else if (!t.IsFaulted && !t.IsCanceled)
+                            {
+                                successfulDownloads++;
+                                totalBytes += downloadResult.Size;
+                            }
                         }, cancellationToken);
 
                     // Add the task to the dictionary
@@ -274,14 +289,21 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
             catch (OperationCanceledException) when 
(cancellationToken.IsCancellationRequested)
             {
                 // Expected when cancellation is requested
+                Trace.TraceInformation("Download process was cancelled");
             }
             catch (Exception ex)
             {
-                Debug.WriteLine($"Error in download loop: {ex.Message}");
+                Trace.TraceError($"Error in download loop: {ex.Message}");
                 SetError(ex);
             }
             finally
             {
+                overallStopwatch.Stop();
+
+                Trace.TraceInformation(
+                    $"Download process completed. Total files: {totalFiles}, 
Successful: {successfulDownloads}, " +
+                    $"Failed: {failedDownloads}, Total size: {totalBytes / 
1024.0 / 1024.0:F2} MB, Total time: {overallStopwatch.ElapsedMilliseconds / 
1000.0:F2} sec");
+
                 // If there's an error, add the error to the result queue
                 if (HasError)
                 {
@@ -293,11 +315,18 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
         private async Task DownloadFileAsync(IDownloadResult downloadResult, 
CancellationToken cancellationToken)
         {
             string url = downloadResult.Link.FileLink;
+            string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
             byte[]? fileData = null;
 
             // Use the size directly from the download result
             long size = downloadResult.Size;
 
+            // Create a stopwatch to track download time
+            var stopwatch = Stopwatch.StartNew();
+
+            // Log download start
+            Trace.TraceInformation($"Starting download of file {sanitizedUrl}, 
expected size: {size / 1024.0:F2} KB");
+
             // Acquire memory before downloading
             await _memoryManager.AcquireMemoryAsync(size, 
cancellationToken).ConfigureAwait(false);
 
@@ -318,7 +347,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                     long? contentLength = 
response.Content.Headers.ContentLength;
                     if (contentLength.HasValue && contentLength.Value > 0)
                     {
-                        Debug.WriteLine($"Downloading file of size: 
{contentLength.Value / 1024.0 / 1024.0:F2} MB");
+                        Trace.TraceInformation($"Actual file size for 
{sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
                     }
 
                     // Read the file data
@@ -328,13 +357,17 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                 catch (Exception ex) when (retry < _maxRetries - 1 && 
!cancellationToken.IsCancellationRequested)
                 {
                     // Log the error and retry
-                    Debug.WriteLine($"Error downloading file (attempt {retry + 
1}/{_maxRetries}): {ex.Message}");
+                    Trace.TraceError($"Error downloading file 
{SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
+
                     await Task.Delay(_retryDelayMs * (retry + 1), 
cancellationToken).ConfigureAwait(false);
                 }
             }
 
             if (fileData == null)
             {
+                stopwatch.Stop();
+                Trace.TraceError($"Failed to download file {sanitizedUrl} 
after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds} 
ms");
+
                 // Release the memory we acquired
                 _memoryManager.ReleaseMemory(size);
                 throw new InvalidOperationException($"Failed to download file 
from {url} after {_maxRetries} attempts.");
@@ -342,12 +375,14 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
 
             // Process the downloaded file data
             MemoryStream dataStream;
+            long actualSize = fileData.Length;
 
             // If the data is LZ4 compressed, decompress it
             if (_isLz4Compressed)
             {
                 try
                 {
+                    var decompressStopwatch = Stopwatch.StartNew();
                     dataStream = new MemoryStream();
                     using (var inputStream = new MemoryStream(fileData))
                     using (var decompressor = LZ4Stream.Decode(inputStream))
@@ -355,9 +390,17 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                         await decompressor.CopyToAsync(dataStream, 81920, 
cancellationToken).ConfigureAwait(false);
                     }
                     dataStream.Position = 0;
+                    decompressStopwatch.Stop();
+
+                    Trace.TraceInformation($"Decompressed file {sanitizedUrl} 
in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {actualSize / 
1024.0:F2} KB, Decompressed size: {dataStream.Length / 1024.0:F2} KB");
+
+                    actualSize = dataStream.Length;
                 }
                 catch (Exception ex)
                 {
+                    stopwatch.Stop();
+                    Trace.TraceError($"Error decompressing data for file 
{sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} 
ms");
+
                     // Release the memory we acquired
                     _memoryManager.ReleaseMemory(size);
                     throw new InvalidOperationException($"Error decompressing 
data: {ex.Message}", ex);
@@ -368,6 +411,10 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                 dataStream = new MemoryStream(fileData);
             }
 
+            // Stop the stopwatch and log download completion
+            stopwatch.Stop();
+            Trace.TraceInformation($"Completed download of file 
{sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency: 
{stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0) 
/ (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s");
+
             // Set the download as completed with the original size
             downloadResult.SetCompleted(dataStream, size);
         }
@@ -378,6 +425,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
             {
                 if (_error == null)
                 {
+                    Trace.TraceError($"Setting error state: {ex.Message}");
                     _error = ex;
                 }
             }
@@ -395,7 +443,22 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
             }
             catch (Exception ex)
             {
-                Debug.WriteLine($"Error completing with error: {ex.Message}");
+                Trace.TraceError($"Error completing with error: {ex.Message}");
+            }
+        }
+
+        // Helper method to sanitize URLs for logging (to avoid exposing 
sensitive information)
+        private string SanitizeUrl(string url)
+        {
+            try
+            {
+                var uri = new Uri(url);
+                return 
$"{uri.Scheme}://{uri.Host}/{Path.GetFileName(uri.LocalPath)}";
+            }
+            catch
+            {
+                // If URL parsing fails, return a generic identifier
+                return "cloud-storage-url";
             }
         }
     }
diff --git a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs 
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
index 1e3861833..6a8d3d24b 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
@@ -36,7 +36,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
     {
         private readonly Schema schema;
         private readonly bool isLz4Compressed;
-        private readonly ICloudFetchDownloadManager downloadManager;
+        private ICloudFetchDownloadManager? downloadManager;
         private ArrowStreamReader? currentReader;
         private IDownloadResult? currentDownloadResult;
         private bool isPrefetchEnabled;
@@ -136,6 +136,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
                         this.currentDownloadResult = await 
this.downloadManager.GetNextDownloadedFileAsync(cancellationToken);
                         if (this.currentDownloadResult == null)
                         {
+                            this.downloadManager.Dispose();
+                            this.downloadManager = null;
                             // No more files
                             return null;
                         }
diff --git a/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs 
b/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
index 96b040274..a96b88cbf 100644
--- a/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
+++ b/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
@@ -90,6 +90,8 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
 
             Assert.True(totalRows >= rowCount);
 
+            Assert.Null(await result.Stream.ReadNextRecordBatchAsync());
+
             // Also log to the test output helper if available
             OutputHelper?.WriteLine($"Read {totalRows} rows from range 
function");
         }

Reply via email to