This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e6c78e  Fix not retrying connection errors (#445)
1e6c78e is described below

commit 1e6c78ed08c04a84e4e508fdd0aa94f4936b917d
Author: John Garland <john...@users.noreply.github.com>
AuthorDate: Wed Jul 30 04:12:17 2025 +1000

    Fix not retrying connection errors (#445)
    
    * Add (failing) test for retrying connection errors
    
    * Fix not retrying connection errors
    
    closes #368
    
    * Fix clippy error
    
    ---------
    
    Co-authored-by: John Garland <john.garl...@vivcourt.com>
---
 src/client/http/connection.rs | 36 ++++++++++++------------
 src/client/retry.rs           | 64 +++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 80 insertions(+), 20 deletions(-)

diff --git a/src/client/http/connection.rs b/src/client/http/connection.rs
index 4d833e6..48b7549 100644
--- a/src/client/http/connection.rs
+++ b/src/client/http/connection.rs
@@ -101,27 +101,29 @@ impl HttpError {
 
         // Reqwest error variants aren't great, attempt to refine them
         let mut source = e.source();
-        while let Some(e) = source {
-            if let Some(e) = e.downcast_ref::<hyper::Error>() {
-                if e.is_closed() || e.is_incomplete_message() || 
e.is_body_write_aborted() {
-                    kind = HttpErrorKind::Request;
-                } else if e.is_timeout() {
-                    kind = HttpErrorKind::Timeout;
+        while kind == HttpErrorKind::Unknown {
+            if let Some(e) = source {
+                if let Some(e) = e.downcast_ref::<hyper::Error>() {
+                    if e.is_closed() || e.is_incomplete_message() || 
e.is_body_write_aborted() {
+                        kind = HttpErrorKind::Request;
+                    } else if e.is_timeout() {
+                        kind = HttpErrorKind::Timeout;
+                    }
                 }
-                break;
-            }
-            if let Some(e) = e.downcast_ref::<std::io::Error>() {
-                match e.kind() {
-                    std::io::ErrorKind::TimedOut => kind = 
HttpErrorKind::Timeout,
-                    std::io::ErrorKind::ConnectionAborted
-                    | std::io::ErrorKind::ConnectionReset
-                    | std::io::ErrorKind::BrokenPipe
-                    | std::io::ErrorKind::UnexpectedEof => kind = 
HttpErrorKind::Interrupted,
-                    _ => {}
+                if let Some(e) = e.downcast_ref::<std::io::Error>() {
+                    match e.kind() {
+                        std::io::ErrorKind::TimedOut => kind = 
HttpErrorKind::Timeout,
+                        std::io::ErrorKind::ConnectionAborted
+                        | std::io::ErrorKind::ConnectionReset
+                        | std::io::ErrorKind::BrokenPipe
+                        | std::io::ErrorKind::UnexpectedEof => kind = 
HttpErrorKind::Interrupted,
+                        _ => {}
+                    }
                 }
+                source = e.source();
+            } else {
                 break;
             }
-            source = e.source();
         }
         Self {
             kind,
diff --git a/src/client/retry.rs b/src/client/retry.rs
index 9eac2b1..0d10e60 100644
--- a/src/client/retry.rs
+++ b/src/client/retry.rs
@@ -506,14 +506,21 @@ impl RetryExt for HttpRequestBuilder {
 #[cfg(test)]
 mod tests {
     use crate::client::mock_server::MockServer;
-    use crate::client::retry::{body_contains_error, RequestError, RetryExt};
-    use crate::client::HttpClient;
+    use crate::client::retry::{body_contains_error, RequestError, 
RetryContext, RetryExt};
+    use crate::client::{HttpClient, HttpResponse};
     use crate::RetryConfig;
+    use http::StatusCode;
     use hyper::header::LOCATION;
+    use hyper::server::conn::http1;
+    use hyper::service::service_fn;
     use hyper::Response;
-    use reqwest::{Client, Method, StatusCode};
+    use hyper_util::rt::TokioIo;
+    use reqwest::{Client, Method};
+    use std::convert::Infallible;
     use std::error::Error;
     use std::time::Duration;
+    use tokio::net::TcpListener;
+    use tokio::time::timeout;
 
     #[test]
     fn test_body_contains_error() {
@@ -818,4 +825,55 @@ mod tests {
         // Shutdown
         mock.shutdown().await
     }
+
+    #[tokio::test]
+    async fn test_connection_reset_is_retried() {
+        let retry = RetryConfig {
+            backoff: Default::default(),
+            max_retries: 2,
+            retry_timeout: Duration::from_secs(1),
+        };
+        assert!(retry.max_retries > 0);
+
+        // Setup server which resets a connection and then quits
+        let listener = TcpListener::bind("::1:0").await.unwrap();
+        let url = format!("http://{}";, listener.local_addr().unwrap());
+        let handle = tokio::spawn(async move {
+            // Reset the connection on the first n-1 attempts
+            for _ in 0..retry.max_retries {
+                let (stream, _) = listener.accept().await.unwrap();
+                stream.set_linger(Some(Duration::from_secs(0))).unwrap();
+            }
+            // Succeed on the last attempt
+            let (stream, _) = listener.accept().await.unwrap();
+            http1::Builder::new()
+                // we want the connection to end after responding
+                .keep_alive(false)
+                .serve_connection(
+                    TokioIo::new(stream),
+                    service_fn(move |_req| async {
+                        Ok::<_, 
Infallible>(HttpResponse::new("Success!".to_string().into()))
+                    }),
+                )
+                .await
+                .unwrap();
+        });
+
+        // Perform the request
+        let client = HttpClient::new(reqwest::Client::new());
+        let ctx = &mut RetryContext::new(&retry);
+        let res = client
+            .get(url)
+            .retryable_request()
+            .send(ctx)
+            .await
+            .expect("request should eventually succeed");
+        assert_eq!(res.status(), StatusCode::OK);
+        assert!(ctx.exhausted());
+
+        // Wait for server to shutdown
+        let _ = timeout(Duration::from_secs(1), handle)
+            .await
+            .expect("shutdown shouldn't hang");
+    }
 }

Reply via email to