This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch polish-buffer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 32d85ead80257f1fca1078bc09d2743fdbc9ae8a
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 16:47:36 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                 |   2 +
 core/src/raw/oio/buf/chunked_bytes.rs     | 302 ++++++++++++++++++++++++------
 core/src/raw/oio/buf/write_buf.rs         |  25 +++
 core/src/raw/oio/write/exact_buf_write.rs |  82 ++------
 4 files changed, 282 insertions(+), 129 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 338587062..af8c9a205 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -34,6 +34,8 @@ impl oio::Write for BlackHoleWriter {
         _: &mut Context<'_>,
         bs: &dyn oio::WriteBuf,
     ) -> Poll<opendal::Result<usize>> {
+        // Simulate the write operation.
+        let _ = bs.bytes(bs.remaining());
         Poll::Ready(Ok(bs.remaining()))
     }
 
diff --git a/core/src/raw/oio/buf/chunked_bytes.rs 
b/core/src/raw/oio/buf/chunked_bytes.rs
index e1a4df3c0..ae8bcb6d9 100644
--- a/core/src/raw/oio/buf/chunked_bytes.rs
+++ b/core/src/raw/oio/buf/chunked_bytes.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use bytes::{Bytes, BytesMut};
+use std::cmp::min;
 use std::collections::VecDeque;
 use std::io::IoSlice;
 use std::task::{Context, Poll};
@@ -23,10 +24,16 @@ use std::task::{Context, Poll};
 use crate::raw::*;
 use crate::*;
 
+// TODO: 64KiB is picked based on experiences, should be configurable
+const DEFAULT_CHUNK_SIZE: usize = 64 * 1024;
+
 /// ChunkedBytes is used represents a non-contiguous bytes in memory.
 #[derive(Clone)]
 pub struct ChunkedBytes {
-    inner: VecDeque<Bytes>,
+    chunk_size: usize,
+
+    frozen: VecDeque<Bytes>,
+    active: BytesMut,
     size: usize,
 }
 
