This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 91a30b252d37ba80e7f96209f063a0d6261d2d43 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 11:00:44 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 6 ++-- core/src/layers/concurrent_limit.rs | 6 ++-- core/src/layers/error_context.rs | 6 ++-- core/src/layers/logging.rs | 6 ++-- core/src/layers/metrics.rs | 6 ++-- core/src/layers/minitrace.rs | 6 ++-- core/src/layers/oteltrace.rs | 6 ++-- core/src/layers/prometheus.rs | 6 ++-- core/src/layers/retry.rs | 6 ++-- core/src/layers/throttle.rs | 6 ++-- core/src/layers/timeout.rs | 6 ++-- core/src/layers/tracing.rs | 6 ++-- core/src/raw/oio/write/api.rs | 30 ++++++++--------- core/src/raw/oio/write/append_object_write.rs | 6 ++-- core/src/raw/oio/write/compose_write.rs | 43 ++++++++++++------------ core/src/raw/oio/write/exact_buf_write.rs | 4 +-- core/src/raw/oio/write/multipart_upload_write.rs | 6 ++-- core/src/raw/oio/write/one_shot_write.rs | 6 ++-- core/src/services/azblob/writer.rs | 6 ++-- core/src/services/azdfs/writer.rs | 6 ++-- core/src/services/dropbox/writer.rs | 6 ++-- core/src/services/fs/writer.rs | 6 ++-- core/src/services/ftp/writer.rs | 6 ++-- core/src/services/gcs/writer.rs | 6 ++-- core/src/services/gdrive/writer.rs | 6 ++-- core/src/services/ghac/writer.rs | 6 ++-- core/src/services/hdfs/writer.rs | 6 ++-- core/src/services/ipmfs/writer.rs | 6 ++-- core/src/services/onedrive/writer.rs | 6 ++-- core/src/services/sftp/writer.rs | 6 ++-- core/src/services/supabase/writer.rs | 6 ++-- core/src/services/vercel_artifacts/writer.rs | 6 ++-- core/src/services/wasabi/writer.rs | 6 ++-- core/src/services/webdav/writer.rs | 6 ++-- core/src/services/webhdfs/writer.rs | 6 ++-- core/src/types/writer.rs | 4 +-- 36 files changed, 137 insertions(+), 136 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 19429cff8..3e9e2bba9 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; @@ -733,7 +733,7 @@ where Ok(n) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; @@ -744,7 +744,7 @@ where Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { if let Some(size) = self.size { if self.written < size { return Err(Error::new( diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index f726d2207..ef620e037 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,15 +285,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner.write(bs).await } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.abort().await } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.close().await } } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index c6b3bd2be..7ce05fc18 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> { #[async_trait::async_trait] impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner.write(bs).await.map_err(|err| { err.with_operation(WriteOperation::Write) .with_context("service", self.scheme) @@ -411,7 +411,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.abort().await.map_err(|err| { err.with_operation(WriteOperation::Abort) .with_context("service", self.scheme) @@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.close().await.map_err(|err| { err.with_operation(WriteOperation::Close) .with_context("service", self.scheme) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index f55bbd627..7da2da55b 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,7 +1252,7 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { match self.inner.write(bs).await { Ok(n) => { self.written += n as u64; @@ -1285,7 +1285,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { match self.inner.abort().await { Ok(_) => { trace!( @@ -1316,7 +1316,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { match self.inner.close().await { Ok(_) => { debug!( diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 7a7b73fc4..3cebf46b1 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,7 +847,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner .write(bs) .await @@ -861,14 +861,14 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { }) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.abort().await.map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err }) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.close().await.map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 8d8662846..7c6fad770 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner .write(bs) .in_span(Span::enter_with_parent( @@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner .abort() .in_span(Span::enter_with_parent( @@ -357,7 +357,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner .close() .in_span(Span::enter_with_parent( diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index f0a8ae82b..df9bc2400 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,15 +313,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner.write(bs).await } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.abort().await } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.close().await } } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 98637752b..b3f704e6d 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,7 +662,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner .write(bs) .await @@ -679,14 +679,14 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { }) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.abort().await.map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); err }) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.close().await.map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); err diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 3a1c5521f..6a5dba57c 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -872,7 +872,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp #[async_trait] impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let mut backoff = self.builder.build(); loop { @@ -898,7 +898,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let mut backoff = self.builder.build(); loop { @@ -924,7 +924,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let mut backoff = self.builder.build(); loop { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 7787c77d8..f731ab83c 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -216,7 +216,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap(); loop { @@ -241,11 +241,11 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.abort().await } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.close().await } } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 751caa915..5871818b3 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,7 +322,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let timeout = self.io_timeout(bs.remaining() as u64); tokio::time::timeout(timeout, self.inner.write(bs)) @@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { })? } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { tokio::time::timeout(self.timeout, self.inner.abort()) .await .map_err(|_| { @@ -346,7 +346,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { })? } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { tokio::time::timeout(self.timeout, self.inner.close()) .await .map_err(|_| { diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 35f718cb5..cb71388cb 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner.write(bs).await } @@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.abort().await } @@ -340,7 +340,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.inner.close().await } } diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 2463068e9..76b534888 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -87,11 +87,11 @@ pub trait Write: Unpin + Send + Sync { /// repeatedly until all bytes has been written. fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>>; - /// Abort the pending writer. - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>; - /// Close the writer and make sure all data has been flushed. fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>; + + /// Abort the pending writer. + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>; } #[async_trait] @@ -100,17 +100,17 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } - fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { Poll::Ready(Err(Error::new( ErrorKind::Unsupported, - "output writer doesn't support abort", + "output writer doesn't support close", ))) } - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { Poll::Ready(Err(Error::new( ErrorKind::Unsupported, - "output writer doesn't support close", + "output writer doesn't support abort", ))) } } @@ -124,13 +124,13 @@ impl<T: Write + ?Sized> Write for Box<T> { (**self).poll_write(cx, bs) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - (**self).poll_abort(cx) - } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { (**self).poll_close(cx) } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + (**self).poll_abort(cx) + } } /// Impl WriteExt for all T: Write @@ -143,13 +143,13 @@ pub trait WriteExt: Write { WriteFuture { writer: self, buf } } - fn abort(&mut self) -> AbortFuture<Self> { - AbortFuture { writer: self } - } - fn close(&mut self) -> CloseFuture<Self> { CloseFuture { writer: self } } + + fn abort(&mut self) -> AbortFuture<Self> { + AbortFuture { writer: self } + } } /// Make this future `!Unpin` for compatibility with async trait methods. diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index ed2a456d9..ba5642dff 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -77,7 +77,7 @@ impl<W> oio::Write for AppendObjectWriter<W> where W: AppendObjectWrite, { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let offset = self.offset().await?; let size = bs.remaining(); @@ -94,11 +94,11 @@ where Ok(size) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index c6504ee93..2b56e7233 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -39,6 +39,7 @@ //! type_alias_impl_trait has been stabilized. use async_trait::async_trait; +use std::task::{Context, Poll}; use crate::raw::*; use crate::*; @@ -55,24 +56,24 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { #[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { match self { - Self::One(one) => one.write(bs).await, - Self::Two(two) => two.write(bs).await, + Self::One(one) => one.poll_write(cx, bs), + Self::Two(two) => two.poll_write(cx, bs), } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { match self { - Self::One(one) => one.abort().await, - Self::Two(two) => two.abort().await, + Self::One(one) => one.poll_close(cx), + Self::Two(two) => two.poll_close(cx), } } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { match self { - Self::One(one) => one.close().await, - Self::Two(two) => two.close().await, + Self::One(one) => one.poll_abort(cx), + Self::Two(two) => two.poll_abort(cx), } } } @@ -93,27 +94,27 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { match self { - Self::One(one) => one.write(bs).await, - Self::Two(two) => two.write(bs).await, - Self::Three(three) => three.write(bs).await, + Self::One(one) => one.poll_write(cx, bs), + Self::Two(two) => two.poll_write(cx, bs), + Self::Three(three) => three.poll_write(cx, bs), } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { match self { - Self::One(one) => one.abort().await, - Self::Two(two) => two.abort().await, - Self::Three(three) => three.abort().await, + Self::One(one) => one.poll_close(cx), + Self::Two(two) => two.poll_close(cx), + Self::Three(three) => three.poll_close(cx), } } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { match self { - Self::One(one) => one.close().await, - Self::Two(two) => two.close().await, - Self::Three(three) => three.close().await, + Self::One(one) => one.poll_abort(cx), + Self::Two(two) => two.poll_abort(cx), + Self::Three(three) => three.poll_abort(cx), } } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 38e00f057..f1615346b 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -163,11 +163,11 @@ mod tests { Poll::Ready(Ok(bs.chunk().len())) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 237b5774e..10353714d 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -119,7 +119,7 @@ impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let upload_id = self.upload_id().await?; let size = bs.remaining(); @@ -137,7 +137,7 @@ where Ok(size) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id } else { @@ -147,7 +147,7 @@ where self.inner.complete_part(upload_id, &self.parts).await } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id } else { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 9d387aedb..85ef499d5 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -48,17 +48,17 @@ impl<W: OneShotWrite> OneShotWriter<W> { #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); self.inner.write_once(bs).await?; Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 680bc8f53..1a228b647 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -160,7 +160,7 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); if self.op.append() { @@ -181,11 +181,11 @@ impl oio::Write for AzblobWriter { Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 0dfaa320b..100065a7e 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -40,7 +40,7 @@ impl AzdfsWriter { #[async_trait] impl oio::Write for AzdfsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -89,11 +89,11 @@ impl oio::Write for AzdfsWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 6331727b1..c7887a7d7 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -39,7 +39,7 @@ impl DropboxWriter { #[async_trait] impl oio::Write for DropboxWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let resp = self @@ -61,11 +61,11 @@ impl oio::Write for DropboxWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index fd4a9535b..3d94a8888 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -43,18 +43,18 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.f.write(bs.chunk()).await.map_err(parse_io_error) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support abort", )) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.f.sync_all().await.map_err(parse_io_error)?; if let Some(tmp_path) = &self.tmp_path { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index f0e61d537..9fcfd989d 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -40,7 +40,7 @@ impl FtpWriter { #[async_trait] impl oio::Write for FtpWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let bs = bs.copy_to_bytes(size); @@ -55,11 +55,11 @@ impl oio::Write for FtpWriter { Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 9ae9b2fb3..2436165c4 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -118,7 +118,7 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let location = match &self.location { @@ -161,7 +161,7 @@ impl oio::Write for GcsWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let location = if let Some(location) = &self.location { location } else { @@ -183,7 +183,7 @@ impl oio::Write for GcsWriter { } } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let location = if let Some(location) = &self.location { location } else { diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 2fc5f229a..b733863a8 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -95,7 +95,7 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); if self.file_id.is_none() { self.write_create(size as u64, bs.copy_to_bytes(size)) @@ -108,11 +108,11 @@ impl oio::Write for GdriveWriter { Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 7314d82fc..886abffb5 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -41,7 +41,7 @@ impl GhacWriter { #[async_trait] impl oio::Write for GhacWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let req = self @@ -66,11 +66,11 @@ impl oio::Write for GhacWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let req = self.backend.ghac_commit(self.cache_id, self.size).await?; let resp = self.backend.client.send(req).await?; diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index e5f140dca..4c69d63dc 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -36,18 +36,18 @@ impl<F> HdfsWriter<F> { #[async_trait] impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.f.write(bs.chunk()).await.map_err(parse_io_error) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support abort", )) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { self.f.close().await.map_err(parse_io_error)?; Ok(()) diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 2c82ec0c6..dcecb66d3 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -37,7 +37,7 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let resp = self .backend @@ -55,11 +55,11 @@ impl oio::Write for IpmfsWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 9d0c6a992..46aa50a21 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -45,7 +45,7 @@ impl OneDriveWriter { #[async_trait] impl oio::Write for OneDriveWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let bs = bs.copy_to_bytes(size); @@ -58,11 +58,11 @@ impl oio::Write for OneDriveWriter { Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 9abd24f25..cfe51c7f3 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -33,20 +33,20 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = self.file.write(bs.chunk()).await?; Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Err(Error::new( ErrorKind::Unsupported, "SFTP does not support aborting writes", )) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 8830ca849..d8ba4cc16 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -67,20 +67,20 @@ impl SupabaseWriter { #[async_trait] impl oio::Write for SupabaseWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); self.upload(bs.copy_to_bytes(size)).await?; Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Err(Error::new( ErrorKind::Unsupported, "The abort operation is not yet supported for Supabase backend", )) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 52acee034..6579374a7 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -38,7 +38,7 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::Write for VercelArtifactsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let resp = self @@ -61,11 +61,11 @@ impl oio::Write for VercelArtifactsWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 4736e348b..3eeaab12d 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -40,7 +40,7 @@ impl WasabiWriter { #[async_trait] impl oio::Write for WasabiWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let resp = self @@ -64,11 +64,11 @@ impl oio::Write for WasabiWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index cccf177ed..eaff3d8b4 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -61,7 +61,7 @@ impl WebdavWriter { #[async_trait] impl oio::Write for WebdavWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) @@ -70,11 +70,11 @@ impl oio::Write for WebdavWriter { Ok(size) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index c3ce0b5be..0c7a9e10b 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -38,7 +38,7 @@ impl WebhdfsWriter { #[async_trait] impl oio::Write for WebhdfsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Result<usize> { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let size = bs.remaining(); let req = self @@ -63,11 +63,11 @@ impl oio::Write for WebhdfsWriter { } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index da384729f..77b88899e 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -205,7 +205,7 @@ impl Writer { /// /// Abort should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> { + pub fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { if let State::Idle(Some(w)) = &mut self.state { w.abort().await } else { @@ -222,7 +222,7 @@ impl Writer { /// /// Close should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> { + pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { if let State::Idle(Some(w)) = &mut self.state { w.close().await } else {
