This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 088881f42 refactor: Polish implementation details of WriteBuf and add 
vector chunks support (#3034)
088881f42 is described below

commit 088881f42b6430f6d311f233bbba33a63cbfe86e
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 13:41:24 2023 +0800

    refactor: Polish implementation details of WriteBuf and add vector chunks 
support (#3034)
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add vectored chunk support
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * format code
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove incorrect comments
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/buf.rs                          | 173 ++++++++++++++---------
 core/src/raw/oio/write/append_object_write.rs    |   2 +-
 core/src/raw/oio/write/exact_buf_write.rs        |   2 +-
 core/src/raw/oio/write/multipart_upload_write.rs |   4 +-
 core/src/raw/oio/write/one_shot_write.rs         |   2 +-
 core/src/raw/oio/write/range_write.rs            |   4 +-
 core/src/services/fs/writer.rs                   |   5 +-
 core/src/services/ftp/writer.rs                  |   2 +-
 core/src/services/gdrive/pager.rs                |  21 ++-
 core/src/services/ghac/writer.rs                 |   2 +-
 10 files changed, 132 insertions(+), 85 deletions(-)

diff --git a/core/src/raw/oio/buf.rs b/core/src/raw/oio/buf.rs
index 3e6d42264..50f3c8911 100644
--- a/core/src/raw/oio/buf.rs
+++ b/core/src/raw/oio/buf.rs
@@ -15,16 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp;
-use std::ptr;
+use std::io::IoSlice;
 
 use bytes::Bytes;
 use bytes::BytesMut;
 
-/// WriteBuf is used in [`oio::Write`] to provide a trait similar to 
[`bytes::Buf`].
-///
-/// The biggest difference is that `Buf`'s `copy_to_slice` and `copy_to_bytes` 
only needs `&self`
-/// instead of `&mut self`.
+/// WriteBuf is used in [`oio::Write`] to provide in-memory buffer support.
 pub trait WriteBuf: Send + Sync {
     /// Returns the number of bytes between the current position and the end 
of the buffer.
     ///
@@ -37,6 +33,15 @@ pub trait WriteBuf: Send + Sync {
     /// current position.
     fn remaining(&self) -> usize;
 
+    /// Advance the internal cursor of the Buf
+    ///
+    /// The next call to chunk() will return a slice starting cnt bytes 
further into the underlying buffer.
+    ///
+    /// # Panics
+    ///
+    /// This function may panic if cnt > self.remaining().
+    fn advance(&mut self, cnt: usize);
+
     /// Returns a slice starting at the current position and of length between 
0 and
     /// Buf::remaining(). Note that this can return shorter slice (this allows 
non-continuous
     /// internal representation).
@@ -47,51 +52,44 @@ pub trait WriteBuf: Send + Sync {
     /// Buf::remaining returns 0, calls to chunk() should return an empty 
slice.
     fn chunk(&self) -> &[u8];
 
-    /// Advance the internal cursor of the Buf
+    /// Returns a vectored view of the underlying buffer at the current 
position and of
+    /// length between 0 and Buf::remaining(). Note that this can return 
shorter slice
+    /// (this allows non-continuous internal representation).
     ///
-    /// The next call to chunk() will return a slice starting cnt bytes 
further into the underlying buffer.
+    /// # Notes
     ///
-    /// Panics
-    /// This function may panic if cnt > self.remaining().
-    fn advance(&mut self, cnt: usize);
+    /// This function should never panic.
+    fn vectored_chunk(&self) -> Vec<IoSlice>;
 
-    /// Copies current chunk into dst.
-    ///
-    /// Returns the number of bytes copied.
+    /// Returns a bytes starting at the current position and of length between 
0 and
+    /// Buf::remaining().
     ///
     /// # Notes
     ///
-    /// Users should not assume the returned bytes is the same as the 
Buf::remaining().
-    fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
-        let src = self.chunk();
-        let size = cmp::min(src.len(), dst.len());
-
-        // # Safety
-        //
-        // `src` and `dst` are guaranteed have enough space for `size` bytes.
-        unsafe {
-            ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), size);
-        }
-
-        size
-    }
+    /// This functions is used to concat a single bytes from underlying chunks.
+    /// Use `vectored_bytes` if you want to avoid copy when possible.
+    ///
+    /// # Panics
+    ///
+    /// This function will panic if size > self.remaining().
+    fn bytes(&self, size: usize) -> Bytes;
 
-    /// Copies current chunk into a bytes.
+    /// Returns a vectored bytes of the underlying buffer at the current 
position and of
+    /// length between 0 and Buf::remaining().
     ///
-    /// This function may be optimized by the underlying type to avoid actual 
copies.
-    /// For example, Bytes implementation will do a shallow copy (ref-count 
increment).
+    /// # Notes for Users
     ///
-    /// # Notes
+    /// This functions is used to return a vectored bytes from underlying 
chunks.
+    /// Use `bytes` if you just want to get a continuous bytes.
     ///
-    /// Users should not assume the returned bytes is the same as the 
Buf::remaining().
-    fn copy_to_bytes(&self, len: usize) -> Bytes {
-        let src = self.chunk();
-        let size = cmp::min(src.len(), len);
-
-        let mut ret = BytesMut::with_capacity(size);
-        ret.extend_from_slice(&src[..size]);
-        ret.freeze()
-    }
+    /// # Notes for implementors
+    ///
+    /// It's better to align the vectored bytes with underlying chunks to 
avoid copy.
+    ///
+    /// # Panics
+    ///
+    /// This function will panic if size > self.remaining().
+    fn vectored_bytes(&self, size: usize) -> Vec<Bytes>;
 }
 
 macro_rules! deref_forward_buf {
@@ -100,20 +98,24 @@ macro_rules! deref_forward_buf {
             (**self).remaining()
         }
 
+        fn advance(&mut self, cnt: usize) {
+            (**self).advance(cnt)
+        }
+
         fn chunk(&self) -> &[u8] {
             (**self).chunk()
         }
 
-        fn advance(&mut self, cnt: usize) {
-            (**self).advance(cnt)
+        fn vectored_chunk(&self) -> Vec<IoSlice> {
+            (**self).vectored_chunk()
         }
 
-        fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
-            (**self).copy_to_slice(dst)
+        fn bytes(&self, size: usize) -> Bytes {
+            (**self).bytes(size)
         }
 
-        fn copy_to_bytes(&self, len: usize) -> Bytes {
-            (**self).copy_to_bytes(len)
+        fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+            (**self).vectored_bytes(size)
         }
     };
 }
