This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8fda518891 Retry Safe/Read-Only Requests on Timeout (#5278)
8fda518891 is described below
commit 8fda51889171791d8d36ffc96fc0921fba372fd8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Jan 4 08:39:02 2024 +0000
Retry Safe/Read-Only Requests on Timeout (#5278)
* Retry safe requests on timeout
* Docs
---
object_store/src/client/mock_server.rs | 21 +++++++++++---
object_store/src/client/retry.rs | 50 +++++++++++++++++++++++++++++-----
2 files changed, 60 insertions(+), 11 deletions(-)
diff --git a/object_store/src/client/mock_server.rs
b/object_store/src/client/mock_server.rs
index 36c6b650c0..70b856186d 100644
--- a/object_store/src/client/mock_server.rs
+++ b/object_store/src/client/mock_server.rs
@@ -15,17 +15,20 @@
// specific language governing permissions and limitations
// under the License.
+use futures::future::BoxFuture;
+use futures::FutureExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::convert::Infallible;
+use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
-pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> Response<Body> + Send>;
+pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static,
Response<Body>> + Send>;
/// A mock server
pub struct MockServer {
@@ -46,9 +49,10 @@ impl MockServer {
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let r = Arc::clone(&r);
+ let next = r.lock().pop_front();
async move {
- Ok::<_, Infallible>(match r.lock().pop_front() {
- Some(r) => r(req),
+ Ok::<_, Infallible>(match next {
+ Some(r) => r(req).await,
None => Response::new(Body::from("Hello World")),
})
}
@@ -93,7 +97,16 @@ impl MockServer {
where
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
{
- self.responses.lock().push_back(Box::new(f))
+ let f = Box::new(|req| async move { f(req) }.boxed());
+ self.responses.lock().push_back(f)
+ }
+
+ pub fn push_async_fn<F, Fut>(&self, f: F)
+ where
+ F: FnOnce(Request<Body>) -> Fut + Send + 'static,
+ Fut: Future<Output = Response<Body>> + Send + 'static,
+ {
+ self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
}
/// Shutdown the mock server
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index 08b9a74e17..9d21867d8a 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -119,11 +119,19 @@ impl From<Error> for std::io::Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
-/// Contains the configuration for how to respond to server errors
+/// The configuration for how to respond to request errors
///
-/// By default they will be retried up to some limit, using exponential
+/// The following categories of error will be retried:
+///
+/// * 5xx server errors
+/// * Connection errors
+/// * Dropped connections
+/// * Timeouts for [safe] / read-only requests
+///
+/// Requests will be retried up to some limit, using exponential
/// backoff with jitter. See [`BackoffConfig`] for more information
///
+/// [safe]: https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// The backoff configuration
@@ -173,13 +181,16 @@ impl RetryExt for reqwest::RequestBuilder {
let max_retries = config.max_retries;
let retry_timeout = config.retry_timeout;
+ let (client, req) = self.build_split();
+ let req = req.expect("request must be valid");
+
async move {
let mut retries = 0;
let now = Instant::now();
loop {
- let s = self.try_clone().expect("request body must be
cloneable");
- match s.send().await {
+ let s = req.try_clone().expect("request body must be
cloneable");
+ match client.execute(s).await {
Ok(r) => match r.error_for_status_ref() {
Ok(_) if r.status().is_success() => return Ok(r),
Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
@@ -242,7 +253,9 @@ impl RetryExt for reqwest::RequestBuilder {
Err(e) =>
{
let mut do_retry = false;
- if let Some(source) = e.source() {
+ if req.method().is_safe() && e.is_timeout() {
+ do_retry = true
+ } else if let Some(source) = e.source() {
if let Some(e) =
source.downcast_ref::<hyper::Error>() {
if e.is_connect() || e.is_closed() ||
e.is_incomplete_message() {
do_retry = true;
@@ -294,7 +307,11 @@ mod tests {
retry_timeout: Duration::from_secs(1000),
};
- let client = Client::new();
+ let client = Client::builder()
+ .timeout(Duration::from_millis(100))
+ .build()
+ .unwrap();
+
let do_request = || client.request(Method::GET,
mock.url()).send_retry(&retry);
// Simple request should work
@@ -419,7 +436,7 @@ mod tests {
let e = do_request().await.unwrap_err().to_string();
assert!(
- e.contains("Error after 2 retries in") &&
+ e.contains("Error after 2 retries in") &&
e.contains("max_retries:2, retry_timeout:1000s, source:HTTP status
server error (502 Bad Gateway) for url"),
"{e}"
);
@@ -442,6 +459,25 @@ mod tests {
"{e}"
);
+ // Retries on client timeout
+ mock.push_async_fn(|_| async move {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ panic!()
+ });
+ do_request().await.unwrap();
+
+ // Does not retry PUT request
+ mock.push_async_fn(|_| async move {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ panic!()
+ });
+ let res = client.request(Method::PUT, mock.url()).send_retry(&retry);
+ let e = res.await.unwrap_err().to_string();
+ assert!(
+ e.contains("Error after 0 retries in") && e.contains("operation
timed out"),
+ "{e}"
+ );
+
// Shutdown
mock.shutdown().await
}