This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch cleanup-sink
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 192d6b548ef7414ef2a722927c9d352cca47edb9
Author: Xuanwo <[email protected]>
AuthorDate: Thu Aug 31 17:11:23 2023 +0800

    refactor: Apply buffer for all services
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/main.rs                     |   6 +-
 core/benches/oio/write.rs                    |  33 +---
 core/src/layers/complete.rs                  |  25 ++-
 core/src/raw/oio/cursor.rs                   |   9 +
 core/src/raw/oio/write/api.rs                |   7 +
 core/src/raw/oio/write/at_least_buf_write.rs | 131 ------------
 core/src/raw/oio/write/bounded_buf_write.rs  | 254 ++++++++++++++++++++++++
 core/src/raw/oio/write/exact_buf_write.rs    | 285 ---------------------------
 core/src/raw/oio/write/mod.rs                |   7 +-
 core/src/services/cos/backend.rs             |  17 +-
 core/src/services/obs/backend.rs             |  17 +-
 core/src/services/oss/backend.rs             |  17 +-
 core/src/services/s3/backend.rs              |  25 +--
 13 files changed, 312 insertions(+), 521 deletions(-)

diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs
index 982d29dfb..ab158ebdf 100644
--- a/core/benches/oio/main.rs
+++ b/core/benches/oio/main.rs
@@ -21,9 +21,5 @@ mod write;
 use criterion::criterion_group;
 use criterion::criterion_main;
 
-criterion_group!(
-    benches,
-    write::bench_at_least_buf_write,
-    write::bench_exact_buf_write,
-);
+criterion_group!(benches, write::bench_bounded_buf_write);
 criterion_main!(benches);
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 6e26ce7e0..500876c42 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -17,8 +17,7 @@
 
 use criterion::Criterion;
 use once_cell::sync::Lazy;
-use opendal::raw::oio::AtLeastBufWriter;
-use opendal::raw::oio::ExactBufWriter;
+use opendal::raw::oio::BoundedBufWriter;
 use opendal::raw::oio::Write;
 use rand::thread_rng;
 use size::Size;
@@ -28,33 +27,7 @@ use super::utils::*;
 pub static TOKIO: Lazy<tokio::runtime::Runtime> =
     Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));
 
-pub fn bench_at_least_buf_write(c: &mut Criterion) {
-    let mut group = c.benchmark_group("at_least_buf_write");
-
-    let mut rng = thread_rng();
-
-    for size in [
-        Size::from_kibibytes(4),
-        Size::from_kibibytes(256),
-        Size::from_mebibytes(4),
-        Size::from_mebibytes(16),
-    ] {
-        let content = gen_bytes(&mut rng, size.bytes() as usize);
-
-        group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
-        group.bench_with_input(size.to_string(), &content, |b, content| {
-            b.to_async(&*TOKIO).iter(|| async {
-                let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(content.clone()).await.unwrap();
-                w.close().await.unwrap();
-            })
-        });
-    }
-
-    group.finish()
-}
-
-pub fn bench_exact_buf_write(c: &mut Criterion) {
+pub fn bench_bounded_buf_write(c: &mut Criterion) {
     let mut group = c.benchmark_group("exact_buf_write");
 
     let mut rng = thread_rng();
@@ -70,7 +43,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
         group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
-                let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
+                let mut w = BoundedBufWriter::new(BlackHoleWriter, 256 * 1024);
                 w.write(content.clone()).await.unwrap();
                 w.close().await.unwrap();
             })
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 4c58b6e30..d8065954a 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -365,7 +365,10 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
     type Inner = A;
     type Reader = CompleteReader<A, A::Reader>;
     type BlockingReader = CompleteReader<A, A::BlockingReader>;
-    type Writer = CompleteWriter<A::Writer>;
+    type Writer = oio::TwoWaysWriter<
+        CompleteWriter<A::Writer>,
+        oio::BoundedBufWriter<CompleteWriter<A::Writer>>,
+    >;
     type BlockingWriter = CompleteWriter<A::BlockingWriter>;
     type Pager = CompletePager<A, A::Pager>;
     type BlockingPager = CompletePager<A, A::BlockingPager>;