@@ -132,14 +134,29 @@ impl WriteBuf for &[u8] {
         self.len()
     }
 
+    #[inline]
+    fn advance(&mut self, cnt: usize) {
+        *self = &self[cnt..];
+    }
+
     #[inline]
     fn chunk(&self) -> &[u8] {
         self
     }
 
     #[inline]
-    fn advance(&mut self, cnt: usize) {
-        *self = &self[cnt..];
+    fn vectored_chunk(&self) -> Vec<IoSlice> {
+        vec![IoSlice::new(self)]
+    }
+
+    #[inline]
+    fn bytes(&self, size: usize) -> Bytes {
+        Bytes::copy_from_slice(&self[..size])
+    }
+
+    #[inline]
+    fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+        vec![self.bytes(size)]
     }
 }
 
@@ -155,6 +172,15 @@ impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for 
std::io::Cursor<T> {
         len - pos as usize
     }
 
+    fn advance(&mut self, cnt: usize) {
+        let pos = (self.position() as usize)
+            .checked_add(cnt)
+            .expect("overflow");
+
+        assert!(pos <= self.get_ref().as_ref().len());
+        self.set_position(pos as u64);
+    }
+
     fn chunk(&self) -> &[u8] {
         let len = self.get_ref().as_ref().len();
         let pos = self.position();
@@ -166,13 +192,16 @@ impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for 
std::io::Cursor<T> {
         &self.get_ref().as_ref()[pos as usize..]
     }
 
-    fn advance(&mut self, cnt: usize) {
-        let pos = (self.position() as usize)
-            .checked_add(cnt)
-            .expect("overflow");
+    fn vectored_chunk(&self) -> Vec<IoSlice> {
+        vec![IoSlice::new(self.chunk())]
+    }
 
-        assert!(pos <= self.get_ref().as_ref().len());
-        self.set_position(pos as u64);
+    fn bytes(&self, size: usize) -> Bytes {
+        Bytes::copy_from_slice(&self.chunk()[..size])
+    }
+
+    fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+        vec![self.bytes(size)]
     }
 }
 
