This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch make-write-all
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 153385eafafe235266a3f80e3aaf79d44c140b1f
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jul 11 16:11:31 2024 +0800

    Remove returning n in write
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs                      |  7 +-
 core/src/layers/concurrent_limit.rs              |  4 +-
 core/src/layers/error_context.rs                 | 14 ++--
 core/src/layers/logging.rs                       | 25 +++----
 core/src/layers/retry.rs                         |  4 +-
 core/src/layers/timeout.rs                       |  2 +-
 core/src/raw/adapters/kv/backend.rs              | 10 ++-
 core/src/raw/adapters/typed_kv/backend.rs        | 10 ++-
 core/src/raw/enum_utils.rs                       |  4 +-
 core/src/raw/oio/write/api.rs                    | 30 +++-----
 core/src/raw/oio/write/append_write.rs           |  8 +--
 core/src/raw/oio/write/block_write.rs            | 10 +--
 core/src/raw/oio/write/multipart_write.rs        | 10 +--
 core/src/raw/oio/write/one_shot_write.rs         |  5 +-
 core/src/raw/oio/write/position_write.rs         | 10 +--
 core/src/raw/oio/write/range_write.rs            | 10 +--
 core/src/types/blocking_write/blocking_writer.rs |  8 +--
 core/src/types/blocking_write/std_writer.rs      | 10 ++-
 core/src/types/context/write.rs                  | 89 +++++++++---------------
 core/src/types/write/buffer_sink.rs              | 12 ++--
 core/src/types/write/writer.rs                   | 14 +---
 21 files changed, 114 insertions(+), 182 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index c2156a0418..68b0340aa7 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -654,7 +654,7 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
@@ -689,13 +689,12 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
 where
     W: oio::BlockingWrite,
 {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        let n = w.write(bs)?;
 
-        Ok(n)
+        w.write(bs)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index a1a61ad01d..87ad19b50c 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -262,7 +262,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs).await
     }
 
@@ -276,7 +276,7 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 86ae9dba80..cabe84b053 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -385,14 +385,13 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 }
 
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
         self.inner
             .write(bs)
             .await
