This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch fix-timeout-layer
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 91e31db0a4fc50f26710aa2e1fbba56efc96e282
Author: Xuanwo <[email protected]>
AuthorDate: Mon Jan 22 00:58:41 2024 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/timeout.rs | 156 ++++++++++++++++++++++++++-------------------
 1 file changed, 92 insertions(+), 64 deletions(-)

diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index cae6ee9184..6f50177f08 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::future::Future;
 use std::io::SeekFrom;
 use std::task::Context;
 use std::task::Poll;
@@ -30,76 +31,96 @@ use crate::raw::oio::WriteOperation;
 use crate::raw::*;
 use crate::*;
 
-/// Add timeout for every operations.
+/// Add timeout for every operations to avoid slow or unexpected hang 
operations.
 ///
-/// # Notes
+/// For example, a dead connection could hang a databases sql query. 
TimeoutLayer
+/// will break this connection and returns an error so users can handle it by
+/// retrying or print to users.
 ///
-/// - For IO operations like `read`, `write`, we will set a timeout
-///   for each single IO operation.
-/// - For other operations like `stat`, and `delete`, the timeout is for the 
whole
-///   operation.
+/// # Notes
 ///
-/// Besides, we will also set a slow speed for each IO operation. If the IO
-/// operation's speed is lower than the slow speed, we will return a timeout 
error
-/// instead of kept waiting for it.
+/// `TimeoutLayer` treats all operations in two kinds:
 ///
-/// For examples, if we set timeout to 60 seconds and speed to 1MiB/s, then:
+/// - Non IO Operation like `stat`, `delete` they operate on a single file. We 
control
+///   them by setting `timeout`.
+/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they 
operate on data directly, we
+///   control them by setting `io_timeout`.
 ///
-/// - If `stat` didn't return in 60 seconds, we will return a timeout error.
-/// - If `Reader::read` didn't return in 60 seconds, we will return a timeout 
error.
-/// - For `Writer::write(vec![1024*1024*1024])`
-///   - didn't return in 60s, it's ok, we will keep waiting.
-///   - didn't return in 1024s (1GiB/1MiB), we will return a timeout error.
+/// It happens that a connection could be slow but not dead, so we have a 
`max_io_timeouts` to
+/// control how many consecutive IO timeouts we can tolerate. If 
`max_io_timeouts` is not reached,
+/// we will print a warning and keep waiting this io operation instead.
 ///
 /// # Default
 ///
 /// - timeout: 60 seconds
-/// - speed: 1024 bytes per second, aka, 1KiB/s.
+/// - io_timeout: 10 seconds
+/// - max_io_timeouts: 3 times
 ///
 /// # Examples
 ///
+/// The following examples will create a timeout layer with 10 seconds timeout 
for all non-io
+/// operations, 3 seconds timeout for all io operations and 2 consecutive io 
timeouts are allowed.
+///
 /// ```
 /// use anyhow::Result;
 /// use opendal::layers::TimeoutLayer;
 /// use opendal::services;
 /// use opendal::Operator;
 /// use opendal::Scheme;
+/// use std::time::Duration;
 ///
 /// let _ = Operator::new(services::Memory::default())
 ///     .expect("must init")
-///     .layer(TimeoutLayer::default())
+///     
.layer(TimeoutLayer::default().with_timeout(Duration::from_secs(10)).with_io_timeout(3).with_max_io_timeouts(2))
 ///     .finish();
 /// ```
 #[derive(Clone)]
 pub struct TimeoutLayer {
     timeout: Duration,
-    speed: u64,
+    io_timeout: Duration,
+    max_io_timeouts: usize,
 }
 
 impl Default for TimeoutLayer {
     fn default() -> Self {
         Self {
             timeout: Duration::from_secs(60),
-            speed: 1024,
+            io_timeout: Duration::from_secs(10),
+            max_io_timeouts: 3,
         }
     }
 }
 
 impl TimeoutLayer {
     /// Create a new `TimeoutLayer` with default settings.
-    ///
-    /// - timeout: 60 seconds
-    /// - speed: 1024 bytes per second, aka, 1KiB/s.
     pub fn new() -> Self {
         Self::default()
     }
 
     /// Set timeout for TimeoutLayer with given value.
+    ///
+    /// This timeout is for all non-io operations like `stat`, `delete`.
     pub fn with_timeout(mut self, timeout: Duration) -> Self {
         self.timeout = timeout;
         self
     }
 
+    /// Set io timeout for TimeoutLayer with given value.
+    ///
+    /// This timeout is for all io operations like `read`, `Reader::read` and 
`Writer::write`.
+    pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
+        self.io_timeout = timeout;
+        self
+    }
+
+    /// Set max io timeouts for TimeoutLayer with given value.
+    ///
+    /// This value is used to control how many consecutive io timeouts we can 
tolerate.
+    pub fn with_max_io_timeouts(mut self, v: usize) -> Self {
+        self.max_io_timeouts = v;
+        self
+    }
+
     /// Set speed for TimeoutLayer with given value.
     ///
     /// # Notes