@@ -182,21 +211,30 @@ impl WriteBuf for Bytes {
         self.len()
     }
 
+    #[inline]
+    fn advance(&mut self, cnt: usize) {
+        bytes::Buf::advance(self, cnt)
+    }
+
     #[inline]
     fn chunk(&self) -> &[u8] {
         self
     }
 
     #[inline]
-    fn advance(&mut self, cnt: usize) {
-        bytes::Buf::advance(self, cnt)
+    fn vectored_chunk(&self) -> Vec<IoSlice> {
+        vec![IoSlice::new(self)]
     }
 
     #[inline]
-    fn copy_to_bytes(&self, len: usize) -> Bytes {
-        let size = cmp::min(self.len(), len);
+    fn bytes(&self, size: usize) -> Bytes {
         self.slice(..size)
     }
+
+    #[inline]
+    fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+        vec![self.slice(..size)]
+    }
 }
 
 impl WriteBuf for BytesMut {
@@ -205,19 +243,28 @@ impl WriteBuf for BytesMut {
         self.len()
     }
 
+    #[inline]
+    fn advance(&mut self, cnt: usize) {
+        bytes::Buf::advance(self, cnt)
+    }
+
     #[inline]
     fn chunk(&self) -> &[u8] {
         self
     }
 
     #[inline]
-    fn advance(&mut self, cnt: usize) {
-        bytes::Buf::advance(self, cnt)
+    fn vectored_chunk(&self) -> Vec<IoSlice> {
+        vec![IoSlice::new(self)]
     }
 
     #[inline]
-    fn copy_to_bytes(&self, len: usize) -> Bytes {
-        let size = cmp::min(self.len(), len);
+    fn bytes(&self, size: usize) -> Bytes {
         Bytes::copy_from_slice(&self[..size])
     }
+
+    #[inline]
+    fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+        vec![self.bytes(size)]
+    }
 }
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index b85d08bd9..d5c030f74 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -90,7 +90,7 @@ where
                     match self.offset {
                         Some(offset) => {
                             let size = bs.remaining();
-                            let bs = bs.copy_to_bytes(size);
+                            let bs = bs.bytes(size);
 
                             self.state = State::Append(Box::pin(async move {
                                 let res = w.append(offset, size as u64, 
AsyncBody::Bytes(bs)).await;
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 1fb404653..144d50ff0 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -71,7 +71,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
             match &mut self.buffer {
                 Buffer::Empty => {
                     if bs.remaining() >= self.buffer_size {
-                        self.buffer = 
Buffer::Consuming(bs.copy_to_bytes(self.buffer_size));
+                        self.buffer = 
Buffer::Consuming(bs.bytes(self.buffer_size));
                         return Poll::Ready(Ok(self.buffer_size));
                     }
 
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 67f18ecf3..ec42275f0 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -174,7 +174,7 @@ where
                             // Fill cache with the first write.
                             if self.cache.is_none() {
                                 let size = bs.remaining();
-                                self.cache = Some(bs.copy_to_bytes(size));
+                                self.cache = Some(bs.bytes(size));
                                 return Poll::Ready(Ok(size));
                             }
 
@@ -197,7 +197,7 @@ where
                     self.parts.push(part?);
                     // Replace the cache when last write succeeded
                     let size = bs.remaining();
-                    self.cache = Some(bs.copy_to_bytes(size));
+                    self.cache = Some(bs.bytes(size));
                     return Poll::Ready(Ok(size));
                 }
                 State::Close(_) => {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 9f54de083..c56679e19 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -73,7 +73,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
                     let w = w.take().expect("writer must be valid");
 
                     let size = bs.remaining();
-                    let bs = bs.copy_to_bytes(size);
+                    let bs = bs.bytes(size);
                     let fut = async move {
                         let res = w.write_once(bs).await;
 
diff --git a/core/src/raw/oio/write/range_write.rs 
b/core/src/raw/oio/write/range_write.rs
index 10314424c..9dfe73197 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -138,7 +138,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                             let mut total_size = current_size + remaining;
 
                             if total_size <= self.align_size {
-                                let bs = bs.copy_to_bytes(remaining);
+                                let bs = bs.bytes(remaining);
                                 self.align_buffer.push(bs);
                                 return Poll::Ready(Ok(remaining));
                             }
@@ -150,7 +150,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
 
                             let consume = total_size - total_size % 
self.align_size - current_size;
                             let mut align_buffer = self.align_buffer.clone();
-                            let bs = bs.copy_to_bytes(consume);
+                            let bs = bs.bytes(consume);
                             align_buffer.push(bs);
 
                             let written = self.written;
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 61361bdaf..6087b622c 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -61,7 +61,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
         let f = self.f.as_mut().expect("FsWriter must be initialized");
 
         Pin::new(f)
-            .poll_write(cx, bs.chunk())
+            .poll_write_vectored(cx, &bs.vectored_chunk())
             .map_err(parse_io_error)
     }
 
@@ -120,7 +120,8 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
     fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let f = self.f.as_mut().expect("FsWriter must be initialized");
 
-        f.write(bs.chunk()).map_err(parse_io_error)
+        f.write_vectored(&bs.vectored_chunk())
+            .map_err(parse_io_error)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index de4cbd116..4b2658907 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -66,7 +66,7 @@ impl oio::Write for FtpWriter {
             }
 
             let size = bs.remaining();
-            let bs = bs.copy_to_bytes(size);
+            let bs = bs.bytes(size);
 
             let path = self.path.clone();
             let backend = self.backend.clone();
diff --git a/core/src/services/gdrive/pager.rs 
b/core/src/services/gdrive/pager.rs
index ec4ad90d1..a3c836592 100644
--- a/core/src/services/gdrive/pager.rs
+++ b/core/src/services/gdrive/pager.rs
@@ -17,20 +17,19 @@
 
 use std::sync::Arc;
 
-use crate::{
-    raw::{
-        build_rel_path, build_rooted_abs_path, new_json_deserialize_error,
-        oio::{self},
-    },
-    EntryMode, Metadata, Result,
-};
 use async_trait::async_trait;
 use http::StatusCode;
 
-use super::{
-    core::{GdriveCore, GdriveFileList},
-    error::parse_error,
-};
+use super::core::GdriveCore;
+use super::core::GdriveFileList;
+use super::error::parse_error;
+use crate::raw::build_rel_path;
+use crate::raw::build_rooted_abs_path;
+use crate::raw::new_json_deserialize_error;
+use crate::raw::oio::{self};
+use crate::EntryMode;
+use crate::Metadata;
+use crate::Result;
 pub struct GdrivePager {
     path: String,
     core: Arc<GdriveCore>,
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index a48f43283..76254c0bc 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for GhacWriter {
 
                     let cache_id = self.cache_id;
                     let size = bs.remaining();
-                    let bs = bs.copy_to_bytes(size);
+                    let bs = bs.bytes(size);
 
                     let fut = async move {
                         let res = async {

Reply via email to