@@ -427,10 +430,22 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
         }
 
         let size = args.content_length();
-        self.inner
-            .write(path, args)
-            .await
-            .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
+        let buffer_size = args.buffer_size();
+
+        let (rp, w) = self.inner.write(path, args).await?;
+        let w = CompleteWriter::new(w, size);
+
+        // FIXME
+        //
+        // we enforce to use exact buffer here, we should check the capability 
in the
+        // future.
+        let w = if let Some(buffer) = buffer_size {
+            oio::TwoWaysWriter::Two(oio::BoundedBufWriter::new(w, 
buffer).with_max_buffer(buffer))
+        } else {
+            oio::TwoWaysWriter::One(w)
+        };
+
+        Ok((rp, w))
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c9b670ead..0d574f9fa 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -217,6 +217,15 @@ impl ChunkedCursor {
         self.inner.iter().skip(self.idx).map(|v| v.len()).sum()
     }
 
+    /// Return Some(Bytes) if this cursor only has one bytes, otherwise return 
None.
+    pub fn try_single(&self) -> Option<Bytes> {
+        if self.inner.len() == 1 {
+            Some(self.inner[0].clone())
+        } else {
+            None
+        }
+    }
+
     /// Clear the entire cursor.
     pub fn clear(&mut self) {
         self.idx = 0;
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index f2bb025af..b2442febc 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -98,6 +98,13 @@ pub trait Write: Unpin + Send + Sync {
     async fn write(&mut self, bs: Bytes) -> Result<()>;
 
     /// Sink given stream into writer.
+    ///
+    /// # Limitations
+    ///
+    /// - Sink can't be used with `write` at the same time. Users should always
+    /// stick to the same write method.
+    /// - Sink can't be buffered. Underlying storage will always consume the
+    /// given stream as a whole.
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>;
 
     /// Abort the pending writer.
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
deleted file mode 100644
index 91adddd30..000000000
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use bytes::Bytes;
-
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
-use crate::raw::*;
-use crate::*;
-
-/// AtLeastBufWriter is used to implement [`oio::Write`] based on at least 
buffer strategy: flush
-/// the underlying storage when the buffered size is larger.
-///
-/// AtLeastBufWriter makes sure that the size of the data written to the 
underlying storage is at
-/// least `buffer_size` bytes. It's useful when the underlying storage has a 
minimum size limit.
-///
-/// For example, S3 requires at least 5MiB for multipart uploads.
-pub struct AtLeastBufWriter<W: oio::Write> {
-    inner: W,
-
-    /// The total size of the data.
-    ///
-    /// If the total size is known, we will write to underlying storage 
directly without buffer it
-    /// when possible.
-    total_size: Option<u64>,
-
-    /// The size for buffer, we will flush the underlying storage if the 
buffer is full.
-    buffer_size: usize,
-    buffer: oio::ChunkedCursor,
-}
-
-impl<W: oio::Write> AtLeastBufWriter<W> {
-    /// Create a new at least buf writer.
-    pub fn new(inner: W, buffer_size: usize) -> Self {
-        Self {
-            inner,
-            total_size: None,
-            buffer_size,
-            buffer: oio::ChunkedCursor::new(),
-        }
-    }
-
-    /// Configure the total size for writer.
-    pub fn with_total_size(mut self, total_size: Option<u64>) -> Self {
-        self.total_size = total_size;
-        self
-    }
-}
-
-#[async_trait]
-impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        // If total size is known and equals to given bytes, we can write it 
directly.
-        if let Some(total_size) = self.total_size {
-            if total_size == bs.len() as u64 {
-                return self.inner.write(bs).await;
-            }
-        }
-
-        // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() + bs.len() < self.buffer_size {
-            self.buffer.push(bs);
-            return Ok(());
-        }
-
-        let mut buf = self.buffer.clone();
-        buf.push(bs);
-
-        self.inner
-            .sink(buf.len() as u64, Box::new(buf))
-            .await
-            // Clear buffer if the write is successful.
-            .map(|_| self.buffer.clear())
-    }
-
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
-        // If total size is known and equals to given stream, we can write it 
directly.
-        if let Some(total_size) = self.total_size {
-            if total_size == size {
-                return self.inner.sink(size, s).await;
-            }
-        }
-
-        // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() as u64 + size < self.buffer_size as u64 {
-            self.buffer.push(s.collect().await?);
-            return Ok(());
-        }
-
-        let buf = self.buffer.clone();
-        let buffer_size = buf.len() as u64;
-        let stream = buf.chain(s);
-
-        self.inner
-            .sink(buffer_size + size, Box::new(stream))
-            .await
-            // Clear buffer if the write is successful.
-            .map(|_| self.buffer.clear())
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        self.buffer.clear();
-        self.inner.abort().await
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        if !self.buffer.is_empty() {
-            self.inner
-                .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
-                .await?;
-            self.buffer.clear();
-        }
-
-        self.inner.close().await
-    }
-}
diff --git a/core/src/raw/oio/write/bounded_buf_write.rs 
b/core/src/raw/oio/write/bounded_buf_write.rs
new file mode 100644
index 000000000..951f50e88
--- /dev/null
+++ b/core/src/raw/oio/write/bounded_buf_write.rs
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::min;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+
+use crate::raw::*;
+use crate::*;
+
+/// BoundedBufWriter is used to implement [`oio::Write`] based on bounded 
buffer strategy: flush the
+/// underlying storage when the buffered size is between the range of 
[min_buffer..max_buffer]
+pub struct BoundedBufWriter<W: oio::Write> {
+    inner: W,
+
+    min_buffer: usize,
+    max_buffer: usize,
+    buffer: oio::ChunkedCursor,
+}
+
+impl<W: oio::Write> BoundedBufWriter<W> {
+    /// Create a new exact buf writer.
+    pub fn new(inner: W, min_buffer: usize) -> Self {
+        Self {
+            inner,
+            min_buffer,
+            max_buffer: usize::MAX,
+            buffer: oio::ChunkedCursor::new(),
+        }
+    }
+
+    /// Configure the max buffer size for writer.
+    ///
+    /// # Panics
+    ///
+    /// Panic if max_buffer is smaller than min_buffer.
+    pub fn with_max_buffer(mut self, max_buffer: usize) -> Self {
+        assert!(
+            max_buffer >= self.min_buffer,
+            "input max buffer is smaller than min buffer"
+        );
+
+        self.max_buffer = max_buffer;
+        self
+    }
+}
+
+#[async_trait]
+impl<W: oio::Write> oio::Write for BoundedBufWriter<W> {
+    /// # TODO
+    ///
+    /// - Use copy_from_slice if given bytes is smaller than 4KiB.
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        // Make sure the existing buffer has been flushed.
+        while self.buffer.len() >= self.max_buffer {
+            let mut buf = self.buffer.clone();
+            let to_write = buf.split_to(self.max_buffer);
+
+            if let Some(bs) = to_write.try_single() {
+                self.inner.write(bs).await?;
+            } else {
+                self.inner
+                    .sink(to_write.len() as u64, Box::new(to_write))
+                    .await?;
+            }
+            // input bytes is not handled yet, go on.
+        }
+
+        let current_size = self.buffer.len() + bs.len();
+
+        if current_size >= self.max_buffer {
+            let mut buf = self.buffer.clone();
+            buf.push(bs);
+
+            let to_write = buf.split_to(self.max_buffer);
+
+            if let Some(bs) = to_write.try_single() {
+                self.inner.write(bs).await?;
+            } else {
+                self.inner
+                    .sink(to_write.len() as u64, Box::new(to_write))
+                    .await?;
+            }
+            // Replace buffer since there are bytes not consumed.
+            self.buffer = buf;
+            return Ok(());
+        }
+
+        if current_size >= self.min_buffer {
+            let mut buf = self.buffer.clone();
+            buf.push(bs);
+
+            if let Some(bs) = buf.try_single() {
+                self.inner.write(bs).await?;
+            } else {
+                self.inner.sink(buf.len() as u64, Box::new(buf)).await?;
+            }
+            // Clean buffer, since it has been consumed all.
+            self.buffer.clear();
+            return Ok(());
+        }
+
+        // Push the bytes into the buffer since the buffer is not full.
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    /// Sink will always bypass the buffer logic.
+    ///
+    /// `CompleteLayer` will make sure that users can't mix `write` and `sink` 
together.
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.sink(size, s).await
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        self.inner.abort().await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        while !self.buffer.is_empty() {
+            let mut buf = self.buffer.clone();
+            let to_write = buf.split_to(min(self.max_buffer, buf.len()));
+
+            if let Some(bs) = to_write.try_single() {
+                self.inner.write(bs).await?;
+            } else {
+                self.inner
+                    .sink(to_write.len() as u64, Box::new(to_write))
+                    .await?;
+            }
+            self.buffer = buf;
+        }
+
+        self.inner.close().await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use log::debug;
+    use pretty_assertions::assert_eq;
+    use rand::thread_rng;
+    use rand::Rng;
+    use rand::RngCore;
+    use sha2::Digest;
+    use sha2::Sha256;
+
+    use super::*;
+    use crate::raw::oio::StreamExt;
+    use crate::raw::oio::Write;
+
+    struct MockWriter {
+        buf: Vec<u8>,
+    }
+
+    #[async_trait]
+    impl Write for MockWriter {
+        async fn write(&mut self, bs: Bytes) -> Result<()> {
+            debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
+
+            self.buf.extend_from_slice(&bs);
+            Ok(())
+        }
+
+        async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+            let bs = s.collect().await?;
+            assert_eq!(bs.len() as u64, size);
+            self.write(bs).await
+        }
+
+        async fn abort(&mut self) -> Result<()> {
+            Ok(())
+        }
+
+        async fn close(&mut self) -> Result<()> {
+            Ok(())
+        }
+    }
+
+    #[tokio::test]
+    async fn test_exact_buf_writer_short_write() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut rng = thread_rng();
+        let mut expected = vec![0; 5];
+        rng.fill_bytes(&mut expected);
+
+        let mut w = BoundedBufWriter::new(MockWriter { buf: vec![] }, 10);
+
+        w.write(Bytes::from(expected.clone())).await?;
+        w.close().await?;
+
+        assert_eq!(w.inner.buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&w.inner.buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_fuzz_exact_buf_writer() -> Result<()> {
+        let _ = tracing_subscriber::fmt()
+            .pretty()
+            .with_test_writer()
+            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+            .try_init();
+
+        let mut rng = thread_rng();
+        let mut expected = vec![];
+
+        let buffer_size = rng.gen_range(1..10);
+        let mut writer = BoundedBufWriter::new(MockWriter { buf: vec![] }, 
buffer_size);
+        debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");
+
+        for _ in 0..1000 {
+            let size = rng.gen_range(1..20);
+            debug!("test_fuzz_exact_buf_writer: write size: {size}");
+            let mut content = vec![0; size];
+            rng.fill_bytes(&mut content);
+
+            expected.extend_from_slice(&content);
+            writer.write(Bytes::from(content)).await?;
+        }
+        writer.close().await?;
+
+        assert_eq!(writer.inner.buf.len(), expected.len());
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&writer.inner.buf)),
+            format!("{:x}", Sha256::digest(&expected))
+        );
+        Ok(())
+    }
+}
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
deleted file mode 100644
index 8561c1a4a..000000000
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ /dev/null
@@ -1,285 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::cmp::min;
-
-use async_trait::async_trait;
-use bytes::Bytes;
-
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
-use crate::raw::*;
-use crate::*;
-
-/// ExactBufWriter is used to implement [`oio::Write`] based on exact buffer 
strategy: flush the
-/// underlying storage when the buffered size is exactly the same as the 
buffer size.
-///
-/// ExactBufWriter makes sure that the size of the data written to the 
underlying storage is exactly
-/// `buffer_size` bytes. It's useful when the underlying storage requires the 
size to be written.
-///
-/// For example, R2 requires all parts must be the same size except the last 
part.
-///
-/// ## Notes
-///
-/// ExactBufWriter is not a good choice for most cases, because it will cause 
more network requests.
-pub struct ExactBufWriter<W: oio::Write> {
-    inner: W,
-
-    /// The size for buffer, we will flush the underlying storage at the size 
of this buffer.
-    buffer_size: usize,
-    buffer: oio::ChunkedCursor,
-
-    buffer_stream: Option<Streamer>,
-}
-
-impl<W: oio::Write> ExactBufWriter<W> {
-    /// Create a new exact buf writer.
-    pub fn new(inner: W, buffer_size: usize) -> Self {
-        Self {
-            inner,
-            buffer_size,
-            buffer: oio::ChunkedCursor::new(),
-            buffer_stream: None,
-        }
-    }
-
-    /// Next bytes is used to fetch bytes from buffer or input streamer.
-    ///
-    /// We need this function because we need to make sure our write is 
reentrant.
-    /// We can't mutate state unless we are sure that the write is successful.
-    async fn next_bytes(&mut self, s: &mut Streamer) -> Option<Result<Bytes>> {
-        match self.buffer_stream.as_mut() {
-            None => s.next().await,
-            Some(bs) => match bs.next().await {
-                None => {
-                    self.buffer_stream = None;
-                    s.next().await
-                }
-                Some(v) => Some(v),
-            },
-        }
-    }
-
-    fn chain_stream(&mut self, s: Streamer) {
-        self.buffer_stream = match self.buffer_stream.take() {
-            Some(stream) => Some(Box::new(stream.chain(s))),
-            None => Some(s),
-        }
-    }
-}
-
-#[async_trait]
-impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
-            .await
-    }
-
-    /// # TODO
-    ///
-    /// We know every stream size, we can collect them into a buffer without 
chain them every time.
-    async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> {
-        if self.buffer.len() >= self.buffer_size {
-            let mut buf = self.buffer.clone();
-            let to_write = buf.split_to(self.buffer_size);
-            return self
-                .inner
-                .sink(to_write.len() as u64, Box::new(to_write))
-                .await
-                // Replace buffer with remaining if the write is successful.
-                .map(|_| {
-                    self.buffer = buf;
-                    self.chain_stream(s);
-                });
-        }
-
-        let mut buf = self.buffer.clone();
-        while buf.len() < self.buffer_size {
-            let bs = self.next_bytes(&mut s).await.transpose()?;
-            match bs {
-                None => break,
-                Some(bs) => buf.push(bs),
-            }
-        }
-
-        // Return directly if the buffer is not full.
-        //
-        // We don't need to chain stream here because it must be consumed.
-        if buf.len() < self.buffer_size {
-            self.buffer = buf;
-            return Ok(());
-        }
-
-        let to_write = buf.split_to(self.buffer_size);
-        self.inner
-            .sink(to_write.len() as u64, Box::new(to_write))
-            .await
-            // Replace buffer with remaining if the write is successful.
-            .map(|_| {
-                self.buffer = buf;
-                self.chain_stream(s);
-            })
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        self.buffer.clear();
-        self.buffer_stream = None;
-
-        self.inner.abort().await
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        while let Some(stream) = self.buffer_stream.as_mut() {
-            let bs = stream.next().await.transpose()?;
-            match bs {
-                None => {
-                    self.buffer_stream = None;
-                }
-                Some(bs) => {
-                    self.buffer.push(bs);
-                }
-            }
-
-            if self.buffer.len() >= self.buffer_size {
-                let mut buf = self.buffer.clone();
-                let to_write = buf.split_to(self.buffer_size);
-                self.inner
-                    .sink(to_write.len() as u64, Box::new(to_write))
-                    .await
-                    // Replace buffer with remaining if the write is 
successful.
-                    .map(|_| {
-                        self.buffer = buf;
-                    })?;
-            }
-        }
-
-        while !self.buffer.is_empty() {
-            let mut buf = self.buffer.clone();
-            let to_write = buf.split_to(min(self.buffer_size, buf.len()));
-
-            self.inner
-                .sink(to_write.len() as u64, Box::new(to_write))
-                .await
-                // Replace buffer with remaining if the write is successful.
-                .map(|_| self.buffer = buf)?;
-        }
-
-        self.inner.close().await
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use log::debug;
-    use pretty_assertions::assert_eq;
-    use rand::thread_rng;
-    use rand::Rng;
-    use rand::RngCore;
-    use sha2::Digest;
-    use sha2::Sha256;
-
-    use super::*;
-    use crate::raw::oio::StreamExt;
-    use crate::raw::oio::Write;
-
-    struct MockWriter {
-        buf: Vec<u8>,
-    }
-
-    #[async_trait]
-    impl Write for MockWriter {
-        async fn write(&mut self, bs: Bytes) -> Result<()> {
-            debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
-
-            self.buf.extend_from_slice(&bs);
-            Ok(())
-        }
-
-        async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
-            let bs = s.collect().await?;
-            assert_eq!(bs.len() as u64, size);
-            self.write(bs).await
-        }
-
-        async fn abort(&mut self) -> Result<()> {
-            Ok(())
-        }
-
-        async fn close(&mut self) -> Result<()> {
-            Ok(())
-        }
-    }
-
-    #[tokio::test]
-    async fn test_exact_buf_writer_short_write() -> Result<()> {
-        let _ = tracing_subscriber::fmt()
-            .pretty()
-            .with_test_writer()
-            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
-            .try_init();
-
-        let mut rng = thread_rng();
-        let mut expected = vec![0; 5];
-        rng.fill_bytes(&mut expected);
-
-        let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
-
-        w.write(Bytes::from(expected.clone())).await?;
-        w.close().await?;
-
-        assert_eq!(w.inner.buf.len(), expected.len());
-        assert_eq!(
-            format!("{:x}", Sha256::digest(&w.inner.buf)),
-            format!("{:x}", Sha256::digest(&expected))
-        );
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_fuzz_exact_buf_writer() -> Result<()> {
-        let _ = tracing_subscriber::fmt()
-            .pretty()
-            .with_test_writer()
-            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
-            .try_init();
-
-        let mut rng = thread_rng();
-        let mut expected = vec![];
-
-        let buffer_size = rng.gen_range(1..10);
-        let mut writer = ExactBufWriter::new(MockWriter { buf: vec![] }, 
buffer_size);
-        debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");
-
-        for _ in 0..1000 {
-            let size = rng.gen_range(1..20);
-            debug!("test_fuzz_exact_buf_writer: write size: {size}");
-            let mut content = vec![0; size];
-            rng.fill_bytes(&mut content);
-
-            expected.extend_from_slice(&content);
-            writer.write(Bytes::from(content)).await?;
-        }
-        writer.close().await?;
-
-        assert_eq!(writer.inner.buf.len(), expected.len());
-        assert_eq!(
-            format!("{:x}", Sha256::digest(&writer.inner.buf)),
-            format!("{:x}", Sha256::digest(&expected))
-        );
-        Ok(())
-    }
-}
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index d06bacb2c..5899b30e3 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -39,8 +39,5 @@ mod one_shot_write;
 pub use one_shot_write::OneShotWrite;
 pub use one_shot_write::OneShotWriter;
 
