This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 08dc139c1 feat(core): Avoid copy if input is larger than buffer_size 
(#3016)
08dc139c1 is described below

commit 08dc139c148712a062f01737707aeceab4855c6e
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 6 19:18:42 2023 +0800

    feat(core): Avoid copy if input is larger than buffer_size (#3016)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/exact_buf_write.rs | 33 +++++++++++++++++++++++++------
 1 file changed, 27 insertions(+), 6 deletions(-)

diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index c0ecac3aa..a944f3365 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -50,12 +50,13 @@ impl<W: oio::Write> ExactBufWriter<W> {
         Self {
             inner,
             buffer_size,
-            buffer: Buffer::Filling(BytesMut::new()),
+            buffer: Buffer::Empty,
         }
     }
 }
 
 enum Buffer {
+    Empty,
     Filling(BytesMut),
     Consuming(Bytes),
 }
@@ -65,6 +66,19 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
         loop {
             match &mut self.buffer {
+                Buffer::Empty => {
+                    if bs.len() >= self.buffer_size {
+                        bs.truncate(self.buffer_size);
+                        self.buffer = Buffer::Consuming(bs);
+                        return Ok(self.buffer_size as u64);
+                    }
+
+                    let size = bs.len() as u64;
+                    let mut fill = BytesMut::with_capacity(bs.len());
+                    fill.extend_from_slice(&bs);
+                    self.buffer = Buffer::Filling(fill);
+                    return Ok(size);
+                }
                 Buffer::Filling(fill) => {
                     if fill.len() >= self.buffer_size {
                         self.buffer = Buffer::Consuming(fill.split().freeze());
@@ -84,23 +98,28 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                         let n = self.inner.write(consume.clone()).await?;
                         consume.advance(n as usize);
                     }
-                    self.buffer = Buffer::Filling(BytesMut::new());
+                    self.buffer = Buffer::Empty;
                 }
             }
         }
     }
 
-    async fn copy_from(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> 
Result<u64> {
         loop {
             match &mut self.buffer {
+                Buffer::Empty => {
+                    self.buffer = Buffer::Filling(BytesMut::new());
+                }
                 Buffer::Filling(fill) => {
                     if fill.len() >= self.buffer_size {
                         self.buffer = Buffer::Consuming(fill.split().freeze());
                         continue;
                     }
 
-                    // Reserve to buffer size.
-                    fill.reserve(self.buffer_size - fill.len());
+                    // Reserve to enough size.
+                    if size > fill.remaining_mut() as u64 {
+                        fill.reserve(self.buffer_size - fill.len());
+                    }
                     let dst = fill.spare_capacity_mut();
                     let dst_len = dst.len();
                     let mut buf = ReadBuf::uninit(dst);
@@ -130,13 +149,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.buffer = Buffer::Filling(BytesMut::new());
+        self.buffer = Buffer::Empty;
         self.inner.abort().await
     }
 
     async fn close(&mut self) -> Result<()> {
         loop {
             match &mut self.buffer {
+                Buffer::Empty => break,
                 Buffer::Filling(fill) => {
                     self.buffer = Buffer::Consuming(fill.split().freeze());
                     continue;
@@ -149,6 +169,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                         let n = self.inner.write(consume.clone()).await?;
                         consume.advance(n as usize);
                     }
+                    self.buffer = Buffer::Empty;
                     break;
                 }
             }

Reply via email to