This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit f6a4188a5482754068950acbe9b230c20b0cf5f2 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 17:00:56 2023 +0800 Save Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 22 ++-- core/src/layers/error_context.rs | 6 +- core/src/layers/logging.rs | 20 ++-- core/src/layers/madsim.rs | 16 +-- core/src/layers/metrics.rs | 9 +- core/src/layers/minitrace.rs | 30 ++---- core/src/layers/prometheus.rs | 9 +- core/src/layers/retry.rs | 128 +++++++++++++++++------ core/src/layers/throttle.rs | 14 +-- core/src/layers/timeout.rs | 95 +++++++++++++---- core/src/layers/tracing.rs | 6 +- core/src/raw/adapters/kv/backend.rs | 5 + core/src/raw/adapters/typed_kv/backend.rs | 5 + core/src/raw/oio/write/append_object_write.rs | 20 ++-- core/src/raw/oio/write/multipart_upload_write.rs | 5 + core/src/raw/oio/write/one_shot_write.rs | 5 + core/src/services/azdfs/backend.rs | 4 +- core/src/services/azdfs/writer.rs | 25 ++--- core/src/services/cos/writer.rs | 7 +- core/src/services/dropbox/backend.rs | 8 +- core/src/services/fs/writer.rs | 5 + core/src/services/ftp/writer.rs | 5 + core/src/services/gdrive/backend.rs | 8 +- core/src/services/ghac/writer.rs | 5 + core/src/services/ipmfs/writer.rs | 10 +- core/src/services/obs/writer.rs | 7 +- core/src/services/onedrive/backend.rs | 4 +- core/src/services/oss/writer.rs | 7 +- core/src/services/s3/writer.rs | 7 +- core/src/services/supabase/backend.rs | 4 +- core/src/services/vercel_artifacts/backend.rs | 8 +- core/src/services/wasabi/backend.rs | 4 +- core/src/services/webdav/backend.rs | 7 +- core/src/services/webhdfs/backend.rs | 4 +- 34 files changed, 334 insertions(+), 190 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 3e9e2bba9..4b856e8a1 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -19,8 +19,8 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::io; use std::sync::Arc; -use std::task::Context; use std::task::Poll; +use std::task::{ready, Context}; use async_trait::async_trait; use bytes::Bytes; @@ -715,22 +715,22 @@ where let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - let n = w.write(bs).await?; + let n = ready!(w.poll_write(cx, bs))?; self.written += n as u64; if let Some(size) = self.size { if self.written > size { - return Err(Error::new( + return Poll::Ready(Err(Error::new( ErrorKind::ContentTruncated, &format!( "writer got too much data, expect: {size}, actual: {}", self.written + n as u64 ), - )); + ))); } } - Ok(n) + Poll::Ready(Ok(n)) } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { @@ -738,22 +738,22 @@ where Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.abort().await?; + ready!(w.poll_abort(cx))?; self.inner = None; - Ok(()) + Poll::Ready(Ok(())) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { if let Some(size) = self.size { if self.written < size { - return Err(Error::new( + return Poll::Ready(Err(Error::new( ErrorKind::ContentIncomplete, &format!( "writer got too less data, expect: {size}, actual: {}", self.written ), - )); + ))); } } @@ -761,10 +761,10 @@ where Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.close().await?; + ready!(w.poll_close(cx))?; self.inner = None; - Ok(()) + Poll::Ready(Ok(())) } } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 7ce05fc18..1fd5e608c 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -404,7 +404,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> { #[async_trait::async_trait] impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - self.inner.write(bs).await.map_err(|err| { + self.inner.poll_write(cx, bs).map_err(|err| { err.with_operation(WriteOperation::Write) .with_context("service", self.scheme) .with_context("path", &self.path) @@ -412,7 +412,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.abort().await.map_err(|err| { + self.inner.poll_abort(cx).map_err(|err| { err.with_operation(WriteOperation::Abort) .with_context("service", self.scheme) .with_context("path", &self.path) @@ -420,7 +420,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.close().await.map_err(|err| { + self.inner.poll_close(cx).map_err(|err| { err.with_operation(WriteOperation::Close) .with_context("service", self.scheme) .with_context("path", &self.path) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 7da2da55b..42084eede 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -17,8 +17,8 @@ use std::fmt::Debug; use std::io; -use std::task::Context; use std::task::Poll; +use std::task::{ready, Context}; use async_trait::async_trait; use bytes::Bytes; @@ -1253,7 +1253,7 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - match self.inner.write(bs).await { + match ready!(self.inner.poll_write(cx, bs)) { Ok(n) => { self.written += n as u64; trace!( @@ -1265,7 +1265,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { self.written, n ); - Ok(n) + Poll::Ready(Ok(n)) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1280,13 +1280,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { self.ctx.error_print(&err), ) } - Err(err) + Poll::Ready(Err(err)) } } } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.inner.abort().await { + match ready!(self.inner.poll_abort(cx)) { Ok(_) => { trace!( target: LOGGING_TARGET, @@ -1296,7 +1296,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { self.path, self.written, ); - Ok(()) + Poll::Ready(Ok(())) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1311,13 +1311,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { self.ctx.error_print(&err), ) } - Err(err) + Poll::Ready(Err(err)) } } } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.inner.close().await { + match ready!(self.inner.poll_close(cx)) { Ok(_) => { debug!( target: LOGGING_TARGET, @@ -1327,7 +1327,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { self.path, self.written ); - Ok(()) + Poll::Ready(Ok(())) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1342,7 +1342,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { self.ctx.error_print(&err), ) } - Err(err) + Poll::Ready(Err(err)) } } } diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index f58eb1997..3c107fd83 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,7 +302,11 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> crate::Result<usize> { + fn poll_write( + &mut self, + cx: &mut Context<'_>, + bs: &dyn oio::WriteBuf, + ) -> Poll<crate::Result<usize>> { #[cfg(madsim)] { let req = Request::Write(self.path.to_string(), bs); @@ -318,15 +322,15 @@ impl oio::Write for MadsimWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> crate::Result<()> { - Err(Error::new( + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { + Poll::Ready(Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", - )) + ))) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> crate::Result<()> { - Ok(()) + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { + Poll::Ready(Ok(())) } } diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 3cebf46b1..bcbb72131 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -849,9 +849,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { impl<R: oio::Write> oio::Write for MetricWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner - .write(bs) - .await - .map(|n| { + .poll_write(cx, bs) + .map_ok(|n| { self.bytes += n as u64; n }) @@ -862,14 +861,14 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.abort().await.map_err(|err| { + self.inner.poll_abort(cx).map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err }) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.close().await.map_err(|err| { + self.inner.poll_close(cx).map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err }) diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 7c6fad770..00e1239d2 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -338,33 +338,21 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - self.inner - .write(bs) - .in_span(Span::enter_with_parent( - WriteOperation::Write.into_static(), - &self.span, - )) - .await + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static()); + self.inner.poll_write(cx, bs) } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner - .abort() - .in_span(Span::enter_with_parent( - WriteOperation::Abort.into_static(), - &self.span, - )) - .await + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(WriteOperation::Abort.into_static()); + self.inner.poll_abort(cx) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner - .close() - .in_span(Span::enter_with_parent( - WriteOperation::Close.into_static(), - &self.span, - )) - .await + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(WriteOperation::Close.into_static()); + self.inner.poll_close(cx) } } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index b3f704e6d..2558bb50b 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -664,9 +664,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner - .write(bs) - .await - .map(|n| { + .poll_write(cx, bs) + .map_ok(|n| { self.stats .bytes_total .with_label_values(&[&self.scheme, Operation::Write.into_static()]) @@ -680,14 +679,14 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.abort().await.map_err(|err| { + self.inner.poll_abort(cx).map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); err }) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.close().await.map_err(|err| { + self.inner.poll_close(cx).map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); err }) diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 6a5dba57c..7b2549dc0 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -873,79 +873,139 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp #[async_trait] impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let mut backoff = self.builder.build(); + if let Some(sleep) = self.sleep.as_mut() { + ready!(sleep.poll_unpin(cx)); + self.sleep = None; + } - loop { - match self.inner.write(bs).await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), + match ready!(self.inner.poll_write(cx, bs)) { + Ok(v) => { + self.current_backoff = None; + Poll::Ready(Ok(v)) + } + Err(err) if !err.is_temporary() => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } + Err(err) => { + let backoff = match self.current_backoff.as_mut() { + Some(backoff) => backoff, + None => { + self.current_backoff = Some(self.builder.build()); + self.current_backoff.as_mut().unwrap() + } + }; + + match backoff.next() { + None => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } Some(dur) => { self.notify.intercept( - &e, + &err, dur, &[ ("operation", WriteOperation::Write.into_static()), ("path", &self.path), ], ); - tokio::time::sleep(dur).await; - continue; + self.sleep = Some(Box::pin(tokio::time::sleep(dur))); + self.poll_write(cx, bs) } - }, + } } } } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - let mut backoff = self.builder.build(); + if let Some(sleep) = self.sleep.as_mut() { + ready!(sleep.poll_unpin(cx)); + self.sleep = None; + } - loop { - match self.inner.abort().await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), + match ready!(self.inner.poll_abort(cx)) { + Ok(v) => { + self.current_backoff = None; + Poll::Ready(Ok(v)) + } + Err(err) if !err.is_temporary() => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } + Err(err) => { + let backoff = match self.current_backoff.as_mut() { + Some(backoff) => backoff, + None => { + self.current_backoff = Some(self.builder.build()); + self.current_backoff.as_mut().unwrap() + } + }; + + match backoff.next() { + None => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } Some(dur) => { self.notify.intercept( - &e, + &err, dur, &[ ("operation", WriteOperation::Abort.into_static()), ("path", &self.path), ], ); - tokio::time::sleep(dur).await; - continue; + self.sleep = Some(Box::pin(tokio::time::sleep(dur))); + self.poll_abort(cx) } - }, + } } } } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - let mut backoff = self.builder.build(); + if let Some(sleep) = self.sleep.as_mut() { + ready!(sleep.poll_unpin(cx)); + self.sleep = None; + } - loop { - match self.inner.close().await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), + match ready!(self.inner.poll_close(cx)) { + Ok(v) => { + self.current_backoff = None; + Poll::Ready(Ok(v)) + } + Err(err) if !err.is_temporary() => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } + Err(err) => { + let backoff = match self.current_backoff.as_mut() { + Some(backoff) => backoff, + None => { + self.current_backoff = Some(self.builder.build()); + self.current_backoff.as_mut().unwrap() + } + }; + + match backoff.next() { + None => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } Some(dur) => { self.notify.intercept( - &e, + &err, dur, &[ - ("operation", WriteOperation::Close.into_static()), + ("operation", WriteOperation::Abort.into_static()), ("path", &self.path), ], ); - tokio::time::sleep(dur).await; - continue; + self.sleep = Some(Box::pin(tokio::time::sleep(dur))); + self.poll_close(cx) } - }, + } } } } diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index f731ab83c..72e63abe0 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -221,20 +221,22 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { loop { match self.limiter.check_n(buf_length) { - Ok(_) => return self.inner.write(bs).await, + Ok(_) => return self.inner.poll_write(cx, bs), Err(negative) => match negative { // the query is valid but the Decider can not accommodate them. NegativeMultiDecision::BatchNonConforming(_, not_until) => { let wait_time = not_until.wait_time_from(DefaultClock::default().now()); // TODO: Should lock the limiter and wait for the wait_time, or should let other small requests go first? - tokio::time::sleep(wait_time).await; + + // FIXME: we should sleep here. + // tokio::time::sleep(wait_time).await; } // the query was invalid as the rate limit parameters can "never" accommodate the number of cells queried for. NegativeMultiDecision::InsufficientCapacity(_) => { - return Err(Error::new( + return Poll::Ready(Err(Error::new( ErrorKind::RateLimited, "InsufficientCapacity due to burst size being smaller than the request size", - )) + ))) } }, } @@ -242,11 +244,11 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.abort().await + self.inner.poll_abort(cx) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.close().await + self.inner.poll_close(cx) } } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 5871818b3..d6cd4d24a 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -323,38 +323,93 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let timeout = self.io_timeout(bs.remaining() as u64); + match self.start { + Some(start) => { + if start.elapsed() > self.timeout { + // Clean up the start time before return ready. + self.start = None; - tokio::time::timeout(timeout, self.inner.write(bs)) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "operation timeout", + ) .with_operation(WriteOperation::Write) - .with_context("timeout", timeout.as_secs_f64().to_string()) - .set_temporary() - })? + .with_context("timeout", self.timeout.as_secs_f64().to_string()) + .set_temporary())); + } + } + None => { + self.start = Some(Instant::now()); + } + } + + match self.inner.poll_write(cx, bs) { + Poll::Pending => Poll::Pending, + Poll::Ready(v) => { + self.start = None; + Poll::Ready(v) + } + } } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - tokio::time::timeout(self.timeout, self.inner.abort()) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") + match self.start { + Some(start) => { + if start.elapsed() > self.timeout { + // Clean up the start time before return ready. + self.start = None; + + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "operation timeout", + ) .with_operation(WriteOperation::Abort) .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? + .set_temporary())); + } + } + None => { + self.start = Some(Instant::now()); + } + } + + match self.inner.poll_abort(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(v) => { + self.start = None; + Poll::Ready(v) + } + } } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - tokio::time::timeout(self.timeout, self.inner.close()) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") + match self.start { + Some(start) => { + if start.elapsed() > self.timeout { + // Clean up the start time before return ready. + self.start = None; + + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "operation timeout", + ) .with_operation(WriteOperation::Close) .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? + .set_temporary())); + } + } + None => { + self.start = Some(Instant::now()); + } + } + + match self.inner.poll_close(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(v) => { + self.start = None; + Poll::Ready(v) + } + } } } diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index cb71388cb..9bb0b54c2 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -325,7 +325,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { level = "trace", skip_all)] fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - self.inner.write(bs).await + self.inner.poll_write(cx, bs) } #[tracing::instrument( @@ -333,7 +333,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { level = "trace", skip_all)] fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.abort().await + self.inner.poll_abort(cx) } #[tracing::instrument( @@ -341,7 +341,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { level = "trace", skip_all)] fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.close().await + self.inner.poll_close(cx) } } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index a3ead2e7a..48c327b0a 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -390,6 +390,11 @@ impl<S> KvWriter<S> { } } +/// # Safety +/// +/// We will only take `&mut Self` reference for KvWriter. +unsafe impl<S: Adapter> Sync for KvWriter<S> {} + #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { // TODO: we need to support append in the future. diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 4a3cdf37b..fda56bf28 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -368,6 +368,11 @@ pub struct KvWriter<S> { future: Option<BoxFuture<'static, Result<()>>>, } +/// # Safety +/// +/// We will only take `&mut Self` reference for KvWriter. +unsafe impl<S: Adapter> Sync for KvWriter<S> {} + impl<S> KvWriter<S> { fn new(kv: Arc<S>, path: String, op: OpWrite) -> Self { KvWriter { diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 2c0bd5555..393254097 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -59,6 +59,11 @@ enum State<W> { Append(BoxFuture<'static, (W, Result<usize>)>), } +/// # Safety +/// +/// We will only take `&mut Self` reference for State. +unsafe impl<S: AppendObjectWrite> Sync for State<S> {} + impl<W: AppendObjectWrite> AppendObjectWriter<W> { /// Create a new AppendObjectWriter. pub fn new(inner: W) -> Self { @@ -67,17 +72,6 @@ impl<W: AppendObjectWrite> AppendObjectWriter<W> { offset: None, } } - - async fn offset(&mut self) -> Result<u64> { - if let Some(offset) = self.offset { - return Ok(offset); - } - - let offset = self.inner.offset().await?; - self.offset = Some(offset); - - Ok(offset) - } } #[async_trait] @@ -103,9 +97,9 @@ where } None => { self.state = State::Offset(Box::pin(async move { - let offset = w.offset().await?; + let offset = w.offset().await; - (w, Ok(offset)) + (w, offset) })); } } diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 482954dc9..97b2f0220 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -96,6 +96,11 @@ enum State<W> { Abort(BoxFuture<'static, (W, Result<()>)>), } +/// # Safety +/// +/// We will only take `&mut Self` reference for State. +unsafe impl<S: MultipartUploadWrite> Sync for State<S> {} + impl<W: MultipartUploadWrite> MultipartUploadWriter<W> { /// Create a new MultipartUploadWriter. pub fn new(inner: W) -> Self { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index a0a743915..6cc083491 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -47,6 +47,11 @@ enum State<W> { Write(BoxFuture<'static, (W, Result<usize>)>), } +/// # Safety +/// +/// We will only take `&mut Self` reference for State. +unsafe impl<S: OneShotWrite> Sync for State<S> {} + impl<W: OneShotWrite> OneShotWriter<W> { /// Create a new one shot writer. pub fn new(inner: W) -> Self { diff --git a/core/src/services/azdfs/backend.rs b/core/src/services/azdfs/backend.rs index 3882f4fe2..1c0dd3a8c 100644 --- a/core/src/services/azdfs/backend.rs +++ b/core/src/services/azdfs/backend.rs @@ -230,7 +230,7 @@ pub struct AzdfsBackend { impl Accessor for AzdfsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = AzdfsWriter; + type Writer = oio::OneShotWriter<AzdfsWriter>; type BlockingWriter = (); type Pager = AzdfsPager; type BlockingPager = (); @@ -305,7 +305,7 @@ impl Accessor for AzdfsBackend { Ok(( RpWrite::default(), - AzdfsWriter::new(self.core.clone(), args, path.to_string()), + oio::OneShotWriter::new(AzdfsWriter::new(self.core.clone(), args, path.to_string())), )) } diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 774c9ee4b..1da698dd4 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::core::AzdfsCore; @@ -40,8 +41,8 @@ impl AzdfsWriter { } #[async_trait] -impl oio::Write for AzdfsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { +impl oio::OneShotWrite for AzdfsWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -66,13 +67,11 @@ impl oio::Write for AzdfsWriter { } } - let size = bs.remaining(); + let size = bs.len(); - let mut req = self.core.azdfs_update_request( - &self.path, - Some(size), - AsyncBody::Bytes(bs.copy_to_bytes(size)), - )?; + let mut req = + self.core + .azdfs_update_request(&self.path, Some(size), AsyncBody::Bytes(bs))?; self.core.sign(&mut req).await?; @@ -82,19 +81,11 @@ impl oio::Write for AzdfsWriter { match status { StatusCode::OK | StatusCode::ACCEPTED => { resp.into_body().consume().await?; - Ok(size) + Ok(()) } _ => Err(parse_error(resp) .await? .with_operation("Backend::azdfs_update_request")), } } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Poll::Ready(Ok(())) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Poll::Ready(Ok(())) - } } diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index e0cc8be73..fba7cea09 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::core::*; @@ -50,15 +51,15 @@ impl CosWriter { #[async_trait] impl oio::OneShotWrite for CosWriter { - async fn write_once(&self, buf: &dyn oio::WriteBuf) -> Result<()> { - let size = buf.remaining(); + async fn write_once(&self, buf: Bytes) -> Result<()> { + let size = buf.len(); let mut req = self.core.cos_put_object_request( &self.path, Some(size as u64), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Bytes(buf.copy_to_bytes(size)), + AsyncBody::Bytes(buf), )?; self.core.sign(&mut req).await?; diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index ebb468728..029f1fe7f 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -49,7 +49,7 @@ pub struct DropboxBackend { impl Accessor for DropboxBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = DropboxWriter; + type Writer = oio::OneShotWriter<DropboxWriter>; type BlockingWriter = (); type Pager = (); type BlockingPager = (); @@ -114,7 +114,11 @@ impl Accessor for DropboxBackend { } Ok(( RpWrite::default(), - DropboxWriter::new(self.core.clone(), args, String::from(path)), + oio::OneShotWriter::new(DropboxWriter::new( + self.core.clone(), + args, + String::from(path), + )), )) } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index a71de17d5..1a0aa49eb 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -48,6 +48,11 @@ impl<F> FsWriter<F> { } } +/// # Safety +/// +/// We will only take `&mut Self` reference for FsWriter. +unsafe impl<F> Sync for FsWriter<F> {} + #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 34ed85b37..ec3c3f0e2 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -46,6 +46,11 @@ impl FtpWriter { } } +/// # Safety +/// +/// We will only take `&mut Self` reference for FtpWriter. +unsafe impl Sync for FtpWriter {} + #[async_trait] impl oio::Write for FtpWriter { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index df8ee7451..f50713ac7 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -40,7 +40,7 @@ pub struct GdriveBackend { impl Accessor for GdriveBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = GdriveWriter; + type Writer = oio::OneShotWriter<GdriveWriter>; type BlockingWriter = (); type Pager = (); type BlockingPager = (); @@ -194,7 +194,11 @@ impl Accessor for GdriveBackend { Ok(( RpWrite::default(), - GdriveWriter::new(self.core.clone(), String::from(path), file_id), + oio::OneShotWriter::new(GdriveWriter::new( + self.core.clone(), + String::from(path), + file_id, + )), )) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 0be63ce22..c780c2710 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -47,6 +47,11 @@ enum State { Commit(BoxFuture<'static, (GhacBackend, Result<()>)>), } +/// # Safety +/// +/// We will only take `&mut Self` reference for State. +unsafe impl Sync for State {} + #[async_trait] impl oio::Write for GhacWriter { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 0d1673693..feb5e5630 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use std::task::{Context, Poll}; @@ -39,12 +40,9 @@ impl IpmfsWriter { #[async_trait] impl oio::OneShotWrite for IpmfsWriter { - async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { - let size = bs.remaining(); - let resp = self - .backend - .ipmfs_write(&self.path, bs.copy_to_bytes(size)) - .await?; + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); + let resp = self.backend.ipmfs_write(&self.path, bs).await?; let status = resp.status(); diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 38dc660db..6b0aed175 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use bytes::Bytes; use std::sync::Arc; use async_trait::async_trait; @@ -51,14 +52,14 @@ impl ObsWriter { #[async_trait] impl oio::OneShotWrite for ObsWriter { - async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { - let size = bs.remaining(); + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); let mut req = self.core.obs_put_object_request( &self.path, Some(size as u64), self.op.content_type(), self.op.cache_control(), - AsyncBody::Bytes(bs.copy_to_bytes(size)), + AsyncBody::Bytes(bs), )?; self.core.sign(&mut req).await?; diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index 09ef04765..2b32abbaf 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -64,7 +64,7 @@ impl Debug for OnedriveBackend { impl Accessor for OnedriveBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = OneDriveWriter; + type Writer = oio::OneShotWriter<OneDriveWriter>; type BlockingWriter = (); type Pager = OnedrivePager; type BlockingPager = (); @@ -114,7 +114,7 @@ impl Accessor for OnedriveBackend { Ok(( RpWrite::default(), - OneDriveWriter::new(self.clone(), args, path), + oio::OneShotWriter::new(OneDriveWriter::new(self.clone(), args, path)), )) } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 8fd7ac656..56d262f17 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::core::*; @@ -50,15 +51,15 @@ impl OssWriter { #[async_trait] impl oio::OneShotWrite for OssWriter { - async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { - let size = bs.remaining(); + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); let mut req = self.core.oss_put_object_request( &self.path, Some(size as u64), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Bytes(bs.copy_to_bytes(size)), + AsyncBody::Bytes(bs), false, )?; diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 7c5ddf924..a3a1bd5bd 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::core::*; @@ -47,8 +48,8 @@ impl S3Writer { #[async_trait] impl oio::OneShotWrite for S3Writer { - async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { - let size = bs.remaining(); + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); let mut req = self.core.s3_put_object_request( &self.path, @@ -56,7 +57,7 @@ impl oio::OneShotWrite for S3Writer { self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Bytes(bs.copy_to_bytes(size)), + AsyncBody::Bytes(bs), )?; self.core.sign(&mut req).await?; diff --git a/core/src/services/supabase/backend.rs b/core/src/services/supabase/backend.rs index 89996fa8c..402c3abe7 100644 --- a/core/src/services/supabase/backend.rs +++ b/core/src/services/supabase/backend.rs @@ -158,7 +158,7 @@ pub struct SupabaseBackend { impl Accessor for SupabaseBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = SupabaseWriter; + type Writer = oio::OneShotWriter<SupabaseWriter>; type BlockingWriter = (); // todo: implement Pager to support list and scan type Pager = (); @@ -233,7 +233,7 @@ impl Accessor for SupabaseBackend { Ok(( RpWrite::default(), - SupabaseWriter::new(self.core.clone(), path, args), + oio::OneShotWriter::new(SupabaseWriter::new(self.core.clone(), path, args)), )) } diff --git a/core/src/services/vercel_artifacts/backend.rs b/core/src/services/vercel_artifacts/backend.rs index 70cd9048b..4b88be664 100644 --- a/core/src/services/vercel_artifacts/backend.rs +++ b/core/src/services/vercel_artifacts/backend.rs @@ -46,7 +46,7 @@ impl Debug for VercelArtifactsBackend { impl Accessor for VercelArtifactsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = VercelArtifactsWriter; + type Writer = oio::OneShotWriter<VercelArtifactsWriter>; type BlockingWriter = (); type Pager = (); type BlockingPager = (); @@ -93,7 +93,11 @@ impl Accessor for VercelArtifactsBackend { Ok(( RpWrite::default(), - VercelArtifactsWriter::new(self.clone(), args, path.to_string()), + oio::OneShotWriter::new(VercelArtifactsWriter::new( + self.clone(), + args, + path.to_string(), + )), )) } diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs index f2aedab2a..835492bad 100644 --- a/core/src/services/wasabi/backend.rs +++ b/core/src/services/wasabi/backend.rs @@ -670,7 +670,7 @@ pub struct WasabiBackend { impl Accessor for WasabiBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = WasabiWriter; + type Writer = oio::OneShotWriter<WasabiWriter>; type BlockingWriter = (); type Pager = WasabiPager; type BlockingPager = (); @@ -759,7 +759,7 @@ impl Accessor for WasabiBackend { Ok(( RpWrite::default(), - WasabiWriter::new(self.core.clone(), args, path.to_string()), + oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args, path.to_string())), )) } diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index 509d4c612..cca8728ba 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -221,7 +221,7 @@ impl Debug for WebdavBackend { impl Accessor for WebdavBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = WebdavWriter; + type Writer = oio::OneShotWriter<WebdavWriter>; type BlockingWriter = (); type Pager = Option<WebdavPager>; type BlockingPager = (); @@ -286,7 +286,10 @@ impl Accessor for WebdavBackend { let p = build_abs_path(&self.root, path); - Ok((RpWrite::default(), WebdavWriter::new(self.clone(), args, p))) + Ok(( + RpWrite::default(), + oio::OneShotWriter::new(WebdavWriter::new(self.clone(), args, p)), + )) } /// # Notes diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index db8e0490a..e8a3a8f04 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -399,7 +399,7 @@ impl WebhdfsBackend { impl Accessor for WebhdfsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = WebhdfsWriter; + type Writer = oio::OneShotWriter<WebhdfsWriter>; type BlockingWriter = (); type Pager = WebhdfsPager; type BlockingPager = (); @@ -483,7 +483,7 @@ impl Accessor for WebhdfsBackend { Ok(( RpWrite::default(), - WebhdfsWriter::new(self.clone(), args, path.to_string()), + oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args, path.to_string())), )) }
