This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit b0379a2466fa7597745f2d67336badf7fbdbd2dc
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 29 17:51:39 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        |  4 ---
 core/benches/oio/write.rs                        | 16 +++++++++--
 core/src/layers/blocking.rs                      |  5 +++-
 core/src/layers/complete.rs                      | 23 ----------------
 core/src/layers/concurrent_limit.rs              |  8 ++----
 core/src/layers/error_context.rs                 |  8 ------
 core/src/layers/logging.rs                       | 34 ------------------------
 core/src/layers/madsim.rs                        | 16 -----------
 core/src/layers/metrics.rs                       | 12 ---------
 core/src/layers/minitrace.rs                     | 10 -------
 core/src/layers/oteltrace.rs                     |  4 ---
 core/src/layers/prometheus.rs                    | 17 ------------
 core/src/layers/retry.rs                         | 26 ------------------
 core/src/layers/throttle.rs                      | 25 -----------------
 core/src/layers/timeout.rs                       | 13 ---------
 core/src/layers/tracing.rs                       |  8 ------
 core/src/raw/adapters/kv/backend.rs              |  7 -----
 core/src/raw/adapters/typed_kv/backend.rs        |  7 -----
 core/src/raw/oio/write/api.rs                    | 15 +----------
 core/src/raw/oio/write/append_object_write.rs    | 12 ---------
 core/src/raw/oio/write/at_least_buf_write.rs     | 25 -----------------
 core/src/raw/oio/write/compose_write.rs          | 16 -----------
 core/src/raw/oio/write/exact_buf_write.rs        | 29 ++++++++++----------
 core/src/raw/oio/write/multipart_upload_write.rs | 17 ------------
 core/src/raw/oio/write/one_shot_write.rs         |  8 ------
 core/src/services/azblob/writer.rs               | 18 -------------
 core/src/services/azdfs/writer.rs                |  6 ++---
 core/src/services/dropbox/writer.rs              |  6 ++---
 core/src/services/fs/writer.rs                   | 15 -----------
 core/src/services/ftp/writer.rs                  |  6 ++---
 core/src/services/gcs/writer.rs                  |  6 ++---
 core/src/services/gdrive/writer.rs               |  6 ++---
 core/src/services/ghac/writer.rs                 |  6 ++---
 core/src/services/hdfs/writer.rs                 |  6 +++--
 core/src/services/ipmfs/writer.rs                |  6 ++---
 core/src/services/onedrive/writer.rs             |  6 ++---
 core/src/services/sftp/writer.rs                 |  6 ++---
 core/src/services/supabase/writer.rs             |  6 ++---
 core/src/services/vercel_artifacts/writer.rs     |  6 ++---
 core/src/services/wasabi/writer.rs               |  6 ++---
 core/src/services/webdav/writer.rs               |  6 ++---
 core/src/services/webhdfs/writer.rs              |  6 ++---
 core/src/types/operator/operator.rs              |  4 ++-
 core/src/types/writer.rs                         | 10 ++++---
 44 files changed, 91 insertions(+), 411 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 0e70bcfc7..e704f8d52 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -27,10 +27,6 @@ pub struct BlackHoleWriter;
 
 #[async_trait]
 impl oio::Write for BlackHoleWriter {
-    async fn write(&mut self, _: Bytes) -> opendal::Result<()> {
-        Ok(())
-    }
-
     async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
         Ok(())
     }
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 6e26ce7e0..befc6c587 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -17,6 +17,7 @@
 
 use criterion::Criterion;
 use once_cell::sync::Lazy;
+use opendal::raw::oio;
 use opendal::raw::oio::AtLeastBufWriter;
 use opendal::raw::oio::ExactBufWriter;
 use opendal::raw::oio::Write;
@@ -45,7 +46,13 @@ pub fn bench_at_least_buf_write(c: &mut Criterion) {
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(content.clone()).await.unwrap();
+
+                w.sink(
+                    content.len() as u64,
+                    Box::new(oio::Cursor::from(content.clone())),
+                )
+                .await
+                .unwrap();
                 w.close().await.unwrap();
             })
         });
