This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 1e6c78e Fix not retrying connection errors (#445)
1e6c78e is described below
commit 1e6c78ed08c04a84e4e508fdd0aa94f4936b917d
Author: John Garland <[email protected]>
AuthorDate: Wed Jul 30 04:12:17 2025 +1000
Fix not retrying connection errors (#445)
* Add (failing) test for retrying connection errors
* Fix not retrying connection errors
closes #368
* Fix clippy error
---------
Co-authored-by: John Garland <[email protected]>
---
src/client/http/connection.rs | 36 ++++++++++++------------
src/client/retry.rs | 64 +++++++++++++++++++++++++++++++++++++++++--
2 files changed, 80 insertions(+), 20 deletions(-)
diff --git a/src/client/http/connection.rs b/src/client/http/connection.rs
index 4d833e6..48b7549 100644
--- a/src/client/http/connection.rs
+++ b/src/client/http/connection.rs
@@ -101,27 +101,29 @@ impl HttpError {
// Reqwest error variants aren't great, attempt to refine them
let mut source = e.source();
- while let Some(e) = source {
- if let Some(e) = e.downcast_ref::<hyper::Error>() {
- if e.is_closed() || e.is_incomplete_message() ||
e.is_body_write_aborted() {
- kind = HttpErrorKind::Request;
- } else if e.is_timeout() {
- kind = HttpErrorKind::Timeout;
+ while kind == HttpErrorKind::Unknown {
+ if let Some(e) = source {
+ if let Some(e) = e.downcast_ref::<hyper::Error>() {
+ if e.is_closed() || e.is_incomplete_message() ||
e.is_body_write_aborted() {
+ kind = HttpErrorKind::Request;
+ } else if e.is_timeout() {
+ kind = HttpErrorKind::Timeout;
+ }
}
- break;
- }
- if let Some(e) = e.downcast_ref::<std::io::Error>() {
- match e.kind() {
- std::io::ErrorKind::TimedOut => kind =
HttpErrorKind::Timeout,
- std::io::ErrorKind::ConnectionAborted
- | std::io::ErrorKind::ConnectionReset
- | std::io::ErrorKind::BrokenPipe
- | std::io::ErrorKind::UnexpectedEof => kind =
HttpErrorKind::Interrupted,
- _ => {}
+ if let Some(e) = e.downcast_ref::<std::io::Error>() {
+ match e.kind() {
+ std::io::ErrorKind::TimedOut => kind =
HttpErrorKind::Timeout,
+ std::io::ErrorKind::ConnectionAborted
+ | std::io::ErrorKind::ConnectionReset
+ | std::io::ErrorKind::BrokenPipe
+ | std::io::ErrorKind::UnexpectedEof => kind =
HttpErrorKind::Interrupted,
+ _ => {}
+ }
}
+ source = e.source();
+ } else {
break;
}
- source = e.source();
}
Self {
kind,
diff --git a/src/client/retry.rs b/src/client/retry.rs
index 9eac2b1..0d10e60 100644
--- a/src/client/retry.rs
+++ b/src/client/retry.rs
@@ -506,14 +506,21 @@ impl RetryExt for HttpRequestBuilder {
#[cfg(test)]
mod tests {
use crate::client::mock_server::MockServer;
- use crate::client::retry::{body_contains_error, RequestError, RetryExt};
- use crate::client::HttpClient;
+ use crate::client::retry::{body_contains_error, RequestError,
RetryContext, RetryExt};
+ use crate::client::{HttpClient, HttpResponse};
use crate::RetryConfig;
+ use http::StatusCode;
use hyper::header::LOCATION;
+ use hyper::server::conn::http1;
+ use hyper::service::service_fn;
use hyper::Response;
- use reqwest::{Client, Method, StatusCode};
+ use hyper_util::rt::TokioIo;
+ use reqwest::{Client, Method};
+ use std::convert::Infallible;
use std::error::Error;
use std::time::Duration;
+ use tokio::net::TcpListener;
+ use tokio::time::timeout;
#[test]
fn test_body_contains_error() {
@@ -818,4 +825,55 @@ mod tests {
// Shutdown
mock.shutdown().await
}
+
+ #[tokio::test]
+ async fn test_connection_reset_is_retried() {
+ let retry = RetryConfig {
+ backoff: Default::default(),
+ max_retries: 2,
+ retry_timeout: Duration::from_secs(1),
+ };
+ assert!(retry.max_retries > 0);
+
+ // Setup server which resets a connection and then quits
+ let listener = TcpListener::bind("::1:0").await.unwrap();
+ let url = format!("http://{}", listener.local_addr().unwrap());
+ let handle = tokio::spawn(async move {
+ // Reset the connection on the first n-1 attempts
+ for _ in 0..retry.max_retries {
+ let (stream, _) = listener.accept().await.unwrap();
+ stream.set_linger(Some(Duration::from_secs(0))).unwrap();
+ }
+ // Succeed on the last attempt
+ let (stream, _) = listener.accept().await.unwrap();
+ http1::Builder::new()
+ // we want the connection to end after responding
+ .keep_alive(false)
+ .serve_connection(
+ TokioIo::new(stream),
+ service_fn(move |_req| async {
+ Ok::<_,
Infallible>(HttpResponse::new("Success!".to_string().into()))
+ }),
+ )
+ .await
+ .unwrap();
+ });
+
+ // Perform the request
+ let client = HttpClient::new(reqwest::Client::new());
+ let ctx = &mut RetryContext::new(&retry);
+ let res = client
+ .get(url)
+ .retryable_request()
+ .send(ctx)
+ .await
+ .expect("request should eventually succeed");
+ assert_eq!(res.status(), StatusCode::OK);
+ assert!(ctx.exhausted());
+
+ // Wait for server to shutdown
+ let _ = timeout(Duration::from_secs(1), handle)
+ .await
+ .expect("shutdown shouldn't hang");
+ }
}