This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit f6a4188a5482754068950acbe9b230c20b0cf5f2
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 17:00:56 2023 +0800

    Save
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs                      |  22 ++--
 core/src/layers/error_context.rs                 |   6 +-
 core/src/layers/logging.rs                       |  20 ++--
 core/src/layers/madsim.rs                        |  16 +--
 core/src/layers/metrics.rs                       |   9 +-
 core/src/layers/minitrace.rs                     |  30 ++----
 core/src/layers/prometheus.rs                    |   9 +-
 core/src/layers/retry.rs                         | 128 +++++++++++++++++------
 core/src/layers/throttle.rs                      |  14 +--
 core/src/layers/timeout.rs                       |  95 +++++++++++++----
 core/src/layers/tracing.rs                       |   6 +-
 core/src/raw/adapters/kv/backend.rs              |   5 +
 core/src/raw/adapters/typed_kv/backend.rs        |   5 +
 core/src/raw/oio/write/append_object_write.rs    |  20 ++--
 core/src/raw/oio/write/multipart_upload_write.rs |   5 +
 core/src/raw/oio/write/one_shot_write.rs         |   5 +
 core/src/services/azdfs/backend.rs               |   4 +-
 core/src/services/azdfs/writer.rs                |  25 ++---
 core/src/services/cos/writer.rs                  |   7 +-
 core/src/services/dropbox/backend.rs             |   8 +-
 core/src/services/fs/writer.rs                   |   5 +
 core/src/services/ftp/writer.rs                  |   5 +
 core/src/services/gdrive/backend.rs              |   8 +-
 core/src/services/ghac/writer.rs                 |   5 +
 core/src/services/ipmfs/writer.rs                |  10 +-
 core/src/services/obs/writer.rs                  |   7 +-
 core/src/services/onedrive/backend.rs            |   4 +-
 core/src/services/oss/writer.rs                  |   7 +-
 core/src/services/s3/writer.rs                   |   7 +-
 core/src/services/supabase/backend.rs            |   4 +-
 core/src/services/vercel_artifacts/backend.rs    |   8 +-
 core/src/services/wasabi/backend.rs              |   4 +-
 core/src/services/webdav/backend.rs              |   7 +-
 core/src/services/webhdfs/backend.rs             |   4 +-
 34 files changed, 334 insertions(+), 190 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 3e9e2bba9..4b856e8a1 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -19,8 +19,8 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::io;
 use std::sync::Arc;
-use std::task::Context;
 use std::task::Poll;
+use std::task::{ready, Context};
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -715,22 +715,22 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        let n = w.write(bs).await?;
+        let n = ready!(w.poll_write(cx, bs))?;
         self.written += n as u64;
 
         if let Some(size) = self.size {
             if self.written > size {
-                return Err(Error::new(
+                return Poll::Ready(Err(Error::new(
                     ErrorKind::ContentTruncated,
                     &format!(
                         "writer got too much data, expect: {size}, actual: {}",
                         self.written + n as u64
                     ),
-                ));
+                )));
             }
         }
 
-        Ok(n)
+        Poll::Ready(Ok(n))
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
@@ -738,22 +738,22 @@ where
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
-        w.abort().await?;
+        ready!(w.poll_abort(cx))?;
         self.inner = None;
 
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         if let Some(size) = self.size {
             if self.written < size {
-                return Err(Error::new(
+                return Poll::Ready(Err(Error::new(
                     ErrorKind::ContentIncomplete,
                     &format!(
                         "writer got too less data, expect: {size}, actual: {}",
                         self.written
                     ),
-                ));
+                )));
             }
         }
 