@@ -71,7 +78,12 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(content.clone()).await.unwrap();
+                w.sink(
+                    content.len() as u64,
+                    Box::new(oio::Cursor::from(content.clone())),
+                )
+                .await
+                .unwrap();
                 w.close().await.unwrap();
             })
         });
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 7b80b5956..2dab3ea45 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -197,7 +197,10 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.handle.block_on(self.inner.write(bs))
+        self.handle.block_on(
+            self.inner
+                .sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))),
+        )
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 4c58b6e30..13e009746 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,29 +711,6 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let n = bs.len();
-
-        if let Some(size) = self.size {
-            if self.written + n as u64 > size {
-                return Err(Error::new(
-                    ErrorKind::ContentTruncated,
-                    &format!(
-                        "writer got too much data, expect: {size}, actual: {}",
-                        self.written + n as u64
-                    ),
-                ));
-            }
-        }
-
-        let w = self.inner.as_mut().ok_or_else(|| {
-            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
-        })?;
-        w.write(bs).await?;
-        self.written += n as u64;
-        Ok(())
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         if let Some(total_size) = self.size {
             if self.written + size > total_size {
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 9cef0fb9b..781fb7610 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,18 +285,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.write(bs).await
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.sink(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.sink(size, s).await
-    }
-
     async fn close(&mut self) -> Result<()> {
         self.inner.close().await
     }
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index bfe9be4df..a0da6a0a2 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -403,14 +403,6 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 
 #[async_trait::async_trait]
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.write(bs).await.map_err(|err| {
-            err.with_operation(WriteOperation::Write)
-                .with_context("service", self.scheme)
-                .with_context("path", &self.path)
-        })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             err.with_operation(WriteOperation::Abort)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 07323cf8c..5b2f39e63 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,40 +1252,6 @@ impl<W> LoggingWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        match self.inner.write(bs).await {
-            Ok(_) => {
-                self.written += size as u64;
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={} -> data write 
{}B",
-                    self.ctx.scheme,
-                    WriteOperation::Write,
-                    self.path,
-                    self.written,
-                    size
-                );
-                Ok(())
-            }
-            Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={} -> data 
write failed: {}",
-                        self.ctx.scheme,
-                        WriteOperation::Write,
-                        self.path,
-                        self.written,
-                        self.ctx.error_print(&err),
-                    )
-                }
-                Err(err)
-            }
-        }
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         match self.inner.sink(size, s).await {
             Ok(_) => {
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index fdf0ec5de..e4b490c87 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,22 +302,6 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
-        #[cfg(madsim)]
-        {
-            let req = Request::Write(self.path.to_string(), bs);
-            let ep = Endpoint::bind(self.addr).await?;
-            let (tx, mut rx) = ep.connect1(self.addr).await?;
-            tx.send(Box::new(req)).await?;
-            rx.recv().await?;
-            Ok(())
-        }
-        #[cfg(not(madsim))]
-        {
-            unreachable!("madsim is not enabled")
-        }
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> 
{
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 1eade833b..40ad53693 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,18 +847,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .write(bs)
-            .await
-            .map(|_| self.bytes += size as u64)
-            .map_err(|err| {
-                self.handle.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner
             .sink(size, s)
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 1213d692e..da216cd61 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,16 +337,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.inner
-            .write(bs)
-            .in_span(Span::enter_with_parent(
-                WriteOperation::Write.into_static(),
-                &self.span,
-            ))
-            .await
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner
             .sink(size, s)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 2ae39b05c..39455cd54 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,10 +313,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.write(bs).await
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner.sink(size, s).await
     }
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 005d6aa97..7f6b5aecb 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,23 +662,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .write(bs)
-            .await
-            .map(|_| {
-                self.stats
-                    .bytes_total
-                    .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
-                    .observe(size as f64)
-            })
-            .map_err(|err| {
-                self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner
             .sink(size, s)
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 7517c2c21..5127ca6e3 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -873,32 +873,6 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 
 #[async_trait]
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let mut backoff = self.builder.build();
-
-        loop {
-            match self.inner.write(bs.clone()).await {
-                Ok(v) => return Ok(v),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
-                    Some(dur) => {
-                        self.notify.intercept(
-                            &e,
-                            dur,
-                            &[
-                                ("operation", 
WriteOperation::Write.into_static()),
-                                ("path", &self.path),
-                            ],
-                        );
-                        tokio::time::sleep(dur).await;
-                        continue;
-                    }
-                },
-            }
-        }
-    }
-
     /// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>` 
here? Adding a lock has
     /// a lot overhead!
     ///
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index d929226df..89fabe71d 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -217,31 +217,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
-
-        loop {
-            match self.limiter.check_n(buf_length) {
-                Ok(_) => return self.inner.write(bs).await,
-                Err(negative) => match negative {
-                    // the query is valid but the Decider can not accommodate 
them.
-                    NegativeMultiDecision::BatchNonConforming(_, not_until) => 
{
-                        let wait_time = 
not_until.wait_time_from(DefaultClock::default().now());
-                        // TODO: Should lock the limiter and wait for the 
wait_time, or should let other small requests go first?
-                        tokio::time::sleep(wait_time).await;
-                    }
-                    // the query was invalid as the rate limit parameters can 
"never" accommodate the number of cells queried for.
-                    NegativeMultiDecision::InsufficientCapacity(_) => {
-                        return Err(Error::new(
-                            ErrorKind::RateLimited,
-                            "InsufficientCapacity due to burst size being 
smaller than the request size",
-                        ))
-                    }
-                },
-            }
-        }
-    }
-
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
         self.inner.sink(size, s).await
     }
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index c0fb739aa..8a894d17b 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,19 +322,6 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let timeout = self.io_timeout(bs.len() as u64);
-
-        tokio::time::timeout(timeout, self.inner.write(bs))
-            .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(WriteOperation::Write)
-                    .with_context("timeout", timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         let timeout = self.io_timeout(size);
 
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 9042e6e35..e3812fd48 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -320,14 +320,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
TracingWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TracingWrapper<R> {
-    #[tracing::instrument(
-        parent = &self.span,
-        level = "trace",
-        skip_all)]
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.write(bs).await
-    }
-
     #[tracing::instrument(
         parent = &self.span,
         level = "trace",
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 41c4c6a7d..c1314b2de 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -389,13 +389,6 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.buf = Some(bs.into());
-
-        Ok(())
-    }
-
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 9f6186a38..76aee64f9 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -402,13 +402,6 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.buf.push(bs);
-
-        Ok(())
-    }
-
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index f2bb025af..14bff1ec7 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -87,7 +87,7 @@ pub type Writer = Box<dyn Write>;
 /// the whole data.
 #[async_trait]
 pub trait Write: Unpin + Send + Sync {
-    /// Write given bytes into writer.
+    /// Sink given stream into writer.
     ///
     /// # Notes
     ///
@@ -95,9 +95,6 @@ pub trait Write: Unpin + Send + Sync {
     /// content length. And users will call write multiple times.
     ///
     /// Please make sure `write` is safe to re-enter.
-    async fn write(&mut self, bs: Bytes) -> Result<()>;
-
-    /// Sink given stream into writer.
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>;
 
     /// Abort the pending writer.
@@ -109,12 +106,6 @@ pub trait Write: Unpin + Send + Sync {
 
 #[async_trait]
 impl Write for () {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        unimplemented!("write is required to be implemented for oio::Write")
-    }
-
     async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -142,10 +133,6 @@ impl Write for () {
 /// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        (**self).write(bs).await
-    }
-
     async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> {
         (**self).sink(n, s).await
     }
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 07fa546cc..7e02fc0ea 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::oio::Streamer;
 use crate::raw::*;
@@ -79,17 +78,6 @@ impl<W> oio::Write for AppendObjectWriter<W>
 where
     W: AppendObjectWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let offset = self.offset().await?;
-
-        let size = bs.len() as u64;
-
-        self.inner
-            .append(offset, size, AsyncBody::Bytes(bs))
-            .await
-            .map(|_| self.offset = Some(offset + size))
-    }
-
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
         let offset = self.offset().await?;
 
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 91adddd30..94be240f4 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::oio::StreamExt;
 use crate::raw::oio::Streamer;
@@ -64,30 +63,6 @@ impl<W: oio::Write> AtLeastBufWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        // If total size is known and equals to given bytes, we can write it 
directly.
-        if let Some(total_size) = self.total_size {
-            if total_size == bs.len() as u64 {
-                return self.inner.write(bs).await;
-            }
-        }
-
-        // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() + bs.len() < self.buffer_size {
-            self.buffer.push(bs);
-            return Ok(());
-        }
-
-        let mut buf = self.buffer.clone();
-        buf.push(bs);
-
-        self.inner
-            .sink(buf.len() as u64, Box::new(buf))
-            .await
-            // Clear buffer if the write is successful.
-            .map(|_| self.buffer.clear())
-    }
-
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
         // If total size is known and equals to given stream, we can write it 
