This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch buffer-refactor in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b6c47831bd05b53bc911fcc85a0152a7597c239f Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 22 15:21:11 2023 +0800 feat: Add AtLeastBufWrite Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/cursor.rs | 71 +++++++++++++++++++++- core/src/raw/oio/mod.rs | 1 + core/src/raw/oio/stream/api.rs | 79 ++++++++++++++++++++++++- core/src/raw/oio/write/at_least_buf_write.rs | 88 ++++++++++++++++++++++++++++ core/src/raw/oio/write/mod.rs | 2 + 5 files changed, 238 insertions(+), 3 deletions(-) diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 1940c40cd..1080f78e9 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -148,7 +148,76 @@ impl oio::BlockingRead for Cursor { } } -/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`] +/// # TODO +/// +/// we can do some compaction during runtime. For example, merge 4K data +/// into the same bytes instead. +#[derive(Clone)] +pub struct ChunkedCursor { + inner: VecDeque<Bytes>, + idx: usize, +} + +impl Default for ChunkedCursor { + fn default() -> Self { + Self::new() + } +} + +impl ChunkedCursor { + /// Create a new chunked cursor. + pub fn new() -> Self { + Self { + inner: VecDeque::new(), + idx: 0, + } + } + + /// Returns `true` if current cursor is empty. + pub fn is_empty(&self) -> bool { + self.inner.len() > self.idx + } + + /// Return current bytes size of cursor. + pub fn len(&self) -> usize { + self.inner.iter().take(self.idx).map(|v| v.len()).sum() + } + + /// Reset current cursor to start. + pub fn reset(&mut self) { + self.idx = 0; + } + + /// Clear the entire cursor. + pub fn clear(&mut self) { + self.idx = 0; + self.inner.clear(); + } + + /// Push a new bytes into vector cursor. + pub fn push(&mut self, bs: Bytes) { + self.inner.push_back(bs); + } +} + +impl oio::Stream for ChunkedCursor { + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + if self.is_empty() { + return Poll::Ready(None); + } + + let bs = self.inner[self.idx].clone(); + self.idx += 1; + Poll::Ready(Some(Ok(bs))) + } + + fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + self.reset(); + Poll::Ready(Ok(())) + } +} + +/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Stream`] pub struct VectorCursor { inner: VecDeque<Bytes>, size: usize, diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index 5a4729fe8..1b24bec9c 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -35,6 +35,7 @@ mod page; pub use page::*; mod cursor; +pub use cursor::ChunkedCursor; pub use cursor::Cursor; pub use cursor::VectorCursor; diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs index 7345564a2..495a4fb22 100644 --- a/core/src/raw/oio/stream/api.rs +++ b/core/src/raw/oio/stream/api.rs @@ -18,10 +18,10 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::Context; use std::task::Poll; +use std::task::{ready, Context}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use pin_project::pin_project; use crate::*; @@ -135,6 +135,29 @@ pub trait StreamExt: Stream { fn reset(&mut self) -> ResetFuture<'_, Self> { ResetFuture { inner: self } } + + /// Chain this stream with another stream. + fn chain<S>(self, other: S) -> Chain<Self, S> + where + Self: Sized, + S: Stream, + { + Chain { + first: Some(self), + second: other, + } + } + + /// Collect all items from this stream into a single bytes. + fn collect(self) -> Collect<Self> + where + Self: Sized, + { + Collect { + stream: self, + buf: BytesMut::new(), + } + } } /// Make this future `!Unpin` for compatibility with async trait methods. @@ -172,3 +195,55 @@ where Pin::new(this.inner).poll_reset(cx) } } + +/// Stream for the [`chain`](StreamExt::chain) method. +#[must_use = "streams do nothing unless polled"] +pub struct Chain<S1: Stream, S2: Stream> { + first: Option<S1>, + second: S2, +} + +impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + if let Some(first) = self.first.as_mut() { + if let Some(item) = ready!(first.poll_next(cx)) { + return Poll::Ready(Some(item)); + } + + self.first = None; + } + self.second.poll_next(cx) + } + + fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "chained stream doesn't support reset", + ))) + } +} + +/// Stream for the [`collect`](StreamExt::collect) method. +#[must_use = "streams do nothing unless polled"] +pub struct Collect<S> { + stream: S, + buf: BytesMut, +} + +impl<S> Future for Collect<S> +where + S: Stream, +{ + type Output = Result<Bytes>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.as_mut(); + loop { + match ready!(this.stream.poll_next(cx)) { + Some(Ok(bs)) => this.buf.extend(bs), + Some(Err(err)) => return Poll::Ready(Err(err)), + None => return Poll::Ready(Ok(self.buf.split().freeze())), + } + } + } +} diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs new file mode 100644 index 000000000..98a237970 --- /dev/null +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::oio::{StreamExt, Streamer}; +use crate::raw::*; +use crate::*; +use async_trait::async_trait; +use bytes::Bytes; + +/// AtLeastBufWrite is used to implement [`Write`] based on at least buffer. +/// +/// Users can wrap a writer and a buffer together. +pub struct AtLeastBufWriter<W: oio::Write> { + inner: W, + + size: usize, + buf: oio::ChunkedCursor, +} + +#[async_trait] +impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { + async fn write(&mut self, bs: Bytes) -> Result<()> { + // Push the bytes into the buffer if the buffer is not full. + if self.buf.len() + bs.len() <= self.size { + self.buf.push(bs); + return Ok(()); + } + + let mut buf = self.buf.clone(); + buf.push(bs); + + self.inner + .sink(buf.len() as u64, Box::new(buf)) + .await + // Clear buffer if the write is successful. + .map(|_| self.buf.clear()) + } + + async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + // Push the bytes into the buffer if the buffer is not full. + if self.buf.len() as u64 + size <= self.size as u64 { + self.buf.push(s.collect().await?); + return Ok(()); + } + + let buf = self.buf.clone(); + let buffer_size = buf.len() as u64; + let stream = buf.chain(s); + + self.inner + .sink(buffer_size + size, Box::new(stream)) + .await + // Clear buffer if the write is successful. + .map(|_| self.buf.clear()) + } + + async fn abort(&mut self) -> Result<()> { + self.buf.clear(); + self.inner.abort().await + } + + async fn close(&mut self) -> Result<()> { + if !self.buf.is_empty() { + self.inner + .sink(self.buf.len() as u64, Box::new(self.buf.clone())) + .await?; + self.buf.clear(); + } + + self.inner.close().await?; + + Ok(()) + } +} diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index b13b3e2c7..fa49ea6e2 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -33,3 +33,5 @@ pub use multipart_upload_write::MultipartUploadWriter; mod append_object_write; pub use append_object_write::AppendObjectWrite; pub use append_object_write::AppendObjectWriter; + +mod at_least_buf_write;
