This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch polish-buffer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit d7207edaa72dd0597f1dd7a1e25d6fc6104ccf15
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 14:32:25 2023 +0800

    Implement chunked bytes
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/buf/chunked_bytes.rs | 304 ++++++++++++++++++++++++++++++++++
 core/src/raw/oio/buf/mod.rs           |   3 +
 core/src/raw/oio/mod.rs               |   2 +-
 3 files changed, 308 insertions(+), 1 deletion(-)

diff --git a/core/src/raw/oio/buf/chunked_bytes.rs 
b/core/src/raw/oio/buf/chunked_bytes.rs
new file mode 100644
index 000000000..e1a4df3c0
--- /dev/null
+++ b/core/src/raw/oio/buf/chunked_bytes.rs
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+use std::collections::VecDeque;
+use std::io::IoSlice;
+use std::task::{Context, Poll};
+
+use crate::raw::*;
+use crate::*;
+
+/// ChunkedBytes is used represents a non-contiguous bytes in memory.
+#[derive(Clone)]
+pub struct ChunkedBytes {
+    inner: VecDeque<Bytes>,
+    size: usize,
+}
+
+impl Default for ChunkedBytes {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ChunkedBytes {
+    /// Create a new chunked cursor.
+    pub fn new() -> Self {
+        Self {
+            inner: VecDeque::new(),
+            size: 0,
+        }
+    }
+
+    /// Returns `true` if current cursor is empty.
+    pub fn is_empty(&self) -> bool {
+        self.size == 0
+    }
+
+    /// Return current bytes size of cursor.
+    pub fn len(&self) -> usize {
+        self.size
+    }
+
+    /// Clear the entire cursor.
+    pub fn clear(&mut self) {
+        self.size = 0;
+        self.inner.clear();
+    }
+
+    /// Push a new bytes into vector cursor.
+    pub fn push(&mut self, bs: Bytes) {
+        self.size += bs.len();
+        self.inner.push_back(bs);
+    }
+}
+
+impl oio::WriteBuf for ChunkedBytes {
+    fn remaining(&self) -> usize {
+        self.size
+    }
+
+    fn advance(&mut self, mut cnt: usize) {
+        debug_assert!(
+            cnt <= self.size,
+            "cnt size {} is larger than bytes size {}",
+            cnt,
+            self.size
+        );
+
+        self.size -= cnt;
+        loop {
+            if cnt == 0 {
+                break;
+            }
+
+            let bs = self.inner.front_mut().unwrap();
+            if cnt >= bs.len() {
+                cnt -= bs.len();
+                self.inner.pop_front();
+            } else {
+                bs.advance(cnt);
+                cnt = 0;
+            }
+        }
+    }
+
+    fn chunk(&self) -> &[u8] {
+        match self.inner.front() {
+            Some(v) => v,
+            None => &[],
+        }
+    }
+
+    fn vectored_chunk(&self) -> Vec<IoSlice> {
+        self.inner.iter().map(|v| IoSlice::new(v)).collect()
+    }
+
+    fn bytes(&self, size: usize) -> Bytes {
+        debug_assert!(
+            size <= self.size,
+            "input size {} is larger than bytes size {}",
+            size,
+            self.size
+        );
+
+        if size == 0 {
+            return Bytes::new();
+        }
+
+        if let Some(bs) = self.inner.front() {
+            if size <= bs.len() {
+                return bs.slice(..size);
+            }
+        }
+
+        let mut remaining = size;
+        let mut buf = BytesMut::with_capacity(size);
+        for bs in self.inner.iter() {
+            if remaining == 0 {
+                break;
+            }
+
+            if remaining <= bs.len() {
+                buf.extend_from_slice(&bs[..remaining]);
+                break;
+            }
+
+            buf.extend_from_slice(bs);
+            remaining -= bs.len();
+        }
+
+        buf.freeze()
+    }
+
+    fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+        debug_assert!(
+            size <= self.size,
+            "input size {} is larger than bytes size {}",
+            size,
+            self.size
+        );
+
+        let mut remaining = size;
+        let mut buf = vec![];
+        for bs in self.inner.iter() {
+            if remaining == 0 {
+                break;
+            }
+
+            if remaining <= bs.len() {
+                buf.push(bs.slice(..remaining));
+                break;
+            }
+
+            buf.push(bs.clone());
+            remaining -= bs.len();
+        }
+
+        buf
+    }
+}
+
+impl oio::Stream for ChunkedBytes {
+    fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.inner.pop_front() {
+            Some(bs) => {
+                self.size -= bs.len();
+                Poll::Ready(Some(Ok(bs)))
+            }
+            None => Poll::Ready(None),
+        }
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "ChunkedBytes does not support reset",
+        )))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use pretty_assertions::assert_eq;
+
+    use super::*;
+    use crate::raw::oio::{StreamExt, WriteBuf};
+
+    #[test]
+    fn test_chunked_bytes_write_buf() -> Result<()> {
+        let mut c = ChunkedBytes::new();
+
+        c.push(Bytes::from("hello"));
+        assert_eq!(c.len(), 5);
+        assert!(!c.is_empty());
+
+        c.push(Bytes::from("world"));
+        assert_eq!(c.len(), 10);
+        assert!(!c.is_empty());
+
+        // Test chunk
+        let bs = c.chunk();
+        assert_eq!(bs, "hello".as_bytes());
+        assert_eq!(c.len(), 10);
+        assert!(!c.is_empty());
+
+        // The second chunk should return the same content.
+        let bs = c.chunk();
+        assert_eq!(bs, "hello".as_bytes());
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test vectored chunk
+        let bs = c.vectored_chunk();
+        assert_eq!(
+            bs.iter().map(|v| v.as_ref()).collect::<Vec<_>>(),
+            vec!["hello".as_bytes(), "world".as_bytes()]
+        );
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test bytes
+        let bs = c.bytes(4);
+        assert_eq!(bs, Bytes::from("hell"));
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test bytes again
+        let bs = c.bytes(6);
+        assert_eq!(bs, Bytes::from("hellow"));
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test vectored bytes
+        let bs = c.vectored_bytes(4);
+        assert_eq!(bs, vec![Bytes::from("hell")]);
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test vectored bytes again
+        let bs = c.vectored_bytes(6);
+        assert_eq!(bs, vec![Bytes::from("hello"), Bytes::from("w")]);
+        assert_eq!(c.remaining(), 10);
+        assert!(!c.is_empty());
+
+        // Test Advance.
+        c.advance(4);
+
+        // Test chunk
+        let bs = c.chunk();
+        assert_eq!(bs, "o".as_bytes());
+        assert_eq!(c.len(), 6);
+        assert!(!c.is_empty());
+
+        c.clear();
+        assert_eq!(c.len(), 0);
+        assert!(c.is_empty());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_chunked_bytes_stream() -> Result<()> {
+        let mut c = ChunkedBytes::new();
+
+        c.push(Bytes::from("hello"));
+        assert_eq!(c.len(), 5);
+        assert!(!c.is_empty());
+
+        c.push(Bytes::from("world"));
+        assert_eq!(c.len(), 10);
+        assert!(!c.is_empty());
+
+        let bs = c.next().await.unwrap().unwrap();
+        assert_eq!(bs, Bytes::from("hello"));
+        assert_eq!(c.len(), 5);
+        assert!(!c.is_empty());
+
+        let bs = c.next().await.unwrap().unwrap();
+        assert_eq!(bs, Bytes::from("world"));
+        assert_eq!(c.len(), 0);
+        assert!(c.is_empty());
+
+        c.clear();
+        assert_eq!(c.len(), 0);
+        assert!(c.is_empty());
+
+        Ok(())
+    }
+}
diff --git a/core/src/raw/oio/buf/mod.rs b/core/src/raw/oio/buf/mod.rs
index c26894768..dfd3663e5 100644
--- a/core/src/raw/oio/buf/mod.rs
+++ b/core/src/raw/oio/buf/mod.rs
@@ -15,5 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod chunked_bytes;
+pub use chunked_bytes::ChunkedBytes;
+
 mod write_buf;
 pub use write_buf::WriteBuf;
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index bffc70ca2..29cf8e474 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -42,4 +42,4 @@ mod entry;
 pub use entry::Entry;
 
 mod buf;
-pub use buf::WriteBuf;
+pub use buf::*;

Reply via email to