This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch retry-stream
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 44a28d674a91915104f1a369cfe8593c67e0e948
Author: Xuanwo <[email protected]>
AuthorDate: Mon Aug 21 23:13:47 2023 +0800

    feat: Add retry for Writer::sink operation
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/ocaml/src/operator/mod.rs                 |   3 +-
 bindings/ocaml/src/operator/reader.rs              |   4 +-
 core/src/layers/retry.rs                           |  60 ++++++++++-
 core/src/raw/oio/stream/api.rs                     | 118 ++++++++++++++++++++-
 core/src/raw/oio/stream/into_stream.rs             |   7 ++
 core/src/raw/oio/stream/into_stream_from_reader.rs |   7 ++
 core/src/raw/oio/stream/mod.rs                     |   1 +
 7 files changed, 191 insertions(+), 9 deletions(-)

diff --git a/bindings/ocaml/src/operator/mod.rs 
b/bindings/ocaml/src/operator/mod.rs
index 833cf6391..f2a580fa7 100644
--- a/bindings/ocaml/src/operator/mod.rs
+++ b/bindings/ocaml/src/operator/mod.rs
@@ -19,9 +19,10 @@ mod _type;
 mod metadata;
 mod reader;
 
-use super::*;
 use _type::*;
 
+use super::*;
+
 #[ocaml::func]
 #[ocaml::sig("string -> (string * string) list -> (operator, string) Result.t 
")]
 pub fn operator(
diff --git a/bindings/ocaml/src/operator/reader.rs 
b/bindings/ocaml/src/operator/reader.rs
index bb75f7b38..b3bc7bc58 100644
--- a/bindings/ocaml/src/operator/reader.rs
+++ b/bindings/ocaml/src/operator/reader.rs
@@ -17,10 +17,10 @@
 
 use std::io;
 
-use super::*;
-
 use opendal::raw::oio::BlockingRead;
 
+use super::*;
+
 #[ocaml::func]
 #[ocaml::sig("reader -> bytes -> (int, string) Result.t ")]
 pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result<usize, 
String> {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index a240e9e62..bd56fa42c 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -34,6 +34,7 @@ use backon::Retryable;
 use bytes::Bytes;
 use futures::FutureExt;
 use log::warn;
+use tokio::sync::Mutex;
 
 use crate::raw::oio::PageOperation;
 use crate::raw::oio::ReadOperation;
@@ -898,9 +899,64 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
         }
     }
 
-    /// Sink will move the input stream, so we can't retry it.
+    /// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>` 
here? Adding a lock has
+    /// a lot overhead!
+    ///
+    /// Yes, you are right. But we have no choice. This is the only safe way 
for us to add retry
+    /// support for stream.
+    ///
+    /// And the overhead is acceptable. Based on our benchmark, adding a lock
+    /// that has no conflicts will only cost 5ns.
+    ///
+    /// ```shell
+    /// stream/without_arc_mutex
+    ///                         time:   [10.715 ns 10.729 ns 10.744 ns]
+    ///                         thrpt:  [ 90896 GiB/s  91019 GiB/s  91139 
GiB/s]
+    /// stream/with_arc_mutex   time:   [14.891 ns 14.905 ns 14.928 ns]
+    ///                         thrpt:  [ 65418 GiB/s  65517 GiB/s  65581 
GiB/s]
+    /// ```
+    ///
+    /// The overhead is constant, which means the overhead will not increase 
with the size of
+    /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
+    /// which is acceptable.
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.sink(size, s).await
+        let s = Arc::new(Mutex::new(s));
+
+        let mut backoff = self.builder.build();
+
+        loop {
+            match self.inner.sink(size, Box::new(s.clone())).await {
+                Ok(_) => return Ok(()),
+                Err(e) if !e.is_temporary() => return Err(e),
+                Err(e) => match backoff.next() {
+                    None => return Err(e),
+                    Some(dur) => {
+                        {
+                            use oio::StreamExt;
+
+                            let mut stream = s.lock().await;
+                            // Try to reset this stream.
+                            //
+                            // If error happened, we will return the sink 
error directly and stop retry.
+                            if stream.reset().await.is_err() {
+                                return Err(e);
+                            }
+                        }
+
+                        self.notify.intercept(
+                            &e,
+                            dur,
+                            &[
+                                ("operation", 
WriteOperation::Sink.into_static()),
+                                ("path", &self.path),
+                            ],
+                        );
+                        tokio::time::sleep(dur).await;
+                        continue;
+                    }
+                },
+            }
+        }
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 6ae6a732c..7345564a2 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -15,10 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
 use std::task::Context;
 use std::task::Poll;
 
 use bytes::Bytes;
