jadewang-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105442755
##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##########
@@ -341,6 +365,40 @@ private async Task DownloadFileAsync(IDownloadResult
downloadResult, Cancellatio
HttpCompletionOption.ResponseHeadersRead,
cancellationToken).ConfigureAwait(false);
+ // Check if the response indicates an expired URL
(typically 403 or 401)
+ if (response.StatusCode ==
System.Net.HttpStatusCode.Forbidden ||
+ response.StatusCode ==
System.Net.HttpStatusCode.Unauthorized)
+ {
+ // If we've already tried refreshing too many times,
fail
+ if (downloadResult.RefreshAttempts >=
_maxUrlRefreshAttempts)
+ {
+ throw new InvalidOperationException($"Failed to
download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
+ }
+
+ // Try to refresh the URL
+ var refreshedLink = await
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset,
cancellationToken);
+ if (refreshedLink != null)
+ {
+ // Update the download result with the refreshed
link
+
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+ url = refreshedLink.FileLink;
+ sanitizedUrl = SanitizeUrl(url);
+
+ Trace.TraceInformation($"URL for file at offset
{refreshedLink.StartRowOffset} was refreshed after expired URL response");
+
+ // Also refresh other potentially expired URLs
Review Comment:
this is not needed here.
##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##########
@@ -125,6 +139,111 @@ public async Task StopAsync()
}
}
+ /// <summary>
+ /// Gets a URL for the specified offset, fetching or refreshing as
needed.
+ /// </summary>
+ /// <param name="offset">The row offset for which to get a URL.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The URL link for the specified offset, or null if not
available.</returns>
+ public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset,
CancellationToken cancellationToken)
+ {
+ // Need to fetch or refresh the URL
+ await _fetchLock.WaitAsync(cancellationToken);
+ try
+ {
+ // Determine if we need to fetch new URLs or refresh existing
ones
+ if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
+ {
+ // This is a new offset we haven't seen before - fetch new
URLs
+ var links = await FetchUrlBatchAsync(offset, 100,
cancellationToken);
+ return links.FirstOrDefault(l => l.StartRowOffset ==
offset);
+ }
+ else
+ {
+ // We have the URL but it's expired - refresh it
+ return await RefreshUrlAsync(offset, cancellationToken);
+ }
+ }
+ finally
+ {
+ _fetchLock.Release();
+ }
+ }
+
+ /// <summary>
+ /// Checks if any URLs are expired or about to expire.
+ /// </summary>
+ /// <returns>True if any URLs are expired or about to expire, false
otherwise.</returns>
+ public bool HasExpiredOrExpiringSoonUrls()
+ {
+ return _urlsByOffset.Values.Any(IsUrlExpiredOrExpiringSoon);
+ }
+
+ /// <summary>
+ /// Proactively refreshes URLs that are expired or about to expire.
+ /// </summary>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ public async Task RefreshExpiredUrlsAsync(CancellationToken
cancellationToken)
Review Comment:
this can be removed
##########
csharp/src/Drivers/Databricks/CloudFetch/ICloudFetchInterfaces.cs:
##########
@@ -142,6 +160,26 @@ internal interface ICloudFetchResultFetcher
/// Gets the error encountered by the fetcher, if any.
/// </summary>
Exception? Error { get; }
+
+ /// <summary>
+ /// Gets a URL for the specified offset, fetching or refreshing as
needed.
+ /// </summary>
+ /// <param name="offset">The row offset for which to get a URL.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The URL link for the specified offset, or null if not
available.</returns>
+ Task<TSparkArrowResultLink?> GetUrlAsync(long offset,
CancellationToken cancellationToken);
+
+ /// <summary>
Review Comment:
not needed
##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##########
@@ -125,6 +139,111 @@ public async Task StopAsync()
}
}
+ /// <summary>
+ /// Gets a URL for the specified offset, fetching or refreshing as
needed.
+ /// </summary>
+ /// <param name="offset">The row offset for which to get a URL.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The URL link for the specified offset, or null if not
available.</returns>
+ public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset,
CancellationToken cancellationToken)
+ {
+ // Need to fetch or refresh the URL
+ await _fetchLock.WaitAsync(cancellationToken);
+ try
+ {
+ // Determine if we need to fetch new URLs or refresh existing
ones
+ if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
Review Comment:
let's remove this
##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##########
@@ -125,6 +139,111 @@ public async Task StopAsync()
}
}
+ /// <summary>
+ /// Gets a URL for the specified offset, fetching or refreshing as
needed.
+ /// </summary>
+ /// <param name="offset">The row offset for which to get a URL.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>The URL link for the specified offset, or null if not
available.</returns>
+ public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset,
CancellationToken cancellationToken)
+ {
+ // Need to fetch or refresh the URL
+ await _fetchLock.WaitAsync(cancellationToken);
+ try
+ {
+ // Determine if we need to fetch new URLs or refresh existing
ones
+ if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
+ {
+ // This is a new offset we haven't seen before - fetch new
URLs
+ var links = await FetchUrlBatchAsync(offset, 100,
cancellationToken);
+ return links.FirstOrDefault(l => l.StartRowOffset ==
offset);
+ }
+ else
+ {
+ // We have the URL but it's expired - refresh it
+ return await RefreshUrlAsync(offset, cancellationToken);
Review Comment:
let's reuse FetchResultsAsync method and add offset to the parameter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]