This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit e1d7ce09c2f56a9898d4916c2fae45e10a81b200
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 10:57:10 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        |  10 +-
 core/src/layers/complete.rs                      |   6 +-
 core/src/layers/concurrent_limit.rs              |   6 +-
 core/src/layers/error_context.rs                 |   6 +-
 core/src/layers/logging.rs                       |   6 +-
 core/src/layers/madsim.rs                        |   6 +-
 core/src/layers/metrics.rs                       |   6 +-
 core/src/layers/minitrace.rs                     |   6 +-
 core/src/layers/oteltrace.rs                     |   6 +-
 core/src/layers/prometheus.rs                    |   6 +-
 core/src/layers/retry.rs                         |   6 +-
 core/src/layers/throttle.rs                      |   6 +-
 core/src/layers/timeout.rs                       |   6 +-
 core/src/layers/tracing.rs                       |   6 +-
 core/src/raw/adapters/kv/backend.rs              |  59 +++++++++---
 core/src/raw/adapters/typed_kv/backend.rs        |  54 +++++++++--
 core/src/raw/oio/write/api.rs                    | 112 +++++++++++++++++++----
 core/src/raw/oio/write/append_object_write.rs    |   6 +-
 core/src/raw/oio/write/compose_write.rs          |  12 +--
 core/src/raw/oio/write/exact_buf_write.rs        |  30 +++---
 core/src/raw/oio/write/mod.rs                    |   1 +
 core/src/raw/oio/write/multipart_upload_write.rs |   6 +-
 core/src/raw/oio/write/one_shot_write.rs         |   6 +-
 core/src/services/azblob/writer.rs               |   6 +-
 core/src/services/azdfs/writer.rs                |   6 +-
 core/src/services/dropbox/writer.rs              |   6 +-
 core/src/services/fs/writer.rs                   |   6 +-
 core/src/services/ftp/writer.rs                  |   6 +-
 core/src/services/gcs/writer.rs                  |   6 +-
 core/src/services/gdrive/writer.rs               |   6 +-
 core/src/services/ghac/writer.rs                 |   6 +-
 core/src/services/hdfs/writer.rs                 |   6 +-
 core/src/services/ipmfs/writer.rs                |   6 +-
 core/src/services/onedrive/writer.rs             |   6 +-
 core/src/services/sftp/writer.rs                 |   6 +-
 core/src/services/supabase/writer.rs             |   6 +-
 core/src/services/vercel_artifacts/writer.rs     |   6 +-
 core/src/services/wasabi/writer.rs               |   6 +-
 core/src/services/webdav/writer.rs               |   6 +-
 core/src/services/webhdfs/writer.rs              |   6 +-
 core/src/types/writer.rs                         |   4 +-
 41 files changed, 316 insertions(+), 164 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 1671057f0..67b4fd451 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -26,15 +26,19 @@ pub struct BlackHoleWriter;
 
 #[async_trait]
 impl oio::Write for BlackHoleWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> 
