This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-buf in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 700c73307461329fa02b73f95b6058461b8895cd Author: Xuanwo <[email protected]> AuthorDate: Tue Sep 12 13:20:20 2023 +0800 Fix build Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/buf.rs | 175 ++++++++++++++--------- core/src/raw/oio/write/append_object_write.rs | 2 +- core/src/raw/oio/write/exact_buf_write.rs | 2 +- core/src/raw/oio/write/multipart_upload_write.rs | 4 +- core/src/raw/oio/write/one_shot_write.rs | 2 +- core/src/raw/oio/write/range_write.rs | 4 +- core/src/services/ftp/writer.rs | 2 +- core/src/services/ghac/writer.rs | 2 +- 8 files changed, 120 insertions(+), 73 deletions(-) diff --git a/core/src/raw/oio/buf.rs b/core/src/raw/oio/buf.rs index 3e6d42264..b92894e88 100644 --- a/core/src/raw/oio/buf.rs +++ b/core/src/raw/oio/buf.rs @@ -15,16 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::cmp; -use std::ptr; - use bytes::Bytes; use bytes::BytesMut; +use std::io::IoSlice; -/// WriteBuf is used in [`oio::Write`] to provide a trait similar to [`bytes::Buf`]. -/// -/// The biggest difference is that `Buf`'s `copy_to_slice` and `copy_to_bytes` only needs `&self` -/// instead of `&mut self`. +/// WriteBuf is used in [`oio::Write`] to provide in-memory buffer support. pub trait WriteBuf: Send + Sync { /// Returns the number of bytes between the current position and the end of the buffer. /// @@ -37,6 +32,15 @@ pub trait WriteBuf: Send + Sync { /// current position. fn remaining(&self) -> usize; + /// Advance the internal cursor of the Buf + /// + /// The next call to chunk() will return a slice starting cnt bytes further into the underlying buffer. + /// + /// # Panics + /// + /// This function may panic if cnt > self.remaining(). + fn advance(&mut self, cnt: usize); + /// Returns a slice starting at the current position and of length between 0 and /// Buf::remaining(). Note that this can return shorter slice (this allows non-continuous /// internal representation). @@ -47,51 +51,45 @@ pub trait WriteBuf: Send + Sync { /// Buf::remaining returns 0, calls to chunk() should return an empty slice. fn chunk(&self) -> &[u8]; - /// Advance the internal cursor of the Buf + /// Returns a vectored view of the underlying buffer at the current position and of + /// length between 0 and Buf::remaining(). Note that this can return shorter slice + /// (this allows non-continuous internal representation). /// - /// The next call to chunk() will return a slice starting cnt bytes further into the underlying buffer. + /// # Notes /// - /// Panics - /// This function may panic if cnt > self.remaining(). - fn advance(&mut self, cnt: usize); + /// This function should never panic. Once the end of the buffer is reached, i.e., + /// Buf::remaining returns 0, calls to vectored_chunk() should return an empty `vec![]`. + fn vectored_chunk(&self) -> Vec<IoSlice>; - /// Copies current chunk into dst. - /// - /// Returns the number of bytes copied. + /// Returns a bytes starting at the current position and of length between 0 and + /// Buf::remaining(). /// /// # Notes /// - /// Users should not assume the returned bytes is the same as the Buf::remaining(). - fn copy_to_slice(&self, dst: &mut [u8]) -> usize { - let src = self.chunk(); - let size = cmp::min(src.len(), dst.len()); - - // # Safety - // - // `src` and `dst` are guaranteed have enough space for `size` bytes. - unsafe { - ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), size); - } - - size - } + /// This functions is used to concat a single bytes from underlying chunks. + /// Use `vectored_bytes` if you want to avoid copy when possible. + /// + /// # Panics + /// + /// This function will panic if size > self.remaining(). + fn bytes(&self, size: usize) -> Bytes; - /// Copies current chunk into a bytes. + /// Returns a vectored bytes of the underlying buffer at the current position and of + /// length between 0 and Buf::remaining(). /// - /// This function may be optimized by the underlying type to avoid actual copies. - /// For example, Bytes implementation will do a shallow copy (ref-count increment). + /// # Notes for Users /// - /// # Notes + /// This functions is used to return a vectored bytes from underlying chunks. + /// Use `bytes` if you just want to get a continuous bytes. /// - /// Users should not assume the returned bytes is the same as the Buf::remaining(). - fn copy_to_bytes(&self, len: usize) -> Bytes { - let src = self.chunk(); - let size = cmp::min(src.len(), len); - - let mut ret = BytesMut::with_capacity(size); - ret.extend_from_slice(&src[..size]); - ret.freeze() - } + /// # Notes for implementors + /// + /// It's better to align the vectored bytes with underlying chunks to avoid copy. + /// + /// # Panics + /// + /// This function will panic if size > self.remaining(). + fn vectored_bytes(&self, size: usize) -> Vec<Bytes>; } macro_rules! deref_forward_buf { @@ -100,20 +98,24 @@ macro_rules! deref_forward_buf { (**self).remaining() } + fn advance(&mut self, cnt: usize) { + (**self).advance(cnt) + } + fn chunk(&self) -> &[u8] { (**self).chunk() } - fn advance(&mut self, cnt: usize) { - (**self).advance(cnt) + fn vectored_chunk(&self) -> Vec<IoSlice> { + (**self).vectored_chunk() } - fn copy_to_slice(&self, dst: &mut [u8]) -> usize { - (**self).copy_to_slice(dst) + fn bytes(&self, size: usize) -> Bytes { + (**self).bytes(size) } - fn copy_to_bytes(&self, len: usize) -> Bytes { - (**self).copy_to_bytes(len) + fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { + (**self).vectored_bytes(size) } }; } @@ -132,14 +134,29 @@ impl WriteBuf for &[u8] { self.len() } + #[inline] + fn advance(&mut self, cnt: usize) { + *self = &self[cnt..]; + } + #[inline] fn chunk(&self) -> &[u8] { self } #[inline] - fn advance(&mut self, cnt: usize) { - *self = &self[cnt..]; + fn vectored_chunk(&self) -> Vec<IoSlice> { + vec![IoSlice::new(self)] + } + + #[inline] + fn bytes(&self, size: usize) -> Bytes { + Bytes::copy_from_slice(&self[..size]) + } + + #[inline] + fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { + vec![self.bytes(size)] } } @@ -155,6 +172,15 @@ impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for std::io::Cursor<T> { len - pos as usize } + fn advance(&mut self, cnt: usize) { + let pos = (self.position() as usize) + .checked_add(cnt) + .expect("overflow"); + + assert!(pos <= self.get_ref().as_ref().len()); + self.set_position(pos as u64); + } + fn chunk(&self) -> &[u8] { let len = self.get_ref().as_ref().len(); let pos = self.position(); @@ -166,13 +192,16 @@ impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for std::io::Cursor<T> { &self.get_ref().as_ref()[pos as usize..] } - fn advance(&mut self, cnt: usize) { - let pos = (self.position() as usize) - .checked_add(cnt) - .expect("overflow"); + fn vectored_chunk(&self) -> Vec<IoSlice> { + vec![IoSlice::new(self.chunk())] + } - assert!(pos <= self.get_ref().as_ref().len()); - self.set_position(pos as u64); + fn bytes(&self, size: usize) -> Bytes { + Bytes::copy_from_slice(&self.chunk()[..size]) + } + + fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { + vec![self.bytes(size)] } } @@ -182,21 +211,30 @@ impl WriteBuf for Bytes { self.len() } + #[inline] + fn advance(&mut self, cnt: usize) { + bytes::Buf::advance(self, cnt) + } + #[inline] fn chunk(&self) -> &[u8] { self } #[inline] - fn advance(&mut self, cnt: usize) { - bytes::Buf::advance(self, cnt) + fn vectored_chunk(&self) -> Vec<IoSlice> { + vec![IoSlice::new(self)] } #[inline] - fn copy_to_bytes(&self, len: usize) -> Bytes { - let size = cmp::min(self.len(), len); + fn bytes(&self, size: usize) -> Bytes { self.slice(..size) } + + #[inline] + fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { + vec![self.slice(..size)] + } } impl WriteBuf for BytesMut { @@ -205,19 +243,28 @@ impl WriteBuf for BytesMut { self.len() } + #[inline] + fn advance(&mut self, cnt: usize) { + bytes::Buf::advance(self, cnt) + } + #[inline] fn chunk(&self) -> &[u8] { self } #[inline] - fn advance(&mut self, cnt: usize) { - bytes::Buf::advance(self, cnt) + fn vectored_chunk(&self) -> Vec<IoSlice> { + vec![IoSlice::new(self)] } #[inline] - fn copy_to_bytes(&self, len: usize) -> Bytes { - let size = cmp::min(self.len(), len); + fn bytes(&self, size: usize) -> Bytes { Bytes::copy_from_slice(&self[..size]) } + + #[inline] + fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { + vec![self.bytes(size)] + } } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index b85d08bd9..d5c030f74 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -90,7 +90,7 @@ where match self.offset { Some(offset) => { let size = bs.remaining(); - let bs = bs.copy_to_bytes(size); + let bs = bs.bytes(size); self.state = State::Append(Box::pin(async move { let res = w.append(offset, size as u64, AsyncBody::Bytes(bs)).await; diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 1fb404653..144d50ff0 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -71,7 +71,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { match &mut self.buffer { Buffer::Empty => { if bs.remaining() >= self.buffer_size { - self.buffer = Buffer::Consuming(bs.copy_to_bytes(self.buffer_size)); + self.buffer = Buffer::Consuming(bs.bytes(self.buffer_size)); return Poll::Ready(Ok(self.buffer_size)); } diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 67f18ecf3..ec42275f0 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -174,7 +174,7 @@ where // Fill cache with the first write. if self.cache.is_none() { let size = bs.remaining(); - self.cache = Some(bs.copy_to_bytes(size)); + self.cache = Some(bs.bytes(size)); return Poll::Ready(Ok(size)); } @@ -197,7 +197,7 @@ where self.parts.push(part?); // Replace the cache when last write succeeded let size = bs.remaining(); - self.cache = Some(bs.copy_to_bytes(size)); + self.cache = Some(bs.bytes(size)); return Poll::Ready(Ok(size)); } State::Close(_) => { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 9f54de083..c56679e19 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -73,7 +73,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { let w = w.take().expect("writer must be valid"); let size = bs.remaining(); - let bs = bs.copy_to_bytes(size); + let bs = bs.bytes(size); let fut = async move { let res = w.write_once(bs).await; diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 10314424c..9dfe73197 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -138,7 +138,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> { let mut total_size = current_size + remaining; if total_size <= self.align_size { - let bs = bs.copy_to_bytes(remaining); + let bs = bs.bytes(remaining); self.align_buffer.push(bs); return Poll::Ready(Ok(remaining)); } @@ -150,7 +150,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> { let consume = total_size - total_size % self.align_size - current_size; let mut align_buffer = self.align_buffer.clone(); - let bs = bs.copy_to_bytes(consume); + let bs = bs.bytes(consume); align_buffer.push(bs); let written = self.written; diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index de4cbd116..4b2658907 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -66,7 +66,7 @@ impl oio::Write for FtpWriter { } let size = bs.remaining(); - let bs = bs.copy_to_bytes(size); + let bs = bs.bytes(size); let path = self.path.clone(); let backend = self.backend.clone(); diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index a48f43283..76254c0bc 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -65,7 +65,7 @@ impl oio::Write for GhacWriter { let cache_id = self.cache_id; let size = bs.remaining(); - let bs = bs.copy_to_bytes(size); + let bs = bs.bytes(size); let fut = async move { let res = async {
