This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch exact-buf-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit e40c5c6d10620dbf00cde1eae143df9e400d98a9
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 23 23:13:39 2023 +0800

    Implement
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/exact_buf_write.rs | 100 +++++++++++++++++++++++++++---
 1 file changed, 90 insertions(+), 10 deletions(-)

diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index f1cba001b..3afffe1d9 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -86,16 +86,21 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
             .await
     }
 
+    /// # TODO
+    ///
+    /// We know every stream size, we can collect them into a buffer without 
chain them every time.
     async fn sink(&mut self, size: u64, mut s: Streamer) -> Result<()> {
         // Collect the stream into buffer directly if the buffet is not full.
-        if self.buffer.len() as u64 + size < self.buffer_size as u64 {
-            self.buffer.push(s.collect().await?);
-            return Ok(());
+        if self.buffer_stream.is_none() {
+            if self.buffer.len() as u64 + size <= self.buffer_size as u64 {
+                self.buffer.push(s.collect().await?);
+                return Ok(());
+            }
         }
 
-        if self.buffer.len() > self.buffer_size {
-            let buf = self.buffer.clone();
-            let to_write = self.buffer.split_to(self.buffer_size);
+        if self.buffer.len() >= self.buffer_size {
+            let mut buf = self.buffer.clone();
+            let to_write = buf.split_to(self.buffer_size);
             return self
                 .inner
                 .sink(to_write.len() as u64, Box::new(to_write))
@@ -151,12 +156,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                         self.buffer_stream = None;
                         break;
                     }
-                    Some(bs) => self.buffer.push(bs),
+                    Some(bs) => {
+                        self.buffer.push(bs);
+                    }
                 }
             }
 
-            let mut buf = self.buffer.clone();
-            if buf.len() >= self.buffer_size {
+            if self.buffer.len() >= self.buffer_size {
+                let mut buf = self.buffer.clone();
                 let to_write = buf.split_to(self.buffer_size);
                 self.inner
                     .sink(to_write.len() as u64, Box::new(to_write))
@@ -164,7 +171,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                     // Replace buffer with remaining if the write is 
successful.
                     .map(|_| {
                         self.buffer = buf;
-                    })?
+                    })?;
             }
         }
 
@@ -182,3 +189,76 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
         self.inner.close().await
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::raw::oio::StreamExt;
+    use crate::raw::oio::Write;
+    use log::debug;
+    use pretty_assertions::assert_eq;
+    use rand::{thread_rng, Rng, RngCore};
+    use sha2::{Digest, Sha256};
+
+    struct MockWriter {
+        buf: Vec<u8>,
+    }
+
+    #[async_trait]
+    impl Write for MockWriter {
+        async fn write(&mut self, bs: Bytes) -> Result<()> {
+            debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
+
+            self.buf.extend_from_slice(&bs);
+            Ok(())
+        }
+
+        async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+            let bs = s.collect().await?;
+            assert_eq!(bs.len() as u64, size);
+            self.write(bs).await
+        }
+
+        async fn abort(&mut self) -> Result<()> {
+            Ok(())
+        }
+
+        async fn close(&mut self) -> Result<()> {
+            Ok(())
+        }
+    }
+
+    #[tokio::test]
+    async fn test_fuzz_exact_buf_writer() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut rng = thread_rng();
+        let mut expected = vec![];
+
+        let buffer_size = rng.gen_range(1..10);
+        let mut writer = ExactBufWriter::new(MockWriter { buf: vec![] }, 
buffer_size);
+        debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");
+
+        for _ in 0..1000 {
+            let size = rng.gen_range(1..20);
+            debug!("test_fuzz_exact_buf_writer: write size: {size}");
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+
+            expected.extend_from_slice(&content);
+            writer.write(Bytes::from(content)).await?;
+        }
+        writer.close().await?;
+
+        assert_eq!(writer.inner.buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&writer.inner.buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+        Ok(())
+    }
+}

Reply via email to