This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fda518891 Retry Safe/Read-Only Requests on Timeout (#5278)
8fda518891 is described below

commit 8fda51889171791d8d36ffc96fc0921fba372fd8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Jan 4 08:39:02 2024 +0000

    Retry Safe/Read-Only Requests on Timeout (#5278)
    
    * Retry safe requests on timeout
    
    * Docs
---
 object_store/src/client/mock_server.rs | 21 +++++++++++---
 object_store/src/client/retry.rs       | 50 +++++++++++++++++++++++++++++-----
 2 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/object_store/src/client/mock_server.rs 
b/object_store/src/client/mock_server.rs
index 36c6b650c0..70b856186d 100644
--- a/object_store/src/client/mock_server.rs
+++ b/object_store/src/client/mock_server.rs
@@ -15,17 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use hyper::service::{make_service_fn, service_fn};
 use hyper::{Body, Request, Response, Server};
 use parking_lot::Mutex;
 use std::collections::VecDeque;
 use std::convert::Infallible;
+use std::future::Future;
 use std::net::SocketAddr;
 use std::sync::Arc;
 use tokio::sync::oneshot;
 use tokio::task::JoinHandle;
 
-pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> Response<Body> + Send>;
+pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static, 
Response<Body>> + Send>;
 
 /// A mock server
 pub struct MockServer {
@@ -46,9 +49,10 @@ impl MockServer {
             async move {
                 Ok::<_, Infallible>(service_fn(move |req| {
                     let r = Arc::clone(&r);
+                    let next = r.lock().pop_front();
                     async move {
-                        Ok::<_, Infallible>(match r.lock().pop_front() {
-                            Some(r) => r(req),
+                        Ok::<_, Infallible>(match next {
+                            Some(r) => r(req).await,
                             None => Response::new(Body::from("Hello World")),
                         })
                     }
@@ -93,7 +97,16 @@ impl MockServer {
     where
         F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
     {
-        self.responses.lock().push_back(Box::new(f))
+        let f = Box::new(|req| async move { f(req) }.boxed());
+        self.responses.lock().push_back(f)
+    }
+
+    pub fn push_async_fn<F, Fut>(&self, f: F)
+    where
+        F: FnOnce(Request<Body>) -> Fut + Send + 'static,
+        Fut: Future<Output = Response<Body>> + Send + 'static,
+    {
+        self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
     }
 
     /// Shutdown the mock server
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index 08b9a74e17..9d21867d8a 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -119,11 +119,19 @@ impl From<Error> for std::io::Error {
 
 pub type Result<T, E = Error> = std::result::Result<T, E>;
 
-/// Contains the configuration for how to respond to server errors
+/// The configuration for how to respond to request errors
 ///
-/// By default they will be retried up to some limit, using exponential
+/// The following categories of error will be retried:
+///
+/// * 5xx server errors
+/// * Connection errors
+/// * Dropped connections
+/// * Timeouts for [safe] / read-only requests
+///
+/// Requests will be retried up to some limit, using exponential
 /// backoff with jitter. See [`BackoffConfig`] for more information
 ///
+/// [safe]: https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1
 #[derive(Debug, Clone)]
 pub struct RetryConfig {
     /// The backoff configuration
@@ -173,13 +181,16 @@ impl RetryExt for reqwest::RequestBuilder {
         let max_retries = config.max_retries;
         let retry_timeout = config.retry_timeout;
 
+        let (client, req) = self.build_split();
+        let req = req.expect("request must be valid");
+
         async move {
             let mut retries = 0;
             let now = Instant::now();
 
             loop {
-                let s = self.try_clone().expect("request body must be 
cloneable");
-                match s.send().await {
+                let s = req.try_clone().expect("request body must be 
cloneable");
+                match client.execute(s).await {
                     Ok(r) => match r.error_for_status_ref() {
                         Ok(_) if r.status().is_success() => return Ok(r),
                         Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
@@ -242,7 +253,9 @@ impl RetryExt for reqwest::RequestBuilder {
                     Err(e) =>
                     {
                         let mut do_retry = false;
-                        if let Some(source) = e.source() {
+                        if req.method().is_safe() && e.is_timeout() {
+                            do_retry = true
+                        } else if let Some(source) = e.source() {
                             if let Some(e) = 
source.downcast_ref::<hyper::Error>() {
                                 if e.is_connect() || e.is_closed() || 
e.is_incomplete_message() {
                                     do_retry = true;
@@ -294,7 +307,11 @@ mod tests {
             retry_timeout: Duration::from_secs(1000),
         };
 
-        let client = Client::new();
+        let client = Client::builder()
+            .timeout(Duration::from_millis(100))
+            .build()
+            .unwrap();
+
         let do_request = || client.request(Method::GET, 
mock.url()).send_retry(&retry);
 
         // Simple request should work
@@ -419,7 +436,7 @@ mod tests {
 
         let e = do_request().await.unwrap_err().to_string();
         assert!(
-            e.contains("Error after 2 retries in") && 
+            e.contains("Error after 2 retries in") &&
             e.contains("max_retries:2, retry_timeout:1000s, source:HTTP status 
server error (502 Bad Gateway) for url"),
             "{e}"
         );
@@ -442,6 +459,25 @@ mod tests {
             "{e}"
         );
 
+        // Retries on client timeout
+        mock.push_async_fn(|_| async move {
+            tokio::time::sleep(Duration::from_secs(10)).await;
+            panic!()
+        });
+        do_request().await.unwrap();
+
+        // Does not retry PUT request
+        mock.push_async_fn(|_| async move {
+            tokio::time::sleep(Duration::from_secs(10)).await;
+            panic!()
+        });
+        let res = client.request(Method::PUT, mock.url()).send_retry(&retry);
+        let e = res.await.unwrap_err().to_string();
+        assert!(
+            e.contains("Error after 0 retries in") && e.contains("operation 
timed out"),
+            "{e}"
+        );
+
         // Shutdown
         mock.shutdown().await
     }

Reply via email to