This is an automated email from the ASF dual-hosted git repository. tustvold pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push: new 1e6c78e Fix not retrying connection errors (#445) 1e6c78e is described below commit 1e6c78ed08c04a84e4e508fdd0aa94f4936b917d Author: John Garland <john...@users.noreply.github.com> AuthorDate: Wed Jul 30 04:12:17 2025 +1000 Fix not retrying connection errors (#445) * Add (failing) test for retrying connection errors * Fix not retrying connection errors closes #368 * Fix clippy error --------- Co-authored-by: John Garland <john.garl...@vivcourt.com> --- src/client/http/connection.rs | 36 ++++++++++++------------ src/client/retry.rs | 64 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/src/client/http/connection.rs b/src/client/http/connection.rs index 4d833e6..48b7549 100644 --- a/src/client/http/connection.rs +++ b/src/client/http/connection.rs @@ -101,27 +101,29 @@ impl HttpError { // Reqwest error variants aren't great, attempt to refine them let mut source = e.source(); - while let Some(e) = source { - if let Some(e) = e.downcast_ref::<hyper::Error>() { - if e.is_closed() || e.is_incomplete_message() || e.is_body_write_aborted() { - kind = HttpErrorKind::Request; - } else if e.is_timeout() { - kind = HttpErrorKind::Timeout; + while kind == HttpErrorKind::Unknown { + if let Some(e) = source { + if let Some(e) = e.downcast_ref::<hyper::Error>() { + if e.is_closed() || e.is_incomplete_message() || e.is_body_write_aborted() { + kind = HttpErrorKind::Request; + } else if e.is_timeout() { + kind = HttpErrorKind::Timeout; + } } - break; - } - if let Some(e) = e.downcast_ref::<std::io::Error>() { - match e.kind() { - std::io::ErrorKind::TimedOut => kind = HttpErrorKind::Timeout, - std::io::ErrorKind::ConnectionAborted - | std::io::ErrorKind::ConnectionReset - | std::io::ErrorKind::BrokenPipe - | std::io::ErrorKind::UnexpectedEof => kind = HttpErrorKind::Interrupted, - _ => {} + if let Some(e) = e.downcast_ref::<std::io::Error>() { + match e.kind() { + std::io::ErrorKind::TimedOut => kind = HttpErrorKind::Timeout, + std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::UnexpectedEof => kind = HttpErrorKind::Interrupted, + _ => {} + } } + source = e.source(); + } else { break; } - source = e.source(); } Self { kind, diff --git a/src/client/retry.rs b/src/client/retry.rs index 9eac2b1..0d10e60 100644 --- a/src/client/retry.rs +++ b/src/client/retry.rs @@ -506,14 +506,21 @@ impl RetryExt for HttpRequestBuilder { #[cfg(test)] mod tests { use crate::client::mock_server::MockServer; - use crate::client::retry::{body_contains_error, RequestError, RetryExt}; - use crate::client::HttpClient; + use crate::client::retry::{body_contains_error, RequestError, RetryContext, RetryExt}; + use crate::client::{HttpClient, HttpResponse}; use crate::RetryConfig; + use http::StatusCode; use hyper::header::LOCATION; + use hyper::server::conn::http1; + use hyper::service::service_fn; use hyper::Response; - use reqwest::{Client, Method, StatusCode}; + use hyper_util::rt::TokioIo; + use reqwest::{Client, Method}; + use std::convert::Infallible; use std::error::Error; use std::time::Duration; + use tokio::net::TcpListener; + use tokio::time::timeout; #[test] fn test_body_contains_error() { @@ -818,4 +825,55 @@ mod tests { // Shutdown mock.shutdown().await } + + #[tokio::test] + async fn test_connection_reset_is_retried() { + let retry = RetryConfig { + backoff: Default::default(), + max_retries: 2, + retry_timeout: Duration::from_secs(1), + }; + assert!(retry.max_retries > 0); + + // Setup server which resets a connection and then quits + let listener = TcpListener::bind("::1:0").await.unwrap(); + let url = format!("http://{}", listener.local_addr().unwrap()); + let handle = tokio::spawn(async move { + // Reset the connection on the first n-1 attempts + for _ in 0..retry.max_retries { + let (stream, _) = listener.accept().await.unwrap(); + stream.set_linger(Some(Duration::from_secs(0))).unwrap(); + } + // Succeed on the last attempt + let (stream, _) = listener.accept().await.unwrap(); + http1::Builder::new() + // we want the connection to end after responding + .keep_alive(false) + .serve_connection( + TokioIo::new(stream), + service_fn(move |_req| async { + Ok::<_, Infallible>(HttpResponse::new("Success!".to_string().into())) + }), + ) + .await + .unwrap(); + }); + + // Perform the request + let client = HttpClient::new(reqwest::Client::new()); + let ctx = &mut RetryContext::new(&retry); + let res = client + .get(url) + .retryable_request() + .send(ctx) + .await + .expect("request should eventually succeed"); + assert_eq!(res.status(), StatusCode::OK); + assert!(ctx.exhausted()); + + // Wait for server to shutdown + let _ = timeout(Duration::from_secs(1), handle) + .await + .expect("shutdown shouldn't hang"); + } }