This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 5fb041f9825d437641a4e136da996489244db9ed Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 4 18:03:27 2023 +0800 Save code Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/write.rs | 8 +- core/src/raw/oio/cursor.rs | 14 --- core/src/raw/oio/write/at_least_buf_write.rs | 2 +- core/src/raw/oio/write/exact_buf_write.rs | 173 +++++++++++++++------------ 4 files changed, 107 insertions(+), 90 deletions(-) diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index 6e26ce7e0..506a974f0 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use bytes::{Buf, Bytes}; use criterion::Criterion; use once_cell::sync::Lazy; use opendal::raw::oio::AtLeastBufWriter; @@ -71,7 +72,12 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { group.bench_with_input(size.to_string(), &content, |b, content| { b.to_async(&*TOKIO).iter(|| async { let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024); - w.write(content.clone()).await.unwrap(); + + let mut bs = Bytes::from(content.clone()); + while !bs.is_empty() { + let n = w.write(bs.clone()).await.unwrap(); + bs.advance(n as usize); + } w.close().await.unwrap(); }) }); diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 88a713068..796b2a349 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -343,20 +343,6 @@ impl oio::Stream for ChunkedCursor { } } -impl oio::Read for ChunkedCursor { - fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { - todo!() - } - - fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { - todo!() - } - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { - todo!() - } -} - /// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Stream`] pub struct VectorCursor { inner: VecDeque<Bytes>, diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 99c20ee46..ebfff8ffd 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -90,7 +90,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { }) } - async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { + async fn pipe(&mut self, _: u64, s: oio::Reader) -> Result<u64> { todo!() } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index e77330e0b..57118fce8 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -18,10 +18,10 @@ use std::cmp::min; use async_trait::async_trait; -use bytes::Bytes; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use tokio::io::ReadBuf; -use crate::raw::oio::StreamExt; -use crate::raw::oio::Streamer; +use crate::raw::oio::ReadExt; use crate::raw::*; use crate::*; @@ -41,9 +41,7 @@ pub struct ExactBufWriter<W: oio::Write> { /// The size for buffer, we will flush the underlying storage at the size of this buffer. buffer_size: usize, - buffer: oio::ChunkedCursor, - - buffer_stream: Option<Streamer>, + buffer: Buffer, } impl<W: oio::Write> ExactBufWriter<W> { @@ -52,91 +50,108 @@ impl<W: oio::Write> ExactBufWriter<W> { Self { inner, buffer_size, - buffer: oio::ChunkedCursor::new(), - buffer_stream: None, - } - } - - /// Next bytes is used to fetch bytes from buffer or input streamer. - /// - /// We need this function because we need to make sure our write is reentrant. - /// We can't mutate state unless we are sure that the write is successful. - async fn next_bytes(&mut self, s: &mut Streamer) -> Option<Result<Bytes>> { - match self.buffer_stream.as_mut() { - None => s.next().await, - Some(bs) => match bs.next().await { - None => { - self.buffer_stream = None; - s.next().await - } - Some(v) => Some(v), - }, + buffer: Buffer::Filling(BytesMut::new()), } } +} - fn chain_stream(&mut self, s: Streamer) { - self.buffer_stream = match self.buffer_stream.take() { - Some(stream) => Some(Box::new(stream.chain(s))), - None => Some(s), - } - } +enum Buffer { + Filling(BytesMut), + Consuming(Bytes), } #[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - self.pipe(bs.len() as u64, Box::new(oio::Cursor::from(bs))) - .await + async fn write(&mut self, mut bs: Bytes) -> Result<u64> { + loop { + match &mut self.buffer { + Buffer::Filling(fill) => { + if fill.len() == self.buffer_size { + self.buffer = Buffer::Consuming(fill.split().freeze()); + continue; + } + + let size = min(self.buffer_size - fill.len(), bs.len()); + fill.extend_from_slice(&bs[..size]); + bs.advance(size); + return Ok(size as u64); + } + Buffer::Consuming(consume) => { + // Make sure filled buffer has been flushed. + // + // TODO: maybe we can re-fill it after a successful write. + while !consume.is_empty() { + let n = self.inner.write(consume.clone()).await?; + consume.advance(n as usize); + } + self.buffer = Buffer::Filling(BytesMut::new()); + } + } + } } - /// # TODO - /// - /// We know every stream size, we can collect them into a buffer without chain them every time. async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> { - todo!() + loop { + match &mut self.buffer { + Buffer::Filling(fill) => { + if fill.len() == self.buffer_size { + self.buffer = Buffer::Consuming(fill.split().freeze()); + continue; + } + + // Reserve to buffer size. + fill.reserve(self.buffer_size - fill.len()); + let dst = fill.spare_capacity_mut(); + let dst_len = dst.len(); + let mut buf = ReadBuf::uninit(dst); + + // Safety: the input buffer is created with_capacity(length). + unsafe { buf.assume_init(dst_len) }; + + let n = s.read(buf.filled_mut()).await?; + + // Safety: read makes sure this buffer has been filled. + unsafe { fill.advance_mut(n) }; + + return Ok(n as u64); + } + Buffer::Consuming(consume) => { + // Make sure filled buffer has been flushed. + // + // TODO: maybe we can re-fill it after a successful write. + while !consume.is_empty() { + let n = self.inner.write(consume.clone()).await?; + consume.advance(n as usize); + } + self.buffer = Buffer::Filling(BytesMut::new()); + } + } + } } async fn abort(&mut self) -> Result<()> { - self.buffer.clear(); - self.buffer_stream = None; - + self.buffer = Buffer::Filling(BytesMut::new()); self.inner.abort().await } async fn close(&mut self) -> Result<()> { - while let Some(stream) = self.buffer_stream.as_mut() { - let bs = stream.next().await.transpose()?; - match bs { - None => { - self.buffer_stream = None; + loop { + match &mut self.buffer { + Buffer::Filling(fill) => { + self.buffer = Buffer::Consuming(fill.split().freeze()); + continue; } - Some(bs) => { - self.buffer.push(bs); + Buffer::Consuming(consume) => { + // Make sure filled buffer has been flushed. + // + // TODO: maybe we can re-fill it after a successful write. + while !consume.is_empty() { + let n = self.inner.write(consume.clone()).await?; + consume.advance(n as usize); + } + break; } } - - if self.buffer.len() >= self.buffer_size { - let mut buf = self.buffer.clone(); - let to_write = buf.split_to(self.buffer_size); - self.inner - .pipe(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|_| { - self.buffer = buf; - })?; - } - } - - while !self.buffer.is_empty() { - let mut buf = self.buffer.clone(); - let to_write = buf.split_to(min(self.buffer_size, buf.len())); - - self.inner - .pipe(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|_| self.buffer = buf)?; } self.inner.close().await @@ -172,7 +187,7 @@ mod tests { async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> { let mut bs = vec![]; - s.read_to_end(&mut bs).await?; + s.read_to_end(&mut bs).await.unwrap(); assert_eq!(bs.len() as u64, size); self.write(bs.into()).await } @@ -200,7 +215,12 @@ mod tests { let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10); - w.write(Bytes::from(expected.clone())).await?; + let mut bs = Bytes::from(expected.clone()); + while !bs.is_empty() { + let n = w.write(bs.clone()).await?; + bs.advance(n as usize); + } + w.close().await?; assert_eq!(w.inner.buf.len(), expected.len()); @@ -233,7 +253,12 @@ mod tests { rng.fill_bytes(&mut content); expected.extend_from_slice(&content); - writer.write(Bytes::from(content)).await?; + + let mut bs = Bytes::from(content.clone()); + while !bs.is_empty() { + let n = writer.write(bs.clone()).await?; + bs.advance(n as usize); + } } writer.close().await?;
