This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 27c6a77665 feat(core): sets default chunk_size and sends buffer >
chunk_size directly (#4710)
27c6a77665 is described below
commit 27c6a776653b7671da8171dea8c36283713eeb45
Author: Yingwen <[email protected]>
AuthorDate: Thu Jun 13 00:56:56 2024 +0800
feat(core): sets default chunk_size and sends buffer > chunk_size directly
(#4710)
---
core/benches/oio/write.rs | 2 +-
core/src/layers/complete.rs | 37 +++--
core/src/raw/oio/write/chunked_write.rs | 260 ++++++++++++++++++++++++++++++--
core/src/types/operator/operator.rs | 24 +--
4 files changed, 287 insertions(+), 36 deletions(-)
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 13f31560d7..d9d8dd8b93 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -44,7 +44,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
group.bench_with_input(size.to_string(), &content, |b, content| {
b.to_async(&*TOKIO).iter(|| async {
- let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024);
+ let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024,
true);
let mut bs = content.clone();
while !bs.is_empty() {
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 97483f10b1..1d38fbda6b 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -423,28 +423,35 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}
// Calculate buffer size.
- let chunk_size = args.chunk().map(|mut size| {
- if let Some(v) = capability.write_multi_max_size {
- size = size.min(v);
- }
- if let Some(v) = capability.write_multi_min_size {
- size = size.max(v);
- }
- if let Some(v) = capability.write_multi_align_size {
- // Make sure size >= size first.
- size = size.max(v);
- size -= size % v;
- }
+ // If `chunk` is not set, we use `write_multi_min_size` or
`write_multi_align_size`
+ // as the default size.
+ let chunk_size = args
+ .chunk()
+ .or(capability.write_multi_min_size)
+ .or(capability.write_multi_align_size)
+ .map(|mut size| {
+ if let Some(v) = capability.write_multi_max_size {
+ size = size.min(v);
+ }
+ if let Some(v) = capability.write_multi_min_size {
+ size = size.max(v);
+ }
+ if let Some(v) = capability.write_multi_align_size {
+ // Make sure size >= size first.
+ size = size.max(v);
+ size -= size % v;
+ }
- size
- });
+ size
+ });
+ let exact = args.chunk().is_some() ||
capability.write_multi_align_size.is_some();
let (rp, w) = self.inner.write(path, args.clone()).await?;
let w = CompleteWriter::new(w);
let w = match chunk_size {
None => TwoWays::One(w),
- Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size)),
+ Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size,
exact)),
};
Ok((rp, w))
diff --git a/core/src/raw/oio/write/chunked_write.rs
b/core/src/raw/oio/write/chunked_write.rs
index 8eadaa3dd2..c98ad41c97 100644
--- a/core/src/raw/oio/write/chunked_write.rs
+++ b/core/src/raw/oio/write/chunked_write.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use bytes::Buf;
+
use crate::raw::*;
use crate::*;
@@ -22,21 +24,27 @@ use crate::*;
/// flush the underlying storage at the `chunk`` size.
///
/// ChunkedWriter makes sure that the size of the data written to the
-/// underlying storage is exactly `chunk` bytes.
+/// underlying storage is
+/// - exactly `chunk` bytes if `exact` is true
+/// - at least `chunk` bytes if `exact` is false
pub struct ChunkedWriter<W: oio::Write> {
inner: W,
/// The size for buffer, we will flush the underlying storage at the size
of this buffer.
chunk_size: usize,
+ /// If `exact` is true, the size of the data written to the underlying
storage is
+ /// exactly `chunk_size` bytes.
+ exact: bool,
buffer: oio::QueueBuf,
}
impl<W: oio::Write> ChunkedWriter<W> {
/// Create a new exact buf writer.
- pub fn new(inner: W, chunk_size: usize) -> Self {
+ pub fn new(inner: W, chunk_size: usize, exact: bool) -> Self {
Self {
inner,
chunk_size,
+ exact,
buffer: oio::QueueBuf::new(),
}
}
@@ -44,14 +52,48 @@ impl<W: oio::Write> ChunkedWriter<W> {
impl<W: oio::Write> oio::Write for ChunkedWriter<W> {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
- if self.buffer.len() >= self.chunk_size {
- let written =
self.inner.write(self.buffer.clone().collect()).await?;
- self.buffer.advance(written);
+ if self.exact {
+ if self.buffer.len() >= self.chunk_size {
+ let written =
self.inner.write(self.buffer.clone().collect()).await?;
+ self.buffer.advance(written);
+ }
+
+ let remaining = self.chunk_size - self.buffer.len();
+ bs.truncate(remaining);
+ let n = bs.len();
+ self.buffer.push(bs);
+ return Ok(n);
+ }
+ // We are in inexact mode.
+
+ if self.buffer.len() + bs.len() < self.chunk_size {
+ // We haven't buffered enough data.
+ let n = bs.len();
+ self.buffer.push(bs);
+ return Ok(n);
+ }
+ // We have enough data to send.
+
+ if self.buffer.is_empty() {
+ // Fast path: Sends the buffer directly if the buffer queue is
empty.
+ return self.inner.write(bs).await;
}
- let remaining = self.chunk_size - self.buffer.len();
- bs.truncate(remaining);
- let n = bs.len();
+ // If we always push `bs` to the buffer queue, the buffer queue may
grow infinitely if inner
+ // doesn't fully consume the queue. So we clone the buffer queue and
send it with `bs` first.
+ let mut buffer = self.buffer.clone();
+ buffer.push(bs.clone());
+ let written = self.inner.write(buffer.collect()).await?;
+ // The number of bytes in `self.buffer` that already written.
+ let queue_written = written.min(self.buffer.len());
+ self.buffer.advance(queue_written);
+ // The number of bytes in `bs` that already written.
+ let bs_written = written - queue_written;
+ // Skip bytes that already written.
+ bs.advance(bs_written);
+ // We already sent `written` bytes so we put more `written` bytes into
the buffer queue.
+ bs.truncate(written);
+ let n = bs_written + bs.len();
self.buffer.push(bs);
Ok(n)
}
@@ -124,7 +166,7 @@ mod tests {
let mut expected = vec![0; 5];
rng.fill_bytes(&mut expected);
- let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10);
+ let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, true);
let mut bs = Bytes::from(expected.clone());
while !bs.is_empty() {
@@ -142,6 +184,204 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_inexact_buf_writer_large_write() -> Result<()> {
+ let _ = tracing_subscriber::fmt()
+ .pretty()
+ .with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .try_init();
+
+ let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);
+
+ let mut rng = thread_rng();
+ let mut expected = vec![0; 15];
+ rng.fill_bytes(&mut expected);
+
+ let bs = Bytes::from(expected.clone());
+ // The MockWriter always returns the first chunk size.
+ let n = w.write(bs.into()).await?;
+ assert_eq!(expected.len(), n);
+
+ w.close().await?;
+
+ assert_eq!(w.inner.buf.len(), expected.len());
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&w.inner.buf)),
+ format!("{:x}", Sha256::digest(&expected))
+ );
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_inexact_buf_writer_combine_small() -> Result<()> {
+ let _ = tracing_subscriber::fmt()
+ .pretty()
+ .with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .try_init();
+
+ let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);
+
+ let mut rng = thread_rng();
+ let mut expected = vec![];
+
+ let mut new_content = |size| {
+ let mut content = vec![0; size];
+ rng.fill_bytes(&mut content);
+ expected.extend_from_slice(&content);
+ Bytes::from(content)
+ };
+
+ // content > chunk size.
+ let content = new_content(15);
+ assert_eq!(15, w.write(content.into()).await?);
+ // content < chunk size.
+ let content = new_content(5);
+ assert_eq!(5, w.write(content.into()).await?);
+ // content > chunk size, but 5 bytes in queue.
+ let mut content = new_content(15);
+ // The MockWriter can only send 5 bytes each time, so we can only
advance 5 bytes.
+ assert_eq!(5, w.write(content.clone().into()).await?);
+ content.advance(5);
+ assert_eq!(5, w.write(content.clone().into()).await?);
+ content.advance(5);
+ assert_eq!(5, w.write(content.clone().into()).await?);
+
+ w.close().await?;
+
+ assert_eq!(w.inner.buf.len(), expected.len());
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&w.inner.buf)),
+ format!("{:x}", Sha256::digest(&expected))
+ );
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_inexact_buf_writer_queue_remaining() -> Result<()> {
+ let _ = tracing_subscriber::fmt()
+ .pretty()
+ .with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .try_init();
+
+ let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);
+
+ let mut rng = thread_rng();
+ let mut expected = vec![];
+
+ let mut new_content = |size| {
+ let mut content = vec![0; size];
+ rng.fill_bytes(&mut content);
+ expected.extend_from_slice(&content);
+ Bytes::from(content)
+ };
+
+ // content > chunk size.
+ let content = new_content(15);
+ assert_eq!(15, w.write(content.into()).await?);
+ // content < chunk size.
+ let content = new_content(5);
+ assert_eq!(5, w.write(content.into()).await?);
+ // content < chunk size.
+ let content = new_content(3);
+ assert_eq!(3, w.write(content.into()).await?);
+ // content > chunk size, but only sends the first chunk in the queue.
+ let mut content = new_content(15);
+ assert_eq!(5, w.write(content.clone().into()).await?);
+ // queue: 3, 5, bs: 10
+ content.advance(5);
+ assert_eq!(3, w.write(content.clone().into()).await?);
+ // queue: 5, 3, bs: 7
+ content.advance(3);
+ assert_eq!(5, w.write(content.clone().into()).await?);
+ // queue: 3, 5, bs: 2
+ content.advance(5);
+ assert_eq!(2, w.write(content.clone().into()).await?);
+ // queue: 5, 2, bs: empty.
+ content.advance(2);
+ assert!(content.is_empty());
+
+ w.close().await?;
+
+ assert_eq!(w.inner.buf.len(), expected.len());
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&w.inner.buf)),
+ format!("{:x}", Sha256::digest(&expected))
+ );
+ Ok(())
+ }
+
+ struct PartialWriter {
+ buf: Vec<u8>,
+ }
+
+ impl Write for PartialWriter {
+ async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
+ if Buffer::count(&bs) > 1 {
+ // Always leaves last buffer for non-contiguous buffer.
+ let mut written = 0;
+ while Buffer::count(&bs) > 1 {
+ let chunk = bs.chunk();
+ self.buf.extend_from_slice(chunk);
+ written += chunk.len();
+ bs.advance(chunk.len());
+ }
+ Ok(written)
+ } else {
+ let chunk = bs.chunk();
+ self.buf.extend_from_slice(chunk);
+ Ok(chunk.len())
+ }
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Ok(())
+ }
+ }
+
+ #[tokio::test]
+ async fn test_inexact_buf_writer_partial_send() -> Result<()> {
+ let _ = tracing_subscriber::fmt()
+ .pretty()
+ .with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .try_init();
+
+ let mut w = ChunkedWriter::new(PartialWriter { buf: vec![] }, 10,
false);
+
+ let mut rng = thread_rng();
+ let mut expected = vec![];
+
+ let mut new_content = |size| {
+ let mut content = vec![0; size];
+ rng.fill_bytes(&mut content);
+ expected.extend_from_slice(&content);
+ Bytes::from(content)
+ };
+
+ // content < chunk size.
+ let content = new_content(5);
+ assert_eq!(5, w.write(content.into()).await?);
+ // Non-contiguous buffer.
+ let content = Buffer::from(vec![new_content(3), new_content(2)]);
+ assert_eq!(5, w.write(content).await?);
+
+ w.close().await?;
+
+ assert_eq!(w.inner.buf.len(), expected.len());
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&w.inner.buf)),
+ format!("{:x}", Sha256::digest(&expected))
+ );
+ Ok(())
+ }
+
#[tokio::test]
async fn test_fuzz_exact_buf_writer() -> Result<()> {
let _ = tracing_subscriber::fmt()
@@ -154,7 +394,7 @@ mod tests {
let mut expected = vec![];
let buffer_size = rng.gen_range(1..10);
- let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] },
buffer_size);
+ let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] },
buffer_size, true);
debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");
for _ in 0..1000 {
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index fd42fe49ff..b06687292c 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -858,12 +858,14 @@ impl Operator {
///
/// ## Chunk
///
- /// OpenDAL is designed to write files directly without chunking by
default, giving users
- /// control over the exact size of their writes and helping avoid
unnecessary costs.
+ /// Some storage services have a minimum chunk size requirement. For
example, `s3` could return
+ /// hard errors like `EntityTooSmall` if the chunk size is too small. Some
services like `gcs`
+ /// also return errors if the chunk size is not aligned. Besides, cloud
storage services will cost
+ /// more money if we write data in small chunks.
///
- /// This is not efficient for cases when users write small chunks of data.
Some storage services
- /// like `s3` could even return hard errors like `EntityTooSmall`.
Besides, cloud storage services
- /// will cost more money if we write data in small chunks.
+ /// OpenDAL sets the chunk size automatically based on the
[Capability](crate::types::Capability)
+ /// of the service if users don't set it. Users can set `chunk` to control
the exact size to send
+ /// to the storage service.
///
/// Users can use [`Operator::writer_with`] to set a good chunk size might
improve the performance,
///
@@ -919,12 +921,14 @@ impl Operator {
///
/// Set `chunk` for the writer.
///
- /// OpenDAL is designed to write files directly without chunking by
default, giving users
- /// control over the exact size of their writes and helping avoid
unnecessary costs.
+ /// Some storage services have a minimum chunk size requirement. For
example, `s3` could return
+ /// hard errors like `EntityTooSmall` if the chunk size is too small. Some
services like `gcs`
+ /// also return errors if the chunk size is not aligned. Besides, cloud
storage services will cost
+ /// more money if we write data in small chunks.
///
- /// This is not efficient for cases when users write small chunks of data.
Some storage services
- /// like `s3` could even return hard errors like `EntityTooSmall`.
Besides, cloud storage services
- /// will cost more money if we write data in small chunks.
+ /// OpenDAL sets the chunk size automatically based on the
[Capability](crate::types::Capability)
+ /// of the service if users don't set it. Users can set `chunk` to control
the exact size to send
+ /// to the storage service.
///
/// Set a good chunk size might improve the performance, reduce the API
calls and save money.
///