@@ -110,10 +131,8 @@ impl TimeoutLayer {
     /// # Panics
     ///
     /// This function will panic if speed is 0.
-    pub fn with_speed(mut self, speed: u64) -> Self {
-        assert_ne!(speed, 0, "TimeoutLayer speed must not be 0");
-
-        self.speed = speed;
+    #[deprecated(note = "with speed is not supported anymore, please use 
with_io_timeout instead")]
+    pub fn with_speed(mut self, _: u64) -> Self {
         self
     }
 }
@@ -126,7 +145,8 @@ impl<A: Accessor> Layer<A> for TimeoutLayer {
             inner,
 
             timeout: self.timeout,
-            speed: self.speed,
+            io_timeout: self.io_timeout,
+            max_io_timeouts: self.max_io_timeouts,
         }
     }
 }
@@ -136,7 +156,25 @@ pub struct TimeoutAccessor<A: Accessor> {
     inner: A,
 
     timeout: Duration,
-    speed: u64,
+    io_timeout: Duration,
+    max_io_timeouts: usize,
+}
+
+impl<A: Accessor> TimeoutAccessor<A> {
+    async fn io_timeout<F: Future<Output = Result<T>>, T>(
+        &self,
+        op: Operation,
+        fut: F,
+    ) -> Result<T> {
+        tokio::time::timeout(self.io_timeout, fut)
+            .await
+            .map_err(|_| {
+                Error::new(ErrorKind::Unexpected, "io operation timeout 
reached")
+                    .with_operation(op)
+                    .with_context("io_timeout", 
self.io_timeout.as_secs_f64().to_string())
+                    .set_temporary()
+            })?
+    }
 }
 
 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
@@ -155,39 +193,36 @@ impl<A: Accessor> LayeredAccessor for TimeoutAccessor<A> {
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        tokio::time::timeout(self.timeout, self.inner.read(path, args))
+        self.io_timeout(Operation::Read, self.inner.read(path, args))
             .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(Operation::Read)
-                    .with_context("timeout", 
self.timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
-            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, 
self.speed)))
+            .map(|(rp, r)| {
+                (
+                    rp,
+                    TimeoutWrapper::new(r, self.io_timeout, 
self.max_io_timeouts),
+                )
+            })
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        tokio::time::timeout(self.timeout, self.inner.write(path, args))
+        self.io_timeout(Operation::Write, self.inner.write(path, args))
             .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(Operation::Write)
-                    .with_context("timeout", 
self.timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
-            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, 
self.speed)))
+            .map(|(rp, r)| {
+                (
+                    rp,
+                    TimeoutWrapper::new(r, self.io_timeout, 
self.max_io_timeouts),
+                )
+            })
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        tokio::time::timeout(self.timeout, self.inner.list(path, args))
+        self.io_timeout(Operation::List, self.inner.list(path, args))
             .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(Operation::List)
-                    .with_context("timeout", 
self.timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
-            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, 
self.speed)))
+            .map(|(rp, r)| {
+                (
+                    rp,
+                    TimeoutWrapper::new(r, self.io_timeout, 
self.max_io_timeouts),
+                )
+            })
     }
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
@@ -207,28 +242,21 @@ pub struct TimeoutWrapper<R> {
     inner: R,
 
     timeout: Duration,
-    #[allow(dead_code)]
-    speed: u64,
+    max_timeouts: usize,
 
-    start: Option<Instant>,
+    current_timeouts: usize,
+    futures: Option<(BoxedFuture)>,
 }
 
 impl<R> TimeoutWrapper<R> {
-    fn new(inner: R, timeout: Duration, speed: u64) -> Self {
+    fn new(inner: R, timeout: Duration, max_timeouts: usize) -> Self {
         Self {
             inner,
             timeout,
-            speed,
+            max_timeouts,
             start: None,
         }
     }
-
-    #[allow(dead_code)]
-    fn io_timeout(&self, size: u64) -> Duration {
-        let timeout = Duration::from_millis(size * 1000 / self.speed + 1);
-
-        timeout.max(self.timeout)
-    }
 }
 
 impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {

Reply via email to