This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit e1d7ce09c2f56a9898d4916c2fae45e10a81b200 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 10:57:10 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 10 +- core/src/layers/complete.rs | 6 +- core/src/layers/concurrent_limit.rs | 6 +- core/src/layers/error_context.rs | 6 +- core/src/layers/logging.rs | 6 +- core/src/layers/madsim.rs | 6 +- core/src/layers/metrics.rs | 6 +- core/src/layers/minitrace.rs | 6 +- core/src/layers/oteltrace.rs | 6 +- core/src/layers/prometheus.rs | 6 +- core/src/layers/retry.rs | 6 +- core/src/layers/throttle.rs | 6 +- core/src/layers/timeout.rs | 6 +- core/src/layers/tracing.rs | 6 +- core/src/raw/adapters/kv/backend.rs | 59 +++++++++--- core/src/raw/adapters/typed_kv/backend.rs | 54 +++++++++-- core/src/raw/oio/write/api.rs | 112 +++++++++++++++++++---- core/src/raw/oio/write/append_object_write.rs | 6 +- core/src/raw/oio/write/compose_write.rs | 12 +-- core/src/raw/oio/write/exact_buf_write.rs | 30 +++--- core/src/raw/oio/write/mod.rs | 1 + core/src/raw/oio/write/multipart_upload_write.rs | 6 +- core/src/raw/oio/write/one_shot_write.rs | 6 +- core/src/services/azblob/writer.rs | 6 +- core/src/services/azdfs/writer.rs | 6 +- core/src/services/dropbox/writer.rs | 6 +- core/src/services/fs/writer.rs | 6 +- core/src/services/ftp/writer.rs | 6 +- core/src/services/gcs/writer.rs | 6 +- core/src/services/gdrive/writer.rs | 6 +- core/src/services/ghac/writer.rs | 6 +- core/src/services/hdfs/writer.rs | 6 +- core/src/services/ipmfs/writer.rs | 6 +- core/src/services/onedrive/writer.rs | 6 +- core/src/services/sftp/writer.rs | 6 +- core/src/services/supabase/writer.rs | 6 +- core/src/services/vercel_artifacts/writer.rs | 6 +- core/src/services/wasabi/writer.rs | 6 +- core/src/services/webdav/writer.rs | 6 +- core/src/services/webhdfs/writer.rs | 6 +- core/src/types/writer.rs | 4 +- 41 files changed, 316 insertions(+), 164 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 1671057f0..67b4fd451 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -26,15 +26,19 @@ pub struct BlackHoleWriter; #[async_trait] impl oio::Write for BlackHoleWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> opendal::Result<usize> { + fn poll_write( + &mut self, + cx: &mut Context<'_>, + bs: &dyn oio::WriteBuf, + ) -> opendal::Result<usize> { Ok(bs.remaining()) } - async fn abort(&mut self) -> opendal::Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> { Ok(()) } - async fn close(&mut self) -> opendal::Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> { Ok(()) } } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 048905bae..19429cff8 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; @@ -733,7 +733,7 @@ where Ok(n) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; @@ -744,7 +744,7 @@ where Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { if let Some(size) = self.size { if self.written < size { return Err(Error::new( diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index bdc473384..f726d2207 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,15 +285,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.inner.write(bs).await } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.abort().await } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.close().await } } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 15d7a0dd0..c6b3bd2be 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> { #[async_trait::async_trait] impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.inner.write(bs).await.map_err(|err| { err.with_operation(WriteOperation::Write) .with_context("service", self.scheme) @@ -411,7 +411,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.abort().await.map_err(|err| { err.with_operation(WriteOperation::Abort) .with_context("service", self.scheme) @@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.close().await.map_err(|err| { err.with_operation(WriteOperation::Close) .with_context("service", self.scheme) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index f0cea32e8..f55bbd627 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,7 +1252,7 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { match self.inner.write(bs).await { Ok(n) => { self.written += n as u64; @@ -1285,7 +1285,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { match self.inner.abort().await { Ok(_) => { trace!( @@ -1316,7 +1316,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { match self.inner.close().await { Ok(_) => { debug!( diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index b0b5cf179..f58eb1997 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,7 +302,7 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> crate::Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> crate::Result<usize> { #[cfg(madsim)] { let req = Request::Write(self.path.to_string(), bs); @@ -318,14 +318,14 @@ impl oio::Write for MadsimWriter { } } - async fn abort(&mut self) -> crate::Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> crate::Result<()> { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", )) } - async fn close(&mut self) -> crate::Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> crate::Result<()> { Ok(()) } } diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index b16f87a40..7a7b73fc4 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,7 +847,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.inner .write(bs) .await @@ -861,14 +861,14 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { }) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.abort().await.map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err }) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.close().await.map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index f54f40593..8d8662846 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.inner .write(bs) .in_span(Span::enter_with_parent( @@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner .abort() .in_span(Span::enter_with_parent( @@ -357,7 +357,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner .close() .in_span(Span::enter_with_parent( diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index cceca3a69..f0a8ae82b 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,15 +313,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.inner.write(bs).await } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.abort().await } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.close().await } } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 2e294c9e5..98637752b 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,7 +662,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.inner .write(bs) .await @@ -679,14 +679,14 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { }) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.abort().await.map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); err }) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.close().await.map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); err diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 3746008ab..3a1c5521f 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -872,7 +872,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp #[async_trait] impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let mut backoff = self.builder.build(); loop { @@ -898,7 +898,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { let mut backoff = self.builder.build(); loop { @@ -924,7 +924,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { let mut backoff = self.builder.build(); loop { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index aea7b6381..7787c77d8 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -216,7 +216,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap(); loop { @@ -241,11 +241,11 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.abort().await } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.close().await } } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index c02a4d58a..751caa915 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,7 +322,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let timeout = self.io_timeout(bs.remaining() as u64); tokio::time::timeout(timeout, self.inner.write(bs)) @@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { })? } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { tokio::time::timeout(self.timeout, self.inner.abort()) .await .map_err(|_| { @@ -346,7 +346,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { })? } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { tokio::time::timeout(self.timeout, self.inner.close()) .await .map_err(|_| { diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index b829f8376..35f718cb5 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.inner.write(bs).await } @@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.abort().await } @@ -340,7 +340,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.inner.close().await } } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 513bda67e..a3ead2e7a 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -16,8 +16,10 @@ // under the License. use std::sync::Arc; +use std::task::{ready, Context, Poll}; use async_trait::async_trait; +use futures::future::BoxFuture; use super::Adapter; use crate::raw::*; @@ -374,6 +376,7 @@ pub struct KvWriter<S> { /// TODO: if kv supports append, we can use them directly. buf: Option<Vec<u8>>, + future: Option<BoxFuture<'static, Result<()>>>, } impl<S> KvWriter<S> { @@ -382,6 +385,7 @@ impl<S> KvWriter<S> { kv, path, buf: None, + future: None, } } } @@ -389,29 +393,60 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { // TODO: we need to support append in the future. - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { + if self.future.is_some() { + self.future = None; + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "there is a future on going, it's maybe a bug to go into this case", + ))); + } + let size = bs.chunk().len(); let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size)); buf.extend_from_slice(bs.chunk()); self.buf = Some(buf); - Ok(size) + Poll::Ready(Ok(size)) } - async fn abort(&mut self) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support abort", - )) + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + if self.future.is_some() { + self.future = None; + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "there is a future on going, it's maybe a bug to go into this case", + ))); + } + + self.buf = None; + Poll::Ready(Ok(())) } - async fn close(&mut self) -> Result<()> { - if let Some(buf) = self.buf.as_deref() { - self.kv.set(&self.path, buf).await?; + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + loop { + match self.future.as_mut() { + Some(fut) => { + ready!(fut.poll_unpin(cx))?; + self.future = None; + return Poll::Ready(Ok(())); + } + None => { + let kv = self.kv.clone(); + let path = self.path.clone(); + let buf = match self.buf.take() { + Some(buf) => buf, + None => return Poll::Ready(Ok(())), + }; + + let fut = async move { + kv.set(&path, &buf).await?; + }; + self.future = Some(Box::pin(fut)); + } + } } - - Ok(()) } } diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 1e0864de7..4a3cdf37b 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -16,9 +16,11 @@ // under the License. use std::sync::Arc; +use std::task::{ready, Context, Poll}; use async_trait::async_trait; use bytes::Bytes; +use futures::future::BoxFuture; use super::Adapter; use super::Value; @@ -363,6 +365,7 @@ pub struct KvWriter<S> { op: OpWrite, buf: Option<Vec<u8>>, + future: Option<BoxFuture<'static, Result<()>>>, } impl<S> KvWriter<S> { @@ -372,6 +375,7 @@ impl<S> KvWriter<S> { path, op, buf: None, + future: None, } } @@ -401,7 +405,15 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { // TODO: we need to support append in the future. - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { + if self.future.is_some() { + self.future = None; + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "there is a future on going, it's maybe a bug to go into this case", + ))); + } + let size = bs.chunk().len(); let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size)); @@ -409,20 +421,42 @@ impl<S: Adapter> oio::Write for KvWriter<S> { self.buf = Some(buf); - Ok(size) + Poll::Ready(Ok(size)) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + if self.future.is_some() { + self.future = None; + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "there is a future on going, it's maybe a bug to go into this case", + ))); + } + self.buf = None; - Ok(()) + Poll::Ready(Ok(())) } - async fn close(&mut self) -> Result<()> { - let kv = self.kv.clone(); - let value = self.build(); - - kv.set(&self.path, value).await?; - Ok(()) + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + loop { + match self.future.as_mut() { + Some(fut) => { + ready!(fut.poll_unpin(cx))?; + self.future = None; + return Poll::Ready(Ok(())); + } + None => { + let kv = self.kv.clone(); + let path = self.path.clone(); + let value = self.build(); + + let fut = async move { + kv.set(&path, value).await?; + }; + self.future = Some(Box::pin(fut)); + } + } + } } } diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 622303ff4..2463068e9 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -17,8 +17,12 @@ use std::fmt::Display; use std::fmt::Formatter; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use async_trait::async_trait; +use pin_project::pin_project; use crate::raw::*; use crate::*; @@ -81,35 +85,33 @@ pub trait Write: Unpin + Send + Sync { /// /// It's possible that `n < bs.len()`, caller should pass the remaining bytes /// repeatedly until all bytes has been written. - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize>; + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>>; /// Abort the pending writer. - async fn abort(&mut self) -> Result<()>; + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>; /// Close the writer and make sure all data has been flushed. - async fn close(&mut self) -> Result<()>; + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>; } #[async_trait] impl Write for () { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { - let _ = bs; - + fn poll_write(&mut self, _: &mut Context<'_>, _: &dyn oio::WriteBuf) -> Poll<Result<usize>> { unimplemented!("write is required to be implemented for oio::Write") } - async fn abort(&mut self) -> Result<()> { - Err(Error::new( + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + Poll::Ready(Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support abort", - )) + ))) } - async fn close(&mut self) -> Result<()> { - Err(Error::new( + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + Poll::Ready(Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support close", - )) + ))) } } @@ -118,16 +120,90 @@ impl Write for () { /// To make Writer work as expected, we must add this impl. #[async_trait] impl<T: Write + ?Sized> Write for Box<T> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { - (**self).write(bs).await + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { + (**self).poll_write(cx, bs) + } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + (**self).poll_abort(cx) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + (**self).poll_close(cx) + } +} + +/// Impl WriteExt for all T: Write +impl<T: Write> WriteExt for T {} + +/// Extension of [`Read`] to make it easier for use. +pub trait WriteExt: Write { + /// Build a future for `poll_write`. + fn write<'a>(&'a mut self, buf: &'a dyn oio::WriteBuf) -> WriteFuture<'a, Self> { + WriteFuture { writer: self, buf } + } + + fn abort(&mut self) -> AbortFuture<Self> { + AbortFuture { writer: self } } - async fn abort(&mut self) -> Result<()> { - (**self).abort().await + fn close(&mut self) -> CloseFuture<Self> { + CloseFuture { writer: self } } +} + +/// Make this future `!Unpin` for compatibility with async trait methods. +#[pin_project(!Unpin)] +pub struct WriteFuture<'a, W: Write + Unpin + ?Sized> { + writer: &'a mut W, + buf: &'a dyn oio::WriteBuf, +} + +impl<W> Future for WriteFuture<'_, W> +where + W: Write + Unpin + ?Sized, +{ + type Output = Result<usize>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> { + let this = self.project(); + Pin::new(this.writer).poll_write(cx, this.buf) + } +} + +/// Make this future `!Unpin` for compatibility with async trait methods. +#[pin_project(!Unpin)] +pub struct AbortFuture<'a, W: Write + Unpin + ?Sized> { + writer: &'a mut W, +} + +impl<W> Future for AbortFuture<'_, W> +where + W: Write + Unpin + ?Sized, +{ + type Output = Result<usize>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + Pin::new(this.writer).poll_abort(cx) + } +} + +/// Make this future `!Unpin` for compatibility with async trait methods. +#[pin_project(!Unpin)] +pub struct CloseFuture<'a, W: Write + Unpin + ?Sized> { + writer: &'a mut W, +} + +impl<W> Future for CloseFuture<'_, W> +where + W: Write + Unpin + ?Sized, +{ + type Output = Result<usize>; - async fn close(&mut self) -> Result<()> { - (**self).close().await + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + Pin::new(this.writer).poll_close(cx) } } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 2a5e30609..ed2a456d9 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -77,7 +77,7 @@ impl<W> oio::Write for AppendObjectWriter<W> where W: AppendObjectWrite, { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let offset = self.offset().await?; let size = bs.remaining(); @@ -94,11 +94,11 @@ where Ok(size) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index e2c6638af..c6504ee93 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -55,21 +55,21 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { #[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { match self { Self::One(one) => one.abort().await, Self::Two(two) => two.abort().await, } } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { match self { Self::One(one) => one.close().await, Self::Two(two) => two.close().await, @@ -93,7 +93,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, @@ -101,7 +101,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { match self { Self::One(one) => one.abort().await, Self::Two(two) => two.abort().await, @@ -109,7 +109,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write } } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { match self { Self::One(one) => one.close().await, Self::Two(two) => two.close().await, diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index d4a868472..38e00f057 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -16,11 +16,13 @@ // under the License. use std::cmp::min; +use std::task::{ready, Context, Poll}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use crate::raw::oio::WriteBuf; +use crate::raw::oio::WriteExt; use crate::raw::*; use crate::*; @@ -62,20 +64,20 @@ enum Buffer { #[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { - async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> Poll<Result<usize>> { loop { match &mut self.buffer { Buffer::Empty => { if bs.remaining() >= self.buffer_size { self.buffer = Buffer::Consuming(bs.copy_to_bytes(self.buffer_size)); - return Ok(self.buffer_size); + return Poll::Ready(Ok(self.buffer_size)); } let chunk = bs.chunk(); let mut fill = BytesMut::with_capacity(chunk.len()); fill.extend_from_slice(chunk); self.buffer = Buffer::Filling(fill); - return Ok(chunk.len()); + return Poll::Ready(Ok(chunk.len())); } Buffer::Filling(fill) => { if fill.len() >= self.buffer_size { @@ -85,14 +87,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let size = min(self.buffer_size - fill.len(), bs.chunk().len()); fill.extend_from_slice(&bs.chunk()[..size]); - return Ok(size); + return Poll::Ready(Ok(size)); } Buffer::Consuming(consume) => { // Make sure filled buffer has been flushed. // // TODO: maybe we can re-fill it after a successful write. while !consume.is_empty() { - let n = self.inner.write(consume).await?; + let n = ready!(self.inner.poll_write(cx, consume)?); consume.advance(n); } self.buffer = Buffer::Empty; @@ -101,12 +103,12 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.buffer = Buffer::Empty; - self.inner.abort().await + self.inner.poll_abort(cx) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { loop { match &mut self.buffer { Buffer::Empty => break, @@ -119,7 +121,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { // // TODO: maybe we can re-fill it after a successful write. while !consume.is_empty() { - let n = self.inner.write(&consume).await?; + let n = ready!(self.inner.poll_write(cx, &consume))?; consume.advance(n); } self.buffer = Buffer::Empty; @@ -128,7 +130,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { } } - self.inner.close().await + self.inner.poll_close(cx) } } @@ -151,21 +153,21 @@ mod tests { #[async_trait] impl Write for MockWriter { - async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> Poll<Result<usize>> { debug!( "test_fuzz_exact_buf_writer: flush size: {}", bs.chunk().len() ); self.buf.extend_from_slice(bs.chunk()); - Ok(bs.chunk().len()) + Poll::Ready(Ok(bs.chunk().len())) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index dfaf6c4ee..ff6d8377e 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -19,6 +19,7 @@ mod api; pub use api::BlockingWrite; pub use api::BlockingWriter; pub use api::Write; +pub use api::WriteExt; pub use api::WriteOperation; pub use api::Writer; diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 39660a2f7..237b5774e 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -119,7 +119,7 @@ impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let upload_id = self.upload_id().await?; let size = bs.remaining(); @@ -137,7 +137,7 @@ where Ok(size) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id } else { @@ -147,7 +147,7 @@ where self.inner.complete_part(upload_id, &self.parts).await } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id } else { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index a02c64445..9d387aedb 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -48,17 +48,17 @@ impl<W: OneShotWrite> OneShotWriter<W> { #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); self.inner.write_once(bs).await?; Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 9745b1387..680bc8f53 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -160,7 +160,7 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); if self.op.append() { @@ -181,11 +181,11 @@ impl oio::Write for AzblobWriter { Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index f9bec7346..0dfaa320b 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -40,7 +40,7 @@ impl AzdfsWriter { #[async_trait] impl oio::Write for AzdfsWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -89,11 +89,11 @@ impl oio::Write for AzdfsWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index c39e95eaf..6331727b1 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -39,7 +39,7 @@ impl DropboxWriter { #[async_trait] impl oio::Write for DropboxWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let resp = self @@ -61,11 +61,11 @@ impl oio::Write for DropboxWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 13e03ae2c..fd4a9535b 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -43,18 +43,18 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.f.write(bs.chunk()).await.map_err(parse_io_error) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support abort", )) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.f.sync_all().await.map_err(parse_io_error)?; if let Some(tmp_path) = &self.tmp_path { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 2b0673d67..f0e61d537 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -40,7 +40,7 @@ impl FtpWriter { #[async_trait] impl oio::Write for FtpWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let bs = bs.copy_to_bytes(size); @@ -55,11 +55,11 @@ impl oio::Write for FtpWriter { Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index d8dbe7597..9ae9b2fb3 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -118,7 +118,7 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let location = match &self.location { @@ -161,7 +161,7 @@ impl oio::Write for GcsWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { let location = if let Some(location) = &self.location { location } else { @@ -183,7 +183,7 @@ impl oio::Write for GcsWriter { } } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { let location = if let Some(location) = &self.location { location } else { diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index c77a8989d..2fc5f229a 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -95,7 +95,7 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); if self.file_id.is_none() { self.write_create(size as u64, bs.copy_to_bytes(size)) @@ -108,11 +108,11 @@ impl oio::Write for GdriveWriter { Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 13b27ccf4..7314d82fc 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -41,7 +41,7 @@ impl GhacWriter { #[async_trait] impl oio::Write for GhacWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let req = self @@ -66,11 +66,11 @@ impl oio::Write for GhacWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { let req = self.backend.ghac_commit(self.cache_id, self.size).await?; let resp = self.backend.client.send(req).await?; diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 47cdb53fe..e5f140dca 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -36,18 +36,18 @@ impl<F> HdfsWriter<F> { #[async_trait] impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { self.f.write(bs.chunk()).await.map_err(parse_io_error) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support abort", )) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { self.f.close().await.map_err(parse_io_error)?; Ok(()) diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 6b6ab6a4d..2c82ec0c6 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -37,7 +37,7 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let resp = self .backend @@ -55,11 +55,11 @@ impl oio::Write for IpmfsWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index d9eabb8eb..9d0c6a992 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -45,7 +45,7 @@ impl OneDriveWriter { #[async_trait] impl oio::Write for OneDriveWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let bs = bs.copy_to_bytes(size); @@ -58,11 +58,11 @@ impl oio::Write for OneDriveWriter { Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 90e605152..9abd24f25 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -33,20 +33,20 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = self.file.write(bs.chunk()).await?; Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "SFTP does not support aborting writes", )) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 95a2ee92f..8830ca849 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -67,20 +67,20 @@ impl SupabaseWriter { #[async_trait] impl oio::Write for SupabaseWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); self.upload(bs.copy_to_bytes(size)).await?; Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "The abort operation is not yet supported for Supabase backend", )) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 36e62b734..52acee034 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -38,7 +38,7 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::Write for VercelArtifactsWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let resp = self @@ -61,11 +61,11 @@ impl oio::Write for VercelArtifactsWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index d0509ebbc..4736e348b 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -40,7 +40,7 @@ impl WasabiWriter { #[async_trait] impl oio::Write for WasabiWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let resp = self @@ -64,11 +64,11 @@ impl oio::Write for WasabiWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 413fe891a..cccf177ed 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -61,7 +61,7 @@ impl WebdavWriter { #[async_trait] impl oio::Write for WebdavWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) @@ -70,11 +70,11 @@ impl oio::Write for WebdavWriter { Ok(size) } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 34e6b20fa..c3ce0b5be 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -38,7 +38,7 @@ impl WebhdfsWriter { #[async_trait] impl oio::Write for WebhdfsWriter { - async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { let size = bs.remaining(); let req = self @@ -63,11 +63,11 @@ impl oio::Write for WebhdfsWriter { } } - async fn abort(&mut self) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } - async fn close(&mut self) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { Ok(()) } } diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 0474d6ea4..da384729f 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -205,7 +205,7 @@ impl Writer { /// /// Abort should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub async fn abort(&mut self) -> Result<()> { + pub fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { w.abort().await } else { @@ -222,7 +222,7 @@ impl Writer { /// /// Close should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub async fn close(&mut self) -> Result<()> { + pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { w.close().await } else {
