CurtHagenlocher commented on code in PR #2634:
URL: https://github.com/apache/arrow-adbc/pull/2634#discussion_r2021626037


##########
csharp/src/Drivers/Apache/Spark/CloudFetch/SparkCloudFetchReader.cs:
##########
@@ -0,0 +1,269 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch
+{
+    /// <summary>
+    /// Reader for CloudFetch results from Databricks Spark Thrift server.
+    /// Handles downloading and processing URL-based result sets.
+    /// </summary>
+    internal sealed class SparkCloudFetchReader : IArrowArrayStream
+    {
+        private const int MaxRetries = 3;
+        private const int RetryDelayMs = 500;
+
+        private HiveServer2Statement? statement;
+        private readonly Schema schema;
+        private List<TSparkArrowResultLink>? resultLinks;
+        private int linkIndex;
+        private ArrowStreamReader? currentReader;
+        private readonly bool isLz4Compressed;
+        private long startOffset;
+
+        // Lazy initialization of HttpClient
+        private readonly Lazy<HttpClient> _httpClient = new 
Lazy<HttpClient>(() =>
+        {
+            var client = new HttpClient();
+            client.Timeout = TimeSpan.FromMinutes(5); // Set a reasonable 
timeout for large downloads
+            return client;
+        });
+
+        /// <summary>
+        /// Initializes a new instance of the <see 
cref="SparkCloudFetchReader"/> class.
+        /// </summary>
+        /// <param name="statement">The HiveServer2 statement.</param>
+        /// <param name="schema">The Arrow schema.</param>
+        /// <param name="isLz4Compressed">Whether the results are LZ4 
compressed.</param>
+        public SparkCloudFetchReader(HiveServer2Statement statement, Schema 
schema, bool isLz4Compressed)
+        {
+            this.statement = statement;
+            this.schema = schema;
+            this.isLz4Compressed = isLz4Compressed;
+        }
+
+        /// <summary>
+        /// Gets the Arrow schema.
+        /// </summary>
+        public Schema Schema { get { return schema; } }
+
+        private HttpClient HttpClient
+        {
+            get { return _httpClient.Value; }
+        }
+
+        /// <summary>
+        /// Reads the next record batch from the result set.
+        /// </summary>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>The next record batch, or null if there are no more 
batches.</returns>
+        public async ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+        {
+            while (true)
+            {
+                // If we have a current reader, try to read the next batch
+                if (this.currentReader != null)
+                {
+                    RecordBatch? next = await 
this.currentReader.ReadNextRecordBatchAsync(cancellationToken);
+                    if (next != null)
+                    {
+                        return next;
+                    }
+                    else
+                    {
+                        this.currentReader.Dispose();
+                        this.currentReader = null;
+                    }
+                }
+
+                // If we have more links to process, download and process the 
next one
+                if (this.resultLinks != null && this.linkIndex < 
this.resultLinks.Count)
+                {
+                    var link = this.resultLinks[this.linkIndex++];
+                    byte[]? fileData = null;
+
+                    // Retry logic for downloading files
+                    for (int retry = 0; retry < MaxRetries; retry++)
+                    {
+                        try
+                        {
+                            fileData = await DownloadFileAsync(link.FileLink, 
cancellationToken);
+                            break; // Success, exit retry loop
+                        }
+                        catch (Exception ex) when (retry < MaxRetries - 1)
+                        {
+                            // Log the error and retry
+                            Console.WriteLine($"Error downloading file 
(attempt {retry + 1}/{MaxRetries}): {ex.Message}");
+                            await Task.Delay(RetryDelayMs * (retry + 1), 
cancellationToken);
+                        }
+                    }
+
+                    if (fileData == null)
+                    {
+                        // All retries failed, continue to the next link or 
fetch more links
+                        continue;
+                    }
+
+                    // Process the downloaded file data
+                    MemoryStream dataStream;
+
+                    // If the data is LZ4 compressed, decompress it
+                    if (this.isLz4Compressed)

Review Comment:
   I don't, no. I can try to figure it out later; this doesn't need to be 
blocking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to