CurtHagenlocher commented on code in PR #2664:
URL: https://github.com/apache/arrow-adbc/pull/2664#discussion_r2027453468


##########
csharp/src/Drivers/Apache/Spark/SparkParameters.cs:
##########
@@ -52,6 +52,18 @@ public static class SparkParameters
         /// Default value is 5 minutes if not specified.
         /// </summary>
         public const string CloudFetchTimeoutMinutes = 
"adbc.spark.cloudfetch.timeout_minutes";
+
+        /// <summary>
+        /// Controls whether to retry requests that receive a 503 response 
with a Retry-After header.
+        /// Default value is 1 (enabled). Set to 0 to disable retry behavior.

Review Comment:
   boolean options in ADBC typically use `true` and `false` instead of `0` and 
`1`.



##########
csharp/src/Drivers/Apache/Spark/RetryHttpHandler.cs:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using System.IO;
+using System.Text;
+using Thrift;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    /// <summary>
+    /// HTTP handler that implements retry behavior for 503 responses with 
Retry-After headers.
+    /// </summary>
+    internal class RetryHttpHandler : DelegatingHandler
+    {
+        private readonly bool _retryEnabled;
+        private readonly int _retryTimeoutSeconds;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="RetryHttpHandler"/> 
class.
+        /// </summary>
+        /// <param name="innerHandler">The inner handler to delegate 
to.</param>
+        /// <param name="retryEnabled">Whether retry behavior is 
enabled.</param>
+        /// <param name="retryTimeoutSeconds">Maximum total time in seconds to 
retry before failing.</param>
+        public RetryHttpHandler(HttpMessageHandler innerHandler, bool 
retryEnabled, int retryTimeoutSeconds)
+            : base(innerHandler)
+        {
+            _retryEnabled = retryEnabled;
+            _retryTimeoutSeconds = retryTimeoutSeconds;
+        }
+
+        /// <summary>
+        /// Sends an HTTP request to the inner handler with retry logic for 
503 responses.
+        /// </summary>
+        protected override async Task<HttpResponseMessage> SendAsync(
+            HttpRequestMessage request,
+            CancellationToken cancellationToken)
+        {
+            // If retry is disabled, just pass through to the inner handler
+            if (!_retryEnabled)
+            {
+                return await base.SendAsync(request, cancellationToken);
+            }
+
+            // Clone the request content if it's not null so we can reuse it 
for retries
+            var requestContentClone = request.Content != null
+                ? await CloneHttpContentAsync(request.Content)
+                : null;
+
+            HttpResponseMessage response;
+            string? lastErrorMessage = null;
+            DateTime startTime = DateTime.UtcNow;
+            int totalRetrySeconds = 0;
+
+            do
+            {
+                // Set the content for each attempt (if needed)
+                if (requestContentClone != null && request.Content == null)
+                {
+                    request.Content = await 
CloneHttpContentAsync(requestContentClone);
+                }
+
+                response = await base.SendAsync(request, cancellationToken);
+
+                // If it's not a 503 response, return immediately
+                if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
+                {
+                    return response;
+                }
+
+                // Check for Retry-After header
+                if (!response.Headers.TryGetValues("Retry-After", out var 
retryAfterValues))
+                {
+                    // No Retry-After header, so return the response as is
+                    return response;
+                }
+
+                // Parse the Retry-After value
+                string retryAfterValue = string.Join(",", retryAfterValues);
+                if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) 
|| retryAfterSeconds <= 0)
+                {
+                    // Invalid Retry-After value, return the response as is
+                    return response;
+                }
+
+                // Extract error message from response if possible
+                try
+                {
+                    lastErrorMessage = await 
ExtractErrorMessageAsync(response);
+                }
+                catch
+                {
+                    // If we can't extract the error message, just use a 
generic one
+                    lastErrorMessage = $"Service temporarily unavailable (HTTP 
503). Retry after {retryAfterSeconds} seconds.";
+                }
+
+                // Check if we've exceeded the timeout
+                totalRetrySeconds += retryAfterSeconds;
+                if (_retryTimeoutSeconds > 0 && totalRetrySeconds > 
_retryTimeoutSeconds)
+                {
+                    // We've exceeded the timeout, so break out of the loop
+                    break;
+                }
+
+                // Dispose the response before retrying
+                response.Dispose();
+
+                // Wait for the specified retry time
+                await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), 
cancellationToken);
+
+                // Reset the request content for the next attempt
+                request.Content = null;
+
+            } while (!cancellationToken.IsCancellationRequested);
+
+            // If we get here, we've either exceeded the timeout or been 
cancelled
+            if (cancellationToken.IsCancellationRequested)
+            {
+                throw new OperationCanceledException("Request cancelled during 
retry wait", cancellationToken);
+            }
+
+            // Create a custom exception with the SQL state code and last 
error message
+            var exception = new AdbcException(
+                lastErrorMessage ?? "Service temporarily unavailable and retry 
timeout exceeded",
+                AdbcStatusCode.IOError);
+
+            // Add SQL state as part of the message since we can't set it 
directly
+            throw new AdbcException(
+                $"[SQLState: 08001] {exception.Message}",
+                AdbcStatusCode.IOError);
+        }
+
+        /// <summary>
+        /// Clones an HttpContent object so it can be reused for retries.
+        /// </summary>
+        private static async Task<HttpContent> 
CloneHttpContentAsync(HttpContent content)
+        {
+            var ms = new MemoryStream();
+            await content.CopyToAsync(ms);
+            ms.Position = 0;
+
+            var clone = new StreamContent(ms);
+            if (content.Headers != null)
+            {
+                foreach (var header in content.Headers)
+                {
+                    clone.Headers.Add(header.Key, header.Value);
+                }
+            }
+            return clone;
+        }
+
+        /// <summary>
+        /// Attempts to extract the error message from a Thrift 
TApplicationException in the response body.
+        /// </summary>
+        private static async Task<string?> 
ExtractErrorMessageAsync(HttpResponseMessage response)
+        {
+            if (response.Content == null)
+            {
+                return null;
+            }
+
+            // Check if the content type is application/x-thrift
+            if (response.Content.Headers.ContentType?.MediaType != 
"application/x-thrift")
+            {
+                // If it's not Thrift, just return the content as a string
+                return await response.Content.ReadAsStringAsync();
+            }
+
+            try
+            {
+                // For Thrift content, just return a generic message
+                // We can't easily parse the Thrift message without access to 
the specific methods

Review Comment:
   I'm having trouble understanding the goal of this logic. We always want to 
try to read the message as a string, but only swallow exceptions if the content 
type was "Thrift"?



##########
csharp/src/Drivers/Apache/Spark/RetryHttpHandler.cs:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using System.IO;
+using System.Text;
+using Thrift;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    /// <summary>
+    /// HTTP handler that implements retry behavior for 503 responses with 
Retry-After headers.
+    /// </summary>
+    internal class RetryHttpHandler : DelegatingHandler
+    {
+        private readonly bool _retryEnabled;
+        private readonly int _retryTimeoutSeconds;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="RetryHttpHandler"/> 
class.
+        /// </summary>
+        /// <param name="innerHandler">The inner handler to delegate 
to.</param>
+        /// <param name="retryEnabled">Whether retry behavior is 
enabled.</param>
+        /// <param name="retryTimeoutSeconds">Maximum total time in seconds to 
retry before failing.</param>
+        public RetryHttpHandler(HttpMessageHandler innerHandler, bool 
retryEnabled, int retryTimeoutSeconds)
+            : base(innerHandler)
+        {
+            _retryEnabled = retryEnabled;
+            _retryTimeoutSeconds = retryTimeoutSeconds;
+        }
+
+        /// <summary>
+        /// Sends an HTTP request to the inner handler with retry logic for 
503 responses.
+        /// </summary>
+        protected override async Task<HttpResponseMessage> SendAsync(
+            HttpRequestMessage request,
+            CancellationToken cancellationToken)
+        {
+            // If retry is disabled, just pass through to the inner handler
+            if (!_retryEnabled)
+            {
+                return await base.SendAsync(request, cancellationToken);
+            }
+
+            // Clone the request content if it's not null so we can reuse it 
for retries
+            var requestContentClone = request.Content != null
+                ? await CloneHttpContentAsync(request.Content)
+                : null;
+
+            HttpResponseMessage response;
+            string? lastErrorMessage = null;
+            DateTime startTime = DateTime.UtcNow;
+            int totalRetrySeconds = 0;
+
+            do
+            {
+                // Set the content for each attempt (if needed)
+                if (requestContentClone != null && request.Content == null)
+                {
+                    request.Content = await 
CloneHttpContentAsync(requestContentClone);
+                }
+
+                response = await base.SendAsync(request, cancellationToken);
+
+                // If it's not a 503 response, return immediately
+                if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
+                {
+                    return response;
+                }
+
+                // Check for Retry-After header
+                if (!response.Headers.TryGetValues("Retry-After", out var 
retryAfterValues))
+                {
+                    // No Retry-After header, so return the response as is
+                    return response;
+                }
+
+                // Parse the Retry-After value
+                string retryAfterValue = string.Join(",", retryAfterValues);
+                if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) 
|| retryAfterSeconds <= 0)
+                {
+                    // Invalid Retry-After value, return the response as is
+                    return response;
+                }
+
+                // Extract error message from response if possible
+                try
+                {
+                    lastErrorMessage = await 
ExtractErrorMessageAsync(response);
+                }
+                catch
+                {
+                    // If we can't extract the error message, just use a 
generic one
+                    lastErrorMessage = $"Service temporarily unavailable (HTTP 
503). Retry after {retryAfterSeconds} seconds.";
+                }
+
+                // Check if we've exceeded the timeout
+                totalRetrySeconds += retryAfterSeconds;
+                if (_retryTimeoutSeconds > 0 && totalRetrySeconds > 
_retryTimeoutSeconds)
+                {
+                    // We've exceeded the timeout, so break out of the loop
+                    break;
+                }
+
+                // Dispose the response before retrying
+                response.Dispose();
+
+                // Wait for the specified retry time
+                await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), 
cancellationToken);
+
+                // Reset the request content for the next attempt
+                request.Content = null;
+
+            } while (!cancellationToken.IsCancellationRequested);
+
+            // If we get here, we've either exceeded the timeout or been 
cancelled
+            if (cancellationToken.IsCancellationRequested)
+            {
+                throw new OperationCanceledException("Request cancelled during 
retry wait", cancellationToken);
+            }
+
+            // Create a custom exception with the SQL state code and last 
error message
+            var exception = new AdbcException(
+                lastErrorMessage ?? "Service temporarily unavailable and retry 
timeout exceeded",
+                AdbcStatusCode.IOError);
+
+            // Add SQL state as part of the message since we can't set it 
directly
+            throw new AdbcException(
+                $"[SQLState: 08001] {exception.Message}",
+                AdbcStatusCode.IOError);
+        }
+
+        /// <summary>
+        /// Clones an HttpContent object so it can be reused for retries.
+        /// </summary>
+        private static async Task<HttpContent> 
CloneHttpContentAsync(HttpContent content)
+        {
+            var ms = new MemoryStream();
+            await content.CopyToAsync(ms);
+            ms.Position = 0;
+
+            var clone = new StreamContent(ms);
+            if (content.Headers != null)
+            {
+                foreach (var header in content.Headers)
+                {
+                    clone.Headers.Add(header.Key, header.Value);
+                }
+            }
+            return clone;
+        }
+
+        /// <summary>
+        /// Attempts to extract the error message from a Thrift 
TApplicationException in the response body.
+        /// </summary>
+        private static async Task<string?> 
ExtractErrorMessageAsync(HttpResponseMessage response)
+        {
+            if (response.Content == null)
+            {
+                return null;
+            }
+
+            // Check if the content type is application/x-thrift
+            if (response.Content.Headers.ContentType?.MediaType != 
"application/x-thrift")
+            {
+                // If it's not Thrift, just return the content as a string
+                return await response.Content.ReadAsStringAsync();
+            }
+
+            try
+            {
+                // For Thrift content, just return a generic message
+                // We can't easily parse the Thrift message without access to 
the specific methods
+                return await response.Content.ReadAsStringAsync();
+            }
+            catch
+            {
+                // If we can't read the content, return null
+                return null;
+            }
+        }
+    }
+}