@@ -761,10 +761,10 @@ where
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
-        w.close().await?;
+        ready!(w.poll_close(cx))?;
         self.inner = None;
 
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 7ce05fc18..1fd5e608c 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -404,7 +404,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 #[async_trait::async_trait]
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        self.inner.write(bs).await.map_err(|err| {
+        self.inner.poll_write(cx, bs).map_err(|err| {
             err.with_operation(WriteOperation::Write)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
@@ -412,7 +412,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.abort().await.map_err(|err| {
+        self.inner.poll_abort(cx).map_err(|err| {
             err.with_operation(WriteOperation::Abort)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
@@ -420,7 +420,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.close().await.map_err(|err| {
+        self.inner.poll_close(cx).map_err(|err| {
             err.with_operation(WriteOperation::Close)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 7da2da55b..42084eede 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -17,8 +17,8 @@
 
 use std::fmt::Debug;
 use std::io;
-use std::task::Context;
 use std::task::Poll;
+use std::task::{ready, Context};
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -1253,7 +1253,7 @@ impl<W> LoggingWriter<W> {
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        match self.inner.write(bs).await {
+        match ready!(self.inner.poll_write(cx, bs)) {
             Ok(n) => {
                 self.written += n as u64;
                 trace!(
@@ -1265,7 +1265,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                     self.written,
                     n
                 );
-                Ok(n)
+                Poll::Ready(Ok(n))
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1280,13 +1280,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                         self.ctx.error_print(&err),
                     )
                 }
-                Err(err)
+                Poll::Ready(Err(err))
             }
         }
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        match self.inner.abort().await {
+        match ready!(self.inner.poll_abort(cx)) {
             Ok(_) => {
                 trace!(
                     target: LOGGING_TARGET,
@@ -1296,7 +1296,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                     self.path,
                     self.written,
                 );
-                Ok(())
+                Poll::Ready(Ok(()))
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1311,13 +1311,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                         self.ctx.error_print(&err),
                     )
                 }
-                Err(err)
+                Poll::Ready(Err(err))
             }
         }
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        match self.inner.close().await {
+        match ready!(self.inner.poll_close(cx)) {
             Ok(_) => {
                 debug!(
                     target: LOGGING_TARGET,
@@ -1327,7 +1327,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                     self.path,
                     self.written
                 );
-                Ok(())
+                Poll::Ready(Ok(()))
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1342,7 +1342,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                         self.ctx.error_print(&err),
                     )
                 }
-                Err(err)
+                Poll::Ready(Err(err))
             }
         }
     }
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index f58eb1997..3c107fd83 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,11 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
crate::Result<usize> {
+    fn poll_write(
+        &mut self,
+        cx: &mut Context<'_>,
+        bs: &dyn oio::WriteBuf,
+    ) -> Poll<crate::Result<usize>> {
         #[cfg(madsim)]
         {
             let req = Request::Write(self.path.to_string(), bs);
@@ -318,15 +322,15 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> crate::Result<()> {
-        Err(Error::new(
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
+        Poll::Ready(Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
-        ))
+        )))
     }
 
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> crate::Result<()> {
-        Ok(())
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
+        Poll::Ready(Ok(()))
     }
 }
 
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 3cebf46b1..bcbb72131 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -849,9 +849,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
         self.inner
-            .write(bs)
-            .await
-            .map(|n| {
+            .poll_write(cx, bs)
+            .map_ok(|n| {
                 self.bytes += n as u64;
                 n
             })
@@ -862,14 +861,14 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.abort().await.map_err(|err| {
+        self.inner.poll_abort(cx).map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
             err
         })
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.close().await.map_err(|err| {
+        self.inner.poll_close(cx).map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
             err
         })
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 7c6fad770..00e1239d2 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -338,33 +338,21 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        self.inner
-            .write(bs)
-            .in_span(Span::enter_with_parent(
-                WriteOperation::Write.into_static(),
-                &self.span,
-            ))
-            .await
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
+        self.inner.poll_write(cx, bs)
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner
-            .abort()
-            .in_span(Span::enter_with_parent(
-                WriteOperation::Abort.into_static(),
-                &self.span,
-            ))
-            .await
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::Abort.into_static());
+        self.inner.poll_abort(cx)
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner
-            .close()
-            .in_span(Span::enter_with_parent(
-                WriteOperation::Close.into_static(),
-                &self.span,
-            ))
-            .await
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::Close.into_static());
+        self.inner.poll_close(cx)
     }
 }
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index b3f704e6d..2558bb50b 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -664,9 +664,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
         self.inner
-            .write(bs)
-            .await
-            .map(|n| {
+            .poll_write(cx, bs)
+            .map_ok(|n| {
                 self.stats
                     .bytes_total
                     .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
@@ -680,14 +679,14 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.abort().await.map_err(|err| {
+        self.inner.poll_abort(cx).map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
             err
         })
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.close().await.map_err(|err| {
+        self.inner.poll_close(cx).map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
             err
         })
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 6a5dba57c..7b2549dc0 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -873,79 +873,139 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 #[async_trait]
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let mut backoff = self.builder.build();
+        if let Some(sleep) = self.sleep.as_mut() {
+            ready!(sleep.poll_unpin(cx));
+            self.sleep = None;
+        }
 
-        loop {
-            match self.inner.write(bs).await {
-                Ok(v) => return Ok(v),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
+        match ready!(self.inner.poll_write(cx, bs)) {
+            Ok(v) => {
+                self.current_backoff = None;
+                Poll::Ready(Ok(v))
+            }
+            Err(err) if !err.is_temporary() => {
+                self.current_backoff = None;
+                Poll::Ready(Err(err))
+            }
+            Err(err) => {
+                let backoff = match self.current_backoff.as_mut() {
+                    Some(backoff) => backoff,
+                    None => {
+                        self.current_backoff = Some(self.builder.build());
+                        self.current_backoff.as_mut().unwrap()
+                    }
+                };
+
+                match backoff.next() {
+                    None => {
+                        self.current_backoff = None;
+                        Poll::Ready(Err(err))
+                    }
                     Some(dur) => {
                         self.notify.intercept(
-                            &e,
+                            &err,
                             dur,
                             &[
                                 ("operation", 
WriteOperation::Write.into_static()),
                                 ("path", &self.path),
                             ],
                         );
-                        tokio::time::sleep(dur).await;
-                        continue;
+                        self.sleep = Some(Box::pin(tokio::time::sleep(dur)));
+                        self.poll_write(cx, bs)
                     }
-                },
+                }
             }
         }
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        let mut backoff = self.builder.build();
+        if let Some(sleep) = self.sleep.as_mut() {
+            ready!(sleep.poll_unpin(cx));
+            self.sleep = None;
+        }
 
-        loop {
-            match self.inner.abort().await {
-                Ok(v) => return Ok(v),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
+        match ready!(self.inner.poll_abort(cx)) {
+            Ok(v) => {
+                self.current_backoff = None;
+                Poll::Ready(Ok(v))
+            }
+            Err(err) if !err.is_temporary() => {
+                self.current_backoff = None;
+                Poll::Ready(Err(err))
+            }
+            Err(err) => {
+                let backoff = match self.current_backoff.as_mut() {
+                    Some(backoff) => backoff,
+                    None => {
+                        self.current_backoff = Some(self.builder.build());
+                        self.current_backoff.as_mut().unwrap()
+                    }
+                };
+
+                match backoff.next() {
+                    None => {
+                        self.current_backoff = None;
+                        Poll::Ready(Err(err))
+                    }
                     Some(dur) => {
                         self.notify.intercept(
-                            &e,
+                            &err,
                             dur,
                             &[
                                 ("operation", 
WriteOperation::Abort.into_static()),
                                 ("path", &self.path),
                             ],
                         );
-                        tokio::time::sleep(dur).await;
-                        continue;
+                        self.sleep = Some(Box::pin(tokio::time::sleep(dur)));
+                        self.poll_abort(cx)
                     }
-                },
+                }
             }
         }
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        let mut backoff = self.builder.build();
+        if let Some(sleep) = self.sleep.as_mut() {
+            ready!(sleep.poll_unpin(cx));
+            self.sleep = None;
+        }
 
-        loop {
-            match self.inner.close().await {
-                Ok(v) => return Ok(v),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
+        match ready!(self.inner.poll_close(cx)) {
+            Ok(v) => {
+                self.current_backoff = None;
+                Poll::Ready(Ok(v))
+            }
+            Err(err) if !err.is_temporary() => {
+                self.current_backoff = None;
+                Poll::Ready(Err(err))
+            }
+            Err(err) => {
+                let backoff = match self.current_backoff.as_mut() {
+                    Some(backoff) => backoff,
+                    None => {
+                        self.current_backoff = Some(self.builder.build());
+                        self.current_backoff.as_mut().unwrap()
+                    }
+                };
+
+                match backoff.next() {
+                    None => {
+                        self.current_backoff = None;
+                        Poll::Ready(Err(err))
+                    }
                     Some(dur) => {
                         self.notify.intercept(
-                            &e,
+                            &err,
                             dur,
                             &[
-                                ("operation", 
WriteOperation::Close.into_static()),
+                                ("operation", 
WriteOperation::Abort.into_static()),
                                 ("path", &self.path),
                             ],
                         );
-                        tokio::time::sleep(dur).await;
-                        continue;
+                        self.sleep = Some(Box::pin(tokio::time::sleep(dur)));
+                        self.poll_close(cx)
                     }
-                },
+                }
             }
         }
     }
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index f731ab83c..72e63abe0 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -221,20 +221,22 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
 
         loop {
             match self.limiter.check_n(buf_length) {
-                Ok(_) => return self.inner.write(bs).await,
+                Ok(_) => return self.inner.poll_write(cx, bs),
                 Err(negative) => match negative {
                     // the query is valid but the Decider can not accommodate 
them.
                     NegativeMultiDecision::BatchNonConforming(_, not_until) => 
{
                         let wait_time = 
not_until.wait_time_from(DefaultClock::default().now());
                         // TODO: Should lock the limiter and wait for the 
wait_time, or should let other small requests go first?
-                        tokio::time::sleep(wait_time).await;
+
+                        // FIXME: we should sleep here.
+                        // tokio::time::sleep(wait_time).await;
                     }
                     // the query was invalid as the rate limit parameters can 
"never" accommodate the number of cells queried for.
                     NegativeMultiDecision::InsufficientCapacity(_) => {
-                        return Err(Error::new(
+                        return Poll::Ready(Err(Error::new(
                             ErrorKind::RateLimited,
                             "InsufficientCapacity due to burst size being 
smaller than the request size",
-                        ))
+                        )))
                     }
                 },
             }
@@ -242,11 +244,11 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.abort().await
+        self.inner.poll_abort(cx)
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.close().await
+        self.inner.poll_close(cx)
     }
 }
 
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 5871818b3..d6cd4d24a 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -323,38 +323,93 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let timeout = self.io_timeout(bs.remaining() as u64);
+        match self.start {
+            Some(start) => {
+                if start.elapsed() > self.timeout {
+                    // Clean up the start time before return ready.
+                    self.start = None;
 
-        tokio::time::timeout(timeout, self.inner.write(bs))
-            .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
+                    return Poll::Ready(Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "operation timeout",
+                    )
                     .with_operation(WriteOperation::Write)
-                    .with_context("timeout", timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
+                    .with_context("timeout", 
self.timeout.as_secs_f64().to_string())
+                    .set_temporary()));
+                }
+            }
+            None => {
+                self.start = Some(Instant::now());
+            }
+        }
+
+        match self.inner.poll_write(cx, bs) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(v) => {
+                self.start = None;
+                Poll::Ready(v)
+            }
+        }
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        tokio::time::timeout(self.timeout, self.inner.abort())
-            .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
+        match self.start {
+            Some(start) => {
+                if start.elapsed() > self.timeout {
+                    // Clean up the start time before return ready.
+                    self.start = None;
+
+                    return Poll::Ready(Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "operation timeout",
+                    )
                     .with_operation(WriteOperation::Abort)
                     .with_context("timeout", 
self.timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
+                    .set_temporary()));
+                }
+            }
+            None => {
+                self.start = Some(Instant::now());
+            }
+        }
+
+        match self.inner.poll_abort(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(v) => {
+                self.start = None;
+                Poll::Ready(v)
+            }
+        }
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        tokio::time::timeout(self.timeout, self.inner.close())
-            .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
+        match self.start {
+            Some(start) => {
+                if start.elapsed() > self.timeout {
+                    // Clean up the start time before return ready.
+                    self.start = None;
+
+                    return Poll::Ready(Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "operation timeout",
+                    )
                     .with_operation(WriteOperation::Close)
                     .with_context("timeout", 
self.timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
+                    .set_temporary()));
+                }
+            }
+            None => {
+                self.start = Some(Instant::now());
+            }
+        }
+
+        match self.inner.poll_close(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(v) => {
+                self.start = None;
+                Poll::Ready(v)
+            }
+        }
     }
 }
 
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index cb71388cb..9bb0b54c2 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -325,7 +325,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         level = "trace",
         skip_all)]
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        self.inner.write(bs).await
+        self.inner.poll_write(cx, bs)
     }
 
     #[tracing::instrument(
@@ -333,7 +333,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         level = "trace",
         skip_all)]
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.abort().await
+        self.inner.poll_abort(cx)
     }
 
     #[tracing::instrument(
