This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch buffer-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit b6c47831bd05b53bc911fcc85a0152a7597c239f
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 22 15:21:11 2023 +0800

    feat: Add AtLeastBufWrite
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/cursor.rs                   | 71 +++++++++++++++++++++-
 core/src/raw/oio/mod.rs                      |  1 +
 core/src/raw/oio/stream/api.rs               | 79 ++++++++++++++++++++++++-
 core/src/raw/oio/write/at_least_buf_write.rs | 88 ++++++++++++++++++++++++++++
 core/src/raw/oio/write/mod.rs                |  2 +
 5 files changed, 238 insertions(+), 3 deletions(-)

diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 1940c40cd..1080f78e9 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -148,7 +148,76 @@ impl oio::BlockingRead for Cursor {
     }
 }
 
-/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`]
+/// # TODO
+///
+/// we can do some compaction during runtime. For example, merge 4K data
+/// into the same bytes instead.
+#[derive(Clone)]
+pub struct ChunkedCursor {
+    inner: VecDeque<Bytes>,
+    idx: usize,
+}
+
+impl Default for ChunkedCursor {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ChunkedCursor {
+    /// Create a new chunked cursor.
+    pub fn new() -> Self {
+        Self {
+            inner: VecDeque::new(),
+            idx: 0,
+        }
+    }
+
+    /// Returns `true` if current cursor is empty.
+    pub fn is_empty(&self) -> bool {
+        self.inner.len() > self.idx
+    }
+
+    /// Return current bytes size of cursor.
+    pub fn len(&self) -> usize {
+        self.inner.iter().take(self.idx).map(|v| v.len()).sum()
+    }
+
+    /// Reset current cursor to start.
+    pub fn reset(&mut self) {
+        self.idx = 0;
+    }
+
+    /// Clear the entire cursor.
+    pub fn clear(&mut self) {
+        self.idx = 0;
+        self.inner.clear();
+    }
+
+    /// Push a new bytes into vector cursor.
+    pub fn push(&mut self, bs: Bytes) {
+        self.inner.push_back(bs);
+    }
+}
+
+impl oio::Stream for ChunkedCursor {
+    fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if self.is_empty() {
+            return Poll::Ready(None);
+        }
+
+        let bs = self.inner[self.idx].clone();
+        self.idx += 1;
+        Poll::Ready(Some(Ok(bs)))
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        self.reset();
+        Poll::Ready(Ok(()))
+    }
+}
+
+/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements 
[`oio::Stream`]
 pub struct VectorCursor {
     inner: VecDeque<Bytes>,
     size: usize,
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 5a4729fe8..1b24bec9c 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -35,6 +35,7 @@ mod page;
 pub use page::*;
 
 mod cursor;
+pub use cursor::ChunkedCursor;
 pub use cursor::Cursor;
 pub use cursor::VectorCursor;
 
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 7345564a2..495a4fb22 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -18,10 +18,10 @@
 use std::future::Future;
 use std::pin::Pin;
 use std::sync::Arc;
-use std::task::Context;
 use std::task::Poll;
+use std::task::{ready, Context};
 
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
 use pin_project::pin_project;
 
 use crate::*;
@@ -135,6 +135,29 @@ pub trait StreamExt: Stream {
     fn reset(&mut self) -> ResetFuture<'_, Self> {
         ResetFuture { inner: self }
     }
+
+    /// Chain this stream with another stream.
+    fn chain<S>(self, other: S) -> Chain<Self, S>
+    where
+        Self: Sized,
+        S: Stream,
+    {
+        Chain {
+            first: Some(self),
+            second: other,
+        }
+    }
+
+    /// Collect all items from this stream into a single bytes.
+    fn collect(self) -> Collect<Self>
+    where
+        Self: Sized,
+    {
+        Collect {
+            stream: self,
+            buf: BytesMut::new(),
+        }
+    }
 }
 
 /// Make this future `!Unpin` for compatibility with async trait methods.
@@ -172,3 +195,55 @@ where
         Pin::new(this.inner).poll_reset(cx)
     }
 }
+
+/// Stream for the [`chain`](StreamExt::chain) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Chain<S1: Stream, S2: Stream> {
+    first: Option<S1>,
+    second: S2,
+}
+
+impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if let Some(first) = self.first.as_mut() {
+            if let Some(item) = ready!(first.poll_next(cx)) {
+                return Poll::Ready(Some(item));
+            }
+
+            self.first = None;
+        }
+        self.second.poll_next(cx)
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "chained stream doesn't support reset",
+        )))
+    }
+}
+
+/// Stream for the [`collect`](StreamExt::collect) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Collect<S> {
+    stream: S,
+    buf: BytesMut,
+}
+
+impl<S> Future for Collect<S>
+where
+    S: Stream,
+{
+    type Output = Result<Bytes>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
+        let mut this = self.as_mut();
+        loop {
+            match ready!(this.stream.poll_next(cx)) {
+                Some(Ok(bs)) => this.buf.extend(bs),
+                Some(Err(err)) => return Poll::Ready(Err(err)),
+                None => return Poll::Ready(Ok(self.buf.split().freeze())),
+            }
+        }
+    }
+}
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
new file mode 100644
index 000000000..98a237970
--- /dev/null
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio::{StreamExt, Streamer};
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use bytes::Bytes;
+
+/// AtLeastBufWrite is used to implement [`Write`] based on at least buffer.
+///
+/// Users can wrap a writer and a buffer together.
+pub struct AtLeastBufWriter<W: oio::Write> {
+    inner: W,
+
+    size: usize,
+    buf: oio::ChunkedCursor,
+}
+
+#[async_trait]
+impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        // Push the bytes into the buffer if the buffer is not full.
+        if self.buf.len() + bs.len() <= self.size {
+            self.buf.push(bs);
+            return Ok(());
+        }
+
+        let mut buf = self.buf.clone();
+        buf.push(bs);
+
+        self.inner
+            .sink(buf.len() as u64, Box::new(buf))
+            .await
+            // Clear buffer if the write is successful.
+            .map(|_| self.buf.clear())
+    }
+
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+        // Push the bytes into the buffer if the buffer is not full.
+        if self.buf.len() as u64 + size <= self.size as u64 {
+            self.buf.push(s.collect().await?);
+            return Ok(());
+        }
+
+        let buf = self.buf.clone();
+        let buffer_size = buf.len() as u64;
+        let stream = buf.chain(s);
+
+        self.inner
+            .sink(buffer_size + size, Box::new(stream))
+            .await
+            // Clear buffer if the write is successful.
+            .map(|_| self.buf.clear())
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buf.clear();
+        self.inner.abort().await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        if !self.buf.is_empty() {
+            self.inner
+                .sink(self.buf.len() as u64, Box::new(self.buf.clone()))
+                .await?;
+            self.buf.clear();
+        }
+
+        self.inner.close().await?;
+
+        Ok(())
+    }
+}
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index b13b3e2c7..fa49ea6e2 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -33,3 +33,5 @@ pub use multipart_upload_write::MultipartUploadWriter;
 mod append_object_write;
 pub use append_object_write::AppendObjectWrite;
 pub use append_object_write::AppendObjectWriter;
+
+mod at_least_buf_write;

Reply via email to