crepererum commented on code in PR #2243: URL: https://github.com/apache/arrow-rs/pull/2243#discussion_r934308485
########## object_store/src/client/backoff.rs: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use rand::prelude::*; +use std::time::Duration; + +/// Exponential backoff with jitter +/// +/// See <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/> +#[allow(missing_copy_implementations)] +#[derive(Debug, Clone)] +pub struct BackoffConfig { + /// The initial backoff duration + pub init_backoff: Duration, + /// The maximum backoff duration + pub max_backoff: Duration, + /// The base of the exponential to use + pub base: f64, +} + +impl Default for BackoffConfig { + fn default() -> Self { + Self { + init_backoff: Duration::from_millis(100), + max_backoff: Duration::from_secs(15), + base: 2., + } + } +} + +/// [`Backoff`] can be created from a [`BackoffConfig`] +/// +/// Consecutive calls to [`Backoff::next`] will return the next backoff interval +/// +pub struct Backoff { + init_backoff: f64, + next_backoff_secs: f64, + max_backoff_secs: f64, + base: f64, + rng: Option<Box<dyn RngCore + Sync + Send>>, +} + +impl std::fmt::Debug for Backoff { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backoff") + .field("init_backoff", &self.init_backoff) + .field("next_backoff_secs", &self.next_backoff_secs) + .field("max_backoff_secs", &self.max_backoff_secs) + .field("base", &self.base) + .finish() + } +} + +impl Backoff { + /// Create a new [`Backoff`] from the provided [`BackoffConfig`] + pub fn new(config: &BackoffConfig) -> Self { + Self::new_with_rng(config, None) + } + + /// Creates a new `Backoff` with the optional `rng` + /// + /// Used [`rand::thread_rng()`] if no rng provided + pub fn new_with_rng( + config: &BackoffConfig, + rng: Option<Box<dyn RngCore + Sync + Send>>, + ) -> Self { + let init_backoff = config.init_backoff.as_secs_f64(); + Self { + init_backoff, + next_backoff_secs: init_backoff, + max_backoff_secs: config.max_backoff.as_secs_f64(), + base: config.base, + rng, + } + } + + /// Returns the next backoff duration to wait for + pub fn next(&mut self) -> Duration { + let range = self.init_backoff..(self.next_backoff_secs * self.base); + + let rand_backoff = match self.rng.as_mut() { + Some(rng) => rng.gen_range(range), + None => thread_rng().gen_range(range), + }; + + let next_backoff = self.max_backoff_secs.min(rand_backoff); + Duration::from_secs_f64(std::mem::replace( + &mut self.next_backoff_secs, + next_backoff, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::rngs::mock::StepRng; Review Comment: TIL that there's `rand::rngs::mock` :+1: ########## object_store/src/client/retry.rs: ########## @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A shared HTTP client implementation incorporating retries + +use crate::client::backoff::{Backoff, BackoffConfig}; +use futures::future::BoxFuture; +use futures::FutureExt; +use reqwest::{Response, Result}; +use std::time::{Duration, Instant}; + +/// Contains the configuration for how to respond to server errors +/// +/// By default they will be retried up to some limit, using exponential +/// backoff with jitter. See [`BackoffConfig`] for more information +/// +#[derive(Debug, Clone)] +pub struct RetryConfig { + /// The backoff configuration + pub backoff: BackoffConfig, + + /// The maximum number of times to retry a request + /// + /// Set to 0 to disable retries + pub max_retries: usize, + + /// The maximum length of time from the initial request + /// after which no further retries will be attempted + /// + /// This not only bounds the length of time before a server + /// error will be surfaced to the application, but also bounds + /// the length of time a request's credentials must remain valid. + /// + /// As requests are retried without renewing credentials or + /// regenerating request payloads, this number should be kept + /// below 5 minutes to avoid errors due to expired credentials + /// and/or request payloads + pub retry_timeout: Duration, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + backoff: Default::default(), + max_retries: 10, + retry_timeout: Duration::from_secs(3 * 60), + } + } +} + +pub trait RetryExt { + /// Dispatch a request with the given retry configuration + /// + /// # Panic + /// + /// This will panic if the request body is a stream + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<Response>>; +} + +impl RetryExt for reqwest::RequestBuilder { + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<Response>> { Review Comment: Some logging would be nice here for: - retries - giving up ########## object_store/Cargo.toml: ########## @@ -48,6 +48,7 @@ quick-xml = { version = "0.23.0", features = ["serialize"], optional = true } rustls-pemfile = { version = "1.0", default-features = false, optional = true } ring = { version = "0.16", default-features = false, features = ["std"] } base64 = { version = "0.13", default-features = false, optional = true } +rand = { version = "0.8", optional = true, features = ["std", "std_rng"] } Review Comment: The features listed here are actually the default features that should always be included. So you could remove the explicit listing or pass `default-features = false` to prevent a silent extension of this feature set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
