This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 864918919 feat: Add retry for Writer::sink operation (#2896)
864918919 is described below
commit 86491891960c075dcf0245f48c893c1aac61fb82
Author: Xuanwo <[email protected]>
AuthorDate: Mon Aug 21 23:19:47 2023 +0800
feat: Add retry for Writer::sink operation (#2896)
Signed-off-by: Xuanwo <[email protected]>
---
bindings/ocaml/src/operator/mod.rs | 3 +-
bindings/ocaml/src/operator/reader.rs | 4 +-
core/src/layers/retry.rs | 60 ++++++++++-
core/src/raw/oio/stream/api.rs | 118 ++++++++++++++++++++-
core/src/raw/oio/stream/into_stream.rs | 7 ++
core/src/raw/oio/stream/into_stream_from_reader.rs | 7 ++
core/src/raw/oio/stream/mod.rs | 1 +
7 files changed, 191 insertions(+), 9 deletions(-)
diff --git a/bindings/ocaml/src/operator/mod.rs
b/bindings/ocaml/src/operator/mod.rs
index 833cf6391..f2a580fa7 100644
--- a/bindings/ocaml/src/operator/mod.rs
+++ b/bindings/ocaml/src/operator/mod.rs
@@ -19,9 +19,10 @@ mod _type;
mod metadata;
mod reader;
-use super::*;
use _type::*;
+use super::*;
+
#[ocaml::func]
#[ocaml::sig("string -> (string * string) list -> (operator, string) Result.t
")]
pub fn operator(
diff --git a/bindings/ocaml/src/operator/reader.rs
b/bindings/ocaml/src/operator/reader.rs
index bb75f7b38..b3bc7bc58 100644
--- a/bindings/ocaml/src/operator/reader.rs
+++ b/bindings/ocaml/src/operator/reader.rs
@@ -17,10 +17,10 @@
use std::io;
-use super::*;
-
use opendal::raw::oio::BlockingRead;
+use super::*;
+
#[ocaml::func]
#[ocaml::sig("reader -> bytes -> (int, string) Result.t ")]
pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result<usize,
String> {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index a240e9e62..bd56fa42c 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -34,6 +34,7 @@ use backon::Retryable;
use bytes::Bytes;
use futures::FutureExt;
use log::warn;
+use tokio::sync::Mutex;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
@@ -898,9 +899,64 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
}
}
- /// Sink will move the input stream, so we can't retry it.
+ /// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>`
here? Adding a lock has
+ /// a lot overhead!
+ ///
+ /// Yes, you are right. But we have no choice. This is the only safe way
for us to add retry
+ /// support for stream.
+ ///
+ /// And the overhead is acceptable. Based on our benchmark, adding a lock
+ /// that has no conflicts will only cost 5ns.
+ ///
+ /// ```shell
+ /// stream/without_arc_mutex
+ /// time: [10.715 ns 10.729 ns 10.744 ns]
+ /// thrpt: [ 90896 GiB/s 91019 GiB/s 91139
GiB/s]
+ /// stream/with_arc_mutex time: [14.891 ns 14.905 ns 14.928 ns]
+ /// thrpt: [ 65418 GiB/s 65517 GiB/s 65581
GiB/s]
+ /// ```
+ ///
+ /// The overhead is constant, which means the overhead will not increase
with the size of
+ /// stream. For example, if every `next` call cost 1ms, then the overhead
will only take 0.005%
+ /// which is acceptable.
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
- self.inner.sink(size, s).await
+ let s = Arc::new(Mutex::new(s));
+
+ let mut backoff = self.builder.build();
+
+ loop {
+ match self.inner.sink(size, Box::new(s.clone())).await {
+ Ok(_) => return Ok(()),
+ Err(e) if !e.is_temporary() => return Err(e),
+ Err(e) => match backoff.next() {
+ None => return Err(e),
+ Some(dur) => {
+ {
+ use oio::StreamExt;
+
+ let mut stream = s.lock().await;
+ // Try to reset this stream.
+ //
+ // If error happened, we will return the sink
error directly and stop retry.
+ if stream.reset().await.is_err() {
+ return Err(e);
+ }
+ }
+
+ self.notify.intercept(
+ &e,
+ dur,
+ &[
+ ("operation",
WriteOperation::Sink.into_static()),
+ ("path", &self.path),
+ ],
+ );
+ tokio::time::sleep(dur).await;
+ continue;
+ }
+ },
+ }
+ }
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 6ae6a732c..7345564a2 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -15,10 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
+use pin_project::pin_project;
use crate::*;
@@ -32,6 +36,9 @@ pub type Streamer = Box<dyn Stream>;
pub trait Stream: Unpin + Send + Sync {
/// Poll next item `Result<Bytes>` from the stream.
fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>>;
+
+ /// Reset this stream to the beginning.
+ fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
}
impl Stream for () {
@@ -40,6 +47,12 @@ impl Stream for () {
unimplemented!("poll_next is required to be implemented for
oio::Stream")
}
+
+ fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let _ = cx;
+
+ unimplemented!("poll_reset is required to be implemented for
oio::Stream")
+ }
}
/// `Box<dyn Stream>` won't implement `Stream` automatically.
@@ -48,17 +61,114 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
(**self).poll_next(cx)
}
+
+ fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ (**self).poll_reset(cx)
+ }
+}
+
+impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ match self.try_lock() {
+ Ok(mut this) => this.poll_next(cx),
+ Err(_) => Poll::Ready(Some(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the stream is expected to have only one consumer, but it's
not",
+ )))),
+ }
+ }
+
+ fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ match self.try_lock() {
+ Ok(mut this) => this.poll_reset(cx),
+ Err(_) => Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the stream is expected to have only one consumer, but it's
not",
+ ))),
+ }
+ }
+}
+
+impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> {
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ match self.try_lock() {
+ Ok(mut this) => this.poll_next(cx),
+ Err(_) => Poll::Ready(Some(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the stream is expected to have only one consumer, but it's
not",
+ )))),
+ }
+ }
+
+ fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ match self.try_lock() {
+ Ok(mut this) => this.poll_reset(cx),
+ Err(_) => Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the stream is expected to have only one consumer, but it's
not",
+ ))),
+ }
+ }
}
impl futures::Stream for dyn Stream {
type Item = Result<Bytes>;
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
let this: &mut dyn Stream = &mut *self;
this.poll_next(cx)
}
}
+
+/// Impl StreamExt for all T: Stream
+impl<T: Stream> StreamExt for T {}
+
+/// Extension of [`Stream`] to make it easier for use.
+pub trait StreamExt: Stream {
+ /// Build a future for `poll_next`.
+ fn next(&mut self) -> NextFuture<'_, Self> {
+ NextFuture { inner: self }
+ }
+
+ /// Build a future for `poll_reset`.
+ fn reset(&mut self) -> ResetFuture<'_, Self> {
+ ResetFuture { inner: self }
+ }
+}
+
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> {
+ inner: &'a mut T,
+}
+
+impl<T> Future for NextFuture<'_, T>
+where
+ T: Stream + Unpin + ?Sized,
+{
+ type Output = Option<Result<Bytes>>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ let this = self.project();
+ Pin::new(this.inner).poll_next(cx)
+ }
+}
+
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct ResetFuture<'a, T: Stream + Unpin + ?Sized> {
+ inner: &'a mut T,
+}
+
+impl<T> Future for ResetFuture<'_, T>
+where
+ T: Stream + Unpin + ?Sized,
+{
+ type Output = Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ Pin::new(this.inner).poll_reset(cx)
+ }
+}
diff --git a/core/src/raw/oio/stream/into_stream.rs
b/core/src/raw/oio/stream/into_stream.rs
index 2b2197437..fd01f3fcc 100644
--- a/core/src/raw/oio/stream/into_stream.rs
+++ b/core/src/raw/oio/stream/into_stream.rs
@@ -43,4 +43,11 @@ where
fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
self.inner.try_poll_next_unpin(cx)
}
+
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "IntoStream doesn't support reset",
+ )))
+ }
}
diff --git a/core/src/raw/oio/stream/into_stream_from_reader.rs
b/core/src/raw/oio/stream/into_stream_from_reader.rs
index cead7d11d..d8b29bff0 100644
--- a/core/src/raw/oio/stream/into_stream_from_reader.rs
+++ b/core/src/raw/oio/stream/into_stream_from_reader.rs
@@ -89,4 +89,11 @@ where
.set_source(err)))),
}
}
+
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "FromReaderStream doesn't support reset",
+ )))
+ }
}
diff --git a/core/src/raw/oio/stream/mod.rs b/core/src/raw/oio/stream/mod.rs
index 64798480f..c71d243ce 100644
--- a/core/src/raw/oio/stream/mod.rs
+++ b/core/src/raw/oio/stream/mod.rs
@@ -17,6 +17,7 @@
mod api;
pub use api::Stream;
+pub use api::StreamExt;
pub use api::Streamer;
mod into_stream_from_reader;