@@ -341,7 +341,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         level = "trace",
         skip_all)]
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.close().await
+        self.inner.poll_close(cx)
     }
 }
 
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index a3ead2e7a..48c327b0a 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -390,6 +390,11 @@ impl<S> KvWriter<S> {
     }
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for KvWriter.
+unsafe impl<S: Adapter> Sync for KvWriter<S> {}
+
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 4a3cdf37b..fda56bf28 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -368,6 +368,11 @@ pub struct KvWriter<S> {
     future: Option<BoxFuture<'static, Result<()>>>,
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for KvWriter.
+unsafe impl<S: Adapter> Sync for KvWriter<S> {}
+
 impl<S> KvWriter<S> {
     fn new(kv: Arc<S>, path: String, op: OpWrite) -> Self {
         KvWriter {
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 2c0bd5555..393254097 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -59,6 +59,11 @@ enum State<W> {
     Append(BoxFuture<'static, (W, Result<usize>)>),
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl<S: AppendObjectWrite> Sync for State<S> {}
+
 impl<W: AppendObjectWrite> AppendObjectWriter<W> {
     /// Create a new AppendObjectWriter.
     pub fn new(inner: W) -> Self {
@@ -67,17 +72,6 @@ impl<W: AppendObjectWrite> AppendObjectWriter<W> {
             offset: None,
         }
     }
-
-    async fn offset(&mut self) -> Result<u64> {
-        if let Some(offset) = self.offset {
-            return Ok(offset);
-        }
-
-        let offset = self.inner.offset().await?;
-        self.offset = Some(offset);
-
-        Ok(offset)
-    }
 }
 
 #[async_trait]
@@ -103,9 +97,9 @@ where
                         }
                         None => {
                             self.state = State::Offset(Box::pin(async move {
-                                let offset = w.offset().await?;
+                                let offset = w.offset().await;
 
-                                (w, Ok(offset))
+                                (w, offset)
                             }));
                         }
                     }
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 482954dc9..97b2f0220 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -96,6 +96,11 @@ enum State<W> {
     Abort(BoxFuture<'static, (W, Result<()>)>),
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl<S: MultipartUploadWrite> Sync for State<S> {}
+
 impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
     /// Create a new MultipartUploadWriter.
     pub fn new(inner: W) -> Self {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index a0a743915..6cc083491 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -47,6 +47,11 @@ enum State<W> {
     Write(BoxFuture<'static, (W, Result<usize>)>),
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl<S: OneShotWrite> Sync for State<S> {}
+
 impl<W: OneShotWrite> OneShotWriter<W> {
     /// Create a new one shot writer.
     pub fn new(inner: W) -> Self {
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 3882f4fe2..1c0dd3a8c 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -230,7 +230,7 @@ pub struct AzdfsBackend {
 impl Accessor for AzdfsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = AzdfsWriter;
+    type Writer = oio::OneShotWriter<AzdfsWriter>;
     type BlockingWriter = ();
     type Pager = AzdfsPager;
     type BlockingPager = ();
@@ -305,7 +305,7 @@ impl Accessor for AzdfsBackend {
 
         Ok((
             RpWrite::default(),
-            AzdfsWriter::new(self.core.clone(), args, path.to_string()),
+            oio::OneShotWriter::new(AzdfsWriter::new(self.core.clone(), args, 
path.to_string())),
         ))
     }
 
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 774c9ee4b..1da698dd4 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzdfsCore;
@@ -40,8 +41,8 @@ impl AzdfsWriter {
 }
 
 #[async_trait]
-impl oio::Write for AzdfsWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+impl oio::OneShotWrite for AzdfsWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -66,13 +67,11 @@ impl oio::Write for AzdfsWriter {
             }
         }
 
-        let size = bs.remaining();
+        let size = bs.len();
 
-        let mut req = self.core.azdfs_update_request(
-            &self.path,
-            Some(size),
-            AsyncBody::Bytes(bs.copy_to_bytes(size)),
-        )?;
+        let mut req =
+            self.core
+                .azdfs_update_request(&self.path, Some(size), 
AsyncBody::Bytes(bs))?;
 
         self.core.sign(&mut req).await?;
 
@@ -82,19 +81,11 @@ impl oio::Write for AzdfsWriter {
         match status {
             StatusCode::OK | StatusCode::ACCEPTED => {
                 resp.into_body().consume().await?;
-                Ok(size)
+                Ok(())
             }
             _ => Err(parse_error(resp)
                 .await?
                 .with_operation("Backend::azdfs_update_request")),
         }
     }
-
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
-    }
 }
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index e0cc8be73..fba7cea09 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -18,6 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
@@ -50,15 +51,15 @@ impl CosWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for CosWriter {
-    async fn write_once(&self, buf: &dyn oio::WriteBuf) -> Result<()> {
-        let size = buf.remaining();
+    async fn write_once(&self, buf: Bytes) -> Result<()> {
+        let size = buf.len();
         let mut req = self.core.cos_put_object_request(
             &self.path,
             Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Bytes(buf.copy_to_bytes(size)),
+            AsyncBody::Bytes(buf),
         )?;
 
         self.core.sign(&mut req).await?;
diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index ebb468728..029f1fe7f 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -49,7 +49,7 @@ pub struct DropboxBackend {
 impl Accessor for DropboxBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = DropboxWriter;
+    type Writer = oio::OneShotWriter<DropboxWriter>;
     type BlockingWriter = ();
     type Pager = ();
     type BlockingPager = ();
@@ -114,7 +114,11 @@ impl Accessor for DropboxBackend {
         }
         Ok((
             RpWrite::default(),
-            DropboxWriter::new(self.core.clone(), args, String::from(path)),
+            oio::OneShotWriter::new(DropboxWriter::new(
+                self.core.clone(),
+                args,
+                String::from(path),
+            )),
         ))
     }
 
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index a71de17d5..1a0aa49eb 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -48,6 +48,11 @@ impl<F> FsWriter<F> {
     }
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for FsWriter.
+unsafe impl<F> Sync for FsWriter<F> {}
+
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 34ed85b37..ec3c3f0e2 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -46,6 +46,11 @@ impl FtpWriter {
     }
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for FtpWriter.
+unsafe impl Sync for FtpWriter {}
+
 #[async_trait]
 impl oio::Write for FtpWriter {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index df8ee7451..f50713ac7 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -40,7 +40,7 @@ pub struct GdriveBackend {
 impl Accessor for GdriveBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = GdriveWriter;
+    type Writer = oio::OneShotWriter<GdriveWriter>;
     type BlockingWriter = ();
     type Pager = ();
     type BlockingPager = ();
@@ -194,7 +194,11 @@ impl Accessor for GdriveBackend {
 
         Ok((
             RpWrite::default(),
-            GdriveWriter::new(self.core.clone(), String::from(path), file_id),
+            oio::OneShotWriter::new(GdriveWriter::new(
+                self.core.clone(),
+                String::from(path),
+                file_id,
+            )),
         ))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 0be63ce22..c780c2710 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -47,6 +47,11 @@ enum State {
     Commit(BoxFuture<'static, (GhacBackend, Result<()>)>),
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl Sync for State {}
+
 #[async_trait]
 impl oio::Write for GhacWriter {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 0d1673693..feb5e5630 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 use std::task::{Context, Poll};
 
@@ -39,12 +40,9 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for IpmfsWriter {
-    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
-        let size = bs.remaining();
-        let resp = self
-            .backend
-            .ipmfs_write(&self.path, bs.copy_to_bytes(size))
-            .await?;
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
+        let resp = self.backend.ipmfs_write(&self.path, bs).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 38dc660db..6b0aed175 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytes::Bytes;
 use std::sync::Arc;
 
 use async_trait::async_trait;
@@ -51,14 +52,14 @@ impl ObsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for ObsWriter {
-    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
-        let size = bs.remaining();
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
         let mut req = self.core.obs_put_object_request(
             &self.path,
             Some(size as u64),
             self.op.content_type(),
             self.op.cache_control(),
-            AsyncBody::Bytes(bs.copy_to_bytes(size)),
+            AsyncBody::Bytes(bs),
         )?;
 
         self.core.sign(&mut req).await?;
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index 09ef04765..2b32abbaf 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -64,7 +64,7 @@ impl Debug for OnedriveBackend {
 impl Accessor for OnedriveBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = OneDriveWriter;
+    type Writer = oio::OneShotWriter<OneDriveWriter>;
     type BlockingWriter = ();
     type Pager = OnedrivePager;
     type BlockingPager = ();
@@ -114,7 +114,7 @@ impl Accessor for OnedriveBackend {
 
         Ok((
             RpWrite::default(),
-            OneDriveWriter::new(self.clone(), args, path),
+            oio::OneShotWriter::new(OneDriveWriter::new(self.clone(), args, 
path)),
         ))
     }
 
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 8fd7ac656..56d262f17 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -18,6 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
@@ -50,15 +51,15 @@ impl OssWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for OssWriter {
-    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
-        let size = bs.remaining();
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
         let mut req = self.core.oss_put_object_request(
             &self.path,
             Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Bytes(bs.copy_to_bytes(size)),
+            AsyncBody::Bytes(bs),
             false,
         )?;
 
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 7c5ddf924..a3a1bd5bd 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -18,6 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
@@ -47,8 +48,8 @@ impl S3Writer {
 
 #[async_trait]
 impl oio::OneShotWrite for S3Writer {
-    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
-        let size = bs.remaining();
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
 
         let mut req = self.core.s3_put_object_request(
             &self.path,
@@ -56,7 +57,7 @@ impl oio::OneShotWrite for S3Writer {
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Bytes(bs.copy_to_bytes(size)),
+            AsyncBody::Bytes(bs),
         )?;
 
         self.core.sign(&mut req).await?;
diff --git a/core/src/services/supabase/backend.rs 
b/core/src/services/supabase/backend.rs
index 89996fa8c..402c3abe7 100644
--- a/core/src/services/supabase/backend.rs
+++ b/core/src/services/supabase/backend.rs
@@ -158,7 +158,7 @@ pub struct SupabaseBackend {
 impl Accessor for SupabaseBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = SupabaseWriter;
+    type Writer = oio::OneShotWriter<SupabaseWriter>;
     type BlockingWriter = ();
     // todo: implement Pager to support list and scan
     type Pager = ();
@@ -233,7 +233,7 @@ impl Accessor for SupabaseBackend {
 
         Ok((
             RpWrite::default(),
-            SupabaseWriter::new(self.core.clone(), path, args),
+            oio::OneShotWriter::new(SupabaseWriter::new(self.core.clone(), 
path, args)),
         ))
     }
 
diff --git a/core/src/services/vercel_artifacts/backend.rs 
b/core/src/services/vercel_artifacts/backend.rs
index 70cd9048b..4b88be664 100644
--- a/core/src/services/vercel_artifacts/backend.rs
+++ b/core/src/services/vercel_artifacts/backend.rs
@@ -46,7 +46,7 @@ impl Debug for VercelArtifactsBackend {
 impl Accessor for VercelArtifactsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = VercelArtifactsWriter;
+    type Writer = oio::OneShotWriter<VercelArtifactsWriter>;
     type BlockingWriter = ();
     type Pager = ();
     type BlockingPager = ();
@@ -93,7 +93,11 @@ impl Accessor for VercelArtifactsBackend {
 
         Ok((
             RpWrite::default(),
-            VercelArtifactsWriter::new(self.clone(), args, path.to_string()),
+            oio::OneShotWriter::new(VercelArtifactsWriter::new(
+                self.clone(),
+                args,
+                path.to_string(),
+            )),
         ))
     }
 
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index f2aedab2a..835492bad 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -670,7 +670,7 @@ pub struct WasabiBackend {
 impl Accessor for WasabiBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = WasabiWriter;
+    type Writer = oio::OneShotWriter<WasabiWriter>;
     type BlockingWriter = ();
     type Pager = WasabiPager;
     type BlockingPager = ();
@@ -759,7 +759,7 @@ impl Accessor for WasabiBackend {
 
         Ok((
             RpWrite::default(),
-            WasabiWriter::new(self.core.clone(), args, path.to_string()),
+            oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args, 
path.to_string())),
         ))
     }
 
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 509d4c612..cca8728ba 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -221,7 +221,7 @@ impl Debug for WebdavBackend {
 impl Accessor for WebdavBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = WebdavWriter;
+    type Writer = oio::OneShotWriter<WebdavWriter>;
     type BlockingWriter = ();
     type Pager = Option<WebdavPager>;
     type BlockingPager = ();
@@ -286,7 +286,10 @@ impl Accessor for WebdavBackend {
 
         let p = build_abs_path(&self.root, path);
 
-        Ok((RpWrite::default(), WebdavWriter::new(self.clone(), args, p)))
+        Ok((
+            RpWrite::default(),
+            oio::OneShotWriter::new(WebdavWriter::new(self.clone(), args, p)),
+        ))
     }
 
     /// # Notes
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index db8e0490a..e8a3a8f04 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -399,7 +399,7 @@ impl WebhdfsBackend {
 impl Accessor for WebhdfsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = WebhdfsWriter;
+    type Writer = oio::OneShotWriter<WebhdfsWriter>;
     type BlockingWriter = ();
     type Pager = WebhdfsPager;
     type BlockingPager = ();
@@ -483,7 +483,7 @@ impl Accessor for WebhdfsBackend {
 
         Ok((
             RpWrite::default(),
-            WebhdfsWriter::new(self.clone(), args, path.to_string()),
+            oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args, 
path.to_string())),
         ))
     }
 

Reply via email to