This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e171fddee refactor(core)!: Make oio::Write always write all given 
buffer (#4880)
6e171fddee is described below

commit 6e171fddeef675abc65b135747ee17d093781035
Author: Xuanwo <[email protected]>
AuthorDate: Fri Jul 12 13:48:48 2024 +0800

    refactor(core)!: Make oio::Write always write all given buffer (#4880)
    
    * Remove returning n in write
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix write
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/async_backtrace.rs          |   4 +-
 core/src/layers/await_tree.rs               |   4 +-
 core/src/layers/blocking.rs                 |   2 +-
 core/src/layers/complete.rs                 |   7 +-
 core/src/layers/concurrent_limit.rs         |   4 +-
 core/src/layers/dtrace.rs                   |  14 ++-
 core/src/layers/error_context.rs            |  14 ++-
 core/src/layers/logging.rs                  |  25 +++---
 core/src/layers/metrics.rs                  |  17 ++--
 core/src/layers/minitrace.rs                |   4 +-
 core/src/layers/oteltrace.rs                |   4 +-
 core/src/layers/prometheus.rs               |  20 +++--
 core/src/layers/prometheus_client.rs        |  16 ++--
 core/src/layers/retry.rs                    |   8 +-
 core/src/layers/throttle.rs                 |   4 +-
 core/src/layers/timeout.rs                  |   2 +-
 core/src/layers/tracing.rs                  |   4 +-
 core/src/raw/adapters/kv/backend.rs         |  10 +--
 core/src/raw/adapters/typed_kv/backend.rs   |  10 +--
 core/src/raw/enum_utils.rs                  |   4 +-
 core/src/raw/oio/write/api.rs               |  30 ++-----
 core/src/raw/oio/write/append_write.rs      |   8 +-
 core/src/raw/oio/write/block_write.rs       |  10 +--
 core/src/raw/oio/write/multipart_write.rs   |  10 +--
 core/src/raw/oio/write/one_shot_write.rs    |   5 +-
 core/src/raw/oio/write/position_write.rs    |  10 +--
 core/src/raw/oio/write/range_write.rs       |  10 +--
 core/src/services/aliyun_drive/writer.rs    |   6 +-
 core/src/services/alluxio/writer.rs         |   7 +-
 core/src/services/compfs/writer.rs          |  24 +++---
 core/src/services/fs/writer.rs              |  19 +++--
 core/src/services/ftp/writer.rs             |  25 +++---
 core/src/services/ghac/writer.rs            |   4 +-
 core/src/services/hdfs/writer.rs            |  18 +++-
 core/src/services/hdfs_native/writer.rs     |   2 +-
 core/src/services/sftp/writer.rs            |  13 ++-
 core/src/types/blocking_write/std_writer.rs |   5 +-
 core/src/types/context/write.rs             | 127 +++++-----------------------
 core/src/types/write/writer.rs              |   9 +-
 39 files changed, 221 insertions(+), 298 deletions(-)

diff --git a/core/src/layers/async_backtrace.rs 
b/core/src/layers/async_backtrace.rs
index 6d77591ca0..290171c29c 100644
--- a/core/src/layers/async_backtrace.rs
+++ b/core/src/layers/async_backtrace.rs
@@ -169,7 +169,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
AsyncBacktraceWrapper<R> {
 
 impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
     #[async_backtrace::framed]
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs).await
     }
 
@@ -185,7 +185,7 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> 
{
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for AsyncBacktraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
index 7bb20d42fb..58fd73e8fe 100644
--- a/core/src/layers/await_tree.rs
+++ b/core/src/layers/await_tree.rs
@@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
AwaitTreeWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         self.inner
             .write(bs)
             .instrument_await(format!("opendal::{}", 
WriteOperation::Write.into_static()))
@@ -211,7 +211,7 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 70830981c8..7293ced965 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -288,7 +288,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 }
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.handle.block_on(self.inner.write(bs))
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index c2156a0418..68b0340aa7 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -654,7 +654,7 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
@@ -689,13 +689,12 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
 where
     W: oio::BlockingWrite,
 {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        let n = w.write(bs)?;
 
-        Ok(n)
+        w.write(bs)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index a1a61ad01d..87ad19b50c 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -262,7 +262,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs).await
     }
 
