This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 18963fa85d13465f362d3432aee6ff9c7d0de644 Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 4 18:06:53 2023 +0800 Remove at least writer Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/main.rs | 6 +- core/benches/oio/write.rs | 31 +------- core/src/raw/oio/write/at_least_buf_write.rs | 112 --------------------------- core/src/raw/oio/write/mod.rs | 3 - core/src/services/cos/backend.rs | 5 +- core/src/services/obs/backend.rs | 5 +- core/src/services/oss/backend.rs | 5 +- core/src/services/s3/backend.rs | 17 +--- 8 files changed, 12 insertions(+), 172 deletions(-) diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs index 982d29dfb..85ca2ebbe 100644 --- a/core/benches/oio/main.rs +++ b/core/benches/oio/main.rs @@ -21,9 +21,5 @@ mod write; use criterion::criterion_group; use criterion::criterion_main; -criterion_group!( - benches, - write::bench_at_least_buf_write, - write::bench_exact_buf_write, -); +criterion_group!(benches, write::bench_exact_buf_write,); criterion_main!(benches); diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index 506a974f0..a227d4901 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use bytes::{Buf, Bytes}; +use bytes::Buf; use criterion::Criterion; use once_cell::sync::Lazy; -use opendal::raw::oio::AtLeastBufWriter; use opendal::raw::oio::ExactBufWriter; use opendal::raw::oio::Write; use rand::thread_rng; @@ -29,32 +28,6 @@ use super::utils::*; pub static TOKIO: Lazy<tokio::runtime::Runtime> = Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime")); -pub fn bench_at_least_buf_write(c: &mut Criterion) { - let mut group = c.benchmark_group("at_least_buf_write"); - - let mut rng = thread_rng(); - - for size in [ - Size::from_kibibytes(4), - Size::from_kibibytes(256), - Size::from_mebibytes(4), - Size::from_mebibytes(16), - ] { - let content = gen_bytes(&mut rng, size.bytes() as usize); - - group.throughput(criterion::Throughput::Bytes(size.bytes() as u64)); - group.bench_with_input(size.to_string(), &content, |b, content| { - b.to_async(&*TOKIO).iter(|| async { - let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024); - w.write(content.clone()).await.unwrap(); - w.close().await.unwrap(); - }) - }); - } - - group.finish() -} - pub fn bench_exact_buf_write(c: &mut Criterion) { let mut group = c.benchmark_group("exact_buf_write"); @@ -73,7 +46,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { b.to_async(&*TOKIO).iter(|| async { let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024); - let mut bs = Bytes::from(content.clone()); + let mut bs = content.clone(); while !bs.is_empty() { let n = w.write(bs.clone()).await.unwrap(); bs.advance(n as usize); diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs deleted file mode 100644 index ebfff8ffd..000000000 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ /dev/null @@ -1,112 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use async_trait::async_trait; -use bytes::Bytes; - -use crate::raw::*; -use crate::*; - -/// AtLeastBufWriter is used to implement [`oio::Write`] based on at least buffer strategy: flush -/// the underlying storage when the buffered size is larger. -/// -/// AtLeastBufWriter makes sure that the size of the data written to the underlying storage is at -/// least `buffer_size` bytes. It's useful when the underlying storage has a minimum size limit. -/// -/// For example, S3 requires at least 5MiB for multipart uploads. -pub struct AtLeastBufWriter<W: oio::Write> { - inner: W, - - /// The total size of the data. - /// - /// If the total size is known, we will write to underlying storage directly without buffer it - /// when possible. - total_size: Option<u64>, - - /// The size for buffer, we will flush the underlying storage if the buffer is full. - buffer_size: usize, - buffer: oio::ChunkedCursor, -} - -impl<W: oio::Write> AtLeastBufWriter<W> { - /// Create a new at least buf writer. - pub fn new(inner: W, buffer_size: usize) -> Self { - Self { - inner, - total_size: None, - buffer_size, - buffer: oio::ChunkedCursor::new(), - } - } - - /// Configure the total size for writer. - pub fn with_total_size(mut self, total_size: Option<u64>) -> Self { - self.total_size = total_size; - self - } -} - -#[async_trait] -impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - // If total size is known and equals to given bytes, we can write it directly. - if let Some(total_size) = self.total_size { - if total_size == bs.len() as u64 { - return self.inner.write(bs).await; - } - } - - // Push the bytes into the buffer if the buffer is not full. - if self.buffer.len() + bs.len() < self.buffer_size { - let size = bs.len(); - self.buffer.push(bs); - return Ok(size as u64); - } - - let mut buf = self.buffer.clone(); - buf.push(bs); - - self.inner - .pipe(buf.len() as u64, Box::new(buf)) - .await - // Clear buffer if the write is successful. - .map(|v| { - self.buffer.clear(); - v - }) - } - - async fn pipe(&mut self, _: u64, s: oio::Reader) -> Result<u64> { - todo!() - } - - async fn abort(&mut self) -> Result<()> { - self.buffer.clear(); - self.inner.abort().await - } - - async fn close(&mut self) -> Result<()> { - if !self.buffer.is_empty() { - self.inner - .pipe(self.buffer.len() as u64, Box::new(self.buffer.clone())) - .await?; - self.buffer.clear(); - } - - self.inner.close().await - } -} diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index d06bacb2c..dfaf6c4ee 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -39,8 +39,5 @@ mod one_shot_write; pub use one_shot_write::OneShotWrite; pub use one_shot_write::OneShotWriter; -mod at_least_buf_write; -pub use at_least_buf_write::AtLeastBufWriter; - mod exact_buf_write; pub use exact_buf_write::ExactBufWriter; diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 3360d9dd5..ba162fae9 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -249,7 +249,7 @@ pub struct CosBackend { impl Accessor for CosBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter<CosWriters, oio::AtLeastBufWriter<CosWriters>>; + type Writer = oio::TwoWaysWriter<CosWriters, oio::ExactBufWriter<CosWriters>>; type BlockingWriter = (); type Pager = CosPager; type BlockingPager = (); @@ -348,8 +348,7 @@ impl Accessor for CosBackend { let w = if let Some(buffer_size) = args.buffer_size() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - let w = - oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); + let w = oio::ExactBufWriter::new(w, buffer_size); oio::TwoWaysWriter::Two(w) } else { diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 6600ddd9b..4c3176cce 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -256,7 +256,7 @@ pub struct ObsBackend { impl Accessor for ObsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter<ObsWriters, oio::AtLeastBufWriter<ObsWriters>>; + type Writer = oio::TwoWaysWriter<ObsWriters, oio::ExactBufWriter<ObsWriters>>; type BlockingWriter = (); type Pager = ObsPager; type BlockingPager = (); @@ -386,8 +386,7 @@ impl Accessor for ObsBackend { let w = if let Some(buffer_size) = args.buffer_size() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - let w = - oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); + let w = oio::ExactBufWriter::new(w, buffer_size); oio::TwoWaysWriter::Two(w) } else { diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index ee1131ee1..d924e4cc9 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -381,7 +381,7 @@ pub struct OssBackend { impl Accessor for OssBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter<OssWriters, oio::AtLeastBufWriter<OssWriters>>; + type Writer = oio::TwoWaysWriter<OssWriters, oio::ExactBufWriter<OssWriters>>; type BlockingWriter = (); type Pager = OssPager; type BlockingPager = (); @@ -484,8 +484,7 @@ impl Accessor for OssBackend { let w = if let Some(buffer_size) = args.buffer_size() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - let w = - oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); + let w = oio::ExactBufWriter::new(w, buffer_size); oio::TwoWaysWriter::Two(w) } else { diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 600fac3f3..2c82cc03c 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -888,11 +888,7 @@ pub struct S3Backend { impl Accessor for S3Backend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::ThreeWaysWriter< - S3Writers, - oio::AtLeastBufWriter<S3Writers>, - oio::ExactBufWriter<S3Writers>, - >; + type Writer = oio::TwoWaysWriter<S3Writers, oio::ExactBufWriter<S3Writers>>; type BlockingWriter = (); type Pager = S3Pager; type BlockingPager = (); @@ -991,16 +987,9 @@ impl Accessor for S3Backend { let w = if let Some(buffer_size) = args.buffer_size() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - if self.core.enable_exact_buf_write { - oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w, buffer_size)) - } else { - oio::ThreeWaysWriter::Two( - oio::AtLeastBufWriter::new(w, buffer_size) - .with_total_size(args.content_length()), - ) - } + oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size)) } else { - oio::ThreeWaysWriter::One(w) + oio::TwoWaysWriter::One(w) }; Ok((RpWrite::default(), w))
