This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 08dc139c1 feat(core): Avoid copy if input is larger than buffer_size
(#3016)
08dc139c1 is described below
commit 08dc139c148712a062f01737707aeceab4855c6e
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 6 19:18:42 2023 +0800
feat(core): Avoid copy if input is larger than buffer_size (#3016)
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/oio/write/exact_buf_write.rs | 33 +++++++++++++++++++++++++------
1 file changed, 27 insertions(+), 6 deletions(-)
diff --git a/core/src/raw/oio/write/exact_buf_write.rs
b/core/src/raw/oio/write/exact_buf_write.rs
index c0ecac3aa..a944f3365 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -50,12 +50,13 @@ impl<W: oio::Write> ExactBufWriter<W> {
Self {
inner,
buffer_size,
- buffer: Buffer::Filling(BytesMut::new()),
+ buffer: Buffer::Empty,
}
}
}
enum Buffer {
+ Empty,
Filling(BytesMut),
Consuming(Bytes),
}
@@ -65,6 +66,19 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
loop {
match &mut self.buffer {
+ Buffer::Empty => {
+ if bs.len() >= self.buffer_size {
+ bs.truncate(self.buffer_size);
+ self.buffer = Buffer::Consuming(bs);
+ return Ok(self.buffer_size as u64);
+ }
+
+ let size = bs.len() as u64;
+ let mut fill = BytesMut::with_capacity(bs.len());
+ fill.extend_from_slice(&bs);
+ self.buffer = Buffer::Filling(fill);
+ return Ok(size);
+ }
Buffer::Filling(fill) => {
if fill.len() >= self.buffer_size {
self.buffer = Buffer::Consuming(fill.split().freeze());
@@ -84,23 +98,28 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
let n = self.inner.write(consume.clone()).await?;
consume.advance(n as usize);
}
- self.buffer = Buffer::Filling(BytesMut::new());
+ self.buffer = Buffer::Empty;
}
}
}
}
- async fn copy_from(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, mut s: oio::Reader) ->
Result<u64> {
loop {
match &mut self.buffer {
+ Buffer::Empty => {
+ self.buffer = Buffer::Filling(BytesMut::new());
+ }
Buffer::Filling(fill) => {
if fill.len() >= self.buffer_size {
self.buffer = Buffer::Consuming(fill.split().freeze());
continue;
}
- // Reserve to buffer size.
- fill.reserve(self.buffer_size - fill.len());
+ // Reserve to enough size.
+ if size > fill.remaining_mut() as u64 {
+ fill.reserve(self.buffer_size - fill.len());
+ }
let dst = fill.spare_capacity_mut();
let dst_len = dst.len();
let mut buf = ReadBuf::uninit(dst);
@@ -130,13 +149,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
}
async fn abort(&mut self) -> Result<()> {
- self.buffer = Buffer::Filling(BytesMut::new());
+ self.buffer = Buffer::Empty;
self.inner.abort().await
}
async fn close(&mut self) -> Result<()> {
loop {
match &mut self.buffer {
+ Buffer::Empty => break,
Buffer::Filling(fill) => {
self.buffer = Buffer::Consuming(fill.split().freeze());
continue;
@@ -149,6 +169,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
let n = self.inner.write(consume.clone()).await?;
consume.advance(n as usize);
}
+ self.buffer = Buffer::Empty;
break;
}
}