+use pin_project::pin_project;
 
 use crate::*;
 
@@ -32,6 +36,9 @@ pub type Streamer = Box<dyn Stream>;
 pub trait Stream: Unpin + Send + Sync {
     /// Poll next item `Result<Bytes>` from the stream.
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>>;
+
+    /// Reset this stream to the beginning.
+    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
 }
 
 impl Stream for () {
@@ -40,6 +47,12 @@ impl Stream for () {
 
         unimplemented!("poll_next is required to be implemented for 
oio::Stream")
     }
+
+    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        let _ = cx;
+
+        unimplemented!("poll_reset is required to be implemented for 
oio::Stream")
+    }
 }
 
 /// `Box<dyn Stream>` won't implement `Stream` automatically.
@@ -48,17 +61,114 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
         (**self).poll_next(cx)
     }
+
+    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        (**self).poll_reset(cx)
+    }
+}
+
+impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.try_lock() {
+            Ok(mut this) => this.poll_next(cx),
+            Err(_) => Poll::Ready(Some(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the stream is expected to have only one consumer, but it's 
not",
+            )))),
+        }
+    }
+
+    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        match self.try_lock() {
+            Ok(mut this) => this.poll_reset(cx),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the stream is expected to have only one consumer, but it's 
not",
+            ))),
+        }
+    }
+}
+
+impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.try_lock() {
+            Ok(mut this) => this.poll_next(cx),
+            Err(_) => Poll::Ready(Some(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the stream is expected to have only one consumer, but it's 
not",
+            )))),
+        }
+    }
+
+    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        match self.try_lock() {
+            Ok(mut this) => this.poll_reset(cx),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the stream is expected to have only one consumer, but it's 
not",
+            ))),
+        }
+    }
 }
 
 impl futures::Stream for dyn Stream {
     type Item = Result<Bytes>;
 
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         let this: &mut dyn Stream = &mut *self;
 
         this.poll_next(cx)
     }
 }
+
+/// Impl StreamExt for all T: Stream
+impl<T: Stream> StreamExt for T {}
+
+/// Extension of [`Stream`] to make it easier for use.
+pub trait StreamExt: Stream {
+    /// Build a future for `poll_next`.
+    fn next(&mut self) -> NextFuture<'_, Self> {
+        NextFuture { inner: self }
+    }
+
+    /// Build a future for `poll_reset`.
+    fn reset(&mut self) -> ResetFuture<'_, Self> {
+        ResetFuture { inner: self }
+    }
+}
+
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> {
+    inner: &'a mut T,
+}
+
+impl<T> Future for NextFuture<'_, T>
+where
+    T: Stream + Unpin + ?Sized,
+{
+    type Output = Option<Result<Bytes>>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        let this = self.project();
+        Pin::new(this.inner).poll_next(cx)
+    }
+}
+
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct ResetFuture<'a, T: Stream + Unpin + ?Sized> {
+    inner: &'a mut T,
+}
+
+impl<T> Future for ResetFuture<'_, T>
+where
+    T: Stream + Unpin + ?Sized,
+{
+    type Output = Result<()>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        let this = self.project();
+        Pin::new(this.inner).poll_reset(cx)
+    }
+}
diff --git a/core/src/raw/oio/stream/into_stream.rs 
b/core/src/raw/oio/stream/into_stream.rs
index 2b2197437..fd01f3fcc 100644
--- a/core/src/raw/oio/stream/into_stream.rs
+++ b/core/src/raw/oio/stream/into_stream.rs
@@ -43,4 +43,11 @@ where
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
         self.inner.try_poll_next_unpin(cx)
     }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "IntoStream doesn't support reset",
+        )))
+    }
 }
diff --git a/core/src/raw/oio/stream/into_stream_from_reader.rs 
b/core/src/raw/oio/stream/into_stream_from_reader.rs
index cead7d11d..d8b29bff0 100644
--- a/core/src/raw/oio/stream/into_stream_from_reader.rs
+++ b/core/src/raw/oio/stream/into_stream_from_reader.rs
@@ -89,4 +89,11 @@ where
             .set_source(err)))),
         }
     }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "FromReaderStream doesn't support reset",
+        )))
+    }
 }
diff --git a/core/src/raw/oio/stream/mod.rs b/core/src/raw/oio/stream/mod.rs
index 64798480f..c71d243ce 100644
--- a/core/src/raw/oio/stream/mod.rs
+++ b/core/src/raw/oio/stream/mod.rs
@@ -17,6 +17,7 @@
 
 mod api;
 pub use api::Stream;
+pub use api::StreamExt;
 pub use api::Streamer;
 
 mod into_stream_from_reader;

Reply via email to