This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 088881f42 refactor: Polish implementation details of WriteBuf and add
vector chunks support (#3034)
088881f42 is described below
commit 088881f42b6430f6d311f233bbba33a63cbfe86e
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 13:41:24 2023 +0800
refactor: Polish implementation details of WriteBuf and add vector chunks
support (#3034)
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Add vectored chunk support
Signed-off-by: Xuanwo <[email protected]>
* format code
Signed-off-by: Xuanwo <[email protected]>
* Remove incorrect comments
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/oio/buf.rs | 173 ++++++++++++++---------
core/src/raw/oio/write/append_object_write.rs | 2 +-
core/src/raw/oio/write/exact_buf_write.rs | 2 +-
core/src/raw/oio/write/multipart_upload_write.rs | 4 +-
core/src/raw/oio/write/one_shot_write.rs | 2 +-
core/src/raw/oio/write/range_write.rs | 4 +-
core/src/services/fs/writer.rs | 5 +-
core/src/services/ftp/writer.rs | 2 +-
core/src/services/gdrive/pager.rs | 21 ++-
core/src/services/ghac/writer.rs | 2 +-
10 files changed, 132 insertions(+), 85 deletions(-)
diff --git a/core/src/raw/oio/buf.rs b/core/src/raw/oio/buf.rs
index 3e6d42264..50f3c8911 100644
--- a/core/src/raw/oio/buf.rs
+++ b/core/src/raw/oio/buf.rs
@@ -15,16 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp;
-use std::ptr;
+use std::io::IoSlice;
use bytes::Bytes;
use bytes::BytesMut;
-/// WriteBuf is used in [`oio::Write`] to provide a trait similar to
[`bytes::Buf`].
-///
-/// The biggest difference is that `Buf`'s `copy_to_slice` and `copy_to_bytes`
only needs `&self`
-/// instead of `&mut self`.
+/// WriteBuf is used in [`oio::Write`] to provide in-memory buffer support.
pub trait WriteBuf: Send + Sync {
/// Returns the number of bytes between the current position and the end
of the buffer.
///
@@ -37,6 +33,15 @@ pub trait WriteBuf: Send + Sync {
/// current position.
fn remaining(&self) -> usize;
+ /// Advance the internal cursor of the Buf
+ ///
+ /// The next call to chunk() will return a slice starting cnt bytes
further into the underlying buffer.
+ ///
+ /// # Panics
+ ///
+ /// This function may panic if cnt > self.remaining().
+ fn advance(&mut self, cnt: usize);
+
/// Returns a slice starting at the current position and of length between
0 and
/// Buf::remaining(). Note that this can return shorter slice (this allows
non-continuous
/// internal representation).
@@ -47,51 +52,44 @@ pub trait WriteBuf: Send + Sync {
/// Buf::remaining returns 0, calls to chunk() should return an empty
slice.
fn chunk(&self) -> &[u8];
- /// Advance the internal cursor of the Buf
+ /// Returns a vectored view of the underlying buffer at the current
position and of
+ /// length between 0 and Buf::remaining(). Note that this can return
shorter slice
+ /// (this allows non-continuous internal representation).
///
- /// The next call to chunk() will return a slice starting cnt bytes
further into the underlying buffer.
+ /// # Notes
///
- /// Panics
- /// This function may panic if cnt > self.remaining().
- fn advance(&mut self, cnt: usize);
+ /// This function should never panic.
+ fn vectored_chunk(&self) -> Vec<IoSlice>;
- /// Copies current chunk into dst.
- ///
- /// Returns the number of bytes copied.
+ /// Returns a bytes starting at the current position and of length between
0 and
+ /// Buf::remaining().
///
/// # Notes
///
- /// Users should not assume the returned bytes is the same as the
Buf::remaining().
- fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
- let src = self.chunk();
- let size = cmp::min(src.len(), dst.len());
-
- // # Safety
- //
- // `src` and `dst` are guaranteed have enough space for `size` bytes.
- unsafe {
- ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), size);
- }
-
- size
- }
+ /// This functions is used to concat a single bytes from underlying chunks.
+ /// Use `vectored_bytes` if you want to avoid copy when possible.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if size > self.remaining().
+ fn bytes(&self, size: usize) -> Bytes;
- /// Copies current chunk into a bytes.
+ /// Returns a vectored bytes of the underlying buffer at the current
position and of
+ /// length between 0 and Buf::remaining().
///
- /// This function may be optimized by the underlying type to avoid actual
copies.
- /// For example, Bytes implementation will do a shallow copy (ref-count
increment).
+ /// # Notes for Users
///
- /// # Notes
+ /// This functions is used to return a vectored bytes from underlying
chunks.
+ /// Use `bytes` if you just want to get a continuous bytes.
///
- /// Users should not assume the returned bytes is the same as the
Buf::remaining().
- fn copy_to_bytes(&self, len: usize) -> Bytes {
- let src = self.chunk();
- let size = cmp::min(src.len(), len);
-
- let mut ret = BytesMut::with_capacity(size);
- ret.extend_from_slice(&src[..size]);
- ret.freeze()
- }
+ /// # Notes for implementors
+ ///
+ /// It's better to align the vectored bytes with underlying chunks to
avoid copy.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if size > self.remaining().
+ fn vectored_bytes(&self, size: usize) -> Vec<Bytes>;
}
macro_rules! deref_forward_buf {
@@ -100,20 +98,24 @@ macro_rules! deref_forward_buf {
(**self).remaining()
}
+ fn advance(&mut self, cnt: usize) {
+ (**self).advance(cnt)
+ }
+
fn chunk(&self) -> &[u8] {
(**self).chunk()
}
- fn advance(&mut self, cnt: usize) {
- (**self).advance(cnt)
+ fn vectored_chunk(&self) -> Vec<IoSlice> {
+ (**self).vectored_chunk()
}
- fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
- (**self).copy_to_slice(dst)
+ fn bytes(&self, size: usize) -> Bytes {
+ (**self).bytes(size)
}
- fn copy_to_bytes(&self, len: usize) -> Bytes {
- (**self).copy_to_bytes(len)
+ fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+ (**self).vectored_bytes(size)
}
};
}
@@ -132,14 +134,29 @@ impl WriteBuf for &[u8] {
self.len()
}
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ *self = &self[cnt..];
+ }
+
#[inline]
fn chunk(&self) -> &[u8] {
self
}
#[inline]
- fn advance(&mut self, cnt: usize) {
- *self = &self[cnt..];
+ fn vectored_chunk(&self) -> Vec<IoSlice> {
+ vec![IoSlice::new(self)]
+ }
+
+ #[inline]
+ fn bytes(&self, size: usize) -> Bytes {
+ Bytes::copy_from_slice(&self[..size])
+ }
+
+ #[inline]
+ fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+ vec![self.bytes(size)]
}
}
@@ -155,6 +172,15 @@ impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for
std::io::Cursor<T> {
len - pos as usize
}
+ fn advance(&mut self, cnt: usize) {
+ let pos = (self.position() as usize)
+ .checked_add(cnt)
+ .expect("overflow");
+
+ assert!(pos <= self.get_ref().as_ref().len());
+ self.set_position(pos as u64);
+ }
+
fn chunk(&self) -> &[u8] {
let len = self.get_ref().as_ref().len();
let pos = self.position();
@@ -166,13 +192,16 @@ impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for
std::io::Cursor<T> {
&self.get_ref().as_ref()[pos as usize..]
}
- fn advance(&mut self, cnt: usize) {
- let pos = (self.position() as usize)
- .checked_add(cnt)
- .expect("overflow");
+ fn vectored_chunk(&self) -> Vec<IoSlice> {
+ vec![IoSlice::new(self.chunk())]
+ }
- assert!(pos <= self.get_ref().as_ref().len());
- self.set_position(pos as u64);
+ fn bytes(&self, size: usize) -> Bytes {
+ Bytes::copy_from_slice(&self.chunk()[..size])
+ }
+
+ fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+ vec![self.bytes(size)]
}
}
@@ -182,21 +211,30 @@ impl WriteBuf for Bytes {
self.len()
}
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ bytes::Buf::advance(self, cnt)
+ }
+
#[inline]
fn chunk(&self) -> &[u8] {
self
}
#[inline]
- fn advance(&mut self, cnt: usize) {
- bytes::Buf::advance(self, cnt)
+ fn vectored_chunk(&self) -> Vec<IoSlice> {
+ vec![IoSlice::new(self)]
}
#[inline]
- fn copy_to_bytes(&self, len: usize) -> Bytes {
- let size = cmp::min(self.len(), len);
+ fn bytes(&self, size: usize) -> Bytes {
self.slice(..size)
}
+
+ #[inline]
+ fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+ vec![self.slice(..size)]
+ }
}
impl WriteBuf for BytesMut {
@@ -205,19 +243,28 @@ impl WriteBuf for BytesMut {
self.len()
}
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ bytes::Buf::advance(self, cnt)
+ }
+
#[inline]
fn chunk(&self) -> &[u8] {
self
}
#[inline]
- fn advance(&mut self, cnt: usize) {
- bytes::Buf::advance(self, cnt)
+ fn vectored_chunk(&self) -> Vec<IoSlice> {
+ vec![IoSlice::new(self)]
}
#[inline]
- fn copy_to_bytes(&self, len: usize) -> Bytes {
- let size = cmp::min(self.len(), len);
+ fn bytes(&self, size: usize) -> Bytes {
Bytes::copy_from_slice(&self[..size])
}
+
+ #[inline]
+ fn vectored_bytes(&self, size: usize) -> Vec<Bytes> {
+ vec![self.bytes(size)]
+ }
}
diff --git a/core/src/raw/oio/write/append_object_write.rs
b/core/src/raw/oio/write/append_object_write.rs
index b85d08bd9..d5c030f74 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -90,7 +90,7 @@ where
match self.offset {
Some(offset) => {
let size = bs.remaining();
- let bs = bs.copy_to_bytes(size);
+ let bs = bs.bytes(size);
self.state = State::Append(Box::pin(async move {
let res = w.append(offset, size as u64,
AsyncBody::Bytes(bs)).await;
diff --git a/core/src/raw/oio/write/exact_buf_write.rs
b/core/src/raw/oio/write/exact_buf_write.rs
index 1fb404653..144d50ff0 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -71,7 +71,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
match &mut self.buffer {
Buffer::Empty => {
if bs.remaining() >= self.buffer_size {
- self.buffer =
Buffer::Consuming(bs.copy_to_bytes(self.buffer_size));
+ self.buffer =
Buffer::Consuming(bs.bytes(self.buffer_size));
return Poll::Ready(Ok(self.buffer_size));
}
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index 67f18ecf3..ec42275f0 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -174,7 +174,7 @@ where
// Fill cache with the first write.
if self.cache.is_none() {
let size = bs.remaining();
- self.cache = Some(bs.copy_to_bytes(size));
+ self.cache = Some(bs.bytes(size));
return Poll::Ready(Ok(size));
}
@@ -197,7 +197,7 @@ where
self.parts.push(part?);
// Replace the cache when last write succeeded
let size = bs.remaining();
- self.cache = Some(bs.copy_to_bytes(size));
+ self.cache = Some(bs.bytes(size));
return Poll::Ready(Ok(size));
}
State::Close(_) => {
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index 9f54de083..c56679e19 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -73,7 +73,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
let w = w.take().expect("writer must be valid");
let size = bs.remaining();
- let bs = bs.copy_to_bytes(size);
+ let bs = bs.bytes(size);
let fut = async move {
let res = w.write_once(bs).await;
diff --git a/core/src/raw/oio/write/range_write.rs
b/core/src/raw/oio/write/range_write.rs
index 10314424c..9dfe73197 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -138,7 +138,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
let mut total_size = current_size + remaining;
if total_size <= self.align_size {
- let bs = bs.copy_to_bytes(remaining);
+ let bs = bs.bytes(remaining);
self.align_buffer.push(bs);
return Poll::Ready(Ok(remaining));
}
@@ -150,7 +150,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
let consume = total_size - total_size %
self.align_size - current_size;
let mut align_buffer = self.align_buffer.clone();
- let bs = bs.copy_to_bytes(consume);
+ let bs = bs.bytes(consume);
align_buffer.push(bs);
let written = self.written;
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 61361bdaf..6087b622c 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -61,7 +61,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
let f = self.f.as_mut().expect("FsWriter must be initialized");
Pin::new(f)
- .poll_write(cx, bs.chunk())
+ .poll_write_vectored(cx, &bs.vectored_chunk())
.map_err(parse_io_error)
}
@@ -120,7 +120,8 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let f = self.f.as_mut().expect("FsWriter must be initialized");
- f.write(bs.chunk()).map_err(parse_io_error)
+ f.write_vectored(&bs.vectored_chunk())
+ .map_err(parse_io_error)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index de4cbd116..4b2658907 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -66,7 +66,7 @@ impl oio::Write for FtpWriter {
}
let size = bs.remaining();
- let bs = bs.copy_to_bytes(size);
+ let bs = bs.bytes(size);
let path = self.path.clone();
let backend = self.backend.clone();
diff --git a/core/src/services/gdrive/pager.rs
b/core/src/services/gdrive/pager.rs
index ec4ad90d1..a3c836592 100644
--- a/core/src/services/gdrive/pager.rs
+++ b/core/src/services/gdrive/pager.rs
@@ -17,20 +17,19 @@
use std::sync::Arc;
-use crate::{
- raw::{
- build_rel_path, build_rooted_abs_path, new_json_deserialize_error,
- oio::{self},
- },
- EntryMode, Metadata, Result,
-};
use async_trait::async_trait;
use http::StatusCode;
-use super::{
- core::{GdriveCore, GdriveFileList},
- error::parse_error,
-};
+use super::core::GdriveCore;
+use super::core::GdriveFileList;
+use super::error::parse_error;
+use crate::raw::build_rel_path;
+use crate::raw::build_rooted_abs_path;
+use crate::raw::new_json_deserialize_error;
+use crate::raw::oio::{self};
+use crate::EntryMode;
+use crate::Metadata;
+use crate::Result;
pub struct GdrivePager {
path: String,
core: Arc<GdriveCore>,
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index a48f43283..76254c0bc 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for GhacWriter {
let cache_id = self.cache_id;
let size = bs.remaining();
- let bs = bs.copy_to_bytes(size);
+ let bs = bs.bytes(size);
let fut = async move {
let res = async {