This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b83c710d refactor: Add ChunkedBytes to improve the exact buf write 
(#3035)
2b83c710d is described below

commit 2b83c710d1ecd68d2b5563d3e89da38931b36f2a
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 17:20:44 2023 +0800

    refactor: Add ChunkedBytes to improve the exact buf write (#3035)
    
    * Remove not used vector cursor
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Rename to write buf
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Rename to buf
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Implement chunked bytes
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Save work
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Polish write
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .typos.toml                                   |   1 +
 core/src/raw/oio/buf/chunked_bytes.rs         | 495 ++++++++++++++++++++++++++
 core/src/raw/oio/{ => buf}/mod.rs             |  32 +-
 core/src/raw/oio/{buf.rs => buf/write_buf.rs} |  25 ++
 core/src/raw/oio/cursor.rs                    | 184 +---------
 core/src/raw/oio/mod.rs                       |   3 +-
 core/src/raw/oio/write/exact_buf_write.rs     |  82 +----
 7 files changed, 542 insertions(+), 280 deletions(-)

diff --git a/.typos.toml b/.typos.toml
index 259a4133e..5b2bca99b 100644
--- a/.typos.toml
+++ b/.typos.toml
@@ -20,5 +20,6 @@
 "Dum" = "Dum"
 "ba" = "ba"
 "Hel" = "Hel"
+"hellow" = "hellow"
 # Showed up in examples.
 "thw" = "thw"
diff --git a/core/src/raw/oio/buf/chunked_bytes.rs 
b/core/src/raw/oio/buf/chunked_bytes.rs
new file mode 100644
index 000000000..82447dc62
--- /dev/null
+++ b/core/src/raw/oio/buf/chunked_bytes.rs
@@ -0,0 +1,495 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::io::IoSlice;
+use std::task::{Context, Poll};
+
+use crate::raw::*;
+use crate::*;
+
+// TODO: 64KiB is picked based on experiences, should be configurable
+const DEFAULT_CHUNK_SIZE: usize = 64 * 1024;
+
+/// ChunkedBytes is used represents a non-contiguous bytes in memory.
+#[derive(Clone)]
+pub struct ChunkedBytes {
+    chunk_size: usize,
+
+    frozen: VecDeque<Bytes>,
+    active: BytesMut,
+    size: usize,
+}
+
+impl Default for ChunkedBytes {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ChunkedBytes {
+    /// Create a new chunked bytes.
+    pub fn new() -> Self {
+        Self {
+            frozen: VecDeque::new(),
+            active: BytesMut::new(),
+            size: 0,
+
+            chunk_size: DEFAULT_CHUNK_SIZE,
+        }
+    }
+
+    /// Create a new chunked cursor with given chunk size.
+    pub fn with_chunk_size(chunk_size: usize) -> Self {
+        Self {
+            frozen: VecDeque::new(),
+            active: BytesMut::new(),
+            size: 0,
+
+            chunk_size,
+        }
+    }
+
+    /// Build a chunked bytes from a vector of bytes.
+    ///
+    /// This function is guaranteed to run in O(1) time and to not re-allocate 
the Vec’s buffer
+    /// or allocate any additional memory.
+    ///
+    /// Reference: 
<https://doc.rust-lang.org/stable/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT,+A%3E%3E-for-VecDeque%3CT,+A%3E>
+    pub fn from_vec(bs: Vec<Bytes>) -> Self {
+        Self {
+            frozen: bs.into(),
+            active: BytesMut::new(),
+            size: 0,
+
+            chunk_size: DEFAULT_CHUNK_SIZE,
+        }
+    }
+
+    /// Returns `true` if current cursor is empty.
+    pub fn is_empty(&self) -> bool {
+        self.size == 0
+    }
+
+    /// Return current bytes size of cursor.
+    pub fn len(&self) -> usize {
+        self.size
+    }
+
+    /// Clear the entire cursor.
+    pub fn clear(&mut self) {
+        self.size = 0;
+        self.frozen.clear();
+        self.active.clear();
+    }
+
+    /// Push a new bytes into ChunkedBytes.
+    pub fn push(&mut self, mut bs: Bytes) {
+        self.size += bs.len();
+
+        // Optimization: if active is empty, we can push to frozen directly if 
possible.
+        if self.active.is_empty() {
+            let aligned_size = bs.len() - bs.len() % self.chunk_size;
+            if aligned_size > 0 {
+                self.frozen.push_back(bs.split_to(aligned_size));
+            }
+            if !bs.is_empty() {
+                self.active.extend_from_slice(&bs);
+            }
+            return;
+        }
+
+        // Try to fill bytes into active first.
+        let remaining = self.chunk_size.saturating_sub(self.active.len());
+        if remaining > 0 {
+            let len = min(remaining, bs.len());
+            self.active.extend_from_slice(&bs.split_to(len));
+        }
+
+        // If active is full, freeze it and push it into frozen.
+        if self.active.len() == self.chunk_size {
+            self.frozen.push_back(self.active.split().freeze());
+        }
+
+        // Split remaining bytes into chunks.
+        let aligned_size = bs.len() - bs.len() % self.chunk_size;
+        if aligned_size > 0 {
+            self.frozen.push_back(bs.split_to(aligned_size));
+        }
+
+        // Append to active if there are remaining bytes.
+        if !bs.is_empty() {
+            self.active.extend_from_slice(&bs);
+        }
+    }
+
+    /// Push a new &[u8] into ChunkedBytes.
+    pub fn extend_from_slice(&mut self, bs: &[u8]) {
+        self.size += bs.len();
+
+        let mut remaining = bs;
+
+        while !remaining.is_empty() {
+            let available = self.chunk_size.saturating_sub(self.active.len());
+
+            // available == 0 means self.active.len() >= CHUNK_SIZE
+            if available == 0 {
+                self.frozen.push_back(self.active.split().freeze());
+                self.active.reserve(self.chunk_size);
+                continue;
+            }
+
+            let size = min(remaining.len(), available);
+            self.active.extend_from_slice(&remaining[0..size]);
+
+            remaining = &remaining[size..];
+        }
+    }
+
+    /// Pull data from [`oio::WriteBuf`] into ChunkedBytes.
+    pub fn extend_from_write_buf(&mut self, size: usize, buf: &dyn 
oio::WriteBuf) -> usize {
+        let to_write = min(buf.chunk().len(), size);
+
+        if buf.is_bytes_optimized(to_write) && to_write > self.chunk_size {
+            // If the chunk is optimized, we can just push it directly.
+            self.push(buf.bytes(to_write));
+        } else {
+            // Otherwise, we should copy it into the buffer.
+            self.extend_from_slice(&buf.chunk()[..to_write]);
+        }
+
+        to_write
+    }
+}
+
+impl oio::WriteBuf for ChunkedBytes {
+    fn remaining(&self) -> usize {
+        self.size
+    }
+
+    fn advance(&mut self, mut cnt: usize) {
+        debug_assert!(
+            cnt <= self.size,
+            "cnt size {} is larger than bytes size {}",
+            cnt,
+            self.size
+        );
+
+        self.size -= cnt;
+
+        while cnt > 0 {
+            if let Some(front) = self.frozen.front_mut() {
+                if front.len() <= cnt {
+                    cnt -= front.len();
+                    self.frozen.pop_front(); // Remove the entire chunk.
+                } else {
+                    front.advance(cnt); // Split and keep the remaining part.
+                    break;
+                }
+            } else {
+                // Here, cnt must be <= self.active.len() due to the checks 
above
+                self.active.advance(cnt); // Remove cnt bytes from the active 
buffer.
+                break;
+            }
+        }
+    }
+
+    fn chunk(&self) -> &[u8] {
+        match self.frozen.front() {
+            Some(v) => v,
+            None => &self.active,
+        }
+    }
+
+    fn vectored_chunk(&self) -> Vec<IoSlice> {
+        let it = self.frozen.iter().map(|v| IoSlice::new(v));
+
+        if !self.active.is_empty() {
+            it.chain([IoSlice::new(&self.active)]).collect()
+        } else {
+            it.collect()
+        }
+    }
+
+    fn bytes(&self, size: usize) -> Bytes {
+        debug_assert!(
+            size <= self.size,
+            "input size {} is larger than bytes size {}",
+            size,
+            self.size
+        );
+
+        if size == 0 {
+            return Bytes::new();
+        }
+
+        if let Some(bs) = self.frozen.front() {
+            if size <= bs.len() {
+                return bs.slice(..size);
+            }
+        }
+
+        let mut remaining = size;
+        let mut result = BytesMut::with_capacity(size);
+
+        // First, go through the frozen buffer.
+        for chunk in &self.frozen {
+            let to_copy = min(remaining, chunk.len());
+            result.extend_from_slice(&chunk[0..to_copy]);
+            remaining -= to_copy;
+
+            if remaining == 0 {
+                break;
+            }
+        }
+
+        // Then, get from the active buffer if necessary.
+        if remaining > 0 {
+            result.extend_from_slice(&self.active[0..remaining]);
+        }
+
+        result.freeze()
+    }
+
+    fn is_bytes_optimized(&self, size: usize) -> bool {
+        if let Some(bs) = self.frozen.front() {
+            return size <= bs.len();
+        }
+
+        false
+    }
+
+    fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+        debug_assert!(
+            size <= self.size,
+            "input size {} is larger than bytes size {}",
+            size,
+            self.size
+        );
+
+        let mut remaining = size;
+        let mut buf = vec![];
+        for bs in self.frozen.iter() {
+            if remaining == 0 {
+                break;
+            }
+
+            let to_take = min(remaining, bs.len());
+
+            if to_take == bs.len() {
+                buf.push(bs.clone()); // Clone is shallow; no data copy occurs.
+            } else {
+                buf.push(bs.slice(0..to_take));
+            }
+
+            remaining -= to_take;
+        }
+
+        if remaining > 0 {
+            buf.push(Bytes::copy_from_slice(&self.active[0..remaining]));
+        }
+
+        buf
+    }
+}
+
+impl oio::Stream for ChunkedBytes {
+    fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.frozen.pop_front() {
+            Some(bs) => {
+                self.size -= bs.len();
+                Poll::Ready(Some(Ok(bs)))
+            }
+            None if !self.active.is_empty() => {
+                self.size -= self.active.len();
+                Poll::Ready(Some(Ok(self.active.split().freeze())))
+            }
+            None => Poll::Ready(None),
+        }
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "ChunkedBytes does not support reset",
+        )))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use log::debug;
+    use pretty_assertions::assert_eq;
+    use rand::{thread_rng, Rng, RngCore};
+    use sha2::{Digest, Sha256};
+
+    use super::*;
+    use crate::raw::oio::WriteBuf;
+
+    #[test]
+    fn test_chunked_bytes_write_buf() -> Result<()> {
+        let mut c = ChunkedBytes::with_chunk_size(5);
+
+        c.push(Bytes::from("hello"));
+        assert_eq!(c.len(), 5);
+        assert!(!c.is_empty());
+
+        c.push(Bytes::from("world"));
+        assert_eq!(c.len(), 10);
+        assert!(!c.is_empty());
+
+        // Test chunk
+        let bs = c.chunk();
+        assert_eq!(bs, "hello".as_bytes());
+        assert_eq!(c.len(), 10);
+        assert!(!c.is_empty());
+
+        // The second chunk should return the same content.
+        let bs = c.chunk();
+        assert_eq!(bs, "hello".as_bytes());
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test vectored chunk
+        let bs = c.vectored_chunk();
+        assert_eq!(
+            bs.iter().map(|v| v.as_ref()).collect::<Vec<_>>(),
+            vec!["hello".as_bytes(), "world".as_bytes()]
+        );
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test bytes
+        let bs = c.bytes(4);
+        assert_eq!(bs, Bytes::from("hell"));
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test bytes again
+        let bs = c.bytes(6);
+        assert_eq!(bs, Bytes::from("hellow"));
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test vectored bytes
+        let bs = c.vectored_bytes(4);
+        assert_eq!(bs, vec![Bytes::from("hell")]);
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test vectored bytes again
+        let bs = c.vectored_bytes(6);
+        assert_eq!(bs, vec![Bytes::from("hello"), Bytes::from("w")]);
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test Advance.
+        c.advance(4);
+
+        // Test chunk
+        let bs = c.chunk();
+        assert_eq!(bs, "o".as_bytes());
+        assert_eq!(c.len(), 6);
+        assert!(!c.is_empty());
+
+        c.clear();
+        assert_eq!(c.len(), 0);
+        assert!(c.is_empty());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_fuzz_chunked_bytes_push() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut rng = thread_rng();
+
+        let chunk_size = rng.gen_range(1..10);
+        let mut cb = ChunkedBytes::with_chunk_size(chunk_size);
+        debug!("test_fuzz_chunked_bytes_push: chunk size: {chunk_size}");
+
+        let mut expected = BytesMut::new();
+        for _ in 0..1000 {
+            let size = rng.gen_range(1..20);
+            debug!("test_fuzz_chunked_bytes_push: write size: {size}");
+
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+
+            expected.extend_from_slice(&content);
+            cb.push(Bytes::from(content.clone()));
+
+            let cnt = rng.gen_range(0..expected.len());
+            expected.advance(cnt);
+            cb.advance(cnt);
+
+            assert_eq!(expected.len(), cb.len());
+            assert_eq!(
+                format!("{:x}", Sha256::digest(&expected)),
+                format!("{:x}", Sha256::digest(&cb.bytes(cb.len())))
+            );
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_fuzz_chunked_bytes_extend_from_slice() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut rng = thread_rng();
+
+        let chunk_size = rng.gen_range(1..10);
+        let mut cb = ChunkedBytes::with_chunk_size(chunk_size);
+        debug!("test_fuzz_chunked_bytes_extend_from_slice: chunk size: 
{chunk_size}");
+
+        let mut expected = BytesMut::new();
+        for _ in 0..1000 {
+            let size = rng.gen_range(1..20);
+            debug!("test_fuzz_chunked_bytes_extend_from_slice: write size: 
{size}");
+
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+
+            expected.extend_from_slice(&content);
+            cb.extend_from_slice(&content);
+
+            let cnt = rng.gen_range(0..expected.len());
+            expected.advance(cnt);
+            cb.advance(cnt);
+
+            assert_eq!(expected.len(), cb.len());
+            assert_eq!(
+                format!("{:x}", Sha256::digest(&expected)),
+                format!("{:x}", Sha256::digest(&cb.bytes(cb.len())))
+            );
+        }
+
+        Ok(())
+    }
+}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/buf/mod.rs
similarity index 59%
copy from core/src/raw/oio/mod.rs
copy to core/src/raw/oio/buf/mod.rs
index 8c353ca50..dfd3663e5 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/buf/mod.rs
@@ -15,32 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! `oio` provides OpenDAL's raw traits and types that opendal returns as
-//! output.
-//!
-//! Those types should only be used internally and we don't want users to
-//! depend on them. So we should also implement trait like `AsyncRead` for
-//! our `output` traits.
+mod chunked_bytes;
+pub use chunked_bytes::ChunkedBytes;
 
-mod read;
-pub use read::*;
-
-mod write;
-pub use write::*;
-
-mod stream;
-pub use stream::*;
-
-mod page;
-pub use page::*;
-
-mod cursor;
-pub use cursor::ChunkedCursor;
-pub use cursor::Cursor;
-pub use cursor::VectorCursor;
-
-mod entry;
-pub use entry::Entry;
-
-mod buf;
-pub use buf::WriteBuf;
+mod write_buf;
+pub use write_buf::WriteBuf;
diff --git a/core/src/raw/oio/buf.rs b/core/src/raw/oio/buf/write_buf.rs
similarity index 90%
rename from core/src/raw/oio/buf.rs
rename to core/src/raw/oio/buf/write_buf.rs
index 50f3c8911..d70ce640a 100644
--- a/core/src/raw/oio/buf.rs
+++ b/core/src/raw/oio/buf/write_buf.rs
@@ -74,6 +74,22 @@ pub trait WriteBuf: Send + Sync {
     /// This function will panic if size > self.remaining().
     fn bytes(&self, size: usize) -> Bytes;
 
+    /// Returns true if the underlying buffer is optimized for bytes with 
given size.
+    ///
+    /// # Notes
+    ///
+    /// This function is used to avoid copy when possible. Implementors should 
return `true`
+    /// the given `self.bytes(size)` could be done without cost. For example, 
the underlying
+    /// buffer is `Bytes`.
+    ///
+    /// # Panics
+    ///
+    /// This function will panic if size > self.remaining().
+    fn is_bytes_optimized(&self, size: usize) -> bool {
+        let _ = size;
+        false
+    }
+
     /// Returns a vectored bytes of the underlying buffer at the current 
position and of
     /// length between 0 and Buf::remaining().
     ///
@@ -114,6 +130,10 @@ macro_rules! deref_forward_buf {
             (**self).bytes(size)
         }
 
+        fn is_bytes_optimized(&self, size: usize) -> bool {
+            (**self).is_bytes_optimized(size)
+        }
+
         fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
             (**self).vectored_bytes(size)
         }
@@ -231,6 +251,11 @@ impl WriteBuf for Bytes {
         self.slice(..size)
     }
 
+    #[inline]
+    fn is_bytes_optimized(&self, _: usize) -> bool {
+        true
+    }
+
     #[inline]
     fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
         vec![self.slice(..size)]
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 2c79451c1..ddead52c9 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -22,9 +22,7 @@ use std::io::SeekFrom;
 use std::task::Context;
 use std::task::Poll;
 
-use bytes::Buf;
 use bytes::Bytes;
-use bytes::BytesMut;
 
 use crate::raw::*;
 use crate::*;
@@ -318,7 +316,7 @@ impl ChunkedCursor {
 
     #[cfg(test)]
     fn concat(&self) -> Bytes {
-        let mut bs = BytesMut::new();
+        let mut bs = bytes::BytesMut::new();
         for v in self.inner.iter().skip(self.idx) {
             bs.extend_from_slice(v);
         }
@@ -343,166 +341,6 @@ impl oio::Stream for ChunkedCursor {
     }
 }
 
-/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements 
[`oio::Stream`]
-pub struct VectorCursor {
-    inner: VecDeque<Bytes>,
-    size: usize,
-}
-
-impl Default for VectorCursor {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl VectorCursor {
-    /// Create a new vector cursor.
-    pub fn new() -> Self {
-        Self {
-            inner: VecDeque::new(),
-            size: 0,
-        }
-    }
-
-    /// Returns `true` if current vector is empty.
-    pub fn is_empty(&self) -> bool {
-        self.size == 0
-    }
-
-    /// Return current bytes size of current vector.
-    pub fn len(&self) -> usize {
-        self.size
-    }
-
-    /// Push a new bytes into vector cursor.
-    pub fn push(&mut self, bs: Bytes) {
-        self.size += bs.len();
-        self.inner.push_back(bs);
-    }
-
-    /// Pop a bytes from vector cursor.
-    pub fn pop(&mut self) {
-        let bs = self.inner.pop_back();
-        self.size -= bs.expect("pop bytes must exist").len()
-    }
-
-    /// Clear the entire vector.
-    pub fn clear(&mut self) {
-        self.inner.clear();
-        self.size = 0;
-    }
-
-    /// Peak will read and copy exactly n bytes from current cursor
-    /// without change it's content.
-    ///
-    /// This function is useful if you want to read a fixed size
-    /// content to make sure it aligned.
-    ///
-    /// # Panics
-    ///
-    /// Panics if n is larger than current size.
-    ///
-    /// # TODO
-    ///
-    /// Optimize to avoid data copy.
-    pub fn peak_exact(&self, n: usize) -> Bytes {
-        assert!(n <= self.size, "peak size must smaller than current size");
-
-        // Avoid data copy if n is smaller than first chunk.
-        if self.inner[0].len() >= n {
-            return self.inner[0].slice(..n);
-        }
-
-        let mut bs = BytesMut::with_capacity(n);
-        let mut n = n;
-        for b in &self.inner {
-            if n == 0 {
-                break;
-            }
-            let len = b.len().min(n);
-            bs.extend_from_slice(&b[..len]);
-            n -= len;
-        }
-        bs.freeze()
-    }
-
-    /// peak_at_least will read and copy at least n bytes from current
-    /// cursor without change it's content.
-    ///
-    /// This function is useful if you only want to make sure the
-    /// returning bytes is larger.
-    ///
-    /// # Panics
-    ///
-    /// Panics if n is larger than current size.
-    ///
-    /// # TODO
-    ///
-    /// Optimize to avoid data copy.
-    pub fn peak_at_least(&self, n: usize) -> Bytes {
-        assert!(n <= self.size, "peak size must smaller than current size");
-
-        // Avoid data copy if n is smaller than first chunk.
-        if self.inner[0].len() >= n {
-            return self.inner[0].clone();
-        }
-
-        let mut bs = BytesMut::with_capacity(n);
-        let mut n = n;
-        for b in &self.inner {
-            if n == 0 {
-                break;
-            }
-            let len = b.len().min(n);
-            bs.extend_from_slice(&b[..len]);
-            n -= len;
-        }
-        bs.freeze()
-    }
-
-    /// peak all will read and copy all bytes from current cursor
-    /// without change it's content.
-    ///
-    /// TODO: we need to find a way to avoid copy all content here.
-    pub fn peak_all(&self) -> Bytes {
-        // Avoid data copy if we only have one bytes.
-        if self.inner.len() == 1 {
-            return self.inner[0].clone();
-        }
-
-        let mut bs = BytesMut::with_capacity(self.len());
-        for b in &self.inner {
-            bs.extend_from_slice(b);
-        }
-        bs.freeze()
-    }
-
-    /// Take will consume n bytes from current cursor.
-    ///
-    /// # Panics
-    ///
-    /// Panics if n is larger than current size.
-    pub fn take(&mut self, n: usize) {
-        assert!(n <= self.size, "take size must smamller than current size");
-
-        // Update current size
-        self.size -= n;
-
-        let mut n = n;
-        while n > 0 {
-            assert!(!self.inner.is_empty(), "inner must not be empty");
-
-            if self.inner[0].len() <= n {
-                n -= self.inner[0].len();
-                self.inner.pop_front();
-            } else {
-                self.inner[0].advance(n);
-                n = 0;
-            }
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use pretty_assertions::assert_eq;
@@ -515,26 +353,6 @@ mod tests {
     use super::*;
     use crate::raw::oio::StreamExt;
 
-    #[test]
-    fn test_vector_cursor() {
-        let mut vc = VectorCursor::new();
-
-        vc.push(Bytes::from("hello"));
-        vc.push(Bytes::from("world"));
-
-        assert_eq!(vc.peak_exact(1), Bytes::from("h"));
-        assert_eq!(vc.peak_exact(1), Bytes::from("h"));
-        assert_eq!(vc.peak_exact(4), Bytes::from("hell"));
-        assert_eq!(vc.peak_exact(10), Bytes::from("helloworld"));
-
-        vc.take(1);
-        assert_eq!(vc.peak_exact(1), Bytes::from("e"));
-        vc.take(1);
-        assert_eq!(vc.peak_exact(1), Bytes::from("l"));
-        vc.take(5);
-        assert_eq!(vc.peak_exact(1), Bytes::from("r"));
-    }
-
     #[tokio::test]
     async fn test_chunked_cursor() -> Result<()> {
         let mut c = ChunkedCursor::new();
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 8c353ca50..29cf8e474 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -37,10 +37,9 @@ pub use page::*;
 mod cursor;
 pub use cursor::ChunkedCursor;
 pub use cursor::Cursor;
-pub use cursor::VectorCursor;
 
 mod entry;
 pub use entry::Entry;
 
 mod buf;
-pub use buf::WriteBuf;
+pub use buf::*;
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 144d50ff0..02ab9d926 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -15,14 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::min;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
-use bytes::Bytes;
-use bytes::BytesMut;
 
 use crate::raw::oio::WriteBuf;
 use crate::raw::*;
@@ -44,7 +41,7 @@ pub struct ExactBufWriter<W: oio::Write> {
 
     /// The size for buffer, we will flush the underlying storage at the size 
of this buffer.
     buffer_size: usize,
-    buffer: Buffer,
+    buffer: oio::ChunkedBytes,
 }
 
 impl<W: oio::Write> ExactBufWriter<W> {
@@ -53,83 +50,33 @@ impl<W: oio::Write> ExactBufWriter<W> {
         Self {
             inner,
             buffer_size,
-            buffer: Buffer::Empty,
+            buffer: oio::ChunkedBytes::default(),
         }
     }
 }
 
-enum Buffer {
-    Empty,
-    Filling(BytesMut),
-    Consuming(Bytes),
-}
-
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
-        loop {
-            match &mut self.buffer {
-                Buffer::Empty => {
-                    if bs.remaining() >= self.buffer_size {
-                        self.buffer = 
Buffer::Consuming(bs.bytes(self.buffer_size));
-                        return Poll::Ready(Ok(self.buffer_size));
-                    }
-
-                    let chunk = bs.chunk();
-                    let mut fill = BytesMut::with_capacity(chunk.len());
-                    fill.extend_from_slice(chunk);
-                    self.buffer = Buffer::Filling(fill);
-                    return Poll::Ready(Ok(chunk.len()));
-                }
-                Buffer::Filling(fill) => {
-                    if fill.len() >= self.buffer_size {
-                        self.buffer = Buffer::Consuming(fill.split().freeze());
-                        continue;
-                    }
-
-                    let size = min(self.buffer_size - fill.len(), 
bs.chunk().len());
-                    fill.extend_from_slice(&bs.chunk()[..size]);
-                    return Poll::Ready(Ok(size));
-                }
-                Buffer::Consuming(consume) => {
-                    // Make sure filled buffer has been flushed.
-                    //
-                    // TODO: maybe we can re-fill it after a successful write.
-                    while !consume.is_empty() {
-                        let n = ready!(self.inner.poll_write(cx, consume)?);
-                        consume.advance(n);
-                    }
-                    self.buffer = Buffer::Empty;
-                }
-            }
+        if self.buffer.len() >= self.buffer_size {
+            let written = ready!(self.inner.poll_write(cx, &self.buffer)?);
+            self.buffer.advance(written);
         }
+
+        let remaining = self.buffer_size - self.buffer.len();
+        let written = self.buffer.extend_from_write_buf(remaining, bs);
+        Poll::Ready(Ok(written))
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.buffer = Buffer::Empty;
+        self.buffer.clear();
         self.inner.poll_abort(cx)
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        loop {
-            match &mut self.buffer {
-                Buffer::Empty => break,
-                Buffer::Filling(fill) => {
-                    self.buffer = Buffer::Consuming(fill.split().freeze());
-                    continue;
-                }
-                Buffer::Consuming(consume) => {
-                    // Make sure filled buffer has been flushed.
-                    //
-                    // TODO: maybe we can re-fill it after a successful write.
-                    while !consume.is_empty() {
-                        let n = ready!(self.inner.poll_write(cx, &consume))?;
-                        consume.advance(n);
-                    }
-                    self.buffer = Buffer::Empty;
-                    break;
-                }
-            }
+        while !self.buffer.is_empty() {
+            let n = ready!(self.inner.poll_write(cx, &self.buffer))?;
+            self.buffer.advance(n);
         }
 
         self.inner.poll_close(cx)
@@ -138,6 +85,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
 #[cfg(test)]
 mod tests {
+    use bytes::Bytes;
     use log::debug;
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
@@ -231,7 +179,7 @@ mod tests {
             let mut bs = Bytes::from(content.clone());
             while !bs.is_empty() {
                 let n = writer.write(&bs).await?;
-                bs.advance(n as usize);
+                bs.advance(n);
             }
         }
         writer.close().await?;

Reply via email to