This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch make-write-all
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 75ff58d477750a421639debfdf8dd0cf637c93a3
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jul 11 17:15:30 2024 +0800

    Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/types/blocking_write/blocking_writer.rs |  8 ++-
 core/src/types/context/write.rs                  | 74 +++++++++++++++---------
 core/src/types/write/buffer_sink.rs              | 12 ++--
 core/src/types/write/writer.rs                   |  8 ++-
 4 files changed, 68 insertions(+), 34 deletions(-)

diff --git a/core/src/types/blocking_write/blocking_writer.rs 
b/core/src/types/blocking_write/blocking_writer.rs
index d97cabb147..489cae502a 100644
--- a/core/src/types/blocking_write/blocking_writer.rs
+++ b/core/src/types/blocking_write/blocking_writer.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytes::Buf;
 use std::sync::Arc;
 
 use crate::raw::*;
@@ -67,7 +68,12 @@ impl BlockingWriter {
     /// }
     /// ```
     pub fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
-        self.inner.write(bs.into())
+        let mut bs = bs.into();
+        while !bs.is_empty() {
+            let n = self.inner.write(bs.clone())?;
+            bs.advance(n);
+        }
+        Ok(())
     }
 
     /// Close the writer and make sure all data have been committed.
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index af875c14ed..6e2464cfce 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -133,14 +133,17 @@ impl WriteGenerator<oio::Writer> {
 
 impl WriteGenerator<oio::Writer> {
     /// Write the entire buffer into writer.
-    pub async fn write(&mut self, mut bs: Buffer) -> Result<()> {
+    pub async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
         let Some(chunk_size) = self.chunk_size else {
-            return self.w.write_dyn(bs).await;
+            let size = bs.len();
+            self.w.write_dyn(bs).await?;
+            return Ok(size);
         };
 
         if self.buffer.len() + bs.len() < chunk_size {
+            let size = bs.len();
             self.buffer.push(bs);
-            return Ok(());
+            return Ok(size);
         }
 
         // Condition:
@@ -149,10 +152,11 @@ impl WriteGenerator<oio::Writer> {
         // Action:
         // - write buffer + bs directly.
         if !self.exact {
+            let fill_size = bs.len();
             self.buffer.push(bs);
             let buf = self.buffer.take().collect();
             self.w.write_dyn(buf).await?;
-            return Ok(());
+            return Ok(fill_size);
         }
 
         // Condition:
@@ -173,8 +177,9 @@ impl WriteGenerator<oio::Writer> {
         // - write bs to buffer with remaining size.
         let remaining = chunk_size - self.buffer.len();
         bs.truncate(remaining);
+        let n = bs.len();
         self.buffer.push(bs);
-        Ok(())
+        Ok(n)
     }
 
     /// Finish the write process.
@@ -184,8 +189,8 @@ impl WriteGenerator<oio::Writer> {
                 break;
             }
 
-            self.w.write_dyn(self.buffer.clone().collect()).await?;
-            self.buffer.clear();
+            let buf = self.buffer.take().collect();
+            self.w.write_dyn(buf).await?;
         }
 
         self.w.close().await
@@ -215,14 +220,17 @@ impl WriteGenerator<oio::BlockingWriter> {
 
 impl WriteGenerator<oio::BlockingWriter> {
     /// Write the entire buffer into writer.
-    pub fn write(&mut self, mut bs: Buffer) -> Result<()> {
+    pub fn write(&mut self, mut bs: Buffer) -> Result<usize> {
         let Some(chunk_size) = self.chunk_size else {
-            return self.w.write(bs);
+            let size = bs.len();
+            self.w.write(bs)?;
+            return Ok(size);
         };
 
         if self.buffer.len() + bs.len() < chunk_size {
+            let size = bs.len();
             self.buffer.push(bs);
-            return Ok(());
+            return Ok(size);
         }
 
         // Condition:
@@ -231,10 +239,11 @@ impl WriteGenerator<oio::BlockingWriter> {
         // Action:
         // - write buffer + bs directly.
         if !self.exact {
+            let fill_size = bs.len();
             self.buffer.push(bs);
             let buf = self.buffer.take().collect();
             self.w.write(buf)?;
-            return Ok(());
+            return Ok(fill_size);
         }
 
         // Condition:
@@ -255,8 +264,9 @@ impl WriteGenerator<oio::BlockingWriter> {
         // - write bs to buffer with remaining size.
         let remaining = chunk_size - self.buffer.len();
         bs.truncate(remaining);
+        let n = bs.len();
         self.buffer.push(bs);
-        Ok(())
+        Ok(n)
     }
 
     /// Finish the write process.
@@ -266,8 +276,8 @@ impl WriteGenerator<oio::BlockingWriter> {
                 break;
             }
 
-            self.w.write(self.buffer.clone().collect())?;
-            self.buffer.clear();
+            let buf = self.buffer.take().collect();
+            self.w.write(buf)?;
         }
 
         self.w.close()
@@ -278,8 +288,8 @@ impl WriteGenerator<oio::BlockingWriter> {
 mod tests {
     use super::*;
     use crate::raw::oio::Write;
-    use bytes::BufMut;
     use bytes::Bytes;
+    use bytes::{Buf, BufMut};
     use log::debug;
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
@@ -326,8 +336,11 @@ mod tests {
         let buf = Arc::new(Mutex::new(vec![]));
         let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() 
}), Some(10), true);
 
-        let bs = Bytes::from(expected.clone());
-        w.write(bs.into()).await?;
+        let mut bs = Bytes::from(expected.clone());
+        while !bs.is_empty() {
+            let n = w.write(bs.clone().into()).await?;
+            bs.advance(n);
+        }
 
         w.close().await?;
 
@@ -356,7 +369,10 @@ mod tests {
         rng.fill_bytes(&mut expected);
 
         let bs = Bytes::from(expected.clone());
-        w.write(bs.into()).await?;
+        // The MockWriter always returns the first chunk size.
+        let n = w.write(bs.into()).await?;
+        assert_eq!(expected.len(), n);
+
         w.close().await?;
 
         let buf = buf.lock().await;
@@ -391,13 +407,14 @@ mod tests {
 
         // content > chunk size.
         let content = new_content(15);
-        w.write(content.into()).await?;
+        assert_eq!(15, w.write(content.into()).await?);
         // content < chunk size.
         let content = new_content(5);
-        w.write(content.into()).await?;
+        assert_eq!(5, w.write(content.into()).await?);
         // content > chunk size, but 5 bytes in queue.
         let content = new_content(15);
-        w.write(content.clone().into()).await?;
+        // The MockWriter can send all 15 bytes together, so we can only 
advance 5 bytes.
+        assert_eq!(15, w.write(content.clone().into()).await?);
 
         w.close().await?;
 
@@ -433,16 +450,16 @@ mod tests {
 
         // content > chunk size.
         let content = new_content(15);
-        w.write(content.into()).await?;
+        assert_eq!(15, w.write(content.into()).await?);
         // content < chunk size.
         let content = new_content(5);
-        w.write(content.into()).await?;
+        assert_eq!(5, w.write(content.into()).await?);
         // content < chunk size.
         let content = new_content(3);
-        w.write(content.into()).await?;
+        assert_eq!(3, w.write(content.into()).await?);
         // content > chunk size, but can send all chunks in the queue.
         let content = new_content(15);
-        w.write(content.clone().into()).await?;
+        assert_eq!(15, w.write(content.clone().into()).await?);
 
         w.close().await?;
 
@@ -483,8 +500,11 @@ mod tests {
 
             expected.extend_from_slice(&content);
 
-            let bs = Bytes::from(content.clone());
-            writer.write(bs.into()).await?;
+            let mut bs = Bytes::from(content.clone());
+            while !bs.is_empty() {
+                let n = writer.write(bs.clone().into()).await?;
+                bs.advance(n);
+            }
         }
         writer.close().await?;
 
diff --git a/core/src/types/write/buffer_sink.rs 
b/core/src/types/write/buffer_sink.rs
index cc42e09090..46d9112530 100644
--- a/core/src/types/write/buffer_sink.rs
+++ b/core/src/types/write/buffer_sink.rs
@@ -20,6 +20,8 @@ use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
+use bytes::Buf;
+
 use crate::raw::*;
 use crate::*;
 
@@ -33,7 +35,7 @@ pub struct BufferSink {
 
 enum State {
     Idle(Option<WriteGenerator<oio::Writer>>),
-    Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>),
+    Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<usize>)>),
     Closing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>),
 }
 
@@ -90,8 +92,8 @@ impl futures::Sink<Buffer> for BufferSink {
                     let (w, res) = ready!(fut.as_mut().poll(cx));
                     this.state = State::Idle(Some(w));
                     match res {
-                        Ok(_) => {
-                            this.buf = Buffer::new();
+                        Ok(n) => {
+                            this.buf.advance(n);
                         }
                         Err(err) => return Poll::Ready(Err(err)),
                     }
@@ -137,8 +139,8 @@ impl futures::Sink<Buffer> for BufferSink {
                     let (w, res) = ready!(fut.as_mut().poll(cx));
                     this.state = State::Idle(Some(w));
                     match res {
-                        Ok(_) => {
-                            this.buf = Buffer::new();
+                        Ok(n) => {
+                            this.buf.advance(n);
                         }
                         Err(err) => return Poll::Ready(Err(err)),
                     }
diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs
index 44755ed37c..eaf8b72517 100644
--- a/core/src/types/write/writer.rs
+++ b/core/src/types/write/writer.rs
@@ -136,7 +136,13 @@ impl Writer {
     /// }
     /// ```
     pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
-        self.inner.write(bs.into()).await
+        let mut bs = bs.into();
+        while !bs.is_empty() {
+            let n = self.inner.write(bs.clone()).await?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     /// Write [`bytes::Buf`] into inner writer.

Reply via email to