This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch fix-timeout-layer in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 91e31db0a4fc50f26710aa2e1fbba56efc96e282 Author: Xuanwo <[email protected]> AuthorDate: Mon Jan 22 00:58:41 2024 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/timeout.rs | 156 ++++++++++++++++++++++++++------------------- 1 file changed, 92 insertions(+), 64 deletions(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index cae6ee9184..6f50177f08 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::future::Future; use std::io::SeekFrom; use std::task::Context; use std::task::Poll; @@ -30,76 +31,96 @@ use crate::raw::oio::WriteOperation; use crate::raw::*; use crate::*; -/// Add timeout for every operations. +/// Add timeout for every operations to avoid slow or unexpected hang operations. /// -/// # Notes +/// For example, a dead connection could hang a databases sql query. TimeoutLayer +/// will break this connection and returns an error so users can handle it by +/// retrying or print to users. /// -/// - For IO operations like `read`, `write`, we will set a timeout -/// for each single IO operation. -/// - For other operations like `stat`, and `delete`, the timeout is for the whole -/// operation. +/// # Notes /// -/// Besides, we will also set a slow speed for each IO operation. If the IO -/// operation's speed is lower than the slow speed, we will return a timeout error -/// instead of kept waiting for it. +/// `TimeoutLayer` treats all operations in two kinds: /// -/// For examples, if we set timeout to 60 seconds and speed to 1MiB/s, then: +/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control +/// them by setting `timeout`. +/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we +/// control them by setting `io_timeout`. /// -/// - If `stat` didn't return in 60 seconds, we will return a timeout error. -/// - If `Reader::read` didn't return in 60 seconds, we will return a timeout error. -/// - For `Writer::write(vec![1024*1024*1024])` -/// - didn't return in 60s, it's ok, we will keep waiting. -/// - didn't return in 1024s (1GiB/1MiB), we will return a timeout error. +/// It happens that a connection could be slow but not dead, so we have a `max_io_timeouts` to +/// control how many consecutive IO timeouts we can tolerate. If `max_io_timeouts` is not reached, +/// we will print a warning and keep waiting this io operation instead. /// /// # Default /// /// - timeout: 60 seconds -/// - speed: 1024 bytes per second, aka, 1KiB/s. +/// - io_timeout: 10 seconds +/// - max_io_timeouts: 3 times /// /// # Examples /// +/// The following examples will create a timeout layer with 10 seconds timeout for all non-io +/// operations, 3 seconds timeout for all io operations and 2 consecutive io timeouts are allowed. +/// /// ``` /// use anyhow::Result; /// use opendal::layers::TimeoutLayer; /// use opendal::services; /// use opendal::Operator; /// use opendal::Scheme; +/// use std::time::Duration; /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") -/// .layer(TimeoutLayer::default()) +/// .layer(TimeoutLayer::default().with_timeout(Duration::from_secs(10)).with_io_timeout(3).with_max_io_timeouts(2)) /// .finish(); /// ``` #[derive(Clone)] pub struct TimeoutLayer { timeout: Duration, - speed: u64, + io_timeout: Duration, + max_io_timeouts: usize, } impl Default for TimeoutLayer { fn default() -> Self { Self { timeout: Duration::from_secs(60), - speed: 1024, + io_timeout: Duration::from_secs(10), + max_io_timeouts: 3, } } } impl TimeoutLayer { /// Create a new `TimeoutLayer` with default settings. - /// - /// - timeout: 60 seconds - /// - speed: 1024 bytes per second, aka, 1KiB/s. pub fn new() -> Self { Self::default() } /// Set timeout for TimeoutLayer with given value. + /// + /// This timeout is for all non-io operations like `stat`, `delete`. pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } + /// Set io timeout for TimeoutLayer with given value. + /// + /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`. + pub fn with_io_timeout(mut self, timeout: Duration) -> Self { + self.io_timeout = timeout; + self + } + + /// Set max io timeouts for TimeoutLayer with given value. + /// + /// This value is used to control how many consecutive io timeouts we can tolerate. + pub fn with_max_io_timeouts(mut self, v: usize) -> Self { + self.max_io_timeouts = v; + self + } + /// Set speed for TimeoutLayer with given value. /// /// # Notes @@ -110,10 +131,8 @@ impl TimeoutLayer { /// # Panics /// /// This function will panic if speed is 0. - pub fn with_speed(mut self, speed: u64) -> Self { - assert_ne!(speed, 0, "TimeoutLayer speed must not be 0"); - - self.speed = speed; + #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")] + pub fn with_speed(mut self, _: u64) -> Self { self } } @@ -126,7 +145,8 @@ impl<A: Accessor> Layer<A> for TimeoutLayer { inner, timeout: self.timeout, - speed: self.speed, + io_timeout: self.io_timeout, + max_io_timeouts: self.max_io_timeouts, } } } @@ -136,7 +156,25 @@ pub struct TimeoutAccessor<A: Accessor> { inner: A, timeout: Duration, - speed: u64, + io_timeout: Duration, + max_io_timeouts: usize, +} + +impl<A: Accessor> TimeoutAccessor<A> { + async fn io_timeout<F: Future<Output = Result<T>>, T>( + &self, + op: Operation, + fut: F, + ) -> Result<T> { + tokio::time::timeout(self.io_timeout, fut) + .await + .map_err(|_| { + Error::new(ErrorKind::Unexpected, "io operation timeout reached") + .with_operation(op) + .with_context("io_timeout", self.io_timeout.as_secs_f64().to_string()) + .set_temporary() + })? + } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -155,39 +193,36 @@ impl<A: Accessor> LayeredAccessor for TimeoutAccessor<A> { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - tokio::time::timeout(self.timeout, self.inner.read(path, args)) + self.io_timeout(Operation::Read, self.inner.read(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::Read) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) + .map(|(rp, r)| { + ( + rp, + TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), + ) + }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - tokio::time::timeout(self.timeout, self.inner.write(path, args)) + self.io_timeout(Operation::Write, self.inner.write(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::Write) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) + .map(|(rp, r)| { + ( + rp, + TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), + ) + }) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - tokio::time::timeout(self.timeout, self.inner.list(path, args)) + self.io_timeout(Operation::List, self.inner.list(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::List) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) + .map(|(rp, r)| { + ( + rp, + TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), + ) + }) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { @@ -207,28 +242,21 @@ pub struct TimeoutWrapper<R> { inner: R, timeout: Duration, - #[allow(dead_code)] - speed: u64, + max_timeouts: usize, - start: Option<Instant>, + current_timeouts: usize, + futures: Option<(BoxedFuture)>, } impl<R> TimeoutWrapper<R> { - fn new(inner: R, timeout: Duration, speed: u64) -> Self { + fn new(inner: R, timeout: Duration, max_timeouts: usize) -> Self { Self { inner, timeout, - speed, + max_timeouts, start: None, } } - - #[allow(dead_code)] - fn io_timeout(&self, size: u64) -> Duration { - let timeout = Duration::from_millis(size * 1000 / self.speed + 1); - - timeout.max(self.timeout) - } } impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
