This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 27c6a77665 feat(core): sets default chunk_size and sends buffer > 
chunk_size directly (#4710)
27c6a77665 is described below

commit 27c6a776653b7671da8171dea8c36283713eeb45
Author: Yingwen <[email protected]>
AuthorDate: Thu Jun 13 00:56:56 2024 +0800

    feat(core): sets default chunk_size and sends buffer > chunk_size directly 
(#4710)
---
 core/benches/oio/write.rs               |   2 +-
 core/src/layers/complete.rs             |  37 +++--
 core/src/raw/oio/write/chunked_write.rs | 260 ++++++++++++++++++++++++++++++--
 core/src/types/operator/operator.rs     |  24 +--
 4 files changed, 287 insertions(+), 36 deletions(-)

diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 13f31560d7..d9d8dd8b93 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -44,7 +44,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
         group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
-                let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024);
+                let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024, 
true);
 
                 let mut bs = content.clone();
                 while !bs.is_empty() {
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 97483f10b1..1d38fbda6b 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -423,28 +423,35 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
         }
 
         // Calculate buffer size.
-        let chunk_size = args.chunk().map(|mut size| {
-            if let Some(v) = capability.write_multi_max_size {
-                size = size.min(v);
-            }
-            if let Some(v) = capability.write_multi_min_size {
-                size = size.max(v);
-            }
-            if let Some(v) = capability.write_multi_align_size {
-                // Make sure size >= size first.
-                size = size.max(v);
-                size -= size % v;
-            }
+        // If `chunk` is not set, we use `write_multi_min_size` or 
`write_multi_align_size`
+        // as the default size.
+        let chunk_size = args
+            .chunk()
+            .or(capability.write_multi_min_size)
+            .or(capability.write_multi_align_size)
+            .map(|mut size| {
+                if let Some(v) = capability.write_multi_max_size {
+                    size = size.min(v);
+                }
+                if let Some(v) = capability.write_multi_min_size {
+                    size = size.max(v);
+                }
+                if let Some(v) = capability.write_multi_align_size {
+                    // Make sure size >= size first.
+                    size = size.max(v);
+                    size -= size % v;
+                }
 
-            size
-        });
+                size
+            });
+        let exact = args.chunk().is_some() || 
capability.write_multi_align_size.is_some();
 
         let (rp, w) = self.inner.write(path, args.clone()).await?;
         let w = CompleteWriter::new(w);
 
         let w = match chunk_size {
             None => TwoWays::One(w),
-            Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size)),
+            Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size, 
exact)),
         };
 
         Ok((rp, w))
diff --git a/core/src/raw/oio/write/chunked_write.rs 
b/core/src/raw/oio/write/chunked_write.rs
index 8eadaa3dd2..c98ad41c97 100644
--- a/core/src/raw/oio/write/chunked_write.rs
+++ b/core/src/raw/oio/write/chunked_write.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytes::Buf;
+
 use crate::raw::*;
 use crate::*;
 
@@ -22,21 +24,27 @@ use crate::*;
 /// flush the underlying storage at the `chunk`` size.
 ///
 /// ChunkedWriter makes sure that the size of the data written to the