-            .map(|n| {
-                self.processed += n as u64;
-                n
+            .map(|_| {
+                self.processed += size as u64;
             })
             .map_err(|err| {
                 err.with_operation(WriteOperation::Write)
@@ -423,13 +422,12 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> 
{
 }
 
 impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
         self.inner
             .write(bs)
-            .map(|n| {
-                self.processed += n as u64;
-                n
+            .map(|_| {
+                self.processed += size as u64;
             })
             .map_err(|err| {
                 err.with_operation(WriteOperation::BlockingWrite)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1e0d80d264..507745c6d1 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1072,21 +1072,20 @@ impl<W> LoggingWriter<W> {
 }
 
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        match self.inner.write(bs.clone()).await {
-            Ok(n) => {
-                self.written += n as u64;
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+        match self.inner.write(bs).await {
+            Ok(_) => {
                 trace!(
                     target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> input data 
{}B, write {}B",
+                    "service={} operation={} path={} written={}B -> data write 
{}B",
                     self.ctx.scheme,
                     WriteOperation::Write,
                     self.path,
                     self.written,
-                    bs.len(),
-                    n,
+                    size,
                 );
-                Ok(n)
+                Ok(())
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1170,21 +1169,19 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
 }
 
 impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         match self.inner.write(bs.clone()) {
-            Ok(n) => {
-                self.written += n as u64;
+            Ok(_) => {
                 trace!(
                     target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> input data 
{}B, write {}B",
+                    "service={} operation={} path={} written={}B -> data write 
{}B",
                     self.ctx.scheme,
                     WriteOperation::BlockingWrite,
                     self.path,
                     self.written,
                     bs.len(),
-                    n
                 );
-                Ok(n)
+                Ok(())
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index c48307fdbc..5bc37e6145 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -626,7 +626,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 }
 
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         use backon::RetryableWithContext;
 
         let inner = self.take_inner()?;
@@ -694,7 +694,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
 }
 
 impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for 
RetryWrapper<R, I> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         { || self.inner.as_mut().unwrap().write(bs.clone()) }
             .retry(&self.builder)
             .when(|e| e.is_temporary())
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 246049dfbf..1cbc0c5ac1 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -350,7 +350,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let fut = self.inner.write(bs);
         Self::io_timeout(self.timeout, WriteOperation::Write.into_static(), 
fut).await
     }
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index bb08e4cb19..625e7ea982 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -242,10 +242,9 @@ impl<S> KvWriter<S> {
 unsafe impl<S: Adapter> Sync for KvWriter<S> {}
 
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let ret = bs.len();
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.buffer.push(bs);
-        Ok(ret)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -260,10 +259,9 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let ret = bs.len();
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.buffer.push(bs);
-        Ok(ret)
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index ecce2eb879..fd6271691b 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -275,12 +275,11 @@ impl<S> KvWriter<S> {
 }
 
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let size = bs.len();
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let mut buf = self.buf.take().unwrap_or_default();
         buf.push(bs);
         self.buf = Some(buf);
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -303,12 +302,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
-        let size = bs.len();
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let mut buf = self.buf.take().unwrap_or_default();
         buf.push(bs);
         self.buf = Some(buf);
-        Ok(size)
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs
index c22411904d..111da78be0 100644
--- a/core/src/raw/enum_utils.rs
+++ b/core/src/raw/enum_utils.rs
@@ -70,7 +70,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> 
oio::BlockingRead for TwoWa
 }
 
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         match self {
             Self::One(v) => v.write(bs).await,
             Self::Two(v) => v.write(bs).await,
@@ -129,7 +129,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, THREE: 
oio::BlockingRead> o
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWays<ONE, TWO, THREE>
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         match self {
             Self::One(v) => v.write(bs).await,
             Self::Two(v) => v.write(bs).await,
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index e6c7c05918..4ec53adab6 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -77,31 +77,19 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// # Behavior
     ///
-    /// - `Ok(n)` means `n` bytes has been written successfully.
+    /// - `Ok(())` means all bytes has been written successfully.
     /// - `Err(err)` means error happens and no bytes has been written.
-    ///
-    /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
-    /// repeatedly until all bytes has been written.
-    #[cfg(not(target_arch = "wasm32"))]
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend;
-    #[cfg(target_arch = "wasm32")]
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>>;
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend;
 
     /// Close the writer and make sure all data has been flushed.
-    #[cfg(not(target_arch = "wasm32"))]
     fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
-    #[cfg(target_arch = "wasm32")]
-    fn close(&mut self) -> impl Future<Output = Result<()>>;
 
     /// Abort the pending writer.
-    #[cfg(not(target_arch = "wasm32"))]
     fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
-    #[cfg(target_arch = "wasm32")]
-    fn abort(&mut self) -> impl Future<Output = Result<()>>;
 }
 
 impl Write for () {
-    async fn write(&mut self, _: Buffer) -> Result<usize> {
+    async fn write(&mut self, _: Buffer) -> Result<()> {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
@@ -121,7 +109,7 @@ impl Write for () {
 }
 
 pub trait WriteDyn: Unpin + Send + Sync {
-    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>>;
+    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>>;
 
     fn close_dyn(&mut self) -> BoxedFuture<Result<()>>;
 
@@ -129,7 +117,7 @@ pub trait WriteDyn: Unpin + Send + Sync {
 }
 
 impl<T: Write + ?Sized> WriteDyn for T {
-    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>> {
+    fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>> {
         Box::pin(self.write(bs))
     }
 
@@ -143,7 +131,7 @@ impl<T: Write + ?Sized> WriteDyn for T {
 }
 
 impl<T: WriteDyn + ?Sized> Write for Box<T> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.deref_mut().write_dyn(bs).await
     }
 
@@ -170,14 +158,14 @@ pub trait BlockingWrite: Send + Sync + 'static {
     ///
     /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
     /// repeatedly until all bytes has been written.
-    fn write(&mut self, bs: Buffer) -> Result<usize>;
+    fn write(&mut self, bs: Buffer) -> Result<()>;
 
     /// Close the writer and make sure all data has been flushed.
     fn close(&mut self) -> Result<()>;
 }
 
 impl BlockingWrite for () {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for 
oio::BlockingWrite")
@@ -195,7 +183,7 @@ impl BlockingWrite for () {
 ///
 /// To make BlockingWriter work as expected, we must add this impl.
 impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         (**self).write(bs)
     }
 
diff --git a/core/src/raw/oio/write/append_write.rs 
b/core/src/raw/oio/write/append_write.rs
index 2f48b68307..06c72cc5e2 100644
--- a/core/src/raw/oio/write/append_write.rs
+++ b/core/src/raw/oio/write/append_write.rs
@@ -80,7 +80,7 @@ impl<W> oio::Write for AppendWriter<W>
 where
     W: AppendWrite,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let offset = match self.offset {
             Some(offset) => offset,
             None => {
@@ -91,12 +91,10 @@ where
         };
 
         let size = bs.len();
-        self.inner
-            .append(offset, size as u64, Buffer::from(bs.to_bytes()))
-            .await?;
+        self.inner.append(offset, size as u64, bs).await?;
         // Update offset after succeed.
         self.offset = Some(offset + size as u64);
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/block_write.rs 
b/core/src/raw/oio/write/block_write.rs
index 99c76562ca..cd0ec43b45 100644
--- a/core/src/raw/oio/write/block_write.rs
+++ b/core/src/raw/oio/write/block_write.rs
@@ -162,10 +162,10 @@ impl<W> oio::Write for BlockWriter<W>
 where
     W: BlockWrite,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         if !self.started && self.cache.is_none() {
-            let size = self.fill_cache(bs);
-            return Ok(size);
+            self.fill_cache(bs);
+            return Ok(());
         }
 
         // The block upload process has been started.
@@ -181,8 +181,8 @@ where
             })
             .await?;
         self.cache = None;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/multipart_write.rs 
b/core/src/raw/oio/write/multipart_write.rs
index 0d893d7cb3..44a33c7a4b 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -203,14 +203,14 @@ impl<W> oio::Write for MultipartWriter<W>
 where
     W: MultipartWrite,
 {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let upload_id = match self.upload_id.clone() {
             Some(v) => v,
             None => {
                 // Fill cache with the first write.
                 if self.cache.is_none() {
-                    let size = self.fill_cache(bs);
-                    return Ok(size);
+                    self.fill_cache(bs);
+                    return Ok(());
                 }
 
                 let upload_id = self.w.initiate_part().await?;
@@ -234,8 +234,8 @@ where
             .await?;
         self.cache = None;
         self.next_part_number += 1;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index cd056c1461..938973c33a 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -50,16 +50,15 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 }
 
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         match &self.buffer {
             Some(_) => Err(Error::new(
                 ErrorKind::Unsupported,
                 "OneShotWriter doesn't support multiple write",
             )),
             None => {
-                let size = bs.len();
                 self.buffer = Some(bs);
-                Ok(size)
+                Ok(())
             }
         }
     }
diff --git a/core/src/raw/oio/write/position_write.rs 
b/core/src/raw/oio/write/position_write.rs
index 3dbf5c93ef..5aa5ff3294 100644
--- a/core/src/raw/oio/write/position_write.rs
+++ b/core/src/raw/oio/write/position_write.rs
@@ -124,10 +124,10 @@ impl<W: PositionWrite> PositionWriter<W> {
 }
 
 impl<W: PositionWrite> oio::Write for PositionWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         if self.cache.is_none() {
-            let size = self.fill_cache(bs);
-            return Ok(size);
+            let _ = self.fill_cache(bs);
+            return Ok(());
         }
 
         let bytes = self.cache.clone().expect("pending write must exist");
@@ -144,8 +144,8 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {
             .await?;
         self.cache = None;
         self.next_offset += length;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        let _ = self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/range_write.rs 
b/core/src/raw/oio/write/range_write.rs
index 67ae619dd9..f44f06ad9c 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -155,14 +155,14 @@ impl<W: RangeWrite> RangeWriter<W> {
 }
 
 impl<W: RangeWrite> oio::Write for RangeWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let location = match self.location.clone() {
             Some(location) => location,
             None => {
                 // Fill cache with the first write.
                 if self.cache.is_none() {
-                    let size = self.fill_cache(bs);
-                    return Ok(size);
+                    self.fill_cache(bs);
+                    return Ok(());
                 }
 
                 let location = self.w.initiate_range().await?;
@@ -187,8 +187,8 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
             .await?;
         self.cache = None;
         self.next_offset += length;
-        let size = self.fill_cache(bs);
-        Ok(size)
+        self.fill_cache(bs);
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/types/blocking_write/blocking_writer.rs 
b/core/src/types/blocking_write/blocking_writer.rs
index 489cae502a..d97cabb147 100644
--- a/core/src/types/blocking_write/blocking_writer.rs
+++ b/core/src/types/blocking_write/blocking_writer.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Buf;
 use std::sync::Arc;
 
 use crate::raw::*;
@@ -68,12 +67,7 @@ impl BlockingWriter {
     /// }
     /// ```
     pub fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
-        let mut bs = bs.into();
-        while !bs.is_empty() {
-            let n = self.inner.write(bs.clone())?;
-            bs.advance(n);
-        }
-        Ok(())
+        self.inner.write(bs.into())
     }
 
     /// Close the writer and make sure all data have been committed.
diff --git a/core/src/types/blocking_write/std_writer.rs 
b/core/src/types/blocking_write/std_writer.rs
index 5b18467e36..fe918b43bf 100644
--- a/core/src/types/blocking_write/std_writer.rs
+++ b/core/src/types/blocking_write/std_writer.rs
@@ -81,10 +81,9 @@ impl Write for StdWriter {
             }
 
             let bs = self.buf.get().expect("frozen buffer must be valid");
-            let n = w
-                .write(Buffer::from(bs))
+            w.write(Buffer::from(bs))
                 .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, 
err))?;
-            self.buf.advance(n);
+            self.buf.clean();
         }
     }
 
@@ -103,10 +102,9 @@ impl Write for StdWriter {
                 return Ok(());
             };
 
-            let n = w
-                .write(Buffer::from(bs))
+            w.write(Buffer::from(bs))
                 .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, 
err))?;
-            self.buf.advance(n);
+            self.buf.clean();
         }
     }
 }
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index 557341da3d..92248762cf 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -18,7 +18,6 @@
 use crate::raw::oio::Write;
 use crate::raw::*;
 use crate::*;
-use bytes::Buf;
 use std::sync::Arc;
 
 /// WriteContext holds the immutable context for give write operation.
@@ -134,15 +133,14 @@ impl WriteGenerator<oio::Writer> {
 
 impl WriteGenerator<oio::Writer> {
     /// Write the entire buffer into writer.
-    pub async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
+    pub async fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let Some(chunk_size) = self.chunk_size else {
             return self.w.write_dyn(bs).await;
         };
 
         if self.buffer.len() + bs.len() < chunk_size {
-            let size = bs.len();
             self.buffer.push(bs);
-            return Ok(size);
+            return Ok(());
         }
 
         // Condition:
@@ -151,13 +149,10 @@ impl WriteGenerator<oio::Writer> {
         // Action:
         // - write buffer + bs directly.
         if !self.exact {
-            let fill_size = bs.len();
             self.buffer.push(bs);
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write_dyn(buf.clone()).await?;
-            buf.advance(written);
-            self.buffer.push(buf);
-            return Ok(fill_size);
+            let buf = self.buffer.take().collect();
+            self.w.write_dyn(buf).await?;
+            return Ok(());
         }
 
         // Condition:
@@ -167,10 +162,8 @@ impl WriteGenerator<oio::Writer> {
         // Action:
         // - write existing buffer in chunk_size to make more rooms for 
writing data.
         if self.buffer.len() >= chunk_size {
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write_dyn(buf.clone()).await?;
-            buf.advance(written);
-            self.buffer.push(buf);
+            let buf = self.buffer.take().collect();
+            self.w.write_dyn(buf).await?;
         }
 
         // Condition
@@ -180,9 +173,8 @@ impl WriteGenerator<oio::Writer> {
         // - write bs to buffer with remaining size.
         let remaining = chunk_size - self.buffer.len();
         bs.truncate(remaining);
-        let n = bs.len();
         self.buffer.push(bs);
-        Ok(n)
+        Ok(())
     }
 
     /// Finish the write process.
@@ -192,8 +184,8 @@ impl WriteGenerator<oio::Writer> {
                 break;
             }
 
-            let written = 
self.w.write_dyn(self.buffer.clone().collect()).await?;
-            self.buffer.advance(written);
+            self.w.write_dyn(self.buffer.clone().collect()).await?;
+            self.buffer.clear();
         }
 
         self.w.close().await
@@ -223,15 +215,14 @@ impl WriteGenerator<oio::BlockingWriter> {
 
 impl WriteGenerator<oio::BlockingWriter> {
     /// Write the entire buffer into writer.
-    pub fn write(&mut self, mut bs: Buffer) -> Result<usize> {
+    pub fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let Some(chunk_size) = self.chunk_size else {
             return self.w.write(bs);
         };
 
         if self.buffer.len() + bs.len() < chunk_size {
-            let size = bs.len();
             self.buffer.push(bs);
-            return Ok(size);
+            return Ok(());
         }
 
         // Condition:
@@ -240,13 +231,10 @@ impl WriteGenerator<oio::BlockingWriter> {
         // Action:
         // - write buffer + bs directly.
         if !self.exact {
-            let fill_size = bs.len();
             self.buffer.push(bs);
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write(buf.clone())?;
-            buf.advance(written);
-            self.buffer.push(buf);
-            return Ok(fill_size);
+            let buf = self.buffer.take().collect();
+            self.w.write(buf)?;
+            return Ok(());
         }
 
         // Condition:
@@ -256,10 +244,8 @@ impl WriteGenerator<oio::BlockingWriter> {
         // Action:
         // - write existing buffer in chunk_size to make more rooms for 
writing data.
         if self.buffer.len() >= chunk_size {
-            let mut buf = self.buffer.take().collect();
-            let written = self.w.write(buf.clone())?;
-            buf.advance(written);
-            self.buffer.push(buf);
+            let buf = self.buffer.take().collect();
+            self.w.write(buf)?;
         }
 
         // Condition
@@ -269,9 +255,8 @@ impl WriteGenerator<oio::BlockingWriter> {
         // - write bs to buffer with remaining size.
         let remaining = chunk_size - self.buffer.len();
         bs.truncate(remaining);
-        let n = bs.len();
         self.buffer.push(bs);
-        Ok(n)
+        Ok(())
     }
 
     /// Finish the write process.
@@ -281,8 +266,8 @@ impl WriteGenerator<oio::BlockingWriter> {
                 break;
             }
 
-            let written = self.w.write(self.buffer.clone().collect())?;
-            self.buffer.advance(written);
+            self.w.write(self.buffer.clone().collect())?;
+            self.buffer.clear();
         }
 
         self.w.close()
@@ -343,10 +328,7 @@ mod tests {
         let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() 
}), Some(10), true);
 
         let mut bs = Bytes::from(expected.clone());
-        while !bs.is_empty() {
-            let n = w.write(bs.clone().into()).await?;
-            bs.advance(n);
-        }
+        w.write(bs.clone().into()).await?;
 
         w.close().await?;
 
@@ -375,10 +357,7 @@ mod tests {
         rng.fill_bytes(&mut expected);
 
         let bs = Bytes::from(expected.clone());
-        // The MockWriter always returns the first chunk size.
-        let n = w.write(bs.into()).await?;
-        assert_eq!(expected.len(), n);
-
+        w.write(bs.into()).await?;
         w.close().await?;
 
         let buf = buf.lock().await;
@@ -413,14 +392,13 @@ mod tests {
 
         // content > chunk size.
         let content = new_content(15);
-        assert_eq!(15, w.write(content.into()).await?);
+        w.write(content.into()).await?;
         // content < chunk size.
         let content = new_content(5);
-        assert_eq!(5, w.write(content.into()).await?);
+        w.write(content.into()).await?;
         // content > chunk size, but 5 bytes in queue.
         let content = new_content(15);
-        // The MockWriter can send all 15 bytes together, so we can only 
advance 5 bytes.
-        assert_eq!(15, w.write(content.clone().into()).await?);
+        w.write(content.clone().into()).await?;
 
         w.close().await?;
 
@@ -456,16 +434,16 @@ mod tests {
 
         // content > chunk size.
         let content = new_content(15);
-        assert_eq!(15, w.write(content.into()).await?);
+        w.write(content.into()).await?;
         // content < chunk size.
         let content = new_content(5);
-        assert_eq!(5, w.write(content.into()).await?);
+        w.write(content.into()).await?;
         // content < chunk size.
         let content = new_content(3);
-        assert_eq!(3, w.write(content.into()).await?);
+        w.write(content.into()).await?;
         // content > chunk size, but can send all chunks in the queue.
         let content = new_content(15);
-        assert_eq!(15, w.write(content.clone().into()).await?);
+        w.write(content.clone().into()).await?;
 
         w.close().await?;
 
@@ -539,10 +517,10 @@ mod tests {
 
         // content < chunk size.
         let content = new_content(5);
-        assert_eq!(5, w.write(content.into()).await?);
+        w.write(content.into()).await?;
         // Non-contiguous buffer.
         let content = Buffer::from(vec![new_content(3), new_content(2)]);
-        assert_eq!(5, w.write(content).await?);
+        w.write(content).await?;
 
         w.close().await?;
 
@@ -584,10 +562,7 @@ mod tests {
             expected.extend_from_slice(&content);
 
             let mut bs = Bytes::from(content.clone());
-            while !bs.is_empty() {
-                let n = writer.write(bs.clone().into()).await?;
-                bs.advance(n);
-            }
+            writer.write(bs.clone().into()).await?;
         }
         writer.close().await?;
 
diff --git a/core/src/types/write/buffer_sink.rs 
b/core/src/types/write/buffer_sink.rs
index 46d9112530..cc42e09090 100644
--- a/core/src/types/write/buffer_sink.rs
+++ b/core/src/types/write/buffer_sink.rs
@@ -20,8 +20,6 @@ use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
-use bytes::Buf;
-
 use crate::raw::*;
 use crate::*;
 
@@ -35,7 +33,7 @@ pub struct BufferSink {
 
 enum State {
     Idle(Option<WriteGenerator<oio::Writer>>),
-    Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<usize>)>),
+    Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>),
     Closing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>),
 }
 
@@ -92,8 +90,8 @@ impl futures::Sink<Buffer> for BufferSink {
                     let (w, res) = ready!(fut.as_mut().poll(cx));
                     this.state = State::Idle(Some(w));
                     match res {
-                        Ok(n) => {
-                            this.buf.advance(n);
+                        Ok(_) => {
+                            this.buf = Buffer::new();
                         }
                         Err(err) => return Poll::Ready(Err(err)),
                     }
@@ -139,8 +137,8 @@ impl futures::Sink<Buffer> for BufferSink {
                     let (w, res) = ready!(fut.as_mut().poll(cx));
                     this.state = State::Idle(Some(w));
                     match res {
-                        Ok(n) => {
-                            this.buf.advance(n);
+                        Ok(_) => {
+                            this.buf = Buffer::new();
                         }
                         Err(err) => return Poll::Ready(Err(err)),
                     }
diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs
index dc81d3d716..44755ed37c 100644
--- a/core/src/types/write/writer.rs
+++ b/core/src/types/write/writer.rs
@@ -136,12 +136,7 @@ impl Writer {
     /// }
     /// ```
     pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
-        let mut bs = bs.into();
-        while !bs.is_empty() {
-            let n = self.inner.write(bs.clone()).await?;
-            bs.advance(n);
-        }
-        Ok(())
+        self.inner.write(bs.into()).await
     }
 
     /// Write [`bytes::Buf`] into inner writer.
@@ -153,11 +148,8 @@ impl Writer {
     /// Optimize this function to avoid unnecessary copy.
     pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> {
         let mut bs = bs;
-        let mut bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
-        while !bs.is_empty() {
-            let n = self.inner.write(bs.clone()).await?;
-            bs.advance(n);
-        }
+        let bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
+        self.inner.write(bs).await?;
         Ok(())
     }
 


Reply via email to