directly.
         if let Some(total_size) = self.total_size {
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 043df2978..ea5f3bf25 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -39,7 +39,6 @@
 //! type_alias_impl_trait has been stabilized.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::oio::Streamer;
 use crate::raw::*;
@@ -57,13 +56,6 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
 
 #[async_trait]
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        match self {
-            Self::One(one) => one.write(bs).await,
-            Self::Two(two) => two.write(bs).await,
-        }
-    }
-
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
         match self {
             Self::One(one) => one.sink(size, s).await,
@@ -102,14 +94,6 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, 
THREE: oio::Write> {
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWaysWriter<ONE, TWO, THREE>
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        match self {
-            Self::One(one) => one.write(bs).await,
-            Self::Two(two) => two.write(bs).await,
-            Self::Three(three) => three.write(bs).await,
-        }
-    }
-
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
         match self {
             Self::One(one) => one.sink(size, s).await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 8561c1a4a..59533db6a 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -84,11 +84,6 @@ impl<W: oio::Write> ExactBufWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
-            .await
-    }
-
     /// # TODO
     ///
     /// We know every stream size, we can collect them into a buffer without 
chain them every time.
@@ -202,17 +197,12 @@ mod tests {
 
     #[async_trait]
     impl Write for MockWriter {
-        async fn write(&mut self, bs: Bytes) -> Result<()> {
-            debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
-
-            self.buf.extend_from_slice(&bs);
-            Ok(())
-        }
-
         async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
             let bs = s.collect().await?;
             assert_eq!(bs.len() as u64, size);
-            self.write(bs).await
+            self.buf.extend_from_slice(&bs);
+
+            Ok(())
         }
 
         async fn abort(&mut self) -> Result<()> {
@@ -238,7 +228,11 @@ mod tests {
 
         let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
 
-        w.write(Bytes::from(expected.clone())).await?;
+        w.sink(
+            expected.len() as u64,
+            Box::new(oio::Cursor::from(Bytes::from(expected.clone()))),
+        )
+        .await?;
         w.close().await?;
 
         assert_eq!(w.inner.buf.len(), expected.len());
@@ -271,7 +265,12 @@ mod tests {
             rng.fill_bytes(&mut content);
 
             expected.extend_from_slice(&content);
-            writer.write(Bytes::from(content)).await?;
+            writer
+                .sink(
+                    expected.len() as u64,
+                    Box::new(oio::Cursor::from(Bytes::from(expected.clone()))),
+                )
+                .await?;
         }
         writer.close().await?;
 
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index c013b24c3..0a0d49680 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::*;
 use crate::*;
@@ -120,22 +119,6 @@ impl<W> oio::Write for MultipartUploadWriter<W>
 where
     W: MultipartUploadWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = self.upload_id().await?;
-
-        let size = bs.len();
-
-        self.inner
-            .write_part(
-                &upload_id,
-                self.parts.len(),
-                size as u64,
-                AsyncBody::Bytes(bs),
-            )
-            .await
-            .map(|v| self.parts.push(v))
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         let upload_id = self.upload_id().await?;
 
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 35b1883bf..a3fcd1b2c 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::*;
 use crate::*;
@@ -49,13 +48,6 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let cursor = oio::Cursor::from(bs);
-        self.inner
-            .write_once(cursor.len() as u64, Box::new(cursor))
-            .await
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner.write_once(size, s).await
     }
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 31a56f27a..2db716b11 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzblobCore;
@@ -161,23 +160,6 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        if self.op.append() {
-            self.append_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                .await
-        } else {
-            if self.op.content_length().is_none() {
-                return Err(Error::new(
-                    ErrorKind::Unsupported,
-                    "write without content length is not supported",
-                ));
-            }
-
-            self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                .await
-        }
-    }
-
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         if self.op.append() {
             self.append_oneshot(size, AsyncBody::Stream(s)).await
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 3c8db1ac1..2eda93e40 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -37,10 +37,7 @@ impl AzdfsWriter {
     pub fn new(core: Arc<AzdfsCore>, op: OpWrite, path: String) -> Self {
         AzdfsWriter { core, op, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for AzdfsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
@@ -85,7 +82,10 @@ impl oio::Write for AzdfsWriter {
                 .with_operation("Backend::azdfs_update_request")),
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for AzdfsWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 2f5e97558..f4a4e225b 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -36,10 +36,7 @@ impl DropboxWriter {
     pub fn new(core: Arc<DropboxCore>, op: OpWrite, path: String) -> Self {
         DropboxWriter { core, op, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for DropboxWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let resp = self
             .core
@@ -59,7 +56,10 @@ impl oio::Write for DropboxWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for DropboxWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 4d31444af..6b518b436 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -50,21 +50,6 @@ impl<F> FsWriter<F> {
 
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f
-            .seek(SeekFrom::Start(self.pos))
-            .await
-            .map_err(parse_io_error)?;
-        self.f.write_all(&bs).await.map_err(parse_io_error)?;
-        self.pos += bs.len() as u64;
-
-        Ok(())
-    }
-
     async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index cd4ba0f6a..7816277c1 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -37,10 +37,7 @@ impl FtpWriter {
     pub fn new(backend: FtpBackend, path: String) -> Self {
         FtpWriter { backend, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for FtpWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
         let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
@@ -52,7 +49,10 @@ impl oio::Write for FtpWriter {
 
         Ok(())
     }
+}
 
+#[async_trait]
+impl oio::Write for FtpWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 236016294..70d79798a 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -114,10 +114,7 @@ impl GcsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for GcsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let location = match &self.location {
             Some(location) => location,
@@ -163,7 +160,10 @@ impl oio::Write for GcsWriter {
             }
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for GcsWriter {
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.write_oneshot(size, AsyncBody::Stream(s)).await
     }
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 48d88f3ec..718307dc8 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -91,10 +91,7 @@ impl GdriveWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for GdriveWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         if self.file_id.is_none() {
             self.write_create(bs.len() as u64, bs).await
@@ -102,7 +99,10 @@ impl oio::Write for GdriveWriter {
             self.write_overwrite(bs.len() as u64, bs).await
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for GdriveWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index b2f959947..9a088a0be 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -38,10 +38,7 @@ impl GhacWriter {
             size: 0,
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for GhacWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let size = bs.len() as u64;
         let req = self
@@ -61,7 +58,10 @@ impl oio::Write for GhacWriter {
                 .map(|err| err.with_operation("Backend::ghac_upload"))?)
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for GhacWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 23c5f1d68..8800ab241 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -39,8 +39,7 @@ impl<F> HdfsWriter<F> {
     }
 }
 
-#[async_trait]
-impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
+impl HdfsWriter<hdrs::AsyncFile> {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         while self.pos < bs.len() {
             let n = self
@@ -55,7 +54,10 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
 
         Ok(())
     }
+}
 
+#[async_trait]
+impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 528478142..011624c23 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -34,10 +34,7 @@ impl IpmfsWriter {
     pub fn new(backend: IpmfsBackend, path: String) -> Self {
         IpmfsWriter { backend, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for IpmfsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let resp = self.backend.ipmfs_write(&self.path, bs).await?;
 
@@ -51,7 +48,10 @@ impl oio::Write for IpmfsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for IpmfsWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 620130847..8b38e30ee 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -42,10 +42,7 @@ impl OneDriveWriter {
     pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self {
         OneDriveWriter { backend, op, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for OneDriveWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let size = bs.len();
 
@@ -55,7 +52,10 @@ impl oio::Write for OneDriveWriter {
             self.write_chunked(bs).await
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for OneDriveWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 76da70da3..cad5c2cca 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -32,16 +32,16 @@ impl SftpWriter {
     pub fn new(file: File) -> Self {
         SftpWriter { file }
     }
-}
 
-#[async_trait]
-impl oio::Write for SftpWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         self.file.write_all(&bs).await?;
 
         Ok(())
     }
+}
 
+#[async_trait]
+impl oio::Write for SftpWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index f4c271313..9eb661c10 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -63,10 +63,7 @@ impl SupabaseWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for SupabaseWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         if bs.is_empty() {
             return Ok(());
@@ -74,7 +71,10 @@ impl oio::Write for SupabaseWriter {
 
         self.upload(bs).await
     }
+}
 
+#[async_trait]
+impl oio::Write for SupabaseWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 6f32d67ba..0d55ca277 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -35,10 +35,7 @@ impl VercelArtifactsWriter {
     pub fn new(backend: VercelArtifactsBackend, op: OpWrite, path: String) -> 
Self {
         VercelArtifactsWriter { backend, op, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for VercelArtifactsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let resp = self
             .backend
@@ -59,7 +56,10 @@ impl oio::Write for VercelArtifactsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for VercelArtifactsWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 689c334dc..4f3f1697a 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -37,10 +37,7 @@ impl WasabiWriter {
     pub fn new(core: Arc<WasabiCore>, op: OpWrite, path: String) -> Self {
         WasabiWriter { core, op, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for WasabiWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let resp = self
             .core
@@ -62,7 +59,10 @@ impl oio::Write for WasabiWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for WasabiWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index a3c17bafa..aeaf0d781 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -58,15 +58,15 @@ impl WebdavWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for WebdavWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
             .await
     }
+}
 
+#[async_trait]
+impl oio::Write for WebdavWriter {
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.write_oneshot(size, AsyncBody::Stream(s)).await
     }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 97eef2e3d..85e6834f4 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -35,10 +35,7 @@ impl WebhdfsWriter {
     pub fn new(backend: WebhdfsBackend, op: OpWrite, path: String) -> Self {
         WebhdfsWriter { backend, op, path }
     }
-}
 
-#[async_trait]
-impl oio::Write for WebhdfsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let req = self
             .backend
@@ -61,7 +58,10 @@ impl oio::Write for WebhdfsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for WebhdfsWriter {
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index ab97a2df2..ab6bca5ac 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -729,7 +729,9 @@ impl Operator {
                     }
 
                     let (_, mut w) = inner.write(&path, args).await?;
-                    w.write(bs).await?;
+                    // FIXME: we should bench here to measure the perf.
+                    w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
+                        .await?;
                     w.close().await?;
 
                     Ok(())
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 37a9fe72c..8517a2f04 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -82,7 +82,9 @@ impl Writer {
     /// Write into inner writer.
     pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
-            w.write(bs.into()).await
+            let bs = bs.into();
+            w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
+                .await
         } else {
             unreachable!(
                 "writer state invalid while write, expect Idle, actual {}",
@@ -250,7 +252,8 @@ impl AsyncWrite for Writer {
                     let bs = Bytes::from(buf.to_vec());
                     let size = bs.len();
                     let fut = async move {
-                        w.write(bs).await?;
+                        // FIXME: we should bench here to measure the perf.
+                        w.sink(size as u64, 
Box::new(oio::Cursor::from(bs))).await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));
@@ -317,7 +320,8 @@ impl tokio::io::AsyncWrite for Writer {
                     let bs = Bytes::from(buf.to_vec());
                     let size = bs.len();
                     let fut = async move {
-                        w.write(bs).await?;
+                        // FIXME: we should bench here to measure the perf.
+                        w.sink(size as u64, 
Box::new(oio::Cursor::from(bs))).await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));


Reply via email to