@@ -276,7 +276,7 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs
index c71277ed9a..51b001313b 100644
--- a/core/src/layers/dtrace.rs
+++ b/core/src/layers/dtrace.rs
@@ -379,15 +379,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
DtraceLayerWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let c_path = CString::new(self.path.clone()).unwrap();
         probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
         self.inner
             .write(bs)
             .await
-            .map(|n| {
-                probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n);
-                n
+            .map(|_| {
+                probe_lazy!(opendal, writer_write_ok, c_path.as_ptr());
             })
             .map_err(|err| {
                 probe_lazy!(opendal, writer_write_error, c_path.as_ptr());
@@ -427,14 +426,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let c_path = CString::new(self.path.clone()).unwrap();
         probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
         self.inner
             .write(bs)
-            .map(|n| {
-                probe_lazy!(opendal, blocking_writer_write_ok, 
c_path.as_ptr(), n);
-                n
+            .map(|_| {
+                probe_lazy!(opendal, blocking_writer_write_ok, 
c_path.as_ptr());
             })
             .map_err(|err| {
                 probe_lazy!(opendal, blocking_writer_write_error, 
c_path.as_ptr());
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 86ae9dba80..cabe84b053 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -385,14 +385,13 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 }
 
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
         self.inner
             .write(bs)
             .await
-            .map(|n| {
-                self.processed += n as u64;
-                n
+            .map(|_| {
+                self.processed += size as u64;
             })
             .map_err(|err| {
                 err.with_operation(WriteOperation::Write)
@@ -423,13 +422,12 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> 
{
 }
 
 impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
         self.inner
             .write(bs)
-            .map(|n| {
-                self.processed += n as u64;
-                n
+            .map(|_| {
+                self.processed += size as u64;
             })
             .map_err(|err| {
                 err.with_operation(WriteOperation::BlockingWrite)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1e0d80d264..507745c6d1 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1072,21 +1072,20 @@ impl<W> LoggingWriter<W> {
 }
 
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        match self.inner.write(bs.clone()).await {
-            Ok(n) => {
-                self.written += n as u64;
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+        match self.inner.write(bs).await {
+            Ok(_) => {
                 trace!(
                     target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> input data 
{}B, write {}B",
+                    "service={} operation={} path={} written={}B -> data write 
{}B",
                     self.ctx.scheme,
                     WriteOperation::Write,
                     self.path,
                     self.written,
-                    bs.len(),
-                    n,
+                    size,
                 );
-                Ok(n)
+                Ok(())
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1170,21 +1169,19 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
 }
 
 impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         match self.inner.write(bs.clone()) {
-            Ok(n) => {
-                self.written += n as u64;
+            Ok(_) => {
                 trace!(
                     target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> input data 
{}B, write {}B",
+                    "service={} operation={} path={} written={}B -> data write 
{}B",
                     self.ctx.scheme,
                     WriteOperation::BlockingWrite,
                     self.path,
                     self.written,
                     bs.len(),
-                    n
                 );
-                Ok(n)
+                Ok(())
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 47d7a8a936..decbad81a8 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -785,17 +785,17 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let start = Instant::now();
+        let size = bs.len();
 
         self.inner
             .write(bs)
             .await
-            .map(|n| {
-                self.bytes_counter.increment(n as u64);
+            .map(|_| {
+                self.bytes_counter.increment(size as u64);
                 self.requests_duration_seconds
                     .record(start.elapsed().as_secs_f64());
-                n
             })
             .map_err(|err| {
                 self.handle.increment_errors_total(self.op, err.kind());
@@ -819,12 +819,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+
         self.inner
             .write(bs)
-            .map(|n| {
-                self.bytes_counter.increment(n as u64);
-                n
+            .map(|_| {
+                self.bytes_counter.increment(size as u64);
             })
             .map_err(|err| {
                 self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 983d18b6b8..bca4a2e584 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -306,7 +306,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
         self.inner.write(bs)
@@ -326,7 +326,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
         self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 6fb5d582ba..b04ad22fdd 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -284,7 +284,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         self.inner.write(bs)
     }
 
@@ -298,7 +298,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index ee740d55c4..a4359ead72 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -742,7 +742,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
             WriteOperation::Write.into_static(),
@@ -758,12 +760,12 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
         timer.observe_duration();
 
         match res {
-            Ok(n) => {
+            Ok(_) => {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
-                    .observe(n as f64);
-                Ok(n)
+                    .observe(size as f64);
+                Ok(())
             }
             Err(err) => {
                 self.stats.increment_errors_total(self.op, err.kind());
@@ -822,7 +824,9 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
             Operation::BlockingWrite.into_static(),
@@ -838,12 +842,12 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
PrometheusMetricWrapper<R> {
         timer.observe_duration();
 
         match res {
-            Ok(n) => {
+            Ok(_) => {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
-                    .observe(n as f64);
-                Ok(n)
+                    .observe(size as f64);
+                Ok(())
             }
             Err(err) => {
                 self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/prometheus_client.rs 
b/core/src/layers/prometheus_client.rs
index 463e409275..58fef7e894 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -627,24 +627,24 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let start = Instant::now();
+        let size = bs.len();
 
         self.inner
             .write(bs)
             .await
-            .map(|n| {
+            .map(|_| {
                 self.metrics.observe_bytes_total(
                     self.scheme,
                     WriteOperation::Write.into_static(),
-                    n,
+                    size,
                 );
                 self.metrics.observe_request_duration(
                     self.scheme,
                     WriteOperation::Write.into_static(),
                     start.elapsed(),
                 );
-                n
             })
             .map_err(|err| {
                 self.metrics.increment_errors_total(
@@ -704,23 +704,23 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let start = Instant::now();
+        let size = bs.len();
 
         self.inner
             .write(bs)
-            .map(|n| {
+            .map(|_| {
                 self.metrics.observe_bytes_total(
                     self.scheme,
                     WriteOperation::BlockingWrite.into_static(),
-                    n,
+                    size,
                 );
                 self.metrics.observe_request_duration(
                     self.scheme,
                     WriteOperation::BlockingWrite.into_static(),
                     start.elapsed(),
                 );
-                n
             })
             .map_err(|err| {
                 self.metrics.increment_errors_total(
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index c48307fdbc..4d42098394 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -626,7 +626,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 }
 
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         use backon::RetryableWithContext;
 
         let inner = self.take_inner()?;
@@ -694,7 +694,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
 }
 
 impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for 
RetryWrapper<R, I> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         { || self.inner.as_mut().unwrap().write(bs.clone()) }
             .retry(&self.builder)
             .when(|e| e.is_temporary())
@@ -938,8 +938,8 @@ mod tests {
     struct MockWriter {}
 
     impl oio::Write for MockWriter {
-        async fn write(&mut self, bs: Buffer) -> Result<usize> {
-            Ok(bs.len())
+        async fn write(&mut self, _: Buffer) -> Result<()> {
+            Ok(())
         }
 
         async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3b66e327f3..f73f33d3ca 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
 
         loop {
@@ -226,7 +226,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
 
         loop {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 246049dfbf..1cbc0c5ac1 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -350,7 +350,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let fut = self.inner.write(bs);
         Self::io_timeout(self.timeout, WriteOperation::Write.into_static(), 
fut).await
     }
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 4ba829a6ec..4a2dc4bc00 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -286,7 +286,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         self.inner.write(bs)
     }
 
@@ -312,7 +312,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index bb08e4cb19..625e7ea982 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -242,10 +242,9 @@ impl<S> KvWriter<S> {
 unsafe impl<S: Adapter> Sync for KvWriter<S> {}
 
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let ret = bs.len();
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.buffer.push(bs);
-        Ok(ret)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -260,10 +259,9 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let ret = bs.len();
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.buffer.push(bs);
-        Ok(ret)
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index ecce2eb879..fd6271691b 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -275,12 +275,11 @@ impl<S> KvWriter<S> {
 }
 
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let size = bs.len();
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let mut buf = self.buf.take().unwrap_or_default();
         buf.push(bs);
         self.buf = Some(buf);
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -303,12 +302,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let size = bs.len();
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let mut buf = self.buf.take().unwrap_or_default();
         buf.push(bs);
         self.buf = Some(buf);
-        Ok(size)
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs
index c22411904d..111da78be0 100644
--- a/core/src/raw/enum_utils.rs
+++ b/core/src/raw/enum_utils.rs
@@ -70,7 +70,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> 
oio::BlockingRead for TwoWa
 }
 
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         match self {
             Self::One(v) => v.write(bs).await,
             Self::Two(v) => v.write(bs).await,
@@ -129,7 +129,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, THREE: 
oio::BlockingRead> o
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWays<ONE, TWO, THREE>
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         match self {
             Self::One(v) => v.write(bs).await,
             Self::Two(v) => v.write(bs).await,
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index e6c7c05918..4ec53adab6 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -77,31 +77,19 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// # Behavior
     ///
-    /// - `Ok(n)` means `n` bytes has been written successfully.
+    /// - `Ok(())` means all bytes has been written successfully.
     /// - `Err(err)` means error happens and no bytes has been written.
-    ///
-    /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
-    /// repeatedly until all bytes has been written.
-    #[cfg(not(target_arch = "wasm32"))]
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend;
-    #[cfg(target_arch = "wasm32")]
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>>;
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend;
 
     /// Close the writer and make sure all data has been flushed.
-    #[cfg(not(target_arch = "wasm32"))]
     fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
-    #[cfg(target_arch = "wasm32")]
-    fn close(&mut self) -> impl Future<Output = Result<()>>;
 
     /// Abort the pending writer.
-    #[cfg(not(target_arch = "wasm32"))]
     fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
-    #[cfg(target_arch = "wasm32")]
-    fn abort(&mut self) -> impl Future<Output = Result<()>>;
 }
 
 impl Write for () {
-    async fn write(&mut self, _: Buffer) -> Result<usize> {
+    async fn write(&mut self, _: Buffer) -> Result<()> {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
@@ -121,7 +109,7 @@ impl Write for () {
 }
 
 pub trait WriteDyn: Unpin + Send + Sync {
-    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>>;
+    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>>;
 
     fn close_dyn(&mut self) -> BoxedFuture<Result<()>>;
 
@@ -129,7 +117,7 @@ pub trait WriteDyn: Unpin + Send + Sync {
 }
 
 impl<T: Write + ?Sized> WriteDyn for T {
-    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>> {
+    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>> {
         Box::pin(self.write(bs))
     }
 
@@ -143,7 +131,7 @@ impl<T: Write + ?Sized> WriteDyn for T {
 }
 
 impl<T: WriteDyn + ?Sized> Write for Box<T> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.deref_mut().write_dyn(bs).await
     }
 
@@ -170,14 +158,14 @@ pub trait BlockingWrite: Send + Sync + 'static {
     ///
     /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
     /// repeatedly until all bytes has been written.
-    fn write(&mut self, bs: Buffer) -> Result<usize>;
+    fn write(&mut self, bs: Buffer) -> Result<()>;
 
     /// Close the writer and make sure all data has been flushed.
     fn close(&mut self) -> Result<()>;
 }
 
 impl BlockingWrite for () {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for 
oio::BlockingWrite")
@@ -195,7 +183,7 @@ impl BlockingWrite for () {
 ///
 /// To make BlockingWriter work as expected, we must add this impl.
 impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         (**self).write(bs)
     }
 
diff --git a/core/src/raw/oio/write/append_write.rs 
b/core/src/raw/oio/write/append_write.rs
index 2f48b68307..06c72cc5e2 100644
--- a/core/src/raw/oio/write/append_write.rs
+++ b/core/src/raw/oio/write/append_write.rs
@@ -80,7 +80,7 @@ impl<W> oio::Write for AppendWriter<W>
 where
     W: AppendWrite,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let offset = match self.offset {
             Some(offset) => offset,
             None => {
@@ -91,12 +91,10 @@ where
         };
 
         let size = bs.len();
-        self.inner
-            .append(offset, size as u64, Buffer::from(bs.to_bytes()))
-            .await?;
+        self.inner.append(offset, size as u64, bs).await?;
         // Update offset after succeed.
         self.offset = Some(offset + size as u64);
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/block_write.rs 
b/core/src/raw/oio/write/block_write.rs
index 99c76562ca..cd0ec43b45 100644
--- a/core/src/raw/oio/write/block_write.rs
+++ b/core/src/raw/oio/write/block_write.rs
@@ -162,10 +162,10 @@ impl<W> oio::Write for BlockWriter<W>
 where
     W: BlockWrite,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         if !self.started && self.cache.is_none() {
-            let size = self.fill_cache(bs);
-            return Ok(size);
+            self.fill_cache(bs);
+            return Ok(());
         }
 
         // The block upload process has been started.
@@ -181,8 +181,8 @@ where
             })
             .await?;
         self.cache = None;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/multipart_write.rs 
b/core/src/raw/oio/write/multipart_write.rs
index 0d893d7cb3..44a33c7a4b 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -203,14 +203,14 @@ impl<W> oio::Write for MultipartWriter<W>
 where
     W: MultipartWrite,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let upload_id = match self.upload_id.clone() {
             Some(v) => v,
             None => {
                 // Fill cache with the first write.
                 if self.cache.is_none() {
-                    let size = self.fill_cache(bs);
-                    return Ok(size);
+                    self.fill_cache(bs);
+                    return Ok(());
                 }
 
                 let upload_id = self.w.initiate_part().await?;
@@ -234,8 +234,8 @@ where
             .await?;
         self.cache = None;
         self.next_part_number += 1;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index cd056c1461..938973c33a 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -50,16 +50,15 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 }
 
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         match &self.buffer {
             Some(_) => Err(Error::new(
                 ErrorKind::Unsupported,
                 "OneShotWriter doesn't support multiple write",
             )),
             None => {
-                let size = bs.len();
                 self.buffer = Some(bs);
-                Ok(size)
+                Ok(())
             }
         }
     }
diff --git a/core/src/raw/oio/write/position_write.rs 
b/core/src/raw/oio/write/position_write.rs
index 3dbf5c93ef..5aa5ff3294 100644
--- a/core/src/raw/oio/write/position_write.rs
+++ b/core/src/raw/oio/write/position_write.rs
@@ -124,10 +124,10 @@ impl<W: PositionWrite> PositionWriter<W> {
 }
 
 impl<W: PositionWrite> oio::Write for PositionWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         if self.cache.is_none() {
-            let size = self.fill_cache(bs);
-            return Ok(size);
+            let _ = self.fill_cache(bs);
+            return Ok(());
         }
 
         let bytes = self.cache.clone().expect("pending write must exist");
@@ -144,8 +144,8 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {
             .await?;
         self.cache = None;
         self.next_offset += length;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        let _ = self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/range_write.rs 
b/core/src/raw/oio/write/range_write.rs
index 67ae619dd9..f44f06ad9c 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -155,14 +155,14 @@ impl<W: RangeWrite> RangeWriter<W> {
 }
 
 impl<W: RangeWrite> oio::Write for RangeWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let location = match self.location.clone() {
             Some(location) => location,
             None => {
                 // Fill cache with the first write.
                 if self.cache.is_none() {
-                    let size = self.fill_cache(bs);
-                    return Ok(size);
+                    self.fill_cache(bs);
+                    return Ok(());
                 }
 
                 let location = self.w.initiate_range().await?;
@@ -187,8 +187,8 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
             .await?;
         self.cache = None;
         self.next_offset += length;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/aliyun_drive/writer.rs 
b/core/src/services/aliyun_drive/writer.rs
index 30ca2ef94e..461764541f 100644
--- a/core/src/services/aliyun_drive/writer.rs
+++ b/core/src/services/aliyun_drive/writer.rs
@@ -51,7 +51,7 @@ impl AliyunDriveWriter {
 }
 
 impl oio::Write for AliyunDriveWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let (upload_id, file_id) = match (self.upload_id.as_ref(), 
self.file_id.as_ref()) {
             (Some(upload_id), Some(file_id)) => (upload_id, file_id),
             _ => {
@@ -94,8 +94,6 @@ impl oio::Write for AliyunDriveWriter {
             return Err(Error::new(ErrorKind::Unexpected, "cannot find 
upload_url"));
         };
 
-        let size = bs.len();
-
         if let Err(err) = self.core.upload(upload_url, bs).await {
             if err.kind() != ErrorKind::AlreadyExists {
                 return Err(err);
@@ -104,7 +102,7 @@ impl oio::Write for AliyunDriveWriter {
 
         self.part_number += 1;
 
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/alluxio/writer.rs 
b/core/src/services/alluxio/writer.rs
index e5ca807b60..f452b6b10b 100644
--- a/core/src/services/alluxio/writer.rs
+++ b/core/src/services/alluxio/writer.rs
@@ -43,7 +43,7 @@ impl AlluxioWriter {
 }
 
 impl oio::Write for AlluxioWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let stream_id = match self.stream_id {
             Some(stream_id) => stream_id,
             None => {
@@ -52,9 +52,8 @@ impl oio::Write for AlluxioWriter {
                 stream_id
             }
         };
-        self.core
-            .write(stream_id, Buffer::from(bs.to_bytes()))
-            .await
+        self.core.write(stream_id, bs).await?;
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/compfs/writer.rs 
b/core/src/services/compfs/writer.rs
index 2e12ea2dc7..749cab10ff 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{io::Cursor, sync::Arc};
-
-use compio::{buf::buf_try, fs::File, io::AsyncWrite};
-
 use super::core::CompfsCore;
 use crate::raw::*;
 use crate::*;
+use compio::io::AsyncWriteExt;
+use compio::{buf::buf_try, fs::File};
+use std::{io::Cursor, sync::Arc};
 
 #[derive(Debug)]
 pub struct CompfsWriter {
@@ -36,17 +35,22 @@ impl CompfsWriter {
 }
 
 impl oio::Write for CompfsWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    /// FIXME
+    ///
+    /// the write_all doesn't work correctly if `bs` is non-contiguous.
+    ///
+    /// The IoBuf::buf_len() only returns the length of the current buffer.
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let mut file = self.file.clone();
 
-        let n = self
-            .core
+        self.core
             .exec(move || async move {
-                let (n, _) = buf_try!(@try file.write(bs).await);
-                Ok(n)
+                buf_try!(@try file.write_all(bs).await);
+                Ok(())
             })
             .await?;
-        Ok(n)
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index b9e84d736c..d0d1bc6674 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -53,11 +53,15 @@ impl<F> FsWriter<F> {
 unsafe impl<F> Sync for FsWriter<F> {}
 
 impl oio::Write for FsWriter<tokio::fs::File> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("FsWriter must be initialized");
 
-        // TODO: use write_vectored instead.
-        f.write(bs.chunk()).await.map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -88,10 +92,15 @@ impl oio::Write for FsWriter<tokio::fs::File> {
 }
 
 impl oio::BlockingWrite for FsWriter<std::fs::File> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("FsWriter must be initialized");
 
-        f.write(bs.chunk()).map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index ec21609c18..f7f6864dd9 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,7 +53,7 @@ impl FtpWriter {
 }
 
 impl oio::Write for FtpWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let path = if let Some(tmp_path) = &self.tmp_path {
             tmp_path
         } else {
@@ -69,17 +69,20 @@ impl oio::Write for FtpWriter {
             ));
         }
 
-        let size = self
-            .data_stream
-            .as_mut()
-            .unwrap()
-            .write(bs.chunk())
-            .await
-            .map_err(|err| {
-                Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
-            })?;
+        while bs.has_remaining() {
+            let n = self
+                .data_stream
+                .as_mut()
+                .unwrap()
+                .write(bs.chunk())
+                .await
+                .map_err(|err| {
+                    Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
+                })?;
+            bs.advance(n);
+        }
 
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 00bce7fac3..3b58dd0fc2 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -38,7 +38,7 @@ impl GhacWriter {
 }
 
 impl oio::Write for GhacWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
         let offset = self.size;
 
@@ -61,7 +61,7 @@ impl oio::Write for GhacWriter {
         }
 
         self.size += size as u64;
-        Ok(size)
+        Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 96708c1ab1..0f60014f41 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -53,10 +53,15 @@ impl<F> HdfsWriter<F> {
 }
 
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("HdfsWriter must be initialized");
 
-        f.write(bs.chunk()).await.map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -82,9 +87,14 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
 }
 
 impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("HdfsWriter must be initialized");
-        f.write(bs.chunk()).map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/hdfs_native/writer.rs 
b/core/src/services/hdfs_native/writer.rs
index e6fb0205e4..4cab45b3be 100644
--- a/core/src/services/hdfs_native/writer.rs
+++ b/core/src/services/hdfs_native/writer.rs
@@ -31,7 +31,7 @@ impl HdfsNativeWriter {
 }
 
 impl oio::Write for HdfsNativeWriter {
-    async fn write(&mut self, _bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, _bs: Buffer) -> Result<()> {
         todo!()
     }
 
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 69bff86245..9b86c78957 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -39,8 +39,17 @@ impl SftpWriter {
 }
 
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        self.file.write(bs.chunk()).await.map_err(new_std_io_error)
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
+        while bs.has_remaining() {
+            let n = self
+                .file
+                .write(bs.chunk())
+                .await
+                .map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/types/blocking_write/std_writer.rs 
b/core/src/types/blocking_write/std_writer.rs
index 5b18467e36..91accbde53 100644
--- a/core/src/types/blocking_write/std_writer.rs
+++ b/core/src/types/blocking_write/std_writer.rs
@@ -103,10 +103,9 @@ impl Write for StdWriter {
                 return Ok(());
             };
 
-            let n = w
-                .write(Buffer::from(bs))
+            w.write(Buffer::from(bs))
                 .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, 
err))?;
-            self.buf.advance(n);
+            self.buf.clean();
         }
     }
 }
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index 557341da3d..6e2464cfce 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -18,7 +18,6 @@
 use crate::raw::oio::Write;
 use crate::raw::*;
 use crate::*;
-use bytes::Buf;
 use std::sync::Arc;
 
 /// WriteContext holds the immutable context for give write operation.
@@ -136,7 +135,9 @@ impl WriteGenerator<oio::Writer> {
     /// Write the entire buffer into writer.
     pub async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
         let Some(chunk_size) = self.chunk_size else {
-            return self.w.write_dyn(bs).await;
+            let size = bs.len();
+            self.w.write_dyn(bs).await?;
+            return Ok(size);
         };
 
         if self.buffer.len() + bs.len() < chunk_size {
@@ -153,10 +154,8 @@ impl WriteGenerator<oio::Writer> {
         if !self.exact {
             let fill_size = bs.len();
             self.buffer.push(bs);
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write_dyn(buf.clone()).await?;
-            buf.advance(written);
-            self.buffer.push(buf);
+            let buf = self.buffer.take().collect();
+            self.w.write_dyn(buf).await?;
             return Ok(fill_size);
         }
 
@@ -167,10 +166,8 @@ impl WriteGenerator<oio::Writer> {
         // Action:
         // - write existing buffer in chunk_size to make more rooms for 
writing data.
         if self.buffer.len() >= chunk_size {
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write_dyn(buf.clone()).await?;
-            buf.advance(written);
-            self.buffer.push(buf);
+            let buf = self.buffer.take().collect();
+            self.w.write_dyn(buf).await?;
         }
 
         // Condition
@@ -192,8 +189,8 @@ impl WriteGenerator<oio::Writer> {
                 break;
             }
 
-            let written = 
self.w.write_dyn(self.buffer.clone().collect()).await?;
-            self.buffer.advance(written);
+            let buf = self.buffer.take().collect();
+            self.w.write_dyn(buf).await?;
         }
 
         self.w.close().await
@@ -225,7 +222,9 @@ impl WriteGenerator<oio::BlockingWriter> {
     /// Write the entire buffer into writer.
     pub fn write(&mut self, mut bs: Buffer) -> Result<usize> {
         let Some(chunk_size) = self.chunk_size else {
-            return self.w.write(bs);
+            let size = bs.len();
+            self.w.write(bs)?;
+            return Ok(size);
         };
 
         if self.buffer.len() + bs.len() < chunk_size {
@@ -242,10 +241,8 @@ impl WriteGenerator<oio::BlockingWriter> {
         if !self.exact {
             let fill_size = bs.len();
             self.buffer.push(bs);
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write(buf.clone())?;
-            buf.advance(written);
-            self.buffer.push(buf);
+            let buf = self.buffer.take().collect();
+            self.w.write(buf)?;
             return Ok(fill_size);
         }
 
@@ -256,10 +253,8 @@ impl WriteGenerator<oio::BlockingWriter> {
         // Action:
         // - write existing buffer in chunk_size to make more rooms for 
writing data.
         if self.buffer.len() >= chunk_size {
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write(buf.clone())?;
-            buf.advance(written);
-            self.buffer.push(buf);
+            let buf = self.buffer.take().collect();
+            self.w.write(buf)?;
         }
 
         // Condition
@@ -281,8 +276,8 @@ impl WriteGenerator<oio::BlockingWriter> {
                 break;
             }
 
-            let written = self.w.write(self.buffer.clone().collect())?;
-            self.buffer.advance(written);
+            let buf = self.buffer.take().collect();
+            self.w.write(buf)?;
         }
 
         self.w.close()
@@ -293,8 +288,8 @@ impl WriteGenerator<oio::BlockingWriter> {
 mod tests {
     use super::*;
     use crate::raw::oio::Write;
-    use bytes::Buf;
     use bytes::Bytes;
+    use bytes::{Buf, BufMut};
     use log::debug;
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
@@ -309,13 +304,12 @@ mod tests {
     }
 
     impl Write for MockWriter {
-        async fn write(&mut self, bs: Buffer) -> Result<usize> {
+        async fn write(&mut self, bs: Buffer) -> Result<()> {
             debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len());
 
-            let chunk = bs.chunk();
             let mut buf = self.buf.lock().await;
-            buf.extend_from_slice(chunk);
-            Ok(chunk.len())
+            buf.put(bs);
+            Ok(())
         }
 
         async fn close(&mut self) -> Result<()> {
@@ -478,83 +472,6 @@ mod tests {
         Ok(())
     }
 
-    struct PartialWriter {
-        buf: Arc<Mutex<Vec<u8>>>,
-    }
-
-    impl Write for PartialWriter {
-        async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
-            let mut buf = self.buf.lock().await;
-
-            if Buffer::count(&bs) > 1 {
-                // Always leaves last buffer for non-contiguous buffer.
-                let mut written = 0;
-                while Buffer::count(&bs) > 1 {
-                    let chunk = bs.chunk();
-                    buf.extend_from_slice(chunk);
-                    written += chunk.len();
-                    bs.advance(chunk.len());
-                }
-                Ok(written)
-            } else {
-                let chunk = bs.chunk();
-                buf.extend_from_slice(chunk);
-                Ok(chunk.len())
-            }
-        }
-
-        async fn close(&mut self) -> Result<()> {
-            Ok(())
-        }
-
-        async fn abort(&mut self) -> Result<()> {
-            Ok(())
-        }
-    }
-
-    #[tokio::test]
-    async fn test_inexact_buf_writer_partial_send() -> Result<()> {
-        let _ = tracing_subscriber::fmt()
-            .pretty()
-            .with_test_writer()
-            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
-            .try_init();
-
-        let buf = Arc::new(Mutex::new(vec![]));
-        let mut w = WriteGenerator::new(
-            Box::new(PartialWriter { buf: buf.clone() }),
-            Some(10),
-            false,
-        );
-
-        let mut rng = thread_rng();
-        let mut expected = vec![];
-
-        let mut new_content = |size| {
-            let mut content = vec![0; size];
-            rng.fill_bytes(&mut content);
-            expected.extend_from_slice(&content);
-            Bytes::from(content)
-        };
-
-        // content < chunk size.
-        let content = new_content(5);
-        assert_eq!(5, w.write(content.into()).await?);
-        // Non-contiguous buffer.
-        let content = Buffer::from(vec![new_content(3), new_content(2)]);
-        assert_eq!(5, w.write(content).await?);
-
-        w.close().await?;
-
-        let buf = buf.lock().await;
-        assert_eq!(buf.len(), expected.len());
-        assert_eq!(
-            format!("{:x}", Sha256::digest(&*buf)),
-            format!("{:x}", Sha256::digest(&expected))
-        );
-        Ok(())
-    }
-
     #[tokio::test]
     async fn test_fuzz_exact_buf_writer() -> Result<()> {
         let _ = tracing_subscriber::fmt()
diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs
index dc81d3d716..114ec886aa 100644
--- a/core/src/types/write/writer.rs
+++ b/core/src/types/write/writer.rs
@@ -141,6 +141,7 @@ impl Writer {
             let n = self.inner.write(bs.clone()).await?;
             bs.advance(n);
         }
+
         Ok(())
     }
 
@@ -153,12 +154,8 @@ impl Writer {
     /// Optimize this function to avoid unnecessary copy.
     pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> {
         let mut bs = bs;
-        let mut bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
-        while !bs.is_empty() {
-            let n = self.inner.write(bs.clone()).await?;
-            bs.advance(n);
-        }
-        Ok(())
+        let bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
+        self.write(bs).await
     }
 
     /// Abort the writer and clean up all written data.

Reply via email to