-mod at_least_buf_write;
-pub use at_least_buf_write::AtLeastBufWriter;
-
-mod exact_buf_write;
-pub use exact_buf_write::ExactBufWriter;
+mod bounded_buf_write;
+pub use bounded_buf_write::BoundedBufWriter;
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 3360d9dd5..c9b7fd290 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -36,6 +35,9 @@ use crate::raw::*;
 use crate::services::cos::writer::CosWriters;
 use crate::*;
 
+#[allow(dead_code)]
+/// FIXME: we should use this const when capability has been added.
+///
 /// The minimum multipart size of COS is 1 MiB.
 ///
 /// ref: <https://www.tencentcloud.com/document/product/436/14112>
@@ -249,7 +251,7 @@ pub struct CosBackend {
 impl Accessor for CosBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::AtLeastBufWriter<CosWriters>>;
+    type Writer = CosWriters;
     type BlockingWriter = ();
     type Pager = CosPager;
     type BlockingPager = ();
@@ -345,17 +347,6 @@ impl Accessor for CosBackend {
             CosWriters::Two(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
-
-            oio::TwoWaysWriter::Two(w)
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 6600ddd9b..f0cbc9754 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -36,6 +35,9 @@ use crate::raw::*;
 use crate::services::obs::writer::ObsWriters;
 use crate::*;
 
+#[allow(dead_code)]
+/// FIXME: we should use this const when capability has been added.
+///
 /// The minimum multipart size of OBS is 5 MiB.
 ///
 /// ref: 
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
@@ -256,7 +258,7 @@ pub struct ObsBackend {
 impl Accessor for ObsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::AtLeastBufWriter<ObsWriters>>;
+    type Writer = ObsWriters;
     type BlockingWriter = ();
     type Pager = ObsPager;
     type BlockingPager = ();
@@ -383,17 +385,6 @@ impl Accessor for ObsBackend {
             ObsWriters::Two(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
-
-            oio::TwoWaysWriter::Two(w)
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index ee1131ee1..752d6de3f 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::fmt::Debug;
@@ -39,6 +38,9 @@ use crate::raw::*;
 use crate::services::oss::writer::OssWriters;
 use crate::*;
 
+#[allow(dead_code)]
+/// FIXME: we should use this const when capability has been added.
+///
 /// The minimum multipart size of OSS is 100 KiB.
 ///
 /// ref: 
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
@@ -381,7 +383,7 @@ pub struct OssBackend {
 impl Accessor for OssBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::AtLeastBufWriter<OssWriters>>;
+    type Writer = OssWriters;
     type BlockingWriter = ();
     type Pager = OssPager;
     type BlockingPager = ();
@@ -481,17 +483,6 @@ impl Accessor for OssBackend {
             OssWriters::Two(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
-
-            oio::TwoWaysWriter::Two(w)
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 600fac3f3..6137a6de9 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
@@ -58,6 +57,9 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, 
&'static str>> = Lazy::new
     m
 });
 
+#[allow(dead_code)]
+/// FIXME: we should use this const when capability has been added.
+///
 /// The minimum multipart size of S3 is 5 MiB.
 ///
 /// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
@@ -888,11 +890,7 @@ pub struct S3Backend {
 impl Accessor for S3Backend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::ThreeWaysWriter<
-        S3Writers,
-        oio::AtLeastBufWriter<S3Writers>,
-        oio::ExactBufWriter<S3Writers>,
-    >;
+    type Writer = S3Writers;
     type BlockingWriter = ();
     type Pager = S3Pager;
     type BlockingPager = ();
@@ -988,21 +986,6 @@ impl Accessor for S3Backend {
             S3Writers::Two(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            if self.core.enable_exact_buf_write {
-                oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w, 
buffer_size))
-            } else {
-                oio::ThreeWaysWriter::Two(
-                    oio::AtLeastBufWriter::new(w, buffer_size)
-                        .with_total_size(args.content_length()),
-                )
-            }
-        } else {
-            oio::ThreeWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 


Reply via email to