This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch cleanup-sink in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 192d6b548ef7414ef2a722927c9d352cca47edb9 Author: Xuanwo <[email protected]> AuthorDate: Thu Aug 31 17:11:23 2023 +0800 refactor: Apply buffer for all services Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/main.rs | 6 +- core/benches/oio/write.rs | 33 +--- core/src/layers/complete.rs | 25 ++- core/src/raw/oio/cursor.rs | 9 + core/src/raw/oio/write/api.rs | 7 + core/src/raw/oio/write/at_least_buf_write.rs | 131 ------------ core/src/raw/oio/write/bounded_buf_write.rs | 254 ++++++++++++++++++++++++ core/src/raw/oio/write/exact_buf_write.rs | 285 --------------------------- core/src/raw/oio/write/mod.rs | 7 +- core/src/services/cos/backend.rs | 17 +- core/src/services/obs/backend.rs | 17 +- core/src/services/oss/backend.rs | 17 +- core/src/services/s3/backend.rs | 25 +-- 13 files changed, 312 insertions(+), 521 deletions(-) diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs index 982d29dfb..ab158ebdf 100644 --- a/core/benches/oio/main.rs +++ b/core/benches/oio/main.rs @@ -21,9 +21,5 @@ mod write; use criterion::criterion_group; use criterion::criterion_main; -criterion_group!( - benches, - write::bench_at_least_buf_write, - write::bench_exact_buf_write, -); +criterion_group!(benches, write::bench_bounded_buf_write); criterion_main!(benches); diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index 6e26ce7e0..500876c42 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -17,8 +17,7 @@ use criterion::Criterion; use once_cell::sync::Lazy; -use opendal::raw::oio::AtLeastBufWriter; -use opendal::raw::oio::ExactBufWriter; +use opendal::raw::oio::BoundedBufWriter; use opendal::raw::oio::Write; use rand::thread_rng; use size::Size; @@ -28,33 +27,7 @@ use super::utils::*; pub static TOKIO: Lazy<tokio::runtime::Runtime> = Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime")); -pub fn bench_at_least_buf_write(c: &mut Criterion) { - let mut group = c.benchmark_group("at_least_buf_write"); - - let mut rng = thread_rng(); - - for size in [ - Size::from_kibibytes(4), - Size::from_kibibytes(256), - Size::from_mebibytes(4), - Size::from_mebibytes(16), - ] { - let content = gen_bytes(&mut rng, size.bytes() as usize); - - group.throughput(criterion::Throughput::Bytes(size.bytes() as u64)); - group.bench_with_input(size.to_string(), &content, |b, content| { - b.to_async(&*TOKIO).iter(|| async { - let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024); - w.write(content.clone()).await.unwrap(); - w.close().await.unwrap(); - }) - }); - } - - group.finish() -} - -pub fn bench_exact_buf_write(c: &mut Criterion) { +pub fn bench_bounded_buf_write(c: &mut Criterion) { let mut group = c.benchmark_group("exact_buf_write"); let mut rng = thread_rng(); @@ -70,7 +43,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { group.throughput(criterion::Throughput::Bytes(size.bytes() as u64)); group.bench_with_input(size.to_string(), &content, |b, content| { b.to_async(&*TOKIO).iter(|| async { - let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024); + let mut w = BoundedBufWriter::new(BlackHoleWriter, 256 * 1024); w.write(content.clone()).await.unwrap(); w.close().await.unwrap(); }) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 4c58b6e30..d8065954a 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -365,7 +365,10 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { type Inner = A; type Reader = CompleteReader<A, A::Reader>; type BlockingReader = CompleteReader<A, A::BlockingReader>; - type Writer = CompleteWriter<A::Writer>; + type Writer = oio::TwoWaysWriter< + CompleteWriter<A::Writer>, + oio::BoundedBufWriter<CompleteWriter<A::Writer>>, + >; type BlockingWriter = CompleteWriter<A::BlockingWriter>; type Pager = CompletePager<A, A::Pager>; type BlockingPager = CompletePager<A, A::BlockingPager>; @@ -427,10 +430,22 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { } let size = args.content_length(); - self.inner - .write(path, args) - .await - .map(|(rp, w)| (rp, CompleteWriter::new(w, size))) + let buffer_size = args.buffer_size(); + + let (rp, w) = self.inner.write(path, args).await?; + let w = CompleteWriter::new(w, size); + + // FIXME + // + // we enforce to use exact buffer here, we should check the capability in the + // future. + let w = if let Some(buffer) = buffer_size { + oio::TwoWaysWriter::Two(oio::BoundedBufWriter::new(w, buffer).with_max_buffer(buffer)) + } else { + oio::TwoWaysWriter::One(w) + }; + + Ok((rp, w)) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index c9b670ead..0d574f9fa 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -217,6 +217,15 @@ impl ChunkedCursor { self.inner.iter().skip(self.idx).map(|v| v.len()).sum() } + /// Return Some(Bytes) if this cursor only has one bytes, otherwise return None. + pub fn try_single(&self) -> Option<Bytes> { + if self.inner.len() == 1 { + Some(self.inner[0].clone()) + } else { + None + } + } + /// Clear the entire cursor. pub fn clear(&mut self) { self.idx = 0; diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index f2bb025af..b2442febc 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -98,6 +98,13 @@ pub trait Write: Unpin + Send + Sync { async fn write(&mut self, bs: Bytes) -> Result<()>; /// Sink given stream into writer. + /// + /// # Limitations + /// + /// - Sink can't be used with `write` at the same time. Users should always + /// stick to the same write method. + /// - Sink can't be buffered. Underlying storage will always consume the + /// given stream as a whole. async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>; /// Abort the pending writer. diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs deleted file mode 100644 index 91adddd30..000000000 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use async_trait::async_trait; -use bytes::Bytes; - -use crate::raw::oio::StreamExt; -use crate::raw::oio::Streamer; -use crate::raw::*; -use crate::*; - -/// AtLeastBufWriter is used to implement [`oio::Write`] based on at least buffer strategy: flush -/// the underlying storage when the buffered size is larger. -/// -/// AtLeastBufWriter makes sure that the size of the data written to the underlying storage is at -/// least `buffer_size` bytes. It's useful when the underlying storage has a minimum size limit. -/// -/// For example, S3 requires at least 5MiB for multipart uploads. -pub struct AtLeastBufWriter<W: oio::Write> { - inner: W, - - /// The total size of the data. - /// - /// If the total size is known, we will write to underlying storage directly without buffer it - /// when possible. - total_size: Option<u64>, - - /// The size for buffer, we will flush the underlying storage if the buffer is full. - buffer_size: usize, - buffer: oio::ChunkedCursor, -} - -impl<W: oio::Write> AtLeastBufWriter<W> { - /// Create a new at least buf writer. - pub fn new(inner: W, buffer_size: usize) -> Self { - Self { - inner, - total_size: None, - buffer_size, - buffer: oio::ChunkedCursor::new(), - } - } - - /// Configure the total size for writer. - pub fn with_total_size(mut self, total_size: Option<u64>) -> Self { - self.total_size = total_size; - self - } -} - -#[async_trait] -impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - // If total size is known and equals to given bytes, we can write it directly. - if let Some(total_size) = self.total_size { - if total_size == bs.len() as u64 { - return self.inner.write(bs).await; - } - } - - // Push the bytes into the buffer if the buffer is not full. - if self.buffer.len() + bs.len() < self.buffer_size { - self.buffer.push(bs); - return Ok(()); - } - - let mut buf = self.buffer.clone(); - buf.push(bs); - - self.inner - .sink(buf.len() as u64, Box::new(buf)) - .await - // Clear buffer if the write is successful. - .map(|_| self.buffer.clear()) - } - - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { - // If total size is known and equals to given stream, we can write it directly. - if let Some(total_size) = self.total_size { - if total_size == size { - return self.inner.sink(size, s).await; - } - } - - // Push the bytes into the buffer if the buffer is not full. - if self.buffer.len() as u64 + size < self.buffer_size as u64 { - self.buffer.push(s.collect().await?); - return Ok(()); - } - - let buf = self.buffer.clone(); - let buffer_size = buf.len() as u64; - let stream = buf.chain(s); - - self.inner - .sink(buffer_size + size, Box::new(stream)) - .await - // Clear buffer if the write is successful. - .map(|_| self.buffer.clear()) - } - - async fn abort(&mut self) -> Result<()> { - self.buffer.clear(); - self.inner.abort().await - } - - async fn close(&mut self) -> Result<()> { - if !self.buffer.is_empty() { - self.inner - .sink(self.buffer.len() as u64, Box::new(self.buffer.clone())) - .await?; - self.buffer.clear(); - } - - self.inner.close().await - } -} diff --git a/core/src/raw/oio/write/bounded_buf_write.rs b/core/src/raw/oio/write/bounded_buf_write.rs new file mode 100644 index 000000000..951f50e88 --- /dev/null +++ b/core/src/raw/oio/write/bounded_buf_write.rs @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::raw::*; +use crate::*; + +/// BoundedBufWriter is used to implement [`oio::Write`] based on bounded buffer strategy: flush the +/// underlying storage when the buffered size is between the range of [min_buffer..max_buffer] +pub struct BoundedBufWriter<W: oio::Write> { + inner: W, + + min_buffer: usize, + max_buffer: usize, + buffer: oio::ChunkedCursor, +} + +impl<W: oio::Write> BoundedBufWriter<W> { + /// Create a new exact buf writer. + pub fn new(inner: W, min_buffer: usize) -> Self { + Self { + inner, + min_buffer, + max_buffer: usize::MAX, + buffer: oio::ChunkedCursor::new(), + } + } + + /// Configure the max buffer size for writer. + /// + /// # Panics + /// + /// Panic if max_buffer is smaller than min_buffer. + pub fn with_max_buffer(mut self, max_buffer: usize) -> Self { + assert!( + max_buffer >= self.min_buffer, + "input max buffer is smaller than min buffer" + ); + + self.max_buffer = max_buffer; + self + } +} + +#[async_trait] +impl<W: oio::Write> oio::Write for BoundedBufWriter<W> { + /// # TODO + /// + /// - Use copy_from_slice if given bytes is smaller than 4KiB. + async fn write(&mut self, bs: Bytes) -> Result<()> { + // Make sure the existing buffer has been flushed. + while self.buffer.len() >= self.max_buffer { + let mut buf = self.buffer.clone(); + let to_write = buf.split_to(self.max_buffer); + + if let Some(bs) = to_write.try_single() { + self.inner.write(bs).await?; + } else { + self.inner + .sink(to_write.len() as u64, Box::new(to_write)) + .await?; + } + // input bytes is not handled yet, go on. + } + + let current_size = self.buffer.len() + bs.len(); + + if current_size >= self.max_buffer { + let mut buf = self.buffer.clone(); + buf.push(bs); + + let to_write = buf.split_to(self.max_buffer); + + if let Some(bs) = to_write.try_single() { + self.inner.write(bs).await?; + } else { + self.inner + .sink(to_write.len() as u64, Box::new(to_write)) + .await?; + } + // Replace buffer since there are bytes not consumed. + self.buffer = buf; + return Ok(()); + } + + if current_size >= self.min_buffer { + let mut buf = self.buffer.clone(); + buf.push(bs); + + if let Some(bs) = buf.try_single() { + self.inner.write(bs).await?; + } else { + self.inner.sink(buf.len() as u64, Box::new(buf)).await?; + } + // Clean buffer, since it has been consumed all. + self.buffer.clear(); + return Ok(()); + } + + // Push the bytes into the buffer since the buffer is not full. + self.buffer.push(bs); + Ok(()) + } + + /// Sink will always bypass the buffer logic. + /// + /// `CompleteLayer` will make sure that users can't mix `write` and `sink` together. + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.sink(size, s).await + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + self.inner.abort().await + } + + async fn close(&mut self) -> Result<()> { + while !self.buffer.is_empty() { + let mut buf = self.buffer.clone(); + let to_write = buf.split_to(min(self.max_buffer, buf.len())); + + if let Some(bs) = to_write.try_single() { + self.inner.write(bs).await?; + } else { + self.inner + .sink(to_write.len() as u64, Box::new(to_write)) + .await?; + } + self.buffer = buf; + } + + self.inner.close().await + } +} + +#[cfg(test)] +mod tests { + use log::debug; + use pretty_assertions::assert_eq; + use rand::thread_rng; + use rand::Rng; + use rand::RngCore; + use sha2::Digest; + use sha2::Sha256; + + use super::*; + use crate::raw::oio::StreamExt; + use crate::raw::oio::Write; + + struct MockWriter { + buf: Vec<u8>, + } + + #[async_trait] + impl Write for MockWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len()); + + self.buf.extend_from_slice(&bs); + Ok(()) + } + + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + let bs = s.collect().await?; + assert_eq!(bs.len() as u64, size); + self.write(bs).await + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_exact_buf_writer_short_write() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut rng = thread_rng(); + let mut expected = vec![0; 5]; + rng.fill_bytes(&mut expected); + + let mut w = BoundedBufWriter::new(MockWriter { buf: vec![] }, 10); + + w.write(Bytes::from(expected.clone())).await?; + w.close().await?; + + assert_eq!(w.inner.buf.len(), expected.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&w.inner.buf)), + format!("{:x}", Sha256::digest(&expected)) + ); + Ok(()) + } + + #[tokio::test] + async fn test_fuzz_exact_buf_writer() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut rng = thread_rng(); + let mut expected = vec![]; + + let buffer_size = rng.gen_range(1..10); + let mut writer = BoundedBufWriter::new(MockWriter { buf: vec![] }, buffer_size); + debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}"); + + for _ in 0..1000 { + let size = rng.gen_range(1..20); + debug!("test_fuzz_exact_buf_writer: write size: {size}"); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + + expected.extend_from_slice(&content); + writer.write(Bytes::from(content)).await?; + } + writer.close().await?; + + assert_eq!(writer.inner.buf.len(), expected.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&writer.inner.buf)), + format!("{:x}", Sha256::digest(&expected)) + ); + Ok(()) + } +} diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs deleted file mode 100644 index 8561c1a4a..000000000 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ /dev/null @@ -1,285 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::cmp::min; - -use async_trait::async_trait; -use bytes::Bytes; - -use crate::raw::oio::StreamExt; -use crate::raw::oio::Streamer; -use crate::raw::*; -use crate::*; - -/// ExactBufWriter is used to implement [`oio::Write`] based on exact buffer strategy: flush the -/// underlying storage when the buffered size is exactly the same as the buffer size. -/// -/// ExactBufWriter makes sure that the size of the data written to the underlying storage is exactly -/// `buffer_size` bytes. It's useful when the underlying storage requires the size to be written. -/// -/// For example, R2 requires all parts must be the same size except the last part. -/// -/// ## Notes -/// -/// ExactBufWriter is not a good choice for most cases, because it will cause more network requests. -pub struct ExactBufWriter<W: oio::Write> { - inner: W, - - /// The size for buffer, we will flush the underlying storage at the size of this buffer. - buffer_size: usize, - buffer: oio::ChunkedCursor, - - buffer_stream: Option<Streamer>, -} - -impl<W: oio::Write> ExactBufWriter<W> { - /// Create a new exact buf writer. - pub fn new(inner: W, buffer_size: usize) -> Self { - Self { - inner, - buffer_size, - buffer: oio::ChunkedCursor::new(), - buffer_stream: None, - } - } - - /// Next bytes is used to fetch bytes from buffer or input streamer. - /// - /// We need this function because we need to make sure our write is reentrant. - /// We can't mutate state unless we are sure that the write is successful. - async fn next_bytes(&mut self, s: &mut Streamer) -> Option<Result<Bytes>> { - match self.buffer_stream.as_mut() { - None => s.next().await, - Some(bs) => match bs.next().await { - None => { - self.buffer_stream = None; - s.next().await - } - Some(v) => Some(v), - }, - } - } - - fn chain_stream(&mut self, s: Streamer) { - self.buffer_stream = match self.buffer_stream.take() { - Some(stream) => Some(Box::new(stream.chain(s))), - None => Some(s), - } - } -} - -#[async_trait] -impl<W: oio::Write> oio::Write for ExactBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) - .await - } - - /// # TODO - /// - /// We know every stream size, we can collect them into a buffer without chain them every time. - async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> { - if self.buffer.len() >= self.buffer_size { - let mut buf = self.buffer.clone(); - let to_write = buf.split_to(self.buffer_size); - return self - .inner - .sink(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|_| { - self.buffer = buf; - self.chain_stream(s); - }); - } - - let mut buf = self.buffer.clone(); - while buf.len() < self.buffer_size { - let bs = self.next_bytes(&mut s).await.transpose()?; - match bs { - None => break, - Some(bs) => buf.push(bs), - } - } - - // Return directly if the buffer is not full. - // - // We don't need to chain stream here because it must be consumed. - if buf.len() < self.buffer_size { - self.buffer = buf; - return Ok(()); - } - - let to_write = buf.split_to(self.buffer_size); - self.inner - .sink(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|_| { - self.buffer = buf; - self.chain_stream(s); - }) - } - - async fn abort(&mut self) -> Result<()> { - self.buffer.clear(); - self.buffer_stream = None; - - self.inner.abort().await - } - - async fn close(&mut self) -> Result<()> { - while let Some(stream) = self.buffer_stream.as_mut() { - let bs = stream.next().await.transpose()?; - match bs { - None => { - self.buffer_stream = None; - } - Some(bs) => { - self.buffer.push(bs); - } - } - - if self.buffer.len() >= self.buffer_size { - let mut buf = self.buffer.clone(); - let to_write = buf.split_to(self.buffer_size); - self.inner - .sink(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|_| { - self.buffer = buf; - })?; - } - } - - while !self.buffer.is_empty() { - let mut buf = self.buffer.clone(); - let to_write = buf.split_to(min(self.buffer_size, buf.len())); - - self.inner - .sink(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|_| self.buffer = buf)?; - } - - self.inner.close().await - } -} - -#[cfg(test)] -mod tests { - use log::debug; - use pretty_assertions::assert_eq; - use rand::thread_rng; - use rand::Rng; - use rand::RngCore; - use sha2::Digest; - use sha2::Sha256; - - use super::*; - use crate::raw::oio::StreamExt; - use crate::raw::oio::Write; - - struct MockWriter { - buf: Vec<u8>, - } - - #[async_trait] - impl Write for MockWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len()); - - self.buf.extend_from_slice(&bs); - Ok(()) - } - - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { - let bs = s.collect().await?; - assert_eq!(bs.len() as u64, size); - self.write(bs).await - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_exact_buf_writer_short_write() -> Result<()> { - let _ = tracing_subscriber::fmt() - .pretty() - .with_test_writer() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let mut rng = thread_rng(); - let mut expected = vec![0; 5]; - rng.fill_bytes(&mut expected); - - let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10); - - w.write(Bytes::from(expected.clone())).await?; - w.close().await?; - - assert_eq!(w.inner.buf.len(), expected.len()); - assert_eq!( - format!("{:x}", Sha256::digest(&w.inner.buf)), - format!("{:x}", Sha256::digest(&expected)) - ); - Ok(()) - } - - #[tokio::test] - async fn test_fuzz_exact_buf_writer() -> Result<()> { - let _ = tracing_subscriber::fmt() - .pretty() - .with_test_writer() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let mut rng = thread_rng(); - let mut expected = vec![]; - - let buffer_size = rng.gen_range(1..10); - let mut writer = ExactBufWriter::new(MockWriter { buf: vec![] }, buffer_size); - debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}"); - - for _ in 0..1000 { - let size = rng.gen_range(1..20); - debug!("test_fuzz_exact_buf_writer: write size: {size}"); - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - - expected.extend_from_slice(&content); - writer.write(Bytes::from(content)).await?; - } - writer.close().await?; - - assert_eq!(writer.inner.buf.len(), expected.len()); - assert_eq!( - format!("{:x}", Sha256::digest(&writer.inner.buf)), - format!("{:x}", Sha256::digest(&expected)) - ); - Ok(()) - } -} diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index d06bacb2c..5899b30e3 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -39,8 +39,5 @@ mod one_shot_write; pub use one_shot_write::OneShotWrite; pub use one_shot_write::OneShotWriter; -mod at_least_buf_write; -pub use at_least_buf_write::AtLeastBufWriter; - -mod exact_buf_write; -pub use exact_buf_write::ExactBufWriter; +mod bounded_buf_write; +pub use bounded_buf_write::BoundedBufWriter; diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 3360d9dd5..c9b7fd290 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -36,6 +35,9 @@ use crate::raw::*; use crate::services::cos::writer::CosWriters; use crate::*; +#[allow(dead_code)] +/// FIXME: we should use this const when capability has been added. +/// /// The minimum multipart size of COS is 1 MiB. /// /// ref: <https://www.tencentcloud.com/document/product/436/14112> @@ -249,7 +251,7 @@ pub struct CosBackend { impl Accessor for CosBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter<CosWriters, oio::AtLeastBufWriter<CosWriters>>; + type Writer = CosWriters; type BlockingWriter = (); type Pager = CosPager; type BlockingPager = (); @@ -345,17 +347,6 @@ impl Accessor for CosBackend { CosWriters::Two(oio::MultipartUploadWriter::new(writer)) }; - let w = if let Some(buffer_size) = args.buffer_size() { - let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - - let w = - oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); - - oio::TwoWaysWriter::Two(w) - } else { - oio::TwoWaysWriter::One(w) - }; - Ok((RpWrite::default(), w)) } diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 6600ddd9b..f0cbc9754 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -36,6 +35,9 @@ use crate::raw::*; use crate::services::obs::writer::ObsWriters; use crate::*; +#[allow(dead_code)] +/// FIXME: we should use this const when capability has been added. +/// /// The minimum multipart size of OBS is 5 MiB. /// /// ref: <https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html> @@ -256,7 +258,7 @@ pub struct ObsBackend { impl Accessor for ObsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter<ObsWriters, oio::AtLeastBufWriter<ObsWriters>>; + type Writer = ObsWriters; type BlockingWriter = (); type Pager = ObsPager; type BlockingPager = (); @@ -383,17 +385,6 @@ impl Accessor for ObsBackend { ObsWriters::Two(oio::MultipartUploadWriter::new(writer)) }; - let w = if let Some(buffer_size) = args.buffer_size() { - let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - - let w = - oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); - - oio::TwoWaysWriter::Two(w) - } else { - oio::TwoWaysWriter::One(w) - }; - Ok((RpWrite::default(), w)) } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index ee1131ee1..752d6de3f 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::max; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; @@ -39,6 +38,9 @@ use crate::raw::*; use crate::services::oss::writer::OssWriters; use crate::*; +#[allow(dead_code)] +/// FIXME: we should use this const when capability has been added. +/// /// The minimum multipart size of OSS is 100 KiB. /// /// ref: <https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12> @@ -381,7 +383,7 @@ pub struct OssBackend { impl Accessor for OssBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter<OssWriters, oio::AtLeastBufWriter<OssWriters>>; + type Writer = OssWriters; type BlockingWriter = (); type Pager = OssPager; type BlockingPager = (); @@ -481,17 +483,6 @@ impl Accessor for OssBackend { OssWriters::Two(oio::MultipartUploadWriter::new(writer)) }; - let w = if let Some(buffer_size) = args.buffer_size() { - let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - - let w = - oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); - - oio::TwoWaysWriter::Two(w) - } else { - oio::TwoWaysWriter::One(w) - }; - Ok((RpWrite::default(), w)) } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 600fac3f3..6137a6de9 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -58,6 +57,9 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new m }); +#[allow(dead_code)] +/// FIXME: we should use this const when capability has been added. +/// /// The minimum multipart size of S3 is 5 MiB. /// /// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html> @@ -888,11 +890,7 @@ pub struct S3Backend { impl Accessor for S3Backend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::ThreeWaysWriter< - S3Writers, - oio::AtLeastBufWriter<S3Writers>, - oio::ExactBufWriter<S3Writers>, - >; + type Writer = S3Writers; type BlockingWriter = (); type Pager = S3Pager; type BlockingPager = (); @@ -988,21 +986,6 @@ impl Accessor for S3Backend { S3Writers::Two(oio::MultipartUploadWriter::new(writer)) }; - let w = if let Some(buffer_size) = args.buffer_size() { - let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - - if self.core.enable_exact_buf_write { - oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w, buffer_size)) - } else { - oio::ThreeWaysWriter::Two( - oio::AtLeastBufWriter::new(w, buffer_size) - .with_total_size(args.content_length()), - ) - } - } else { - oio::ThreeWaysWriter::One(w) - }; - Ok((RpWrite::default(), w)) }