opendal::Result<usize> {
+    fn poll_write(
+        &mut self,
+        cx: &mut Context<'_>,
+        bs: &dyn oio::WriteBuf,
+    ) -> opendal::Result<usize> {
         Ok(bs.remaining())
     }
 
-    async fn abort(&mut self) -> opendal::Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> opendal::Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 048905bae..19429cff8 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
@@ -733,7 +733,7 @@ where
         Ok(n)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
@@ -744,7 +744,7 @@ where
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         if let Some(size) = self.size {
             if self.written < size {
                 return Err(Error::new(
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index bdc473384..f726d2207 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,15 +285,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.inner.write(bs).await
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.abort().await
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.close().await
     }
 }
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 15d7a0dd0..c6b3bd2be 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 
 #[async_trait::async_trait]
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.inner.write(bs).await.map_err(|err| {
             err.with_operation(WriteOperation::Write)
                 .with_context("service", self.scheme)
@@ -411,7 +411,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             err.with_operation(WriteOperation::Abort)
                 .with_context("service", self.scheme)
@@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.close().await.map_err(|err| {
             err.with_operation(WriteOperation::Close)
                 .with_context("service", self.scheme)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index f0cea32e8..f55bbd627 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,7 +1252,7 @@ impl<W> LoggingWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         match self.inner.write(bs).await {
             Ok(n) => {
                 self.written += n as u64;
@@ -1285,7 +1285,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         match self.inner.abort().await {
             Ok(_) => {
                 trace!(
@@ -1316,7 +1316,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         match self.inner.close().await {
             Ok(_) => {
                 debug!(
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index b0b5cf179..f58eb1997 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> crate::Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
crate::Result<usize> {
         #[cfg(madsim)]
         {
             let req = Request::Write(self.path.to_string(), bs);
@@ -318,14 +318,14 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    async fn abort(&mut self) -> crate::Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> crate::Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
         ))
     }
 
-    async fn close(&mut self) -> crate::Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> crate::Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index b16f87a40..7a7b73fc4 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,7 +847,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.inner
             .write(bs)
             .await
@@ -861,14 +861,14 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
             err
         })
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.close().await.map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
             err
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index f54f40593..8d8662846 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.inner
             .write(bs)
             .in_span(Span::enter_with_parent(
@@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner
             .abort()
             .in_span(Span::enter_with_parent(
@@ -357,7 +357,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner
             .close()
             .in_span(Span::enter_with_parent(
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index cceca3a69..f0a8ae82b 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,15 +313,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.inner.write(bs).await
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.abort().await
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.close().await
     }
 }
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 2e294c9e5..98637752b 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,7 +662,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.inner
             .write(bs)
             .await
@@ -679,14 +679,14 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
             err
         })
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.close().await.map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
             err
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 3746008ab..3a1c5521f 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -872,7 +872,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 
 #[async_trait]
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let mut backoff = self.builder.build();
 
         loop {
@@ -898,7 +898,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let mut backoff = self.builder.build();
 
         loop {
@@ -924,7 +924,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
         }
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let mut backoff = self.builder.build();
 
         loop {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index aea7b6381..7787c77d8 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -216,7 +216,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
 
         loop {
@@ -241,11 +241,11 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.abort().await
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.close().await
     }
 }
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index c02a4d58a..751caa915 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,7 +322,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let timeout = self.io_timeout(bs.remaining() as u64);
 
         tokio::time::timeout(timeout, self.inner.write(bs))
@@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         tokio::time::timeout(self.timeout, self.inner.abort())
             .await
             .map_err(|_| {
@@ -346,7 +346,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         tokio::time::timeout(self.timeout, self.inner.close())
             .await
             .map_err(|_| {
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index b829f8376..35f718cb5 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.inner.write(bs).await
     }
 
@@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.abort().await
     }
 
@@ -340,7 +340,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.inner.close().await
     }
 }
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 513bda67e..a3ead2e7a 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -16,8 +16,10 @@
 // under the License.
 
 use std::sync::Arc;
+use std::task::{ready, Context, Poll};
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
 
 use super::Adapter;
 use crate::raw::*;
@@ -374,6 +376,7 @@ pub struct KvWriter<S> {
 
     /// TODO: if kv supports append, we can use them directly.
     buf: Option<Vec<u8>>,
+    future: Option<BoxFuture<'static, Result<()>>>,
 }
 
 impl<S> KvWriter<S> {
@@ -382,6 +385,7 @@ impl<S> KvWriter<S> {
             kv,
             path,
             buf: None,
+            future: None,
         }
     }
 }
@@ -389,29 +393,60 @@ impl<S> KvWriter<S> {
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+        if self.future.is_some() {
+            self.future = None;
+            return Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "there is a future on going, it's maybe a bug to go into this 
case",
+            )));
+        }
+
         let size = bs.chunk().len();
 
         let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
         buf.extend_from_slice(bs.chunk());
         self.buf = Some(buf);
 
-        Ok(size)
+        Poll::Ready(Ok(size))
     }
 
-    async fn abort(&mut self) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support abort",
-        ))
+    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        if self.future.is_some() {
+            self.future = None;
+            return Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "there is a future on going, it's maybe a bug to go into this 
case",
+            )));
+        }
+
+        self.buf = None;
+        Poll::Ready(Ok(()))
     }
 
-    async fn close(&mut self) -> Result<()> {
-        if let Some(buf) = self.buf.as_deref() {
-            self.kv.set(&self.path, buf).await?;
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        loop {
+            match self.future.as_mut() {
+                Some(fut) => {
+                    ready!(fut.poll_unpin(cx))?;
+                    self.future = None;
+                    return Poll::Ready(Ok(()));
+                }
+                None => {
+                    let kv = self.kv.clone();
+                    let path = self.path.clone();
+                    let buf = match self.buf.take() {
+                        Some(buf) => buf,
+                        None => return Poll::Ready(Ok(())),
+                    };
+
+                    let fut = async move {
+                        kv.set(&path, &buf).await?;
+                    };
+                    self.future = Some(Box::pin(fut));
+                }
+            }
         }
-
-        Ok(())
     }
 }
 
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 1e0864de7..4a3cdf37b 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -16,9 +16,11 @@
 // under the License.
 
 use std::sync::Arc;
+use std::task::{ready, Context, Poll};
 
 use async_trait::async_trait;
 use bytes::Bytes;
+use futures::future::BoxFuture;
 
 use super::Adapter;
 use super::Value;
@@ -363,6 +365,7 @@ pub struct KvWriter<S> {
 
     op: OpWrite,
     buf: Option<Vec<u8>>,
+    future: Option<BoxFuture<'static, Result<()>>>,
 }
 
 impl<S> KvWriter<S> {
@@ -372,6 +375,7 @@ impl<S> KvWriter<S> {
             path,
             op,
             buf: None,
+            future: None,
         }
     }
 
@@ -401,7 +405,15 @@ impl<S> KvWriter<S> {
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+        if self.future.is_some() {
+            self.future = None;
+            return Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "there is a future on going, it's maybe a bug to go into this 
case",
+            )));
+        }
+
         let size = bs.chunk().len();
 
         let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
@@ -409,20 +421,42 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 
         self.buf = Some(buf);
 
-        Ok(size)
+        Poll::Ready(Ok(size))
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        if self.future.is_some() {
+            self.future = None;
+            return Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "there is a future on going, it's maybe a bug to go into this 
case",
+            )));
+        }
+
         self.buf = None;
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
-    async fn close(&mut self) -> Result<()> {
-        let kv = self.kv.clone();
-        let value = self.build();
-
-        kv.set(&self.path, value).await?;
-        Ok(())
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        loop {
+            match self.future.as_mut() {
+                Some(fut) => {
+                    ready!(fut.poll_unpin(cx))?;
+                    self.future = None;
+                    return Poll::Ready(Ok(()));
+                }
+                None => {
+                    let kv = self.kv.clone();
+                    let path = self.path.clone();
+                    let value = self.build();
+
+                    let fut = async move {
+                        kv.set(&path, value).await?;
+                    };
+                    self.future = Some(Box::pin(fut));
+                }
+            }
+        }
     }
 }
 
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 622303ff4..2463068e9 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -17,8 +17,12 @@
 
 use std::fmt::Display;
 use std::fmt::Formatter;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
