This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c94a2142 refactor(raw): Return written bytes in oio::Write (#3005)
2c94a2142 is described below

commit 2c94a2142b998cb9c86ddf9629582594340bb91e
Author: Xuanwo <[email protected]>
AuthorDate: Mon Sep 4 14:51:17 2023 +0800

    refactor(raw): Return written bytes in oio::Write (#3005)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        |  8 ++---
 core/src/layers/blocking.rs                      |  2 +-
 core/src/layers/complete.rs                      | 16 ++++-----
 core/src/layers/concurrent_limit.rs              |  6 ++--
 core/src/layers/error_context.rs                 |  6 ++--
 core/src/layers/logging.rs                       | 29 ++++++++-------
 core/src/layers/madsim.rs                        |  4 +--
 core/src/layers/metrics.rs                       | 23 +++++++-----
 core/src/layers/minitrace.rs                     |  6 ++--
 core/src/layers/oteltrace.rs                     |  6 ++--
 core/src/layers/prometheus.rs                    | 23 ++++++------
 core/src/layers/retry.rs                         |  8 ++---
 core/src/layers/throttle.rs                      |  6 ++--
 core/src/layers/timeout.rs                       |  4 +--
 core/src/layers/tracing.rs                       |  6 ++--
 core/src/raw/adapters/kv/backend.rs              | 12 ++++---
 core/src/raw/adapters/typed_kv/backend.rs        | 12 ++++---
 core/src/raw/oio/write/api.rs                    | 46 ++++++++++++------------
 core/src/raw/oio/write/append_object_write.rs    | 12 ++++---
 core/src/raw/oio/write/at_least_buf_write.rs     | 23 ++++++++----
 core/src/raw/oio/write/compose_write.rs          |  8 ++---
 core/src/raw/oio/write/exact_buf_write.rs        | 19 +++++-----
 core/src/raw/oio/write/multipart_upload_write.rs | 12 ++++---
 core/src/raw/oio/write/one_shot_write.rs         | 15 ++++----
 core/src/services/azblob/writer.rs               | 20 ++++++-----
 core/src/services/azdfs/writer.rs                |  8 +++--
 core/src/services/dropbox/writer.rs              | 10 +++---
 core/src/services/fs/writer.rs                   | 16 +++++----
 core/src/services/ftp/writer.rs                  |  8 +++--
 core/src/services/gcs/writer.rs                  | 22 +++++++-----
 core/src/services/gdrive/writer.rs               | 11 +++---
 core/src/services/ghac/writer.rs                 |  6 ++--
 core/src/services/hdfs/writer.rs                 | 14 +++++---
 core/src/services/ipmfs/writer.rs                |  7 ++--
 core/src/services/onedrive/writer.rs             | 10 +++---
 core/src/services/sftp/writer.rs                 |  7 ++--
 core/src/services/supabase/writer.rs             | 10 +++---
 core/src/services/vercel_artifacts/writer.rs     |  8 +++--
 core/src/services/wasabi/writer.rs               | 10 +++---
 core/src/services/webdav/writer.rs               | 15 +++++---
 core/src/services/webhdfs/writer.rs              | 10 +++---
 core/src/types/writer.rs                         | 28 +++++++++++----
 42 files changed, 310 insertions(+), 222 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 0e70bcfc7..9a14442c2 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -27,12 +27,12 @@ pub struct BlackHoleWriter;
 
 #[async_trait]
 impl oio::Write for BlackHoleWriter {
-    async fn write(&mut self, _: Bytes) -> opendal::Result<()> {
-        Ok(())
+    async fn write(&mut self, bs: Bytes) -> opendal::Result<u64> {
+        Ok(bs.len() as u64)
     }
 
-    async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
-        Ok(())
+    async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
+        Ok(size)
     }
 
     async fn abort(&mut self) -> opendal::Result<()> {
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 7b80b5956..6e530023c 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 }
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.handle.block_on(self.inner.write(bs))
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 4c58b6e30..e986d1a47 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let n = bs.len();
 
         if let Some(size) = self.size {
@@ -731,10 +731,10 @@ where
         })?;
         w.write(bs).await?;
         self.written += n as u64;
-        Ok(())
+        Ok(n as u64)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         if let Some(total_size) = self.size {
             if self.written + size > total_size {
                 return Err(Error::new(
@@ -750,9 +750,9 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        w.sink(size, s).await?;
-        self.written += size;
-        Ok(())
+        let n = w.sink(size, s).await?;
+        self.written += n;
+        Ok(n)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -794,7 +794,7 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
 where
     W: oio::BlockingWrite,
 {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         let n = bs.len();
 
         if let Some(size) = self.size {
@@ -815,7 +815,7 @@ where
 
         w.write(bs)?;
         self.written += n as u64;
-        Ok(())
+        Ok(n as u64)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 9cef0fb9b..96a682d61 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,7 +285,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs).await
     }
 
@@ -293,7 +293,7 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.abort().await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner.sink(size, s).await
     }
 
@@ -303,7 +303,7 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index bfe9be4df..2acd6dd7d 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 
 #[async_trait::async_trait]
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs).await.map_err(|err| {
             err.with_operation(WriteOperation::Write)
                 .with_context("service", self.scheme)
@@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner.sink(size, s).await.map_err(|err| {
             err.with_operation(WriteOperation::Sink)
                 .with_context("service", self.scheme)
@@ -437,7 +437,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
 }
 
 impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs).map_err(|err| {
             err.with_operation(WriteOperation::BlockingWrite)
                 .with_context("service", self.scheme)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 07323cf8c..6c63f466f 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,11 +1252,10 @@ impl<W> LoggingWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         match self.inner.write(bs).await {
-            Ok(_) => {
-                self.written += size as u64;
+            Ok(n) => {
+                self.written += n;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data write 
{}B",
@@ -1264,9 +1263,9 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                     WriteOperation::Write,
                     self.path,
                     self.written,
-                    size
+                    n
                 );
-                Ok(())
+                Ok(n)
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1286,10 +1285,10 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         match self.inner.sink(size, s).await {
-            Ok(_) => {
-                self.written += size;
+            Ok(n) => {
+                self.written += n;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data sink 
{}B",
@@ -1297,9 +1296,9 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                     WriteOperation::Sink,
                     self.path,
                     self.written,
-                    size
+                    n
                 );
-                Ok(())
+                Ok(n)
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1383,11 +1382,11 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
 }
 
 impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         let size = bs.len();
         match self.inner.write(bs) {
-            Ok(_) => {
-                self.written += size as u64;
+            Ok(n) => {
+                self.written += n;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data write 
{}B",
@@ -1397,7 +1396,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
                     self.written,
                     size
                 );
-                Ok(())
+                Ok(n)
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index fdf0ec5de..17835e5ba 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
+    async fn write(&mut self, bs: Bytes) -> crate::Result<u64> {
         #[cfg(madsim)]
         {
             let req = Request::Write(self.path.to_string(), bs);
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> 
{
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> 
crate::Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 1eade833b..181ebb3c0 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,23 +847,28 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner
             .write(bs)
             .await
-            .map(|_| self.bytes += size as u64)
+            .map(|n| {
+                self.bytes += n;
+                n
+            })
             .map_err(|err| {
                 self.handle.increment_errors_total(self.op, err.kind());
                 err
             })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner
             .sink(size, s)
             .await
-            .map(|_| self.bytes += size)
+            .map(|n| {
+                self.bytes += n;
+                n
+            })
             .map_err(|err| {
                 self.handle.increment_errors_total(self.op, err.kind());
                 err
@@ -886,11 +891,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner
             .write(bs)
-            .map(|_| self.bytes += size as u64)
+            .map(|n| {
+                self.bytes += n;
+                n
+            })
             .map_err(|err| {
                 self.handle.increment_errors_total(self.op, err.kind());
                 err
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 1213d692e..75c852c3a 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner
             .write(bs)
             .in_span(Span::enter_with_parent(
@@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner
             .sink(size, s)
             .in_span(Span::enter_with_parent(
@@ -379,7 +379,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
         self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 2ae39b05c..fde87e9ba 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,11 +313,11 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs).await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner.sink(size, s).await
     }
 
@@ -331,7 +331,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 005d6aa97..644532bf6 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,16 +662,16 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner
             .write(bs)
             .await
-            .map(|_| {
+            .map(|n| {
                 self.stats
                     .bytes_total
                     .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
-                    .observe(size as f64)
+                    .observe(n as f64);
+                n
             })
             .map_err(|err| {
                 self.stats.increment_errors_total(self.op, err.kind());
@@ -679,15 +679,16 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner
             .sink(size, s)
             .await
-            .map(|_| {
+            .map(|n| {
                 self.stats
                     .bytes_total
                     .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
-                    .observe(size as f64)
+                    .observe(n as f64);
+                n
             })
             .map_err(|err| {
                 self.stats.increment_errors_total(self.op, err.kind());
@@ -711,15 +712,15 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner
             .write(bs)
-            .map(|_| {
+            .map(|n| {
                 self.stats
                     .bytes_total
                     .with_label_values(&[&self.scheme, 
Operation::BlockingWrite.into_static()])
-                    .observe(size as f64)
+                    .observe(n as f64);
+                n
             })
             .map_err(|err| {
                 self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 7517c2c21..07b92e24c 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -873,7 +873,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 
 #[async_trait]
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let mut backoff = self.builder.build();
 
         loop {
@@ -919,14 +919,14 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
     /// The overhead is constant, which means the overhead will not increase 
with the size of
     /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
     /// which is acceptable.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         let s = Arc::new(Mutex::new(s));
 
         let mut backoff = self.builder.build();
 
         loop {
             match self.inner.sink(size, Box::new(s.clone())).await {
-                Ok(_) => return Ok(()),
+                Ok(n) => return Ok(n),
                 Err(e) if !e.is_temporary() => return Err(e),
                 Err(e) => match backoff.next() {
                     None => return Err(e),
@@ -1013,7 +1013,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
 }
 
 impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for 
RetryWrapper<R, I> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         { || self.inner.write(bs.clone()) }
             .retry(&self.builder)
             .when(|e| e.is_temporary())
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index d929226df..a88d1c701 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -217,7 +217,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
 
         loop {
@@ -242,7 +242,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
         self.inner.sink(size, s).await
     }
 
@@ -256,7 +256,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
 
         loop {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index c0fb739aa..be2289d04 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,7 +322,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let timeout = self.io_timeout(bs.len() as u64);
 
         tokio::time::timeout(timeout, self.inner.write(bs))
@@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         let timeout = self.io_timeout(size);
 
         tokio::time::timeout(timeout, self.inner.sink(size, s))
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 9042e6e35..33dcbdebc 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs).await
     }
 
@@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner.sink(size, s).await
     }
 
@@ -358,7 +358,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index d10971104..be4913ff9 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -390,13 +390,14 @@ impl<S> KvWriter<S> {
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
         self.buf = Some(bs.into());
 
-        Ok(())
+        Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
@@ -420,10 +421,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
         self.buf = Some(bs.into());
 
-        Ok(())
+        Ok(size as u64)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 9f6186a38..48232c1fc 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -403,13 +403,14 @@ impl<S> KvWriter<S> {
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
         self.buf.push(bs);
 
-        Ok(())
+        Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
@@ -429,10 +430,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
         self.buf.push(bs);
 
-        Ok(())
+        Ok(size as u64)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index f2bb025af..8ced843da 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -74,31 +74,29 @@ impl From<WriteOperation> for &'static str {
 pub type Writer = Box<dyn Write>;
 
 /// Write is the trait that OpenDAL returns to callers.
-///
-/// # Notes
-///
-/// There are two possible two cases:
-///
-/// - Sized: The total size of the object is known in advance.
-/// - Unsized: The total size of the object is unknown in advance.
-///
-/// And it's possible that the given bs length is less than the total
-/// content length. Users will call write multiple times to write
-/// the whole data.
 #[async_trait]
 pub trait Write: Unpin + Send + Sync {
     /// Write given bytes into writer.
     ///
-    /// # Notes
+    /// # Behavior
     ///
-    /// It's possible that the given bs length is less than the total
-    /// content length. And users will call write multiple times.
+    /// - `Ok(n)` means `n` bytes has been written successfully.
+    /// - `Err(err)` means error happens and no bytes has been written.
     ///
-    /// Please make sure `write` is safe to re-enter.
-    async fn write(&mut self, bs: Bytes) -> Result<()>;
+    /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
+    /// repeatedly until all bytes has been written.
+    async fn write(&mut self, bs: Bytes) -> Result<u64>;
 
     /// Sink given stream into writer.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>;
+    ///
+    /// # Behavior
+    ///
+    /// - `Ok(n)` means `n` bytes has been written successfully.
+    /// - `Err(err)` means error happens and no bytes has been written.
+    ///
+    /// It's possible that `n < size`, caller should pass the remaining bytes
+    /// repeatedly until all bytes has been written.
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -109,13 +107,13 @@ pub trait Write: Unpin + Send + Sync {
 
 #[async_trait]
 impl Write for () {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support sink",
@@ -142,11 +140,11 @@ impl Write for () {
 /// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         (**self).write(bs).await
     }
 
-    async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<u64> {
         (**self).sink(n, s).await
     }
 
@@ -165,14 +163,14 @@ pub type BlockingWriter = Box<dyn BlockingWrite>;
 /// BlockingWrite is the trait that OpenDAL returns to callers.
 pub trait BlockingWrite: Send + Sync + 'static {
     /// Write whole content at once.
-    fn write(&mut self, bs: Bytes) -> Result<()>;
+    fn write(&mut self, bs: Bytes) -> Result<u64>;
 
     /// Close the writer and make sure all data has been flushed.
     fn close(&mut self) -> Result<()>;
 }
 
 impl BlockingWrite for () {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for 
oio::BlockingWrite")
@@ -190,7 +188,7 @@ impl BlockingWrite for () {
 ///
 /// To make BlockingWriter work as expected, we must add this impl.
 impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         (**self).write(bs)
     }
 
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 07fa546cc..b047ef43d 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -79,7 +79,7 @@ impl<W> oio::Write for AppendObjectWriter<W>
 where
     W: AppendObjectWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let offset = self.offset().await?;
 
         let size = bs.len() as u64;
@@ -87,16 +87,20 @@ where
         self.inner
             .append(offset, size, AsyncBody::Bytes(bs))
             .await
-            .map(|_| self.offset = Some(offset + size))
+            .map(|_| self.offset = Some(offset + size))?;
+
+        Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
         let offset = self.offset().await?;
 
         self.inner
             .append(offset, size, AsyncBody::Stream(s))
             .await
-            .map(|_| self.offset = Some(offset + size))
+            .map(|_| self.offset = Some(offset + size))?;
+
+        Ok(size)
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 91adddd30..51f5d2645 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -64,7 +64,7 @@ impl<W: oio::Write> AtLeastBufWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         // If total size is known and equals to given bytes, we can write it 
directly.
         if let Some(total_size) = self.total_size {
             if total_size == bs.len() as u64 {
@@ -74,8 +74,9 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
 
         // Push the bytes into the buffer if the buffer is not full.
         if self.buffer.len() + bs.len() < self.buffer_size {
+            let size = bs.len();
             self.buffer.push(bs);
-            return Ok(());
+            return Ok(size as u64);
         }
 
         let mut buf = self.buffer.clone();
@@ -85,10 +86,13 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
             .sink(buf.len() as u64, Box::new(buf))
             .await
             // Clear buffer if the write is successful.
-            .map(|_| self.buffer.clear())
+            .map(|v| {
+                self.buffer.clear();
+                v
+            })
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
         // If total size is known and equals to given stream, we can write it 
directly.
         if let Some(total_size) = self.total_size {
             if total_size == size {
@@ -98,8 +102,10 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
 
         // Push the bytes into the buffer if the buffer is not full.
         if self.buffer.len() as u64 + size < self.buffer_size as u64 {
-            self.buffer.push(s.collect().await?);
-            return Ok(());
+            let bs = s.collect().await?;
+            let size = bs.len() as u64;
+            self.buffer.push(bs);
+            return Ok(size);
         }
 
         let buf = self.buffer.clone();
@@ -110,7 +116,10 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
             .sink(buffer_size + size, Box::new(stream))
             .await
             // Clear buffer if the write is successful.
-            .map(|_| self.buffer.clear())
+            .map(|v| {
+                self.buffer.clear();
+                v
+            })
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 043df2978..79ddfc5ed 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -57,14 +57,14 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
 
 #[async_trait]
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
         match self {
             Self::One(one) => one.sink(size, s).await,
             Self::Two(two) => two.sink(size, s).await,
@@ -102,7 +102,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, 
THREE: oio::Write> {
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWaysWriter<ONE, TWO, THREE>
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
@@ -110,7 +110,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
         match self {
             Self::One(one) => one.sink(size, s).await,
             Self::Two(two) => two.sink(size, s).await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 8561c1a4a..8e2d8a922 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -84,7 +84,7 @@ impl<W: oio::Write> ExactBufWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
             .await
     }
@@ -92,7 +92,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     /// # TODO
     ///
     /// We know every stream size, we can collect them into a buffer without 
chain them every time.
-    async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> {
+    async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<u64> {
         if self.buffer.len() >= self.buffer_size {
             let mut buf = self.buffer.clone();
             let to_write = buf.split_to(self.buffer_size);
@@ -101,9 +101,10 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                 .sink(to_write.len() as u64, Box::new(to_write))
                 .await
                 // Replace buffer with remaining if the write is successful.
-                .map(|_| {
+                .map(|v| {
                     self.buffer = buf;
                     self.chain_stream(s);
+                    v
                 });
         }
 
@@ -120,8 +121,9 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
         //
         // We don't need to chain stream here because it must be consumed.
         if buf.len() < self.buffer_size {
+            let size = buf.len() as u64;
             self.buffer = buf;
-            return Ok(());
+            return Ok(size);
         }
 
         let to_write = buf.split_to(self.buffer_size);
@@ -129,9 +131,10 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
             .sink(to_write.len() as u64, Box::new(to_write))
             .await
             // Replace buffer with remaining if the write is successful.
-            .map(|_| {
+            .map(|v| {
                 self.buffer = buf;
                 self.chain_stream(s);
+                v
             })
     }
 
@@ -202,14 +205,14 @@ mod tests {
 
     #[async_trait]
     impl Write for MockWriter {
-        async fn write(&mut self, bs: Bytes) -> Result<()> {
+        async fn write(&mut self, bs: Bytes) -> Result<u64> {
             debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
 
             self.buf.extend_from_slice(&bs);
-            Ok(())
+            Ok(bs.len() as u64)
         }
 
-        async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+        async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
             let bs = s.collect().await?;
             assert_eq!(bs.len() as u64, size);
             self.write(bs).await
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index c013b24c3..7bfd0342c 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -120,7 +120,7 @@ impl<W> oio::Write for MultipartUploadWriter<W>
 where
     W: MultipartUploadWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let upload_id = self.upload_id().await?;
 
         let size = bs.len();
@@ -133,16 +133,20 @@ where
                 AsyncBody::Bytes(bs),
             )
             .await
-            .map(|v| self.parts.push(v))
+            .map(|v| self.parts.push(v))?;
+
+        Ok(size as u64)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         let upload_id = self.upload_id().await?;
 
         self.inner
             .write_part(&upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
             .await
-            .map(|v| self.parts.push(v))
+            .map(|v| self.parts.push(v))?;
+
+        Ok(size)
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 35b1883bf..e6fe47616 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -49,15 +49,18 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let cursor = oio::Cursor::from(bs);
-        self.inner
-            .write_once(cursor.len() as u64, Box::new(cursor))
-            .await
+
+        let size = cursor.len() as u64;
+        self.inner.write_once(size, Box::new(cursor)).await?;
+
+        Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.write_once(size, s).await
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        self.inner.write_once(size, s).await?;
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 31a56f27a..a3b8abe30 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -161,10 +161,11 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
+
         if self.op.append() {
-            self.append_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                .await
+            self.append_oneshot(size, AsyncBody::Bytes(bs)).await?;
         } else {
             if self.op.content_length().is_none() {
                 return Err(Error::new(
@@ -173,14 +174,15 @@ impl oio::Write for AzblobWriter {
                 ));
             }
 
-            self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                .await
+            self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
         }
+
+        Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         if self.op.append() {
-            self.append_oneshot(size, AsyncBody::Stream(s)).await
+            self.append_oneshot(size, AsyncBody::Stream(s)).await?;
         } else {
             if self.op.content_length().is_none() {
                 return Err(Error::new(
@@ -189,8 +191,10 @@ impl oio::Write for AzblobWriter {
                 ));
             }
 
-            self.write_oneshot(size, AsyncBody::Stream(s)).await
+            self.write_oneshot(size, AsyncBody::Stream(s)).await?;
         }
+
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 3c8db1ac1..ff1125bfa 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -41,7 +41,9 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::Write for AzdfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
+
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -78,7 +80,7 @@ impl oio::Write for AzdfsWriter {
         match status {
             StatusCode::OK | StatusCode::ACCEPTED => {
                 resp.into_body().consume().await?;
-                Ok(())
+                Ok(size)
             }
             _ => Err(parse_error(resp)
                 .await?
@@ -86,7 +88,7 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 2f5e97558..1b5b6b17d 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -40,12 +40,14 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::Write for DropboxWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
+
         let resp = self
             .core
             .dropbox_update(
                 &self.path,
-                Some(bs.len()),
+                Some(size),
                 self.op.content_type(),
                 AsyncBody::Bytes(bs),
             )
@@ -54,13 +56,13 @@ impl oio::Write for DropboxWriter {
         match status {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(())
+                Ok(size as u64)
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 4d31444af..9ca571077 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -54,18 +54,20 @@ impl oio::Write for FsWriter<tokio::fs::File> {
     ///
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
+
         self.f
             .seek(SeekFrom::Start(self.pos))
             .await
             .map_err(parse_io_error)?;
         self.f.write_all(&bs).await.map_err(parse_io_error)?;
-        self.pos += bs.len() as u64;
+        self.pos += size;
 
-        Ok(())
+        Ok(size)
     }
 
-    async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
             self.f
@@ -76,7 +78,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
             self.pos += bs.len() as u64;
         }
 
-        Ok(())
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -104,14 +106,14 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
     ///
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .map_err(parse_io_error)?;
         self.f.write_all(&bs).map_err(parse_io_error)?;
         self.pos += bs.len() as u64;
 
-        Ok(())
+        Ok(bs.len() as u64)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index cd4ba0f6a..18dd6fed9 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -41,7 +41,9 @@ impl FtpWriter {
 
 #[async_trait]
 impl oio::Write for FtpWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
+
         let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
         let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
         data_stream.write_all(&bs).await.map_err(|err| {
@@ -50,10 +52,10 @@ impl oio::Write for FtpWriter {
 
         ftp_stream.finalize_put_stream(data_stream).await?;
 
-        Ok(())
+        Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index e6cd8703c..8c4431302 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -118,16 +118,18 @@ impl GcsWriter {
 
 #[async_trait]
 impl oio::Write for GcsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
+
         let location = match &self.location {
             Some(location) => location,
             None => {
                 if self.op.content_length().unwrap_or_default() == bs.len() as 
u64
                     && self.written == 0
                 {
-                    return self
-                        .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                        .await;
+                    self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+
+                    return Ok(size);
                 } else {
                     let location = self.initiate_upload().await?;
                     self.location = Some(location);
@@ -138,22 +140,23 @@ impl oio::Write for GcsWriter {
 
         // Ignore empty bytes
         if bs.is_empty() {
-            return Ok(());
+            return Ok(0);
         }
 
         self.buffer.push(bs);
         // Return directly if the buffer is not full
         if self.buffer.len() <= self.write_fixed_size {
-            return Ok(());
+            return Ok(size);
         }
 
         let bs = self.buffer.peak_exact(self.write_fixed_size);
+        let size = bs.len() as u64;
 
         match self.write_part(location, bs).await {
             Ok(_) => {
                 self.buffer.take(self.write_fixed_size);
                 self.written += self.write_fixed_size as u64;
-                Ok(())
+                Ok(size)
             }
             Err(e) => {
                 // If the upload fails, we should pop the given bs to make sure
@@ -164,8 +167,9 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 48d88f3ec..b33137137 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -95,15 +95,18 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
         if self.file_id.is_none() {
-            self.write_create(bs.len() as u64, bs).await
+            self.write_create(size, bs).await?;
         } else {
-            self.write_overwrite(bs.len() as u64, bs).await
+            self.write_overwrite(size, bs).await?;
         }
+
+        Ok(size)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index b2f959947..6bd4bf057 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -42,7 +42,7 @@ impl GhacWriter {
 
 #[async_trait]
 impl oio::Write for GhacWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let size = bs.len() as u64;
         let req = self
             .backend
@@ -54,7 +54,7 @@ impl oio::Write for GhacWriter {
         if resp.status().is_success() {
             resp.into_body().consume().await?;
             self.size += size;
-            Ok(())
+            Ok(size)
         } else {
             Err(parse_error(resp)
                 .await
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 23c5f1d68..011c8352e 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -41,7 +41,9 @@ impl<F> HdfsWriter<F> {
 
 #[async_trait]
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
+
         while self.pos < bs.len() {
             let n = self
                 .f
@@ -53,10 +55,10 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
         // Reset pos to 0 for next write.
         self.pos = 0;
 
-        Ok(())
+        Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
@@ -78,7 +80,9 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
 }
 
 impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
-    fn write(&mut self, bs: Bytes) -> Result<()> {
+    fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
+
         while self.pos < bs.len() {
             let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?;
             self.pos += n;
@@ -86,7 +90,7 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
         // Reset pos to 0 for next write.
         self.pos = 0;
 
-        Ok(())
+        Ok(size as u64)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 528478142..43a46e500 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -38,7 +38,8 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::Write for IpmfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
         let resp = self.backend.ipmfs_write(&self.path, bs).await?;
 
         let status = resp.status();
@@ -46,13 +47,13 @@ impl oio::Write for IpmfsWriter {
         match status {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(())
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 620130847..1086f3fde 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -46,17 +46,19 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::Write for OneDriveWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         let size = bs.len();
 
         if size <= Self::MAX_SIMPLE_SIZE {
-            self.write_simple(bs).await
+            self.write_simple(bs).await?;
         } else {
-            self.write_chunked(bs).await
+            self.write_chunked(bs).await?;
         }
+
+        Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 76da70da3..71ac41d7c 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -36,13 +36,14 @@ impl SftpWriter {
 
 #[async_trait]
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
         self.file.write_all(&bs).await?;
 
-        Ok(())
+        Ok(size)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index f4c271313..b786896c2 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -67,15 +67,17 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::Write for SupabaseWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
         if bs.is_empty() {
-            return Ok(());
+            return Ok(9);
         }
 
-        self.upload(bs).await
+        let size = bs.len();
+        self.upload(bs).await?;
+        Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 6f32d67ba..1db2d230f 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -39,7 +39,9 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::Write for VercelArtifactsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
+
         let resp = self
             .backend
             .vercel_artifacts_put(
@@ -54,13 +56,13 @@ impl oio::Write for VercelArtifactsWriter {
         match status {
             StatusCode::OK | StatusCode::ACCEPTED => {
                 resp.into_body().consume().await?;
-                Ok(())
+                Ok(size as u64)
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 689c334dc..130e8e911 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -41,12 +41,14 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
+
         let resp = self
             .core
             .put_object(
                 &self.path,
-                Some(bs.len()),
+                Some(size),
                 self.op.content_type(),
                 self.op.content_disposition(),
                 self.op.cache_control(),
@@ -57,13 +59,13 @@ impl oio::Write for WasabiWriter {
         match resp.status() {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(())
+                Ok(size as u64)
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index a3c17bafa..8dc093e65 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -62,13 +62,18 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::Write for WebdavWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-            .await
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len() as u64;
+
+        self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+
+        Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 97eef2e3d..1b055f122 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -39,12 +39,14 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::Write for WebhdfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+        let size = bs.len();
+
         let req = self
             .backend
             .webhdfs_create_object_request(
                 &self.path,
-                Some(bs.len()),
+                Some(size),
                 self.op.content_type(),
                 AsyncBody::Bytes(bs),
             )
@@ -56,13 +58,13 @@ impl oio::Write for WebhdfsWriter {
         match status {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(())
+                Ok(size as u64)
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 37a9fe72c..f4b93dfa6 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -22,7 +22,7 @@ use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use futures::future::BoxFuture;
 use futures::AsyncWrite;
 use futures::FutureExt;
@@ -81,14 +81,23 @@ impl Writer {
 
     /// Write into inner writer.
     pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
-        if let State::Idle(Some(w)) = &mut self.state {
-            w.write(bs.into()).await
+        let w = if let State::Idle(Some(w)) = &mut self.state {
+            w
         } else {
             unreachable!(
                 "writer state invalid while write, expect Idle, actual {}",
                 self.state
             );
+        };
+
+        let mut bs = bs.into();
+
+        while !bs.is_empty() {
+            let n = w.write(bs.clone()).await?;
+            bs.advance(n as usize);
         }
+
+        Ok(())
     }
 
     /// Sink into writer.
@@ -123,7 +132,7 @@ impl Writer {
     ///     Ok(())
     /// }
     /// ```
-    pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<()>
+    pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<u64>
     where
         S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
         T: Into<Bytes>,
@@ -169,7 +178,7 @@ impl Writer {
     ///     Ok(())
     /// }
     /// ```
-    pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<()>
+    pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64>
     where
         R: futures::AsyncRead + Send + Sync + Unpin + 'static,
     {
@@ -390,7 +399,14 @@ impl BlockingWriter {
 
     /// Write into inner writer.
     pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
-        self.inner.write(bs.into())
+        let mut bs = bs.into();
+
+        while !bs.is_empty() {
+            let n = self.inner.write(bs.clone())?;
+            bs.advance(n as usize);
+        }
+
+        Ok(())
     }
 
     /// Close the writer and make sure all data have been stored.

Reply via email to