jadewang-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105146012


##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##########
@@ -237,6 +248,25 @@ private async Task DownloadFilesAsync(CancellationToken 
cancellationToken)
                         break;
                     }
 
+                    // Check if there are any expired URLs that need refreshing

Review Comment:
   do we really need this?



##########
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##########
@@ -468,6 +471,27 @@ protected internal override Task<TRowSet> 
GetRowSetAsync(TGetPrimaryKeysResp res
             return base.GetAuthenticationHeaderValue(authType);
         }
 
+        /// <summary>
+        /// Gets a fresh Thrift client for fetching results.
+        /// This helps avoid "stream already consumed" errors when a Thrift 
transport has already been used.
+        /// </summary>
+        /// <returns>A fresh Thrift client instance.</returns>
+        internal TCLIService.IAsync GetFreshClient()

Review Comment:
   this should be removed



##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchUrlManager.cs:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
+{
+    /// <summary>
+    /// Abstraction for time operations to enable testing with controlled time.
+    /// </summary>
+    public interface IClock
+    {
+        /// <summary>
+        /// Gets the current UTC time.
+        /// </summary>
+        DateTime UtcNow { get; }
+    }
+
+    /// <summary>
+    /// Default implementation that uses system time.
+    /// </summary>
+    internal class SystemClock : IClock
+    {
+        public DateTime UtcNow => DateTime.UtcNow;
+    }
+
+    /// <summary>
+    /// Test implementation that allows controlling time for testing scenarios.
+    /// </summary>
+    public class ControllableClock : IClock
+    {
+        private DateTime _currentTime;
+
+        public ControllableClock(DateTime? initialTime = null)
+        {
+            _currentTime = initialTime ?? DateTime.UtcNow;
+        }
+
+        public DateTime UtcNow => _currentTime;
+
+        /// <summary>
+        /// Advances the clock by the specified time span.
+        /// </summary>
+        /// <param name="timeSpan">The amount of time to advance.</param>
+        public void AdvanceTime(TimeSpan timeSpan)
+        {
+            _currentTime = _currentTime.Add(timeSpan);
+        }
+
+        /// <summary>
+        /// Sets the clock to a specific time.
+        /// </summary>
+        /// <param name="time">The time to set.</param>
+        public void SetTime(DateTime time)
+        {
+            _currentTime = time;
+        }
+
+        /// <summary>
+        /// Resets the clock to the current system time.
+        /// </summary>
+        public void Reset()
+        {
+            _currentTime = DateTime.UtcNow;
+        }
+    }
+
+    /// <summary>
+    /// Manages CloudFetch URLs, handling both initial fetching and refreshing 
of expired URLs.
+    /// </summary>
+    internal class CloudFetchUrlManager
+    {
+        private readonly IHiveServer2Statement _statement;
+        private readonly SemaphoreSlim _fetchLock = new SemaphoreSlim(1, 1);
+        private readonly ConcurrentDictionary<long, TSparkArrowResultLink> 
_urlsByOffset = new ConcurrentDictionary<long, TSparkArrowResultLink>();
+        private readonly int _expirationBufferSeconds;
+        private readonly IClock _clock;
+        private long _lastFetchedOffset = 0;
+        private bool _hasMoreResults = true;
+
+        /// <summary>
+        /// Initializes a new instance of the <see 
cref="CloudFetchUrlManager"/> class.
+        /// </summary>
+        /// <param name="statement">The HiveServer2 statement to use for 
fetching URLs.</param>
+        /// <param name="expirationBufferSeconds">Buffer time in seconds 
before URL expiration to trigger refresh.</param>
+        /// <param name="clock">Clock implementation for time operations. If 
null, uses system clock.</param>
+        public CloudFetchUrlManager(IHiveServer2Statement statement, int 
expirationBufferSeconds = 60, IClock? clock = null)
+        {
+            _statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
+            _expirationBufferSeconds = expirationBufferSeconds;
+            _clock = clock ?? new SystemClock();
+        }
+
+        /// <summary>
+        /// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+        /// </summary>
+        /// <param name="offset">The row offset for which to get a URL.</param>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>The URL link for the specified offset, or null if not 
available.</returns>
+        public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset, 
CancellationToken cancellationToken)
+        {
+            // Check if we already have a valid URL for this offset
+            if (_urlsByOffset.TryGetValue(offset, out var link) && 
!IsUrlExpiredOrExpiringSoon(link))
+            {
+                return link;
+            }
+
+            // Need to fetch or refresh the URL
+            await _fetchLock.WaitAsync(cancellationToken);
+            try
+            {
+                // Check again in case another thread already 
fetched/refreshed while we were waiting
+                if (_urlsByOffset.TryGetValue(offset, out link) && 
!IsUrlExpiredOrExpiringSoon(link))
+                {
+                    return link;
+                }
+
+                // Determine if we need to fetch new URLs or refresh existing 
ones
+                if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
+                {
+                    // This is a new offset we haven't seen before - fetch new 
URLs
+                    return await FetchNewUrlsAsync(offset, cancellationToken);
+                }
+                else
+                {
+                    // We have the URL but it's expired - refresh it
+                    return await RefreshUrlAsync(offset, cancellationToken);
+                }
+            }
+            finally
+            {
+                _fetchLock.Release();
+            }
+        }
+
+        /// <summary>
+        /// Gets URLs for a range of offsets, fetching or refreshing as needed.
+        /// </summary>
+        /// <param name="startOffset">The starting row offset.</param>
+        /// <param name="count">The number of URLs to get.</param>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>A dictionary mapping offsets to their URL links.</returns>
+        public async Task<Dictionary<long, TSparkArrowResultLink>> 
GetUrlRangeAsync(

Review Comment:
   can we move this logic into the cloudfetch result fetcher class and make 
some refactor there, since thrift only allow one fetch at a time, we can use 
this to reset the fetch position and let the continue fetch follow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to