This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish_buffer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 5497f79e26698950ad2a3ea9f20283e562eec06f Author: Xuanwo <[email protected]> AuthorDate: Wed Sep 6 19:06:42 2023 +0800 feat(core): Avoid copy if input is larger than buffer_size Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/exact_buf_write.rs | 33 +++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index c0ecac3aa..a944f3365 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -50,12 +50,13 @@ impl<W: oio::Write> ExactBufWriter<W> { Self { inner, buffer_size, - buffer: Buffer::Filling(BytesMut::new()), + buffer: Buffer::Empty, } } } enum Buffer { + Empty, Filling(BytesMut), Consuming(Bytes), } @@ -65,6 +66,19 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { async fn write(&mut self, mut bs: Bytes) -> Result<u64> { loop { match &mut self.buffer { + Buffer::Empty => { + if bs.len() >= self.buffer_size { + bs.truncate(self.buffer_size); + self.buffer = Buffer::Consuming(bs); + return Ok(self.buffer_size as u64); + } + + let size = bs.len() as u64; + let mut fill = BytesMut::with_capacity(bs.len()); + fill.extend_from_slice(&bs); + self.buffer = Buffer::Filling(fill); + return Ok(size); + } Buffer::Filling(fill) => { if fill.len() >= self.buffer_size { self.buffer = Buffer::Consuming(fill.split().freeze()); @@ -84,23 +98,28 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let n = self.inner.write(consume.clone()).await?; consume.advance(n as usize); } - self.buffer = Buffer::Filling(BytesMut::new()); + self.buffer = Buffer::Empty; } } } } - async fn copy_from(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> { + async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> { loop { match &mut self.buffer { + Buffer::Empty => { + self.buffer = Buffer::Filling(BytesMut::new()); + } Buffer::Filling(fill) => { if fill.len() >= self.buffer_size { self.buffer = Buffer::Consuming(fill.split().freeze()); continue; } - // Reserve to buffer size. - fill.reserve(self.buffer_size - fill.len()); + // Reserve to enough size. + if size > fill.remaining_mut() as u64 { + fill.reserve(self.buffer_size - fill.len()); + } let dst = fill.spare_capacity_mut(); let dst_len = dst.len(); let mut buf = ReadBuf::uninit(dst); @@ -130,13 +149,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { } async fn abort(&mut self) -> Result<()> { - self.buffer = Buffer::Filling(BytesMut::new()); + self.buffer = Buffer::Empty; self.inner.abort().await } async fn close(&mut self) -> Result<()> { loop { match &mut self.buffer { + Buffer::Empty => break, Buffer::Filling(fill) => { self.buffer = Buffer::Consuming(fill.split().freeze()); continue; @@ -149,6 +169,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let n = self.inner.write(consume.clone()).await?; consume.advance(n as usize); } + self.buffer = Buffer::Empty; break; } }
