This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 18963fa85d13465f362d3432aee6ff9c7d0de644
Author: Xuanwo <[email protected]>
AuthorDate: Mon Sep 4 18:06:53 2023 +0800

    Remove at least writer
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/main.rs                     |   6 +-
 core/benches/oio/write.rs                    |  31 +-------
 core/src/raw/oio/write/at_least_buf_write.rs | 112 ---------------------------
 core/src/raw/oio/write/mod.rs                |   3 -
 core/src/services/cos/backend.rs             |   5 +-
 core/src/services/obs/backend.rs             |   5 +-
 core/src/services/oss/backend.rs             |   5 +-
 core/src/services/s3/backend.rs              |  17 +---
 8 files changed, 12 insertions(+), 172 deletions(-)

diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs
index 982d29dfb..85ca2ebbe 100644
--- a/core/benches/oio/main.rs
+++ b/core/benches/oio/main.rs
@@ -21,9 +21,5 @@ mod write;
 use criterion::criterion_group;
 use criterion::criterion_main;
 
-criterion_group!(
-    benches,
-    write::bench_at_least_buf_write,
-    write::bench_exact_buf_write,
-);
+criterion_group!(benches, write::bench_exact_buf_write,);
 criterion_main!(benches);
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 506a974f0..a227d4901 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::{Buf, Bytes};
+use bytes::Buf;
 use criterion::Criterion;
 use once_cell::sync::Lazy;
-use opendal::raw::oio::AtLeastBufWriter;
 use opendal::raw::oio::ExactBufWriter;
 use opendal::raw::oio::Write;
 use rand::thread_rng;
@@ -29,32 +28,6 @@ use super::utils::*;
 pub static TOKIO: Lazy<tokio::runtime::Runtime> =
     Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));
 
-pub fn bench_at_least_buf_write(c: &mut Criterion) {
-    let mut group = c.benchmark_group("at_least_buf_write");
-
-    let mut rng = thread_rng();
-
-    for size in [
-        Size::from_kibibytes(4),
-        Size::from_kibibytes(256),
-        Size::from_mebibytes(4),
-        Size::from_mebibytes(16),
-    ] {
-        let content = gen_bytes(&mut rng, size.bytes() as usize);
-
-        group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
-        group.bench_with_input(size.to_string(), &content, |b, content| {
-            b.to_async(&*TOKIO).iter(|| async {
-                let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(content.clone()).await.unwrap();
-                w.close().await.unwrap();
-            })
-        });
-    }
-
-    group.finish()
-}
-
 pub fn bench_exact_buf_write(c: &mut Criterion) {
     let mut group = c.benchmark_group("exact_buf_write");
 
@@ -73,7 +46,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
 
-                let mut bs = Bytes::from(content.clone());
+                let mut bs = content.clone();
                 while !bs.is_empty() {
                     let n = w.write(bs.clone()).await.unwrap();
                     bs.advance(n as usize);
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
deleted file mode 100644
index ebfff8ffd..000000000
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use bytes::Bytes;
-
-use crate::raw::*;
-use crate::*;
-
-/// AtLeastBufWriter is used to implement [`oio::Write`] based on at least 
buffer strategy: flush
-/// the underlying storage when the buffered size is larger.
-///
-/// AtLeastBufWriter makes sure that the size of the data written to the 
underlying storage is at
-/// least `buffer_size` bytes. It's useful when the underlying storage has a 
minimum size limit.
-///
-/// For example, S3 requires at least 5MiB for multipart uploads.
-pub struct AtLeastBufWriter<W: oio::Write> {
-    inner: W,
-
-    /// The total size of the data.
-    ///
-    /// If the total size is known, we will write to underlying storage 
directly without buffer it
-    /// when possible.
-    total_size: Option<u64>,
-
-    /// The size for buffer, we will flush the underlying storage if the 
buffer is full.
-    buffer_size: usize,
-    buffer: oio::ChunkedCursor,
-}
-
-impl<W: oio::Write> AtLeastBufWriter<W> {
-    /// Create a new at least buf writer.
-    pub fn new(inner: W, buffer_size: usize) -> Self {
-        Self {
-            inner,
-            total_size: None,
-            buffer_size,
-            buffer: oio::ChunkedCursor::new(),
-        }
-    }
-
-    /// Configure the total size for writer.
-    pub fn with_total_size(mut self, total_size: Option<u64>) -> Self {
-        self.total_size = total_size;
-        self
-    }
-}
-
-#[async_trait]
-impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        // If total size is known and equals to given bytes, we can write it 
directly.
-        if let Some(total_size) = self.total_size {
-            if total_size == bs.len() as u64 {
-                return self.inner.write(bs).await;
-            }
-        }
-
-        // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() + bs.len() < self.buffer_size {
-            let size = bs.len();
-            self.buffer.push(bs);
-            return Ok(size as u64);
-        }
-
-        let mut buf = self.buffer.clone();
-        buf.push(bs);
-
-        self.inner
-            .pipe(buf.len() as u64, Box::new(buf))
-            .await
-            // Clear buffer if the write is successful.
-            .map(|v| {
-                self.buffer.clear();
-                v
-            })
-    }
-
-    async fn pipe(&mut self, _: u64, s: oio::Reader) -> Result<u64> {
-        todo!()
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        self.buffer.clear();
-        self.inner.abort().await
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        if !self.buffer.is_empty() {
-            self.inner
-                .pipe(self.buffer.len() as u64, Box::new(self.buffer.clone()))
-                .await?;
-            self.buffer.clear();
-        }
-
-        self.inner.close().await
-    }
-}
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index d06bacb2c..dfaf6c4ee 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -39,8 +39,5 @@ mod one_shot_write;
 pub use one_shot_write::OneShotWrite;
 pub use one_shot_write::OneShotWriter;
 
