This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch retry-stream in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 44a28d674a91915104f1a369cfe8593c67e0e948 Author: Xuanwo <[email protected]> AuthorDate: Mon Aug 21 23:13:47 2023 +0800 feat: Add retry for Writer::sink operation Signed-off-by: Xuanwo <[email protected]> --- bindings/ocaml/src/operator/mod.rs | 3 +- bindings/ocaml/src/operator/reader.rs | 4 +- core/src/layers/retry.rs | 60 ++++++++++- core/src/raw/oio/stream/api.rs | 118 ++++++++++++++++++++- core/src/raw/oio/stream/into_stream.rs | 7 ++ core/src/raw/oio/stream/into_stream_from_reader.rs | 7 ++ core/src/raw/oio/stream/mod.rs | 1 + 7 files changed, 191 insertions(+), 9 deletions(-) diff --git a/bindings/ocaml/src/operator/mod.rs b/bindings/ocaml/src/operator/mod.rs index 833cf6391..f2a580fa7 100644 --- a/bindings/ocaml/src/operator/mod.rs +++ b/bindings/ocaml/src/operator/mod.rs @@ -19,9 +19,10 @@ mod _type; mod metadata; mod reader; -use super::*; use _type::*; +use super::*; + #[ocaml::func] #[ocaml::sig("string -> (string * string) list -> (operator, string) Result.t ")] pub fn operator( diff --git a/bindings/ocaml/src/operator/reader.rs b/bindings/ocaml/src/operator/reader.rs index bb75f7b38..b3bc7bc58 100644 --- a/bindings/ocaml/src/operator/reader.rs +++ b/bindings/ocaml/src/operator/reader.rs @@ -17,10 +17,10 @@ use std::io; -use super::*; - use opendal::raw::oio::BlockingRead; +use super::*; + #[ocaml::func] #[ocaml::sig("reader -> bytes -> (int, string) Result.t ")] pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result<usize, String> { diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index a240e9e62..bd56fa42c 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -34,6 +34,7 @@ use backon::Retryable; use bytes::Bytes; use futures::FutureExt; use log::warn; +use tokio::sync::Mutex; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; @@ -898,9 +899,64 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } } - /// Sink will move the input stream, so we can't retry it. + /// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>` here? Adding a lock has + /// a lot overhead! + /// + /// Yes, you are right. But we have no choice. This is the only safe way for us to add retry + /// support for stream. + /// + /// And the overhead is acceptable. Based on our benchmark, adding a lock + /// that has no conflicts will only cost 5ns. + /// + /// ```shell + /// stream/without_arc_mutex + /// time: [10.715 ns 10.729 ns 10.744 ns] + /// thrpt: [ 90896 GiB/s 91019 GiB/s 91139 GiB/s] + /// stream/with_arc_mutex time: [14.891 ns 14.905 ns 14.928 ns] + /// thrpt: [ 65418 GiB/s 65517 GiB/s 65581 GiB/s] + /// ``` + /// + /// The overhead is constant, which means the overhead will not increase with the size of + /// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005% + /// which is acceptable. async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.sink(size, s).await + let s = Arc::new(Mutex::new(s)); + + let mut backoff = self.builder.build(); + + loop { + match self.inner.sink(size, Box::new(s.clone())).await { + Ok(_) => return Ok(()), + Err(e) if !e.is_temporary() => return Err(e), + Err(e) => match backoff.next() { + None => return Err(e), + Some(dur) => { + { + use oio::StreamExt; + + let mut stream = s.lock().await; + // Try to reset this stream. + // + // If error happened, we will return the sink error directly and stop retry. + if stream.reset().await.is_err() { + return Err(e); + } + } + + self.notify.intercept( + &e, + dur, + &[ + ("operation", WriteOperation::Sink.into_static()), + ("path", &self.path), + ], + ); + tokio::time::sleep(dur).await; + continue; + } + }, + } + } } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs index 6ae6a732c..7345564a2 100644 --- a/core/src/raw/oio/stream/api.rs +++ b/core/src/raw/oio/stream/api.rs @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use bytes::Bytes; +use pin_project::pin_project; use crate::*; @@ -32,6 +36,9 @@ pub type Streamer = Box<dyn Stream>; pub trait Stream: Unpin + Send + Sync { /// Poll next item `Result<Bytes>` from the stream. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>>; + + /// Reset this stream to the beginning. + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>; } impl Stream for () { @@ -40,6 +47,12 @@ impl Stream for () { unimplemented!("poll_next is required to be implemented for oio::Stream") } + + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + let _ = cx; + + unimplemented!("poll_reset is required to be implemented for oio::Stream") + } } /// `Box<dyn Stream>` won't implement `Stream` automatically. @@ -48,17 +61,114 @@ impl<T: Stream + ?Sized> Stream for Box<T> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { (**self).poll_next(cx) } + + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + (**self).poll_reset(cx) + } +} + +impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + match self.try_lock() { + Ok(mut this) => this.poll_next(cx), + Err(_) => Poll::Ready(Some(Err(Error::new( + ErrorKind::Unexpected, + "the stream is expected to have only one consumer, but it's not", + )))), + } + } + + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + match self.try_lock() { + Ok(mut this) => this.poll_reset(cx), + Err(_) => Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "the stream is expected to have only one consumer, but it's not", + ))), + } + } +} + +impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + match self.try_lock() { + Ok(mut this) => this.poll_next(cx), + Err(_) => Poll::Ready(Some(Err(Error::new( + ErrorKind::Unexpected, + "the stream is expected to have only one consumer, but it's not", + )))), + } + } + + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + match self.try_lock() { + Ok(mut this) => this.poll_reset(cx), + Err(_) => Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "the stream is expected to have only one consumer, but it's not", + ))), + } + } } impl futures::Stream for dyn Stream { type Item = Result<Bytes>; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this: &mut dyn Stream = &mut *self; this.poll_next(cx) } } + +/// Impl StreamExt for all T: Stream +impl<T: Stream> StreamExt for T {} + +/// Extension of [`Stream`] to make it easier for use. +pub trait StreamExt: Stream { + /// Build a future for `poll_next`. + fn next(&mut self) -> NextFuture<'_, Self> { + NextFuture { inner: self } + } + + /// Build a future for `poll_reset`. + fn reset(&mut self) -> ResetFuture<'_, Self> { + ResetFuture { inner: self } + } +} + +/// Make this future `!Unpin` for compatibility with async trait methods. +#[pin_project(!Unpin)] +pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> { + inner: &'a mut T, +} + +impl<T> Future for NextFuture<'_, T> +where + T: Stream + Unpin + ?Sized, +{ + type Output = Option<Result<Bytes>>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + let this = self.project(); + Pin::new(this.inner).poll_next(cx) + } +} + +/// Make this future `!Unpin` for compatibility with async trait methods. +#[pin_project(!Unpin)] +pub struct ResetFuture<'a, T: Stream + Unpin + ?Sized> { + inner: &'a mut T, +} + +impl<T> Future for ResetFuture<'_, T> +where + T: Stream + Unpin + ?Sized, +{ + type Output = Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + Pin::new(this.inner).poll_reset(cx) + } +} diff --git a/core/src/raw/oio/stream/into_stream.rs b/core/src/raw/oio/stream/into_stream.rs index 2b2197437..fd01f3fcc 100644 --- a/core/src/raw/oio/stream/into_stream.rs +++ b/core/src/raw/oio/stream/into_stream.rs @@ -43,4 +43,11 @@ where fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { self.inner.try_poll_next_unpin(cx) } + + fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "IntoStream doesn't support reset", + ))) + } } diff --git a/core/src/raw/oio/stream/into_stream_from_reader.rs b/core/src/raw/oio/stream/into_stream_from_reader.rs index cead7d11d..d8b29bff0 100644 --- a/core/src/raw/oio/stream/into_stream_from_reader.rs +++ b/core/src/raw/oio/stream/into_stream_from_reader.rs @@ -89,4 +89,11 @@ where .set_source(err)))), } } + + fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "FromReaderStream doesn't support reset", + ))) + } } diff --git a/core/src/raw/oio/stream/mod.rs b/core/src/raw/oio/stream/mod.rs index 64798480f..c71d243ce 100644 --- a/core/src/raw/oio/stream/mod.rs +++ b/core/src/raw/oio/stream/mod.rs @@ -17,6 +17,7 @@ mod api; pub use api::Stream; +pub use api::StreamExt; pub use api::Streamer; mod into_stream_from_reader;