Review Comment:
   Please end the file with a newline.



##########
csharp/src/Drivers/Apache/Spark/RetryHttpHandler.cs:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using System.IO;
+using System.Text;
+using Thrift;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    /// <summary>
+    /// HTTP handler that implements retry behavior for 503 responses with 
Retry-After headers.
+    /// </summary>
+    internal class RetryHttpHandler : DelegatingHandler
+    {
+        private readonly bool _retryEnabled;
+        private readonly int _retryTimeoutSeconds;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="RetryHttpHandler"/> 
class.
+        /// </summary>
+        /// <param name="innerHandler">The inner handler to delegate 
to.</param>
+        /// <param name="retryEnabled">Whether retry behavior is 
enabled.</param>
+        /// <param name="retryTimeoutSeconds">Maximum total time in seconds to 
retry before failing.</param>
+        public RetryHttpHandler(HttpMessageHandler innerHandler, bool 
retryEnabled, int retryTimeoutSeconds)
+            : base(innerHandler)
+        {
+            _retryEnabled = retryEnabled;
+            _retryTimeoutSeconds = retryTimeoutSeconds;
+        }
+
+        /// <summary>
+        /// Sends an HTTP request to the inner handler with retry logic for 
503 responses.
+        /// </summary>
+        protected override async Task<HttpResponseMessage> SendAsync(
+            HttpRequestMessage request,
+            CancellationToken cancellationToken)
+        {
+            // If retry is disabled, just pass through to the inner handler
+            if (!_retryEnabled)
+            {
+                return await base.SendAsync(request, cancellationToken);
+            }
+
+            // Clone the request content if it's not null so we can reuse it 
for retries
+            var requestContentClone = request.Content != null
+                ? await CloneHttpContentAsync(request.Content)
+                : null;
+
+            HttpResponseMessage response;
+            string? lastErrorMessage = null;
+            DateTime startTime = DateTime.UtcNow;
+            int totalRetrySeconds = 0;
+
+            do
+            {
+                // Set the content for each attempt (if needed)
+                if (requestContentClone != null && request.Content == null)
+                {
+                    request.Content = await 
CloneHttpContentAsync(requestContentClone);
+                }
+
+                response = await base.SendAsync(request, cancellationToken);
+
+                // If it's not a 503 response, return immediately
+                if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
+                {
+                    return response;
+                }
+
+                // Check for Retry-After header
+                if (!response.Headers.TryGetValues("Retry-After", out var 
retryAfterValues))
+                {
+                    // No Retry-After header, so return the response as is
+                    return response;
+                }
+
+                // Parse the Retry-After value
+                string retryAfterValue = string.Join(",", retryAfterValues);
+                if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) 
|| retryAfterSeconds <= 0)
+                {
+                    // Invalid Retry-After value, return the response as is
+                    return response;
+                }
+
+                // Extract error message from response if possible
+                try
+                {
+                    lastErrorMessage = await 
ExtractErrorMessageAsync(response);
+                }
+                catch
+                {
+                    // If we can't extract the error message, just use a 
generic one
+                    lastErrorMessage = $"Service temporarily unavailable (HTTP 
503). Retry after {retryAfterSeconds} seconds.";
+                }
+
+                // Check if we've exceeded the timeout
+                totalRetrySeconds += retryAfterSeconds;
+                if (_retryTimeoutSeconds > 0 && totalRetrySeconds > 
_retryTimeoutSeconds)
+                {
+                    // We've exceeded the timeout, so break out of the loop
+                    break;
+                }
+
+                // Dispose the response before retrying
+                response.Dispose();
+
+                // Wait for the specified retry time
+                await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), 
cancellationToken);
+
+                // Reset the request content for the next attempt
+                request.Content = null;
+
+            } while (!cancellationToken.IsCancellationRequested);
+
+            // If we get here, we've either exceeded the timeout or been 
cancelled
+            if (cancellationToken.IsCancellationRequested)
+            {
+                throw new OperationCanceledException("Request cancelled during 
retry wait", cancellationToken);
+            }
+
+            // Create a custom exception with the SQL state code and last 
error message
+            var exception = new AdbcException(
+                lastErrorMessage ?? "Service temporarily unavailable and retry 
timeout exceeded",
+                AdbcStatusCode.IOError);
+
+            // Add SQL state as part of the message since we can't set it 
directly

Review Comment:
   The original intent was that someone would derive a class from 
`AdbcException` in order to supply a SQLState. In hindsight, maybe that isn't 
the best idea and we should let the state be set on `AdbcException`. But this 
could also throw a `HiveServer2Exception` and that would allow setting the 
SQLState.



##########
csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs:
##########
@@ -162,7 +199,14 @@ protected override TTransport CreateTransport()
             AuthenticationHeaderValue? authenticationHeaderValue = 
GetAuthenticationHeaderValue(authTypeValue, token, username, password, 
access_token);
 
             HttpClientHandler httpClientHandler = 
HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
-            HttpClient httpClient = new(httpClientHandler);
+
+            // Create a RetryHttpHandler that wraps the HttpClientHandler to 
handle 503 responses
+            var retryHandler = new RetryHttpHandler(
+                httpClientHandler,
+                TemporarilyUnavailableRetry,
+                TemporarilyUnavailableRetryTimeout);
+

Review Comment:
   Consider
   ```
   HttpMessageHandler httpClientHandler = 
HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
   if (TemporarilyUnavailableRetry)
   {
       httpClientHandler = new RetryHttpHandler(httpClientHandler, 
TemporarilyUnavailableRetryTimeout);
   }
   ```



##########
csharp/src/Drivers/Apache/Spark/RetryHttpHandler.cs:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using System.IO;
+using System.Text;
+using Thrift;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    /// <summary>
+    /// HTTP handler that implements retry behavior for 503 responses with 
Retry-After headers.
+    /// </summary>
+    internal class RetryHttpHandler : DelegatingHandler
+    {
+        private readonly bool _retryEnabled;
+        private readonly int _retryTimeoutSeconds;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="RetryHttpHandler"/> 
class.
+        /// </summary>
+        /// <param name="innerHandler">The inner handler to delegate 
to.</param>
+        /// <param name="retryEnabled">Whether retry behavior is 
enabled.</param>
+        /// <param name="retryTimeoutSeconds">Maximum total time in seconds to 
retry before failing.</param>
+        public RetryHttpHandler(HttpMessageHandler innerHandler, bool 
retryEnabled, int retryTimeoutSeconds)
+            : base(innerHandler)
+        {
+            _retryEnabled = retryEnabled;
+            _retryTimeoutSeconds = retryTimeoutSeconds;
+        }
+
+        /// <summary>
+        /// Sends an HTTP request to the inner handler with retry logic for 
503 responses.
+        /// </summary>
+        protected override async Task<HttpResponseMessage> SendAsync(
+            HttpRequestMessage request,
+            CancellationToken cancellationToken)
+        {
+            // If retry is disabled, just pass through to the inner handler
+            if (!_retryEnabled)
+            {
+                return await base.SendAsync(request, cancellationToken);
+            }
+
+            // Clone the request content if it's not null so we can reuse it 
for retries
+            var requestContentClone = request.Content != null
+                ? await CloneHttpContentAsync(request.Content)
+                : null;
+
+            HttpResponseMessage response;
+            string? lastErrorMessage = null;
+            DateTime startTime = DateTime.UtcNow;
+            int totalRetrySeconds = 0;
+
+            do
+            {
+                // Set the content for each attempt (if needed)
+                if (requestContentClone != null && request.Content == null)
+                {
+                    request.Content = await 
CloneHttpContentAsync(requestContentClone);
+                }
+
+                response = await base.SendAsync(request, cancellationToken);
+
+                // If it's not a 503 response, return immediately
+                if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
+                {
+                    return response;
+                }
+
+                // Check for Retry-After header
+                if (!response.Headers.TryGetValues("Retry-After", out var 
retryAfterValues))
+                {
+                    // No Retry-After header, so return the response as is
+                    return response;
+                }
+
+                // Parse the Retry-After value
+                string retryAfterValue = string.Join(",", retryAfterValues);
+                if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) 
|| retryAfterSeconds <= 0)
+                {
+                    // Invalid Retry-After value, return the response as is
+                    return response;
+                }
+
+                // Extract error message from response if possible
+                try
+                {
+                    lastErrorMessage = await 
ExtractErrorMessageAsync(response);
+                }
+                catch
+                {
+                    // If we can't extract the error message, just use a 
generic one
+                    lastErrorMessage = $"Service temporarily unavailable (HTTP 
503). Retry after {retryAfterSeconds} seconds.";
+                }
+
+                // Check if we've exceeded the timeout
+                totalRetrySeconds += retryAfterSeconds;
+                if (_retryTimeoutSeconds > 0 && totalRetrySeconds > 
_retryTimeoutSeconds)
+                {
+                    // We've exceeded the timeout, so break out of the loop
+                    break;

Review Comment:
   The response doesn't get disposed on this code path.



##########
csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs:
##########
@@ -136,6 +146,33 @@ protected override void ValidateOptions()
                     ? connectTimeoutMsValue
                     : throw new 
ArgumentOutOfRangeException(SparkParameters.ConnectTimeoutMilliseconds, 
connectTimeoutMs, $"must be a value of 0 (infinite) or between 1 .. 
{int.MaxValue}. default is 30000 milliseconds.");
             }
+
+            // Parse retry configuration parameters
+            
Properties.TryGetValue(SparkParameters.TemporarilyUnavailableRetry, out string? 
tempUnavailableRetryStr);

Review Comment:
   Best practice to check the return value of `TryGetValue` and not assume that 
the output is set to a particular value when it returns false.



##########
csharp/src/Drivers/Apache/Spark/RetryHttpHandler.cs:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using System.IO;
+using System.Text;
+using Thrift;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    /// <summary>
+    /// HTTP handler that implements retry behavior for 503 responses with 
Retry-After headers.
+    /// </summary>
+    internal class RetryHttpHandler : DelegatingHandler
+    {
+        private readonly bool _retryEnabled;
+        private readonly int _retryTimeoutSeconds;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="RetryHttpHandler"/> 
class.
+        /// </summary>
+        /// <param name="innerHandler">The inner handler to delegate 
to.</param>
+        /// <param name="retryEnabled">Whether retry behavior is 
enabled.</param>
+        /// <param name="retryTimeoutSeconds">Maximum total time in seconds to 
retry before failing.</param>
+        public RetryHttpHandler(HttpMessageHandler innerHandler, bool 
retryEnabled, int retryTimeoutSeconds)
+            : base(innerHandler)
+        {
+            _retryEnabled = retryEnabled;
+            _retryTimeoutSeconds = retryTimeoutSeconds;
+        }
+
+        /// <summary>
+        /// Sends an HTTP request to the inner handler with retry logic for 
503 responses.
+        /// </summary>
+        protected override async Task<HttpResponseMessage> SendAsync(
+            HttpRequestMessage request,
+            CancellationToken cancellationToken)
+        {
+            // If retry is disabled, just pass through to the inner handler
+            if (!_retryEnabled)
+            {
+                return await base.SendAsync(request, cancellationToken);
+            }
+
+            // Clone the request content if it's not null so we can reuse it 
for retries
+            var requestContentClone = request.Content != null
+                ? await CloneHttpContentAsync(request.Content)
+                : null;
+
+            HttpResponseMessage response;
+            string? lastErrorMessage = null;
+            DateTime startTime = DateTime.UtcNow;
+            int totalRetrySeconds = 0;
+
+            do
+            {
+                // Set the content for each attempt (if needed)
+                if (requestContentClone != null && request.Content == null)
+                {
+                    request.Content = await 
CloneHttpContentAsync(requestContentClone);
+                }
+
+                response = await base.SendAsync(request, cancellationToken);
+
+                // If it's not a 503 response, return immediately
+                if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
+                {
+                    return response;
+                }
+
+                // Check for Retry-After header
+                if (!response.Headers.TryGetValues("Retry-After", out var 
retryAfterValues))
+                {
+                    // No Retry-After header, so return the response as is
+                    return response;
+                }
+
+                // Parse the Retry-After value
+                string retryAfterValue = string.Join(",", retryAfterValues);
+                if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) 
|| retryAfterSeconds <= 0)
+                {
+                    // Invalid Retry-After value, return the response as is
+                    return response;
+                }
+
+                // Extract error message from response if possible
+                try
+                {
+                    lastErrorMessage = await 
ExtractErrorMessageAsync(response);
+                }
+                catch
+                {
+                    // If we can't extract the error message, just use a 
generic one
+                    lastErrorMessage = $"Service temporarily unavailable (HTTP 
503). Retry after {retryAfterSeconds} seconds.";
+                }
+
+                // Check if we've exceeded the timeout
+                totalRetrySeconds += retryAfterSeconds;
+                if (_retryTimeoutSeconds > 0 && totalRetrySeconds > 
_retryTimeoutSeconds)
+                {
+                    // We've exceeded the timeout, so break out of the loop
+                    break;
+                }
+
+                // Dispose the response before retrying
+                response.Dispose();
+
+                // Wait for the specified retry time
+                await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), 
cancellationToken);
+
+                // Reset the request content for the next attempt
+                request.Content = null;

Review Comment:
   Out of curiosity, why is the content being reset each time? Is it in case 
the content was set to a stream that's now at its end?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to