+use pin_project::pin_project;
 
 use crate::raw::*;
 use crate::*;
@@ -81,35 +85,33 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
     /// repeatedly until all bytes has been written.
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize>;
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>>;
 
     /// Abort the pending writer.
-    async fn abort(&mut self) -> Result<()>;
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
 
     /// Close the writer and make sure all data has been flushed.
-    async fn close(&mut self) -> Result<()>;
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
 }
 
 #[async_trait]
 impl Write for () {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
-        let _ = bs;
-
+    fn poll_write(&mut self, _: &mut Context<'_>, _: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn abort(&mut self) -> Result<()> {
-        Err(Error::new(
+    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support abort",
-        ))
+        )))
     }
 
-    async fn close(&mut self) -> Result<()> {
-        Err(Error::new(
+    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support close",
-        ))
+        )))
     }
 }
 
@@ -118,16 +120,90 @@ impl Write for () {
 /// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
-        (**self).write(bs).await
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+        (**self).poll_write(cx, bs)
+    }
+
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        (**self).poll_abort(cx)
+    }
+
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        (**self).poll_close(cx)
+    }
+}
+
+/// Impl WriteExt for all T: Write
+impl<T: Write> WriteExt for T {}
+
+/// Extension of [`Read`] to make it easier for use.
+pub trait WriteExt: Write {
+    /// Build a future for `poll_write`.
+    fn write<'a>(&'a mut self, buf: &'a dyn oio::WriteBuf) -> WriteFuture<'a, 
Self> {
+        WriteFuture { writer: self, buf }
+    }
+
+    fn abort(&mut self) -> AbortFuture<Self> {
+        AbortFuture { writer: self }
     }
 
-    async fn abort(&mut self) -> Result<()> {
-        (**self).abort().await
+    fn close(&mut self) -> CloseFuture<Self> {
+        CloseFuture { writer: self }
     }
+}
+
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct WriteFuture<'a, W: Write + Unpin + ?Sized> {
+    writer: &'a mut W,
+    buf: &'a dyn oio::WriteBuf,
+}
+
+impl<W> Future for WriteFuture<'_, W>
+where
+    W: Write + Unpin + ?Sized,
+{
+    type Output = Result<usize>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> 
{
+        let this = self.project();
+        Pin::new(this.writer).poll_write(cx, this.buf)
+    }
+}
+
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct AbortFuture<'a, W: Write + Unpin + ?Sized> {
+    writer: &'a mut W,
+}
+
+impl<W> Future for AbortFuture<'_, W>
+where
+    W: Write + Unpin + ?Sized,
+{
+    type Output = Result<usize>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        let this = self.project();
+        Pin::new(this.writer).poll_abort(cx)
+    }
+}
+
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct CloseFuture<'a, W: Write + Unpin + ?Sized> {
+    writer: &'a mut W,
+}
+
+impl<W> Future for CloseFuture<'_, W>
+where
+    W: Write + Unpin + ?Sized,
+{
+    type Output = Result<usize>;
 
-    async fn close(&mut self) -> Result<()> {
-        (**self).close().await
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        let this = self.project();
+        Pin::new(this.writer).poll_close(cx)
     }
 }
 
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 2a5e30609..ed2a456d9 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -77,7 +77,7 @@ impl<W> oio::Write for AppendObjectWriter<W>
 where
     W: AppendObjectWrite,
 {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let offset = self.offset().await?;
 
         let size = bs.remaining();
@@ -94,11 +94,11 @@ where
         Ok(size)
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index e2c6638af..c6504ee93 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -55,21 +55,21 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
 
 #[async_trait]
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         match self {
             Self::One(one) => one.abort().await,
             Self::Two(two) => two.abort().await,
         }
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         match self {
             Self::One(one) => one.close().await,
             Self::Two(two) => two.close().await,
@@ -93,7 +93,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, 
THREE: oio::Write> {
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWaysWriter<ONE, TWO, THREE>
 {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
@@ -101,7 +101,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         match self {
             Self::One(one) => one.abort().await,
             Self::Two(two) => two.abort().await,
@@ -109,7 +109,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         match self {
             Self::One(one) => one.close().await,
             Self::Two(two) => two.close().await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index d4a868472..38e00f057 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -16,11 +16,13 @@
 // under the License.
 
 use std::cmp::min;
+use std::task::{ready, Context, Poll};
 
 use async_trait::async_trait;
 use bytes::{Bytes, BytesMut};
 
 use crate::raw::oio::WriteBuf;
+use crate::raw::oio::WriteExt;
 use crate::raw::*;
 use crate::*;
 
@@ -62,20 +64,20 @@ enum Buffer {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
         loop {
             match &mut self.buffer {
                 Buffer::Empty => {
                     if bs.remaining() >= self.buffer_size {
                         self.buffer = 
Buffer::Consuming(bs.copy_to_bytes(self.buffer_size));
-                        return Ok(self.buffer_size);
+                        return Poll::Ready(Ok(self.buffer_size));
                     }
 
                     let chunk = bs.chunk();
                     let mut fill = BytesMut::with_capacity(chunk.len());
                     fill.extend_from_slice(chunk);
                     self.buffer = Buffer::Filling(fill);
-                    return Ok(chunk.len());
+                    return Poll::Ready(Ok(chunk.len()));
                 }
                 Buffer::Filling(fill) => {
                     if fill.len() >= self.buffer_size {
@@ -85,14 +87,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
                     let size = min(self.buffer_size - fill.len(), 
bs.chunk().len());
                     fill.extend_from_slice(&bs.chunk()[..size]);
-                    return Ok(size);
+                    return Poll::Ready(Ok(size));
                 }
                 Buffer::Consuming(consume) => {
                     // Make sure filled buffer has been flushed.
                     //
                     // TODO: maybe we can re-fill it after a successful write.
                     while !consume.is_empty() {
-                        let n = self.inner.write(consume).await?;
+                        let n = ready!(self.inner.poll_write(cx, consume)?);
                         consume.advance(n);
                     }
                     self.buffer = Buffer::Empty;
@@ -101,12 +103,12 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         self.buffer = Buffer::Empty;
-        self.inner.abort().await
+        self.inner.poll_abort(cx)
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         loop {
             match &mut self.buffer {
                 Buffer::Empty => break,
@@ -119,7 +121,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                     //
                     // TODO: maybe we can re-fill it after a successful write.
                     while !consume.is_empty() {
-                        let n = self.inner.write(&consume).await?;
+                        let n = ready!(self.inner.poll_write(cx, &consume))?;
                         consume.advance(n);
                     }
                     self.buffer = Buffer::Empty;
@@ -128,7 +130,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
             }
         }
 
-        self.inner.close().await
+        self.inner.poll_close(cx)
     }
 }
 
@@ -151,21 +153,21 @@ mod tests {
 
     #[async_trait]
     impl Write for MockWriter {
-        async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> {
+        fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
             debug!(
                 "test_fuzz_exact_buf_writer: flush size: {}",
                 bs.chunk().len()
             );
 
             self.buf.extend_from_slice(bs.chunk());
-            Ok(bs.chunk().len())
+            Poll::Ready(Ok(bs.chunk().len()))
         }
 
-        async fn abort(&mut self) -> Result<()> {
+        fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
             Ok(())
         }
 
-        async fn close(&mut self) -> Result<()> {
+        fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
             Ok(())
         }
     }
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index dfaf6c4ee..ff6d8377e 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -19,6 +19,7 @@ mod api;
 pub use api::BlockingWrite;
 pub use api::BlockingWriter;
 pub use api::Write;
+pub use api::WriteExt;
 pub use api::WriteOperation;
 pub use api::Writer;
 
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 39660a2f7..237b5774e 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -119,7 +119,7 @@ impl<W> oio::Write for MultipartUploadWriter<W>
 where
     W: MultipartUploadWrite,
 {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let upload_id = self.upload_id().await?;
 
         let size = bs.remaining();
@@ -137,7 +137,7 @@ where
         Ok(size)
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let upload_id = if let Some(upload_id) = &self.upload_id {
             upload_id
         } else {
@@ -147,7 +147,7 @@ where
         self.inner.complete_part(upload_id, &self.parts).await
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let upload_id = if let Some(upload_id) = &self.upload_id {
             upload_id
         } else {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index a02c64445..9d387aedb 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -48,17 +48,17 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
         self.inner.write_once(bs).await?;
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 9745b1387..680bc8f53 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -160,7 +160,7 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         if self.op.append() {
@@ -181,11 +181,11 @@ impl oio::Write for AzblobWriter {
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index f9bec7346..0dfaa320b 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -40,7 +40,7 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::Write for AzdfsWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -89,11 +89,11 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index c39e95eaf..6331727b1 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -39,7 +39,7 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::Write for DropboxWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         let resp = self
@@ -61,11 +61,11 @@ impl oio::Write for DropboxWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 13e03ae2c..fd4a9535b 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -43,18 +43,18 @@ impl<F> FsWriter<F> {
 
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.f.write(bs.chunk()).await.map_err(parse_io_error)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support abort",
         ))
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.f.sync_all().await.map_err(parse_io_error)?;
 
         if let Some(tmp_path) = &self.tmp_path {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 2b0673d67..f0e61d537 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -40,7 +40,7 @@ impl FtpWriter {
 
 #[async_trait]
 impl oio::Write for FtpWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
         let bs = bs.copy_to_bytes(size);
 
@@ -55,11 +55,11 @@ impl oio::Write for FtpWriter {
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index d8dbe7597..9ae9b2fb3 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -118,7 +118,7 @@ impl GcsWriter {
 
 #[async_trait]
 impl oio::Write for GcsWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         let location = match &self.location {
@@ -161,7 +161,7 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let location = if let Some(location) = &self.location {
             location
         } else {
@@ -183,7 +183,7 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let location = if let Some(location) = &self.location {
             location
         } else {
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index c77a8989d..2fc5f229a 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -95,7 +95,7 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
         if self.file_id.is_none() {
             self.write_create(size as u64, bs.copy_to_bytes(size))
@@ -108,11 +108,11 @@ impl oio::Write for GdriveWriter {
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 13b27ccf4..7314d82fc 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -41,7 +41,7 @@ impl GhacWriter {
 
 #[async_trait]
 impl oio::Write for GhacWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         let req = self
@@ -66,11 +66,11 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         let req = self.backend.ghac_commit(self.cache_id, self.size).await?;
         let resp = self.backend.client.send(req).await?;
 
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 47cdb53fe..e5f140dca 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -36,18 +36,18 @@ impl<F> HdfsWriter<F> {
 
 #[async_trait]
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         self.f.write(bs.chunk()).await.map_err(parse_io_error)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support abort",
         ))
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         self.f.close().await.map_err(parse_io_error)?;
 
         Ok(())
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 6b6ab6a4d..2c82ec0c6 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -37,7 +37,7 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::Write for IpmfsWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
         let resp = self
             .backend
@@ -55,11 +55,11 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index d9eabb8eb..9d0c6a992 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -45,7 +45,7 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::Write for OneDriveWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
         let bs = bs.copy_to_bytes(size);
 
@@ -58,11 +58,11 @@ impl oio::Write for OneDriveWriter {
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 90e605152..9abd24f25 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -33,20 +33,20 @@ impl SftpWriter {
 
 #[async_trait]
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = self.file.write(bs.chunk()).await?;
 
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "SFTP does not support aborting writes",
         ))
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index 95a2ee92f..8830ca849 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -67,20 +67,20 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::Write for SupabaseWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
         self.upload(bs.copy_to_bytes(size)).await?;
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "The abort operation is not yet supported for Supabase backend",
         ))
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 36e62b734..52acee034 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -38,7 +38,7 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::Write for VercelArtifactsWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         let resp = self
@@ -61,11 +61,11 @@ impl oio::Write for VercelArtifactsWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index d0509ebbc..4736e348b 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -40,7 +40,7 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         let resp = self
@@ -64,11 +64,11 @@ impl oio::Write for WasabiWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 413fe891a..cccf177ed 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -61,7 +61,7 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::Write for WebdavWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
@@ -70,11 +70,11 @@ impl oio::Write for WebdavWriter {
         Ok(size)
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 34e6b20fa..c3ce0b5be 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -38,7 +38,7 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::Write for WebhdfsWriter {
-    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Result<usize> {
         let size = bs.remaining();
 
         let req = self
@@ -63,11 +63,11 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         Ok(())
     }
 }
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 0474d6ea4..da384729f 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -205,7 +205,7 @@ impl Writer {
     ///
     /// Abort should only be called when the writer is not closed or
     /// aborted, otherwise an unexpected error could be returned.
-    pub async fn abort(&mut self) -> Result<()> {
+    pub fn poll_abort(&mut self, cx: &mut Context<'_>) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
             w.abort().await
         } else {
@@ -222,7 +222,7 @@ impl Writer {
     ///
     /// Close should only be called when the writer is not closed or
     /// aborted, otherwise an unexpected error could be returned.
-    pub async fn close(&mut self) -> Result<()> {
+    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
             w.close().await
         } else {

Reply via email to