-/// underlying storage is exactly `chunk` bytes.
+/// underlying storage is
+/// - exactly `chunk` bytes if `exact` is true
+/// - at least `chunk` bytes if `exact` is false
 pub struct ChunkedWriter<W: oio::Write> {
     inner: W,
 
     /// The size for buffer, we will flush the underlying storage at the size 
of this buffer.
     chunk_size: usize,
+    /// If `exact` is true, the size of the data written to the underlying 
storage is
+    /// exactly `chunk_size` bytes.
+    exact: bool,
     buffer: oio::QueueBuf,
 }
 
 impl<W: oio::Write> ChunkedWriter<W> {
     /// Create a new exact buf writer.
-    pub fn new(inner: W, chunk_size: usize) -> Self {
+    pub fn new(inner: W, chunk_size: usize, exact: bool) -> Self {
         Self {
             inner,
             chunk_size,
+            exact,
             buffer: oio::QueueBuf::new(),
         }
     }
@@ -44,14 +52,48 @@ impl<W: oio::Write> ChunkedWriter<W> {
 
 impl<W: oio::Write> oio::Write for ChunkedWriter<W> {
     async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
-        if self.buffer.len() >= self.chunk_size {
-            let written = 
self.inner.write(self.buffer.clone().collect()).await?;
-            self.buffer.advance(written);
+        if self.exact {
+            if self.buffer.len() >= self.chunk_size {
+                let written = 
self.inner.write(self.buffer.clone().collect()).await?;
+                self.buffer.advance(written);
+            }
+
+            let remaining = self.chunk_size - self.buffer.len();
+            bs.truncate(remaining);
+            let n = bs.len();
+            self.buffer.push(bs);
+            return Ok(n);
+        }
+        // We are in inexact mode.
+
+        if self.buffer.len() + bs.len() < self.chunk_size {
+            // We haven't buffered enough data.
+            let n = bs.len();
+            self.buffer.push(bs);
+            return Ok(n);
+        }
+        // We have enough data to send.
+
+        if self.buffer.is_empty() {
+            // Fast path: Sends the buffer directly if the buffer queue is 
empty.
+            return self.inner.write(bs).await;
         }
 
-        let remaining = self.chunk_size - self.buffer.len();
-        bs.truncate(remaining);
-        let n = bs.len();
+        // If we always push `bs` to the buffer queue, the buffer queue may 
grow infinitely if inner
+        // doesn't fully consume the queue. So we clone the buffer queue and 
send it with `bs` first.
+        let mut buffer = self.buffer.clone();
+        buffer.push(bs.clone());
+        let written = self.inner.write(buffer.collect()).await?;
+        // The number of bytes in `self.buffer` that already written.
+        let queue_written = written.min(self.buffer.len());
+        self.buffer.advance(queue_written);
+        // The number of bytes in `bs` that already written.
+        let bs_written = written - queue_written;
+        // Skip bytes that already written.
+        bs.advance(bs_written);
+        // We already sent `written` bytes so we put more `written` bytes into 
the buffer queue.
+        bs.truncate(written);
+        let n = bs_written + bs.len();
         self.buffer.push(bs);
         Ok(n)
     }
@@ -124,7 +166,7 @@ mod tests {
         let mut expected = vec![0; 5];
         rng.fill_bytes(&mut expected);
 
-        let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10);
+        let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, true);
 
         let mut bs = Bytes::from(expected.clone());
         while !bs.is_empty() {
@@ -142,6 +184,204 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_inexact_buf_writer_large_write() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);
+
+        let mut rng = thread_rng();
+        let mut expected = vec![0; 15];
+        rng.fill_bytes(&mut expected);
+
+        let bs = Bytes::from(expected.clone());
+        // The MockWriter always returns the first chunk size.
+        let n = w.write(bs.into()).await?;
+        assert_eq!(expected.len(), n);
+
+        w.close().await?;
+
+        assert_eq!(w.inner.buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&w.inner.buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_inexact_buf_writer_combine_small() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);
+
+        let mut rng = thread_rng();
+        let mut expected = vec![];
+
+        let mut new_content = |size| {
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+            expected.extend_from_slice(&content);
+            Bytes::from(content)
+        };
+
+        // content > chunk size.
+        let content = new_content(15);
+        assert_eq!(15, w.write(content.into()).await?);
+        // content < chunk size.
+        let content = new_content(5);
+        assert_eq!(5, w.write(content.into()).await?);
+        // content > chunk size, but 5 bytes in queue.
+        let mut content = new_content(15);
+        // The MockWriter can only send 5 bytes each time, so we can only 
advance 5 bytes.
+        assert_eq!(5, w.write(content.clone().into()).await?);
+        content.advance(5);
+        assert_eq!(5, w.write(content.clone().into()).await?);
+        content.advance(5);
+        assert_eq!(5, w.write(content.clone().into()).await?);
+
+        w.close().await?;
+
+        assert_eq!(w.inner.buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&w.inner.buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_inexact_buf_writer_queue_remaining() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);
+
+        let mut rng = thread_rng();
+        let mut expected = vec![];
+
+        let mut new_content = |size| {
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+            expected.extend_from_slice(&content);
+            Bytes::from(content)
+        };
+
+        // content > chunk size.
+        let content = new_content(15);
+        assert_eq!(15, w.write(content.into()).await?);
+        // content < chunk size.
+        let content = new_content(5);
+        assert_eq!(5, w.write(content.into()).await?);
+        // content < chunk size.
+        let content = new_content(3);
+        assert_eq!(3, w.write(content.into()).await?);
+        // content > chunk size, but only sends the first chunk in the queue.
+        let mut content = new_content(15);
+        assert_eq!(5, w.write(content.clone().into()).await?);
+        // queue: 3, 5, bs: 10
+        content.advance(5);
+        assert_eq!(3, w.write(content.clone().into()).await?);
+        // queue: 5, 3, bs: 7
+        content.advance(3);
+        assert_eq!(5, w.write(content.clone().into()).await?);
+        // queue: 3, 5, bs: 2
+        content.advance(5);
+        assert_eq!(2, w.write(content.clone().into()).await?);
+        // queue: 5, 2, bs: empty.
+        content.advance(2);
+        assert!(content.is_empty());
+
+        w.close().await?;
+
+        assert_eq!(w.inner.buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&w.inner.buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+        Ok(())
+    }
+
+    struct PartialWriter {
+        buf: Vec<u8>,
+    }
+
+    impl Write for PartialWriter {
+        async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
+            if Buffer::count(&bs) > 1 {
+                // Always leaves last buffer for non-contiguous buffer.
+                let mut written = 0;
+                while Buffer::count(&bs) > 1 {
+                    let chunk = bs.chunk();
+                    self.buf.extend_from_slice(chunk);
+                    written += chunk.len();
+                    bs.advance(chunk.len());
+                }
+                Ok(written)
+            } else {
+                let chunk = bs.chunk();
+                self.buf.extend_from_slice(chunk);
+                Ok(chunk.len())
+            }
+        }
+
+        async fn close(&mut self) -> Result<()> {
+            Ok(())
+        }
+
+        async fn abort(&mut self) -> Result<()> {
+            Ok(())
+        }
+    }
+
+    #[tokio::test]
+    async fn test_inexact_buf_writer_partial_send() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut w = ChunkedWriter::new(PartialWriter { buf: vec![] }, 10, 
false);
+
+        let mut rng = thread_rng();
+        let mut expected = vec![];
+
+        let mut new_content = |size| {
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+            expected.extend_from_slice(&content);
+            Bytes::from(content)
+        };
+
+        // content < chunk size.
+        let content = new_content(5);
+        assert_eq!(5, w.write(content.into()).await?);
+        // Non-contiguous buffer.
+        let content = Buffer::from(vec![new_content(3), new_content(2)]);
+        assert_eq!(5, w.write(content).await?);
+
+        w.close().await?;
+
+        assert_eq!(w.inner.buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&w.inner.buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_fuzz_exact_buf_writer() -> Result<()> {
         let _ = tracing_subscriber::fmt()
@@ -154,7 +394,7 @@ mod tests {
         let mut expected = vec![];
 
         let buffer_size = rng.gen_range(1..10);
-        let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, 
buffer_size);
+        let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, 
buffer_size, true);
         debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");
 
         for _ in 0..1000 {
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index fd42fe49ff..b06687292c 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -858,12 +858,14 @@ impl Operator {
     ///
     /// ## Chunk
     ///
-    /// OpenDAL is designed to write files directly without chunking by 
default, giving users
-    /// control over the exact size of their writes and helping avoid 
unnecessary costs.
+    /// Some storage services have a minimum chunk size requirement. For 
example, `s3` could return
+    /// hard errors like `EntityTooSmall` if the chunk size is too small. Some 
services like `gcs`
+    /// also return errors if the chunk size is not aligned. Besides, cloud 
storage services will cost
+    /// more money if we write data in small chunks.
     ///
-    /// This is not efficient for cases when users write small chunks of data. 
Some storage services
-    /// like `s3` could even return hard errors like `EntityTooSmall`. 
Besides, cloud storage services
-    /// will cost more money if we write data in small chunks.
+    /// OpenDAL sets the chunk size automatically based on the 
[Capability](crate::types::Capability)
+    /// of the service if users don't set it. Users can set `chunk` to control 
the exact size to send
+    /// to the storage service.
     ///
     /// Users can use [`Operator::writer_with`] to set a good chunk size might 
improve the performance,
     ///
@@ -919,12 +921,14 @@ impl Operator {
     ///
     /// Set `chunk` for the writer.
     ///
-    /// OpenDAL is designed to write files directly without chunking by 
default, giving users
-    /// control over the exact size of their writes and helping avoid 
unnecessary costs.
+    /// Some storage services have a minimum chunk size requirement. For 
example, `s3` could return
+    /// hard errors like `EntityTooSmall` if the chunk size is too small. Some 
services like `gcs`
+    /// also return errors if the chunk size is not aligned. Besides, cloud 
storage services will cost
+    /// more money if we write data in small chunks.
     ///
-    /// This is not efficient for cases when users write small chunks of data. 
Some storage services
-    /// like `s3` could even return hard errors like `EntityTooSmall`. 
Besides, cloud storage services
-    /// will cost more money if we write data in small chunks.
+    /// OpenDAL sets the chunk size automatically based on the 
[Capability](crate::types::Capability)
+    /// of the service if users don't set it. Users can set `chunk` to control 
the exact size to send
+    /// to the storage service.
     ///
     /// Set a good chunk size might improve the performance, reduce the API 
calls and save money.
     ///

Reply via email to