This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-buffer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 32d85ead80257f1fca1078bc09d2743fdbc9ae8a Author: Xuanwo <[email protected]> AuthorDate: Tue Sep 12 16:47:36 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 2 + core/src/raw/oio/buf/chunked_bytes.rs | 302 ++++++++++++++++++++++++------ core/src/raw/oio/buf/write_buf.rs | 25 +++ core/src/raw/oio/write/exact_buf_write.rs | 82 ++------ 4 files changed, 282 insertions(+), 129 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 338587062..af8c9a205 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -34,6 +34,8 @@ impl oio::Write for BlackHoleWriter { _: &mut Context<'_>, bs: &dyn oio::WriteBuf, ) -> Poll<opendal::Result<usize>> { + // Simulate the write operation. + let _ = bs.bytes(bs.remaining()); Poll::Ready(Ok(bs.remaining())) } diff --git a/core/src/raw/oio/buf/chunked_bytes.rs b/core/src/raw/oio/buf/chunked_bytes.rs index e1a4df3c0..ae8bcb6d9 100644 --- a/core/src/raw/oio/buf/chunked_bytes.rs +++ b/core/src/raw/oio/buf/chunked_bytes.rs @@ -16,6 +16,7 @@ // under the License. use bytes::{Bytes, BytesMut}; +use std::cmp::min; use std::collections::VecDeque; use std::io::IoSlice; use std::task::{Context, Poll}; @@ -23,10 +24,16 @@ use std::task::{Context, Poll}; use crate::raw::*; use crate::*; +// TODO: 64KiB is picked based on experiences, should be configurable +const DEFAULT_CHUNK_SIZE: usize = 64 * 1024; + /// ChunkedBytes is used represents a non-contiguous bytes in memory. #[derive(Clone)] pub struct ChunkedBytes { - inner: VecDeque<Bytes>, + chunk_size: usize, + + frozen: VecDeque<Bytes>, + active: BytesMut, size: usize, } @@ -37,11 +44,41 @@ impl Default for ChunkedBytes { } impl ChunkedBytes { - /// Create a new chunked cursor. + /// Create a new chunked bytes. pub fn new() -> Self { Self { - inner: VecDeque::new(), + frozen: VecDeque::new(), + active: BytesMut::new(), + size: 0, + + chunk_size: DEFAULT_CHUNK_SIZE, + } + } + + /// Create a new chunked cursor with given chunk size. + pub fn with_chunk_size(chunk_size: usize) -> Self { + Self { + frozen: VecDeque::new(), + active: BytesMut::new(), + size: 0, + + chunk_size, + } + } + + /// Build a chunked bytes from a vector of bytes. + /// + /// This function is guaranteed to run in O(1) time and to not re-allocate the Vec’s buffer + /// or allocate any additional memory. + /// + /// Reference: <https://doc.rust-lang.org/stable/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT,+A%3E%3E-for-VecDeque%3CT,+A%3E> + pub fn from_vec(bs: Vec<Bytes>) -> Self { + Self { + frozen: bs.into(), + active: BytesMut::new(), size: 0, + + chunk_size: DEFAULT_CHUNK_SIZE, } } @@ -58,13 +95,73 @@ impl ChunkedBytes { /// Clear the entire cursor. pub fn clear(&mut self) { self.size = 0; - self.inner.clear(); + self.frozen.clear(); + self.active.clear(); + } + + /// Push a new bytes into ChunkedBytes. + pub fn push(&mut self, mut bs: Bytes) { + self.size += bs.len(); + + // Try to fill bytes into active first. + let remaining = self.chunk_size.saturating_sub(self.active.len()); + if remaining > 0 { + let len = min(remaining, bs.len()); + self.active.extend_from_slice(&bs.split_to(len)); + } + + // If active is full, freeze it and push it into frozen. + if self.active.len() == self.chunk_size { + self.frozen.push_back(self.active.split().freeze()); + } + + // Split remaining bytes into chunks. + while bs.len() >= self.chunk_size { + self.frozen.push_back(bs.split_to(self.chunk_size)); + } + + // Append to active if there are remaining bytes. + if !bs.is_empty() { + self.active.extend_from_slice(&bs); + } } - /// Push a new bytes into vector cursor. - pub fn push(&mut self, bs: Bytes) { + /// Push a new &[u8] into ChunkedBytes. + pub fn extend_from_slice(&mut self, bs: &[u8]) { self.size += bs.len(); - self.inner.push_back(bs); + + let mut remaining = bs; + + while !remaining.is_empty() { + let available = self.chunk_size.saturating_sub(self.active.len()); + + // available == 0 means self.active.len() >= CHUNK_SIZE + if available == 0 { + self.frozen.push_back(self.active.split().freeze()); + self.active.reserve(self.chunk_size); + continue; + } + + let size = min(remaining.len(), available); + self.active.extend_from_slice(&remaining[0..size]); + + remaining = &remaining[size..]; + } + } + + /// Pull data from [`oio::WriteBuf`] into ChunkedBytes. + pub fn extend_from_write_buf(&mut self, size: usize, buf: &dyn oio::WriteBuf) -> usize { + let to_write = min(buf.chunk().len(), size); + + if buf.is_bytes_optimized(to_write) && to_write > self.chunk_size { + // If the chunk is optimized, we can just push it directly. + self.push(buf.bytes(to_write)); + } else { + // Otherwise, we should copy it into the buffer. + self.extend_from_slice(&buf.chunk()[..to_write]); + } + + to_write } } @@ -82,31 +179,39 @@ impl oio::WriteBuf for ChunkedBytes { ); self.size -= cnt; - loop { - if cnt == 0 { - break; - } - let bs = self.inner.front_mut().unwrap(); - if cnt >= bs.len() { - cnt -= bs.len(); - self.inner.pop_front(); + while cnt > 0 { + if let Some(front) = self.frozen.front_mut() { + if front.len() <= cnt { + cnt -= front.len(); + self.frozen.pop_front(); // Remove the entire chunk. + } else { + front.advance(cnt); // Split and keep the remaining part. + break; + } } else { - bs.advance(cnt); - cnt = 0; + // Here, cnt must be <= self.active.len() due to the checks above + self.active.advance(cnt); // Remove cnt bytes from the active buffer. + break; } } } fn chunk(&self) -> &[u8] { - match self.inner.front() { + match self.frozen.front() { Some(v) => v, - None => &[], + None => &self.active, } } fn vectored_chunk(&self) -> Vec<IoSlice> { - self.inner.iter().map(|v| IoSlice::new(v)).collect() + let it = self.frozen.iter().map(|v| IoSlice::new(v)); + + if !self.active.is_empty() { + it.chain([IoSlice::new(&self.active)]).collect() + } else { + it.collect() + } } fn bytes(&self, size: usize) -> Bytes { @@ -121,29 +226,40 @@ impl oio::WriteBuf for ChunkedBytes { return Bytes::new(); } - if let Some(bs) = self.inner.front() { + if let Some(bs) = self.frozen.front() { if size <= bs.len() { return bs.slice(..size); } } let mut remaining = size; - let mut buf = BytesMut::with_capacity(size); - for bs in self.inner.iter() { + let mut result = BytesMut::with_capacity(size); + + // First, go through the frozen buffer. + for chunk in &self.frozen { + let to_copy = min(remaining, chunk.len()); + result.extend_from_slice(&chunk[0..to_copy]); + remaining -= to_copy; + if remaining == 0 { break; } + } - if remaining <= bs.len() { - buf.extend_from_slice(&bs[..remaining]); - break; - } + // Then, get from the active buffer if necessary. + if remaining > 0 { + result.extend_from_slice(&self.active[0..remaining]); + } + + result.freeze() + } - buf.extend_from_slice(bs); - remaining -= bs.len(); + fn is_bytes_optimized(&self, size: usize) -> bool { + if let Some(bs) = self.frozen.front() { + return size <= bs.len(); } - buf.freeze() + false } fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { @@ -156,18 +272,24 @@ impl oio::WriteBuf for ChunkedBytes { let mut remaining = size; let mut buf = vec![]; - for bs in self.inner.iter() { + for bs in self.frozen.iter() { if remaining == 0 { break; } - if remaining <= bs.len() { - buf.push(bs.slice(..remaining)); - break; + let to_take = min(remaining, bs.len()); + + if to_take == bs.len() { + buf.push(bs.clone()); // Clone is shallow; no data copy occurs. + } else { + buf.push(bs.slice(0..to_take)); } - buf.push(bs.clone()); - remaining -= bs.len(); + remaining -= to_take; + } + + if remaining > 0 { + buf.push(Bytes::copy_from_slice(&self.active[0..remaining])); } buf @@ -176,11 +298,15 @@ impl oio::WriteBuf for ChunkedBytes { impl oio::Stream for ChunkedBytes { fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { - match self.inner.pop_front() { + match self.frozen.pop_front() { Some(bs) => { self.size -= bs.len(); Poll::Ready(Some(Ok(bs))) } + None if !self.active.is_empty() => { + self.size -= self.active.len(); + Poll::Ready(Some(Ok(self.active.split().freeze()))) + } None => Poll::Ready(None), } } @@ -195,14 +321,17 @@ impl oio::Stream for ChunkedBytes { #[cfg(test)] mod tests { + use log::debug; use pretty_assertions::assert_eq; + use rand::{thread_rng, Rng, RngCore}; + use sha2::{Digest, Sha256}; use super::*; - use crate::raw::oio::{StreamExt, WriteBuf}; + use crate::raw::oio::WriteBuf; #[test] fn test_chunked_bytes_write_buf() -> Result<()> { - let mut c = ChunkedBytes::new(); + let mut c = ChunkedBytes::with_chunk_size(5); c.push(Bytes::from("hello")); assert_eq!(c.len(), 5); @@ -273,31 +402,80 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_chunked_bytes_stream() -> Result<()> { - let mut c = ChunkedBytes::new(); - - c.push(Bytes::from("hello")); - assert_eq!(c.len(), 5); - assert!(!c.is_empty()); - - c.push(Bytes::from("world")); - assert_eq!(c.len(), 10); - assert!(!c.is_empty()); - - let bs = c.next().await.unwrap().unwrap(); - assert_eq!(bs, Bytes::from("hello")); - assert_eq!(c.len(), 5); - assert!(!c.is_empty()); + #[test] + fn test_fuzz_chunked_bytes_push() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut rng = thread_rng(); + + let chunk_size = rng.gen_range(1..10); + let mut cb = ChunkedBytes::with_chunk_size(chunk_size); + debug!("test_fuzz_chunked_bytes_push: chunk size: {chunk_size}"); + + let mut expected = BytesMut::new(); + for _ in 0..1000 { + let size = rng.gen_range(1..20); + debug!("test_fuzz_chunked_bytes_push: write size: {size}"); + + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + + expected.extend_from_slice(&content); + cb.push(Bytes::from(content.clone())); + + let cnt = rng.gen_range(0..expected.len()); + expected.advance(cnt); + cb.advance(cnt); + + assert_eq!(expected.len(), cb.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&expected)), + format!("{:x}", Sha256::digest(&cb.bytes(cb.len()))) + ); + } - let bs = c.next().await.unwrap().unwrap(); - assert_eq!(bs, Bytes::from("world")); - assert_eq!(c.len(), 0); - assert!(c.is_empty()); + Ok(()) + } - c.clear(); - assert_eq!(c.len(), 0); - assert!(c.is_empty()); + #[test] + fn test_fuzz_chunked_bytes_extend_from_slice() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut rng = thread_rng(); + + let chunk_size = rng.gen_range(1..10); + let mut cb = ChunkedBytes::with_chunk_size(chunk_size); + debug!("test_fuzz_chunked_bytes_extend_from_slice: chunk size: {chunk_size}"); + + let mut expected = BytesMut::new(); + for _ in 0..1000 { + let size = rng.gen_range(1..20); + debug!("test_fuzz_chunked_bytes_extend_from_slice: write size: {size}"); + + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + + expected.extend_from_slice(&content); + cb.extend_from_slice(&content); + + let cnt = rng.gen_range(0..expected.len()); + expected.advance(cnt); + cb.advance(cnt); + + assert_eq!(expected.len(), cb.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&expected)), + format!("{:x}", Sha256::digest(&cb.bytes(cb.len()))) + ); + } Ok(()) } diff --git a/core/src/raw/oio/buf/write_buf.rs b/core/src/raw/oio/buf/write_buf.rs index 50f3c8911..d70ce640a 100644 --- a/core/src/raw/oio/buf/write_buf.rs +++ b/core/src/raw/oio/buf/write_buf.rs @@ -74,6 +74,22 @@ pub trait WriteBuf: Send + Sync { /// This function will panic if size > self.remaining(). fn bytes(&self, size: usize) -> Bytes; + /// Returns true if the underlying buffer is optimized for bytes with given size. + /// + /// # Notes + /// + /// This function is used to avoid copy when possible. Implementors should return `true` + /// the given `self.bytes(size)` could be done without cost. For example, the underlying + /// buffer is `Bytes`. + /// + /// # Panics + /// + /// This function will panic if size > self.remaining(). + fn is_bytes_optimized(&self, size: usize) -> bool { + let _ = size; + false + } + /// Returns a vectored bytes of the underlying buffer at the current position and of /// length between 0 and Buf::remaining(). /// @@ -114,6 +130,10 @@ macro_rules! deref_forward_buf { (**self).bytes(size) } + fn is_bytes_optimized(&self, size: usize) -> bool { + (**self).is_bytes_optimized(size) + } + fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { (**self).vectored_bytes(size) } @@ -231,6 +251,11 @@ impl WriteBuf for Bytes { self.slice(..size) } + #[inline] + fn is_bytes_optimized(&self, _: usize) -> bool { + true + } + #[inline] fn vectored_bytes(&self, size: usize) -> Vec<Bytes> { vec![self.slice(..size)] diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 144d50ff0..02ab9d926 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -15,14 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::min; use std::task::ready; use std::task::Context; use std::task::Poll; use async_trait::async_trait; -use bytes::Bytes; -use bytes::BytesMut; use crate::raw::oio::WriteBuf; use crate::raw::*; @@ -44,7 +41,7 @@ pub struct ExactBufWriter<W: oio::Write> { /// The size for buffer, we will flush the underlying storage at the size of this buffer. buffer_size: usize, - buffer: Buffer, + buffer: oio::ChunkedBytes, } impl<W: oio::Write> ExactBufWriter<W> { @@ -53,83 +50,33 @@ impl<W: oio::Write> ExactBufWriter<W> { Self { inner, buffer_size, - buffer: Buffer::Empty, + buffer: oio::ChunkedBytes::default(), } } } -enum Buffer { - Empty, - Filling(BytesMut), - Consuming(Bytes), -} - #[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> Poll<Result<usize>> { - loop { - match &mut self.buffer { - Buffer::Empty => { - if bs.remaining() >= self.buffer_size { - self.buffer = Buffer::Consuming(bs.bytes(self.buffer_size)); - return Poll::Ready(Ok(self.buffer_size)); - } - - let chunk = bs.chunk(); - let mut fill = BytesMut::with_capacity(chunk.len()); - fill.extend_from_slice(chunk); - self.buffer = Buffer::Filling(fill); - return Poll::Ready(Ok(chunk.len())); - } - Buffer::Filling(fill) => { - if fill.len() >= self.buffer_size { - self.buffer = Buffer::Consuming(fill.split().freeze()); - continue; - } - - let size = min(self.buffer_size - fill.len(), bs.chunk().len()); - fill.extend_from_slice(&bs.chunk()[..size]); - return Poll::Ready(Ok(size)); - } - Buffer::Consuming(consume) => { - // Make sure filled buffer has been flushed. - // - // TODO: maybe we can re-fill it after a successful write. - while !consume.is_empty() { - let n = ready!(self.inner.poll_write(cx, consume)?); - consume.advance(n); - } - self.buffer = Buffer::Empty; - } - } + if self.buffer.len() >= self.buffer_size { + let written = ready!(self.inner.poll_write(cx, &self.buffer)?); + self.buffer.advance(written); } + + let remaining = self.buffer_size - self.buffer.len(); + let written = self.buffer.extend_from_write_buf(remaining, bs); + Poll::Ready(Ok(written)) } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.buffer = Buffer::Empty; + self.buffer.clear(); self.inner.poll_abort(cx) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - loop { - match &mut self.buffer { - Buffer::Empty => break, - Buffer::Filling(fill) => { - self.buffer = Buffer::Consuming(fill.split().freeze()); - continue; - } - Buffer::Consuming(consume) => { - // Make sure filled buffer has been flushed. - // - // TODO: maybe we can re-fill it after a successful write. - while !consume.is_empty() { - let n = ready!(self.inner.poll_write(cx, &consume))?; - consume.advance(n); - } - self.buffer = Buffer::Empty; - break; - } - } + while !self.buffer.is_empty() { + let n = ready!(self.inner.poll_write(cx, &self.buffer))?; + self.buffer.advance(n); } self.inner.poll_close(cx) @@ -138,6 +85,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { #[cfg(test)] mod tests { + use bytes::Bytes; use log::debug; use pretty_assertions::assert_eq; use rand::thread_rng; @@ -231,7 +179,7 @@ mod tests { let mut bs = Bytes::from(content.clone()); while !bs.is_empty() { let n = writer.write(&bs).await?; - bs.advance(n as usize); + bs.advance(n); } } writer.close().await?;