-mod at_least_buf_write;
-pub use at_least_buf_write::AtLeastBufWriter;
-
 mod exact_buf_write;
 pub use exact_buf_write::ExactBufWriter;
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 3360d9dd5..ba162fae9 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -249,7 +249,7 @@ pub struct CosBackend {
 impl Accessor for CosBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::AtLeastBufWriter<CosWriters>>;
+    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::ExactBufWriter<CosWriters>>;
     type BlockingWriter = ();
     type Pager = CosPager;
     type BlockingPager = ();
@@ -348,8 +348,7 @@ impl Accessor for CosBackend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+            let w = oio::ExactBufWriter::new(w, buffer_size);
 
             oio::TwoWaysWriter::Two(w)
         } else {
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 6600ddd9b..4c3176cce 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -256,7 +256,7 @@ pub struct ObsBackend {
 impl Accessor for ObsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::AtLeastBufWriter<ObsWriters>>;
+    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::ExactBufWriter<ObsWriters>>;
     type BlockingWriter = ();
     type Pager = ObsPager;
     type BlockingPager = ();
@@ -386,8 +386,7 @@ impl Accessor for ObsBackend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+            let w = oio::ExactBufWriter::new(w, buffer_size);
 
             oio::TwoWaysWriter::Two(w)
         } else {
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index ee1131ee1..d924e4cc9 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -381,7 +381,7 @@ pub struct OssBackend {
 impl Accessor for OssBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::AtLeastBufWriter<OssWriters>>;
+    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::ExactBufWriter<OssWriters>>;
     type BlockingWriter = ();
     type Pager = OssPager;
     type BlockingPager = ();
@@ -484,8 +484,7 @@ impl Accessor for OssBackend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+            let w = oio::ExactBufWriter::new(w, buffer_size);
 
             oio::TwoWaysWriter::Two(w)
         } else {
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 600fac3f3..2c82cc03c 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -888,11 +888,7 @@ pub struct S3Backend {
 impl Accessor for S3Backend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::ThreeWaysWriter<
-        S3Writers,
-        oio::AtLeastBufWriter<S3Writers>,
-        oio::ExactBufWriter<S3Writers>,
-    >;
+    type Writer = oio::TwoWaysWriter<S3Writers, 
oio::ExactBufWriter<S3Writers>>;
     type BlockingWriter = ();
     type Pager = S3Pager;
     type BlockingPager = ();
@@ -991,16 +987,9 @@ impl Accessor for S3Backend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            if self.core.enable_exact_buf_write {
-                oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w, 
buffer_size))
-            } else {
-                oio::ThreeWaysWriter::Two(
-                    oio::AtLeastBufWriter::new(w, buffer_size)
-                        .with_total_size(args.content_length()),
-                )
-            }
+            oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
         } else {
-            oio::ThreeWaysWriter::One(w)
+            oio::TwoWaysWriter::One(w)
         };
 
         Ok((RpWrite::default(), w))

Reply via email to