@@ -37,11 +44,41 @@ impl Default for ChunkedBytes {
 }
 
 impl ChunkedBytes {
-    /// Create a new chunked cursor.
+    /// Create a new chunked bytes.
     pub fn new() -> Self {
         Self {
-            inner: VecDeque::new(),
+            frozen: VecDeque::new(),
+            active: BytesMut::new(),
+            size: 0,
+
+            chunk_size: DEFAULT_CHUNK_SIZE,
+        }
+    }
+
+    /// Create a new chunked cursor with given chunk size.
+    pub fn with_chunk_size(chunk_size: usize) -> Self {
+        Self {
+            frozen: VecDeque::new(),
+            active: BytesMut::new(),
+            size: 0,
+
+            chunk_size,
+        }
+    }
+
+    /// Build a chunked bytes from a vector of bytes.
+    ///
+    /// This function is guaranteed to run in O(1) time and to not re-allocate 
the Vec’s buffer
+    /// or allocate any additional memory.
+    ///
+    /// Reference: 
<https://doc.rust-lang.org/stable/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT,+A%3E%3E-for-VecDeque%3CT,+A%3E>
+    pub fn from_vec(bs: Vec<Bytes>) -> Self {
+        Self {
+            frozen: bs.into(),
+            active: BytesMut::new(),
             size: 0,
+
+            chunk_size: DEFAULT_CHUNK_SIZE,
         }
     }
 
@@ -58,13 +95,73 @@ impl ChunkedBytes {
     /// Clear the entire cursor.
     pub fn clear(&mut self) {
         self.size = 0;
-        self.inner.clear();
+        self.frozen.clear();
+        self.active.clear();
+    }
+
+    /// Push a new bytes into ChunkedBytes.
+    pub fn push(&mut self, mut bs: Bytes) {
+        self.size += bs.len();
+
+        // Try to fill bytes into active first.
+        let remaining = self.chunk_size.saturating_sub(self.active.len());
+        if remaining > 0 {
+            let len = min(remaining, bs.len());
+            self.active.extend_from_slice(&bs.split_to(len));
+        }
+
+        // If active is full, freeze it and push it into frozen.
+        if self.active.len() == self.chunk_size {
+            self.frozen.push_back(self.active.split().freeze());
+        }
+
+        // Split remaining bytes into chunks.
+        while bs.len() >= self.chunk_size {
+            self.frozen.push_back(bs.split_to(self.chunk_size));
+        }
+
+        // Append to active if there are remaining bytes.
+        if !bs.is_empty() {
+            self.active.extend_from_slice(&bs);
+        }
     }
 
-    /// Push a new bytes into vector cursor.
-    pub fn push(&mut self, bs: Bytes) {
+    /// Push a new &[u8] into ChunkedBytes.
+    pub fn extend_from_slice(&mut self, bs: &[u8]) {
         self.size += bs.len();
-        self.inner.push_back(bs);
+
+        let mut remaining = bs;
+
+        while !remaining.is_empty() {
+            let available = self.chunk_size.saturating_sub(self.active.len());
+
+            // available == 0 means self.active.len() >= CHUNK_SIZE
+            if available == 0 {
+                self.frozen.push_back(self.active.split().freeze());
+                self.active.reserve(self.chunk_size);
+                continue;
+            }
+
+            let size = min(remaining.len(), available);
+            self.active.extend_from_slice(&remaining[0..size]);
+
+            remaining = &remaining[size..];
+        }
+    }
+
+    /// Pull data from [`oio::WriteBuf`] into ChunkedBytes.
+    pub fn extend_from_write_buf(&mut self, size: usize, buf: &dyn 
oio::WriteBuf) -> usize {
+        let to_write = min(buf.chunk().len(), size);
+
+        if buf.is_bytes_optimized(to_write) && to_write > self.chunk_size {
+            // If the chunk is optimized, we can just push it directly.
+            self.push(buf.bytes(to_write));
+        } else {
+            // Otherwise, we should copy it into the buffer.
+            self.extend_from_slice(&buf.chunk()[..to_write]);
+        }
+
+        to_write
     }
 }
 
@@ -82,31 +179,39 @@ impl oio::WriteBuf for ChunkedBytes {
         );
 
         self.size -= cnt;
-        loop {
-            if cnt == 0 {
-                break;
-            }
 
-            let bs = self.inner.front_mut().unwrap();
-            if cnt >= bs.len() {
-                cnt -= bs.len();
-                self.inner.pop_front();
+        while cnt > 0 {
+            if let Some(front) = self.frozen.front_mut() {
+                if front.len() <= cnt {
+                    cnt -= front.len();
+                    self.frozen.pop_front(); // Remove the entire chunk.
+                } else {
+                    front.advance(cnt); // Split and keep the remaining part.
+                    break;
+                }
             } else {
-                bs.advance(cnt);
-                cnt = 0;
+                // Here, cnt must be <= self.active.len() due to the checks 
above
+                self.active.advance(cnt); // Remove cnt bytes from the active 
buffer.
+                break;
             }
         }
     }
 
     fn chunk(&self) -> &[u8] {
-        match self.inner.front() {
+        match self.frozen.front() {
             Some(v) => v,
-            None => &[],
+            None => &self.active,
         }
     }
 
     fn vectored_chunk(&self) -> Vec<IoSlice> {
-        self.inner.iter().map(|v| IoSlice::new(v)).collect()
+        let it = self.frozen.iter().map(|v| IoSlice::new(v));
+
+        if !self.active.is_empty() {
+            it.chain([IoSlice::new(&self.active)]).collect()
+        } else {
+            it.collect()
+        }
     }
 
     fn bytes(&self, size: usize) -> Bytes {
@@ -121,29 +226,40 @@ impl oio::WriteBuf for ChunkedBytes {
             return Bytes::new();
         }
 
-        if let Some(bs) = self.inner.front() {
+        if let Some(bs) = self.frozen.front() {
             if size <= bs.len() {
                 return bs.slice(..size);
             }
         }
 
         let mut remaining = size;
-        let mut buf = BytesMut::with_capacity(size);
-        for bs in self.inner.iter() {
+        let mut result = BytesMut::with_capacity(size);
+
+        // First, go through the frozen buffer.
+        for chunk in &self.frozen {
+            let to_copy = min(remaining, chunk.len());
+            result.extend_from_slice(&chunk[0..to_copy]);
+            remaining -= to_copy;
+
             if remaining == 0 {
                 break;
             }
+        }
 
-            if remaining <= bs.len() {
-                buf.extend_from_slice(&bs[..remaining]);
-                break;
-            }
+        // Then, get from the active buffer if necessary.
+        if remaining > 0 {
+            result.extend_from_slice(&self.active[0..remaining]);
+        }
+
+        result.freeze()
+    }
 
-            buf.extend_from_slice(bs);
-            remaining -= bs.len();
+    fn is_bytes_optimized(&self, size: usize) -> bool {
+        if let Some(bs) = self.frozen.front() {
+            return size <= bs.len();
         }
 
-        buf.freeze()
+        false
     }
 
     fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
@@ -156,18 +272,24 @@ impl oio::WriteBuf for ChunkedBytes {
 
         let mut remaining = size;
         let mut buf = vec![];
-        for bs in self.inner.iter() {
+        for bs in self.frozen.iter() {
             if remaining == 0 {
                 break;
             }
 
-            if remaining <= bs.len() {
-                buf.push(bs.slice(..remaining));
-                break;
+            let to_take = min(remaining, bs.len());
+
+            if to_take == bs.len() {
+                buf.push(bs.clone()); // Clone is shallow; no data copy occurs.
+            } else {
+                buf.push(bs.slice(0..to_take));
             }
 
-            buf.push(bs.clone());
-            remaining -= bs.len();
+            remaining -= to_take;
+        }
+
+        if remaining > 0 {
+            buf.push(Bytes::copy_from_slice(&self.active[0..remaining]));
         }
 
         buf
@@ -176,11 +298,15 @@ impl oio::WriteBuf for ChunkedBytes {
 
 impl oio::Stream for ChunkedBytes {
     fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
-        match self.inner.pop_front() {
+        match self.frozen.pop_front() {
             Some(bs) => {
                 self.size -= bs.len();
                 Poll::Ready(Some(Ok(bs)))
             }
+            None if !self.active.is_empty() => {
+                self.size -= self.active.len();
+                Poll::Ready(Some(Ok(self.active.split().freeze())))
+            }
             None => Poll::Ready(None),
         }
     }
@@ -195,14 +321,17 @@ impl oio::Stream for ChunkedBytes {
 
 #[cfg(test)]
 mod tests {
+    use log::debug;
     use pretty_assertions::assert_eq;
+    use rand::{thread_rng, Rng, RngCore};
+    use sha2::{Digest, Sha256};
 
     use super::*;
-    use crate::raw::oio::{StreamExt, WriteBuf};
+    use crate::raw::oio::WriteBuf;
 
     #[test]
     fn test_chunked_bytes_write_buf() -> Result<()> {
-        let mut c = ChunkedBytes::new();
+        let mut c = ChunkedBytes::with_chunk_size(5);
 
         c.push(Bytes::from("hello"));
         assert_eq!(c.len(), 5);
@@ -273,31 +402,80 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_chunked_bytes_stream() -> Result<()> {
-        let mut c = ChunkedBytes::new();
-
-        c.push(Bytes::from("hello"));
-        assert_eq!(c.len(), 5);
-        assert!(!c.is_empty());
-
-        c.push(Bytes::from("world"));
-        assert_eq!(c.len(), 10);
-        assert!(!c.is_empty());
-
-        let bs = c.next().await.unwrap().unwrap();
-        assert_eq!(bs, Bytes::from("hello"));
-        assert_eq!(c.len(), 5);
-        assert!(!c.is_empty());
+    #[test]
+    fn test_fuzz_chunked_bytes_push() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut rng = thread_rng();
+
+        let chunk_size = rng.gen_range(1..10);
+        let mut cb = ChunkedBytes::with_chunk_size(chunk_size);
+        debug!("test_fuzz_chunked_bytes_push: chunk size: {chunk_size}");
+
+        let mut expected = BytesMut::new();
+        for _ in 0..1000 {
+            let size = rng.gen_range(1..20);
+            debug!("test_fuzz_chunked_bytes_push: write size: {size}");
+
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+
+            expected.extend_from_slice(&content);
+            cb.push(Bytes::from(content.clone()));
+
+            let cnt = rng.gen_range(0..expected.len());
+            expected.advance(cnt);
+            cb.advance(cnt);
+
+            assert_eq!(expected.len(), cb.len());
+            assert_eq!(
+                format!("{:x}", Sha256::digest(&expected)),
+                format!("{:x}", Sha256::digest(&cb.bytes(cb.len())))
+            );
+        }
 
-        let bs = c.next().await.unwrap().unwrap();
-        assert_eq!(bs, Bytes::from("world"));
-        assert_eq!(c.len(), 0);
-        assert!(c.is_empty());
+        Ok(())
+    }
 
-        c.clear();
-        assert_eq!(c.len(), 0);
-        assert!(c.is_empty());
+    #[test]
+    fn test_fuzz_chunked_bytes_extend_from_slice() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut rng = thread_rng();
+
+        let chunk_size = rng.gen_range(1..10);
+        let mut cb = ChunkedBytes::with_chunk_size(chunk_size);
+        debug!("test_fuzz_chunked_bytes_extend_from_slice: chunk size: 
{chunk_size}");
+
+        let mut expected = BytesMut::new();
+        for _ in 0..1000 {
+            let size = rng.gen_range(1..20);
+            debug!("test_fuzz_chunked_bytes_extend_from_slice: write size: 
{size}");
+
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+
+            expected.extend_from_slice(&content);
+            cb.extend_from_slice(&content);
+
+            let cnt = rng.gen_range(0..expected.len());
+            expected.advance(cnt);
+            cb.advance(cnt);
+
+            assert_eq!(expected.len(), cb.len());
+            assert_eq!(
+                format!("{:x}", Sha256::digest(&expected)),
+                format!("{:x}", Sha256::digest(&cb.bytes(cb.len())))
+            );
+        }
 
         Ok(())
     }
diff --git a/core/src/raw/oio/buf/write_buf.rs 
b/core/src/raw/oio/buf/write_buf.rs
index 50f3c8911..d70ce640a 100644
--- a/core/src/raw/oio/buf/write_buf.rs
+++ b/core/src/raw/oio/buf/write_buf.rs
@@ -74,6 +74,22 @@ pub trait WriteBuf: Send + Sync {
     /// This function will panic if size > self.remaining().
     fn bytes(&self, size: usize) -> Bytes;
 
+    /// Returns true if the underlying buffer is optimized for bytes with 
given size.
+    ///
+    /// # Notes
+    ///
+    /// This function is used to avoid copy when possible. Implementors should 
return `true`
+    /// the given `self.bytes(size)` could be done without cost. For example, 
the underlying
+    /// buffer is `Bytes`.
+    ///
+    /// # Panics
+    ///
+    /// This function will panic if size > self.remaining().
+    fn is_bytes_optimized(&self, size: usize) -> bool {
+        let _ = size;
+        false
+    }
+
     /// Returns a vectored bytes of the underlying buffer at the current 
position and of
     /// length between 0 and Buf::remaining().
     ///
@@ -114,6 +130,10 @@ macro_rules! deref_forward_buf {
             (**self).bytes(size)
         }
 
+        fn is_bytes_optimized(&self, size: usize) -> bool {
+            (**self).is_bytes_optimized(size)
+        }
+
         fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
             (**self).vectored_bytes(size)
         }
@@ -231,6 +251,11 @@ impl WriteBuf for Bytes {
         self.slice(..size)
     }
 
+    #[inline]
+    fn is_bytes_optimized(&self, _: usize) -> bool {
+        true
+    }
+
     #[inline]
     fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
         vec![self.slice(..size)]
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 144d50ff0..02ab9d926 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -15,14 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::min;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
-use bytes::Bytes;
-use bytes::BytesMut;
 
 use crate::raw::oio::WriteBuf;
 use crate::raw::*;
@@ -44,7 +41,7 @@ pub struct ExactBufWriter<W: oio::Write> {
 
     /// The size for buffer, we will flush the underlying storage at the size 
of this buffer.
     buffer_size: usize,
-    buffer: Buffer,
+    buffer: oio::ChunkedBytes,
 }
 
 impl<W: oio::Write> ExactBufWriter<W> {
@@ -53,83 +50,33 @@ impl<W: oio::Write> ExactBufWriter<W> {
         Self {
             inner,
             buffer_size,
-            buffer: Buffer::Empty,
+            buffer: oio::ChunkedBytes::default(),
         }
     }
 }
 
-enum Buffer {
-    Empty,
-    Filling(BytesMut),
-    Consuming(Bytes),
-}
-
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
-        loop {
-            match &mut self.buffer {
-                Buffer::Empty => {
-                    if bs.remaining() >= self.buffer_size {
-                        self.buffer = 
Buffer::Consuming(bs.bytes(self.buffer_size));
-                        return Poll::Ready(Ok(self.buffer_size));
-                    }
-
-                    let chunk = bs.chunk();
-                    let mut fill = BytesMut::with_capacity(chunk.len());
-                    fill.extend_from_slice(chunk);
-                    self.buffer = Buffer::Filling(fill);
-                    return Poll::Ready(Ok(chunk.len()));
-                }
-                Buffer::Filling(fill) => {
-                    if fill.len() >= self.buffer_size {
-                        self.buffer = Buffer::Consuming(fill.split().freeze());
-                        continue;
-                    }
-
-                    let size = min(self.buffer_size - fill.len(), 
bs.chunk().len());
-                    fill.extend_from_slice(&bs.chunk()[..size]);
-                    return Poll::Ready(Ok(size));
-                }
-                Buffer::Consuming(consume) => {
-                    // Make sure filled buffer has been flushed.
-                    //
-                    // TODO: maybe we can re-fill it after a successful write.
-                    while !consume.is_empty() {
-                        let n = ready!(self.inner.poll_write(cx, consume)?);
-                        consume.advance(n);
-                    }
-                    self.buffer = Buffer::Empty;
-                }
-            }
+        if self.buffer.len() >= self.buffer_size {
+            let written = ready!(self.inner.poll_write(cx, &self.buffer)?);
+            self.buffer.advance(written);
         }
+
+        let remaining = self.buffer_size - self.buffer.len();
+        let written = self.buffer.extend_from_write_buf(remaining, bs);
+        Poll::Ready(Ok(written))
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.buffer = Buffer::Empty;
+        self.buffer.clear();
         self.inner.poll_abort(cx)
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        loop {
-            match &mut self.buffer {
-                Buffer::Empty => break,
-                Buffer::Filling(fill) => {
-                    self.buffer = Buffer::Consuming(fill.split().freeze());
-                    continue;
-                }
-                Buffer::Consuming(consume) => {
-                    // Make sure filled buffer has been flushed.
-                    //
-                    // TODO: maybe we can re-fill it after a successful write.
-                    while !consume.is_empty() {
-                        let n = ready!(self.inner.poll_write(cx, &consume))?;
-                        consume.advance(n);
-                    }
-                    self.buffer = Buffer::Empty;
-                    break;
-                }
-            }
+        while !self.buffer.is_empty() {
+            let n = ready!(self.inner.poll_write(cx, &self.buffer))?;
+            self.buffer.advance(n);
         }
 
         self.inner.poll_close(cx)
@@ -138,6 +85,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
 #[cfg(test)]
 mod tests {
+    use bytes::Bytes;
     use log::debug;
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
@@ -231,7 +179,7 @@ mod tests {
             let mut bs = Bytes::from(content.clone());
             while !bs.is_empty() {
                 let n = writer.write(&bs).await?;
-                bs.advance(n as usize);
+                bs.advance(n);
             }
         }
         writer.close().await?;

Reply via email to