This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new d740edbbe refactor(core): Split buffer logic from underlying storage 
operations (#2903)
d740edbbe is described below

commit d740edbbed29aa199946ed07ef94ee0d78ae32a2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 23 12:52:36 2023 +0800

    refactor(core): Split buffer logic from underlying storage operations 
(#2903)
    
    * feat: Add AtLeastBufWrite
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Refactor
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add compose
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * refactor
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Avoid exeeed of buffer size
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix unit test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Disable fuzz test for now
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * use aws cli to create bucket
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Disable r2 instead
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix fuzz test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .github/workflows/service_test_s3.yml            |  79 ++++++-------
 core/fuzz/fuzz_writer.rs                         |   4 +-
 core/src/raw/oio/cursor.rs                       |  93 ++++++++++++++-
 core/src/raw/oio/mod.rs                          |   1 +
 core/src/raw/oio/stream/api.rs                   |  79 ++++++++++++-
 core/src/raw/oio/write/append_object_write.rs    |  79 +------------
 core/src/raw/oio/write/at_least_buf_write.rs     | 125 ++++++++++++++++++++
 core/src/raw/oio/write/compose_write.rs          | 136 ++++++++++++++++++++++
 core/src/raw/oio/write/mod.rs                    |  12 +-
 core/src/raw/oio/write/multipart_upload_write.rs | 139 ++++-------------------
 core/src/raw/oio/write/one_shot_write.rs         |  69 +++++++++++
 core/src/raw/oio/write/two_ways_write.rs         |  64 -----------
 core/src/raw/ops.rs                              |  21 ++++
 core/src/services/cos/backend.rs                 |  58 ++++------
 core/src/services/cos/core.rs                    |   1 -
 core/src/services/cos/writer.rs                  |  16 ++-
 core/src/services/obs/backend.rs                 |  60 ++++------
 core/src/services/obs/core.rs                    |   1 -
 core/src/services/obs/writer.rs                  |  18 ++-
 core/src/services/oss/backend.rs                 |  44 +++----
 core/src/services/oss/core.rs                    |   1 -
 core/src/services/oss/writer.rs                  |  16 ++-
 core/src/services/s3/backend.rs                  |  42 ++++---
 core/src/services/s3/core.rs                     |   1 -
 core/src/services/s3/writer.rs                   |  24 ++--
 core/src/types/operator/operator_futures.rs      |  26 +++++
 core/tests/behavior/write.rs                     |   4 +-
 27 files changed, 775 insertions(+), 438 deletions(-)

diff --git a/.github/workflows/service_test_s3.yml 
b/.github/workflows/service_test_s3.yml
index 80ed0ee4e..e066e375b 100644
--- a/.github/workflows/service_test_s3.yml
+++ b/.github/workflows/service_test_s3.yml
@@ -147,17 +147,18 @@ jobs:
         run: |
           docker-compose -f docker-compose-minio.yml up -d
       - name: Setup test bucket
+        env:
+          AWS_ACCESS_KEY_ID: "minioadmin"
+          AWS_SECRET_ACCESS_KEY: "minioadmin"
+          AWS_EC2_METADATA_DISABLED: "true"
         run: |
+          aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
+
           curl -O https://dl.min.io/client/mc/release/linux-amd64/mc
           chmod +x mc
           ./mc alias set local http://127.0.0.1:9000/ minioadmin minioadmin
-          ./mc mb local/test
           ./mc anonymous set public local/test
-          while :; do
-            echo "waiting minio to be ready"
-            [[ "$(./mc ping --count 1 local | awk '{print $6}' | tr -d '\n')" 
== "errors=1" ]] || break
-            sleep 1
-          done
+
       - name: Setup Rust toolchain
         uses: ./.github/actions/setup
         with:
@@ -173,35 +174,37 @@ jobs:
           OPENDAL_S3_ALLOW_ANONYMOUS: on
           OPENDAL_S3_REGION: us-east-1
 
-  r2:
-    runs-on: ubuntu-latest
-    if: github.event_name == 'push' || 
!github.event.pull_request.head.repo.fork
-    steps:
-      - uses: actions/checkout@v3
-      - name: Setup Rust toolchain
-        uses: ./.github/actions/setup
-        with:
-          need-nextest: true
-
-      - name: Load secret
-        id: op-load-secret
-        uses: 1password/load-secrets-action@v1
-        with:
-          export-env: true
-        env:
-          OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
-          OPENDAL_S3_TEST: op://services/r2/test
-          OPENDAL_S3_BUCKET: op://services/r2/bucket
-          OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
-          OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
-          OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
-
-      - name: Test
-        shell: bash
-        working-directory: core
-        run: cargo nextest run s3
-        env:
-          OPENDAL_S3_REGION: auto
-          # This is the R2's limitation
-          # Refer to 
https://opendal.apache.org/docs/services/s3#compatible-services for more 
information
-          OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
+# Disable this test until we addressed 
https://github.com/apache/incubator-opendal/issues/2904
+#
+#  r2:
+#    runs-on: ubuntu-latest
+#    if: github.event_name == 'push' || 
!github.event.pull_request.head.repo.fork
+#    steps:
+#      - uses: actions/checkout@v3
+#      - name: Setup Rust toolchain
+#        uses: ./.github/actions/setup
+#        with:
+#          need-nextest: true
+#
+#      - name: Load secret
+#        id: op-load-secret
+#        uses: 1password/load-secrets-action@v1
+#        with:
+#          export-env: true
+#        env:
+#          OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
+#          OPENDAL_S3_TEST: op://services/r2/test
+#          OPENDAL_S3_BUCKET: op://services/r2/bucket
+#          OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
+#          OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
+#          OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
+#
+#      - name: Test
+#        shell: bash
+#        working-directory: core
+#        run: cargo nextest run s3
+#        env:
+#          OPENDAL_S3_REGION: auto
+#          # This is the R2's limitation
+#          # Refer to 
https://opendal.apache.org/docs/services/s3#compatible-services for more 
information
+#          OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
diff --git a/core/fuzz/fuzz_writer.rs b/core/fuzz/fuzz_writer.rs
index 4c2151035..2f992b919 100644
--- a/core/fuzz/fuzz_writer.rs
+++ b/core/fuzz/fuzz_writer.rs
@@ -107,7 +107,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) -> 
Result<()> {
 
     let checker = WriteChecker::new(total_size);
 
-    let mut writer = op.writer(&path).await?;
+    let mut writer = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
 
     for chunk in &checker.chunks {
         writer.write(chunk.clone()).await?;
@@ -132,7 +132,7 @@ fuzz_target!(|input: FuzzInput| {
         runtime.block_on(async {
             fuzz_writer(op, input.clone())
                 .await
-                .unwrap_or_else(|_| panic!("fuzz reader must succeed"));
+                .unwrap_or_else(|_| panic!("fuzz writer must succeed"));
         })
     }
 });
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 1940c40cd..848bc1786 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -45,6 +45,11 @@ impl Cursor {
         let len = self.pos.min(self.inner.len() as u64) as usize;
         &self.inner.as_ref()[len..]
     }
+
+    /// Return the length of remaining slice.
+    pub fn len(&self) -> usize {
+        self.inner.len() - self.pos as usize
+    }
 }
 
 impl From<Bytes> for Cursor {
@@ -148,7 +153,93 @@ impl oio::BlockingRead for Cursor {
     }
 }
 
-/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`]
+impl oio::Stream for Cursor {
+    fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if self.is_empty() {
+            return Poll::Ready(None);
+        }
+
+        let bs = self.inner.clone();
+        self.pos += bs.len() as u64;
+        Poll::Ready(Some(Ok(bs)))
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        self.pos = 0;
+        Poll::Ready(Ok(()))
+    }
+}
+
+/// # TODO
+///
+/// we can do some compaction during runtime. For example, merge 4K data
+/// into the same bytes instead.
+#[derive(Clone)]
+pub struct ChunkedCursor {
+    inner: VecDeque<Bytes>,
+    idx: usize,
+}
+
+impl Default for ChunkedCursor {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ChunkedCursor {
+    /// Create a new chunked cursor.
+    pub fn new() -> Self {
+        Self {
+            inner: VecDeque::new(),
+            idx: 0,
+        }
+    }
+
+    /// Returns `true` if current cursor is empty.
+    pub fn is_empty(&self) -> bool {
+        self.inner.len() <= self.idx
+    }
+
+    /// Return current bytes size of cursor.
+    pub fn len(&self) -> usize {
+        self.inner.iter().skip(self.idx).map(|v| v.len()).sum()
+    }
+
+    /// Reset current cursor to start.
+    pub fn reset(&mut self) {
+        self.idx = 0;
+    }
+
+    /// Clear the entire cursor.
+    pub fn clear(&mut self) {
+        self.idx = 0;
+        self.inner.clear();
+    }
+
+    /// Push a new bytes into vector cursor.
+    pub fn push(&mut self, bs: Bytes) {
+        self.inner.push_back(bs);
+    }
+}
+
+impl oio::Stream for ChunkedCursor {
+    fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if self.is_empty() {
+            return Poll::Ready(None);
+        }
+
+        let bs = self.inner[self.idx].clone();
+        self.idx += 1;
+        Poll::Ready(Some(Ok(bs)))
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        self.reset();
+        Poll::Ready(Ok(()))
+    }
+}
+
+/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements 
[`oio::Stream`]
 pub struct VectorCursor {
     inner: VecDeque<Bytes>,
     size: usize,
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 5a4729fe8..1b24bec9c 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -35,6 +35,7 @@ mod page;
 pub use page::*;
 
 mod cursor;
+pub use cursor::ChunkedCursor;
 pub use cursor::Cursor;
 pub use cursor::VectorCursor;
 
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 7345564a2..495a4fb22 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -18,10 +18,10 @@
 use std::future::Future;
 use std::pin::Pin;
 use std::sync::Arc;
-use std::task::Context;
 use std::task::Poll;
+use std::task::{ready, Context};
 
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
 use pin_project::pin_project;
 
 use crate::*;
@@ -135,6 +135,29 @@ pub trait StreamExt: Stream {
     fn reset(&mut self) -> ResetFuture<'_, Self> {
         ResetFuture { inner: self }
     }
+
+    /// Chain this stream with another stream.
+    fn chain<S>(self, other: S) -> Chain<Self, S>
+    where
+        Self: Sized,
+        S: Stream,
+    {
+        Chain {
+            first: Some(self),
+            second: other,
+        }
+    }
+
+    /// Collect all items from this stream into a single bytes.
+    fn collect(self) -> Collect<Self>
+    where
+        Self: Sized,
+    {
+        Collect {
+            stream: self,
+            buf: BytesMut::new(),
+        }
+    }
 }
 
 /// Make this future `!Unpin` for compatibility with async trait methods.
@@ -172,3 +195,55 @@ where
         Pin::new(this.inner).poll_reset(cx)
     }
 }
+
+/// Stream for the [`chain`](StreamExt::chain) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Chain<S1: Stream, S2: Stream> {
+    first: Option<S1>,
+    second: S2,
+}
+
+impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if let Some(first) = self.first.as_mut() {
+            if let Some(item) = ready!(first.poll_next(cx)) {
+                return Poll::Ready(Some(item));
+            }
+
+            self.first = None;
+        }
+        self.second.poll_next(cx)
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "chained stream doesn't support reset",
+        )))
+    }
+}
+
+/// Stream for the [`collect`](StreamExt::collect) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Collect<S> {
+    stream: S,
+    buf: BytesMut,
+}
+
+impl<S> Future for Collect<S>
+where
+    S: Stream,
+{
+    type Output = Result<Bytes>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
+        let mut this = self.as_mut();
+        loop {
+            match ready!(this.stream.poll_next(cx)) {
+                Some(Ok(bs)) => this.buf.extend(bs),
+                Some(Err(err)) => return Poll::Ready(Err(err)),
+                None => return Poll::Ready(Ok(self.buf.split().freeze())),
+            }
+        }
+    }
+}
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 5d6eda9da..07fa546cc 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -22,8 +22,6 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
-
 /// AppendObjectWrite is used to implement [`Write`] based on append
 /// object. By implementing AppendObjectWrite, services don't need to
 /// care about the details of buffering and uploading parts.
@@ -53,8 +51,6 @@ pub struct AppendObjectWriter<W: AppendObjectWrite> {
     inner: W,
 
     offset: Option<u64>,
-    buffer: oio::VectorCursor,
-    buffer_size: usize,
 }
 
 impl<W: AppendObjectWrite> AppendObjectWriter<W> {
@@ -63,24 +59,9 @@ impl<W: AppendObjectWrite> AppendObjectWriter<W> {
         Self {
             inner,
             offset: None,
-            buffer: oio::VectorCursor::new(),
-            buffer_size: DEFAULT_WRITE_MIN_SIZE,
         }
     }
 
-    /// Configure the write_min_size.
-    ///
-    /// write_min_size is used to control the size of internal buffer.
-    ///
-    /// AppendObjectWriter will flush the buffer to storage when
-    /// the size of buffer is larger than write_min_size.
-    ///
-    /// This value is default to 8 MiB.
-    pub fn with_write_min_size(mut self, v: usize) -> Self {
-        self.buffer_size = v;
-        self
-    }
-
     async fn offset(&mut self) -> Result<u64> {
         if let Some(offset) = self.offset {
             return Ok(offset);
@@ -101,72 +82,24 @@ where
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let offset = self.offset().await?;
 
-        // Ignore empty bytes
-        if bs.is_empty() {
-            return Ok(());
-        }
-
-        self.buffer.push(bs);
-        // Return directly if the buffer is not full
-        if self.buffer.len() <= self.buffer_size {
-            return Ok(());
-        }
-
-        let bs = self.buffer.peak_all();
-        let size = bs.len();
+        let size = bs.len() as u64;
 
-        match self
-            .inner
-            .append(offset, size as u64, AsyncBody::Bytes(bs))
+        self.inner
+            .append(offset, size, AsyncBody::Bytes(bs))
             .await
-        {
-            Ok(_) => {
-                self.buffer.take(size);
-                self.offset = Some(offset + size as u64);
-                Ok(())
-            }
-            Err(e) => {
-                // If the upload fails, we should pop the given bs to make sure
-                // write is re-enter safe.
-                self.buffer.pop();
-                Err(e)
-            }
-        }
+            .map(|_| self.offset = Some(offset + size))
     }
 
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
-        if !self.buffer.is_empty() {
-            return Err(Error::new(
-                ErrorKind::InvalidInput,
-                "Writer::sink should not be used mixed with existing buffer",
-            ));
-        }
-
         let offset = self.offset().await?;
 
         self.inner
             .append(offset, size, AsyncBody::Stream(s))
-            .await?;
-        self.offset = Some(offset + size);
-
-        Ok(())
+            .await
+            .map(|_| self.offset = Some(offset + size))
     }
 
     async fn close(&mut self) -> Result<()> {
-        // Make sure internal buffer has been flushed.
-        if !self.buffer.is_empty() {
-            let bs = self.buffer.peak_exact(self.buffer.len());
-
-            let offset = self.offset().await?;
-            let size = bs.len() as u64;
-            self.inner
-                .append(offset, size, AsyncBody::Bytes(bs))
-                .await?;
-
-            self.buffer.clear();
-            self.offset = Some(offset + size);
-        }
-
         Ok(())
     }
 
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
new file mode 100644
index 000000000..d484127d7
--- /dev/null
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio::{StreamExt, Streamer};
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use bytes::Bytes;
+
+/// AtLeastBufWrite is used to implement [`Write`] based on at least buffer.
+///
+/// Users can wrap a writer and a buffer together.
+pub struct AtLeastBufWriter<W: oio::Write> {
+    inner: W,
+
+    /// The total size of the data.
+    ///
+    /// If the total size is known, we will write to underlying storage 
directly without buffer it
+    /// when possible.
+    total_size: Option<u64>,
+
+    /// The size for buffer, we will flush the underlying storage if the 
buffer is full.
+    buffer_size: usize,
+    buffer: oio::ChunkedCursor,
+}
+
+impl<W: oio::Write> AtLeastBufWriter<W> {
+    /// Create a new at least buf writer.
+    pub fn new(inner: W, buffer_size: usize) -> Self {
+        Self {
+            inner,
+            total_size: None,
+            buffer_size,
+            buffer: oio::ChunkedCursor::new(),
+        }
+    }
+
+    /// Configure the total size for writer.
+    pub fn with_total_size(mut self, total_size: Option<u64>) -> Self {
+        self.total_size = total_size;
+        self
+    }
+}
+
+#[async_trait]
+impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        // If total size is known and equals to given bytes, we can write it 
directly.
+        if let Some(total_size) = self.total_size {
+            if total_size == bs.len() as u64 {
+                return self.inner.write(bs).await;
+            }
+        }
+
+        // Push the bytes into the buffer if the buffer is not full.
+        if self.buffer.len() + bs.len() < self.buffer_size {
+            self.buffer.push(bs);
+            return Ok(());
+        }
+
+        let mut buf = self.buffer.clone();
+        buf.push(bs);
+
+        self.inner
+            .sink(buf.len() as u64, Box::new(buf))
+            .await
+            // Clear buffer if the write is successful.
+            .map(|_| self.buffer.clear())
+    }
+
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+        // If total size is known and equals to given stream, we can write it 
directly.
+        if let Some(total_size) = self.total_size {
+            if total_size == size {
+                return self.inner.sink(size, s).await;
+            }
+        }
+
+        // Push the bytes into the buffer if the buffer is not full.
+        if self.buffer.len() as u64 + size < self.buffer_size as u64 {
+            self.buffer.push(s.collect().await?);
+            return Ok(());
+        }
+
+        let buf = self.buffer.clone();
+        let buffer_size = buf.len() as u64;
+        let stream = buf.chain(s);
+
+        self.inner
+            .sink(buffer_size + size, Box::new(stream))
+            .await
+            // Clear buffer if the write is successful.
+            .map(|_| self.buffer.clear())
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        self.inner.abort().await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        if !self.buffer.is_empty() {
+            self.inner
+                .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
+                .await?;
+            self.buffer.clear();
+        }
+
+        self.inner.close().await
+    }
+}
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
new file mode 100644
index 000000000..043df2978
--- /dev/null
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`type_alias_impl_trait`](https://github.com/rust-lang/rust/issues/63063) 
is not stable yet.
+//! So we can't write the following code:
+//!
+//! ```txt
+//! impl Accessor for S3Backend {
+//!     type Writer = impl oio::Write;
+//! }
+//! ```
+//!
+//! Which means we have to write the type directly like:
+//!
+//! ```txt
+//! impl Accessor for OssBackend {
+//!     type Writer = oio::TwoWaysWriter<
+//!         oio::MultipartUploadWriter<OssWriter>,
+//!         oio::AppendObjectWriter<OssWriter>,
+//!     >;
+//! }
+//! ```
+//!
+//! This module is used to provide some enums for the above code. We should 
remove this module once
+//! type_alias_impl_trait has been stabilized.
+
+use async_trait::async_trait;
+use bytes::Bytes;
+
+use crate::raw::oio::Streamer;
+use crate::raw::*;
+use crate::*;
+
+/// TwoWaysWrite is used to implement [`Write`] based on two ways.
+///
+/// Users can wrap two different writers together.
+pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
+    /// The first type for the [`TwoWaysWriter`].
+    One(ONE),
+    /// The second type for the [`TwoWaysWriter`].
+    Two(TWO),
+}
+
+#[async_trait]
+impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        match self {
+            Self::One(one) => one.write(bs).await,
+            Self::Two(two) => two.write(bs).await,
+        }
+    }
+
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+        match self {
+            Self::One(one) => one.sink(size, s).await,
+            Self::Two(two) => two.sink(size, s).await,
+        }
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        match self {
+            Self::One(one) => one.abort().await,
+            Self::Two(two) => two.abort().await,
+        }
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        match self {
+            Self::One(one) => one.close().await,
+            Self::Two(two) => two.close().await,
+        }
+    }
+}
+
+/// ThreeWaysWriter is used to implement [`Write`] based on three ways.
+///
+/// Users can wrap three different writers together.
+pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> {
+    /// The first type for the [`ThreeWaysWriter`].
+    One(ONE),
+    /// The second type for the [`ThreeWaysWriter`].
+    Two(TWO),
+    /// The third type for the [`ThreeWaysWriter`].
+    Three(THREE),
+}
+
+#[async_trait]
+impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
+    for ThreeWaysWriter<ONE, TWO, THREE>
+{
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        match self {
+            Self::One(one) => one.write(bs).await,
+            Self::Two(two) => two.write(bs).await,
+            Self::Three(three) => three.write(bs).await,
+        }
+    }
+
+    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+        match self {
+            Self::One(one) => one.sink(size, s).await,
+            Self::Two(two) => two.sink(size, s).await,
+            Self::Three(three) => three.sink(size, s).await,
+        }
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        match self {
+            Self::One(one) => one.abort().await,
+            Self::Two(two) => two.abort().await,
+            Self::Three(three) => three.abort().await,
+        }
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        match self {
+            Self::One(one) => one.close().await,
+            Self::Two(two) => two.close().await,
+            Self::Three(three) => three.close().await,
+        }
+    }
+}
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index b13b3e2c7..49dd94b2b 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -22,8 +22,9 @@ pub use api::Write;
 pub use api::WriteOperation;
 pub use api::Writer;
 
-mod two_ways_write;
-pub use two_ways_write::TwoWaysWriter;
+mod compose_write;
+pub use compose_write::ThreeWaysWriter;
+pub use compose_write::TwoWaysWriter;
 
 mod multipart_upload_write;
 pub use multipart_upload_write::MultipartUploadPart;
@@ -33,3 +34,10 @@ pub use multipart_upload_write::MultipartUploadWriter;
 mod append_object_write;
 pub use append_object_write::AppendObjectWrite;
 pub use append_object_write::AppendObjectWriter;
+
+mod at_least_buf_write;
+pub use at_least_buf_write::AtLeastBufWriter;
+
+mod one_shot_write;
+pub use one_shot_write::OneShotWrite;
+pub use one_shot_write::OneShotWriter;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 8ca10c462..c013b24c3 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -21,8 +21,6 @@ use bytes::Bytes;
 use crate::raw::*;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
-
 /// MultipartUploadWrite is used to implement [`Write`] based on multipart
 /// uploads. By implementing MultipartUploadWrite, services don't need to
 /// care about the details of buffering and uploading parts.
@@ -34,12 +32,6 @@ const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
 /// - Expose `MultipartUploadWriter` as `Accessor::Writer`
 #[async_trait]
 pub trait MultipartUploadWrite: Send + Sync + Unpin {
-    /// write_once write all data at once.
-    ///
-    /// MultipartUploadWriter will call this API when the size of data is
-    /// already known.
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
-
     /// initiate_part will call start a multipart upload and return the upload 
id.
     ///
     /// MultipartUploadWriter will call this when:
@@ -94,39 +86,32 @@ pub struct MultipartUploadPart {
 /// - Allow users to switch to un-buffered mode if users write 16MiB every 
time.
 pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
     inner: W,
-    total_size: Option<u64>,
 
     upload_id: Option<String>,
     parts: Vec<MultipartUploadPart>,
-    buffer: oio::VectorCursor,
-    buffer_size: usize,
 }
 
 impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
     /// Create a new MultipartUploadWriter.
-    pub fn new(inner: W, total_size: Option<u64>) -> Self {
+    pub fn new(inner: W) -> Self {
         Self {
             inner,
-            total_size,
 
             upload_id: None,
             parts: Vec::new(),
-            buffer: oio::VectorCursor::new(),
-            buffer_size: DEFAULT_WRITE_MIN_SIZE,
         }
     }
 
-    /// Configure the write_min_size.
-    ///
-    /// write_min_size is used to control the size of internal buffer.
-    ///
-    /// MultipartUploadWriter will flush the buffer to upload a part when
-    /// the size of buffer is larger than write_min_size.
-    ///
-    /// This value is default to 8 MiB (as recommended by AWS).
-    pub fn with_write_min_size(mut self, v: usize) -> Self {
-        self.buffer_size = v;
-        self
+    /// Get the upload id. Initiate a new multipart upload if the upload id is 
empty.
+    pub async fn upload_id(&mut self) -> Result<String> {
+        match &self.upload_id {
+            Some(upload_id) => Ok(upload_id.to_string()),
+            None => {
+                let upload_id = self.inner.initiate_part().await?;
+                self.upload_id = Some(upload_id.clone());
+                Ok(upload_id)
+            }
+        }
     }
 }
 
@@ -136,88 +121,28 @@ where
     W: MultipartUploadWrite,
 {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = match &self.upload_id {
-            Some(upload_id) => upload_id,
-            None => {
-                if self.total_size.unwrap_or_default() == bs.len() as u64 {
-                    return self
-                        .inner
-                        .write_once(bs.len() as u64, AsyncBody::Bytes(bs))
-                        .await;
-                }
-
-                let upload_id = self.inner.initiate_part().await?;
-                self.upload_id = Some(upload_id);
-                self.upload_id.as_deref().unwrap()
-            }
-        };
+        let upload_id = self.upload_id().await?;
 
-        // Ignore empty bytes
-        if bs.is_empty() {
-            return Ok(());
-        }
-
-        self.buffer.push(bs);
-        // Return directly if the buffer is not full
-        if self.buffer.len() <= self.buffer_size {
-            return Ok(());
-        }
-
-        let bs = self.buffer.peak_at_least(self.buffer_size);
         let size = bs.len();
 
-        match self
-            .inner
+        self.inner
             .write_part(
-                upload_id,
+                &upload_id,
                 self.parts.len(),
                 size as u64,
                 AsyncBody::Bytes(bs),
             )
             .await
-        {
-            Ok(part) => {
-                self.buffer.take(size);
-                self.parts.push(part);
-                Ok(())
-            }
-            Err(e) => {
-                // If the upload fails, we should pop the given bs to make sure
-                // write is re-enter safe.
-                self.buffer.pop();
-                Err(e)
-            }
-        }
+            .map(|v| self.parts.push(v))
     }
 
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        if !self.buffer.is_empty() {
-            return Err(Error::new(
-                ErrorKind::InvalidInput,
-                "Writer::sink should not be used mixed with existing buffer",
-            ));
-        }
-
-        let upload_id = match &self.upload_id {
-            Some(upload_id) => upload_id,
-            None => {
-                if self.total_size.unwrap_or_default() == size {
-                    return self.inner.write_once(size, 
AsyncBody::Stream(s)).await;
-                }
-
-                let upload_id = self.inner.initiate_part().await?;
-                self.upload_id = Some(upload_id);
-                self.upload_id.as_deref().unwrap()
-            }
-        };
+        let upload_id = self.upload_id().await?;
 
-        let part = self
-            .inner
-            .write_part(upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
-            .await?;
-        self.parts.push(part);
-
-        Ok(())
+        self.inner
+            .write_part(&upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
+            .await
+            .map(|v| self.parts.push(v))
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -227,30 +152,6 @@ where
             return Ok(());
         };
 
-        // Make sure internal buffer has been flushed.
-        if !self.buffer.is_empty() {
-            let bs = self.buffer.peak_exact(self.buffer.len());
-
-            match self
-                .inner
-                .write_part(
-                    upload_id,
-                    self.parts.len(),
-                    bs.len() as u64,
-                    AsyncBody::Bytes(bs),
-                )
-                .await
-            {
-                Ok(part) => {
-                    self.buffer.clear();
-                    self.parts.push(part);
-                }
-                Err(e) => {
-                    return Err(e);
-                }
-            }
-        }
-
         self.inner.complete_part(upload_id, &self.parts).await
     }
 
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
new file mode 100644
index 000000000..c2ad25c83
--- /dev/null
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use bytes::Bytes;
+
+/// OneShotWrite is used to implement [`Write`] based on one shot operation.
+/// By implementing OneShotWrite, services don't need to care about the 
details.
+///
+/// For example, S3 `PUT Object` and fs `write_all`.
+///
+/// The layout after adopting [`OneShotWrite`]:
+#[async_trait]
+pub trait OneShotWrite: Send + Sync + Unpin {
+    /// write_once write all data at once.
+    ///
+    /// Implementations should make sure that the data is written correctly at 
once.
+    async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>;
+}
+
+/// OneShotWrite is used to implement [`Write`] based on one shot.
+pub struct OneShotWriter<W: OneShotWrite> {
+    inner: W,
+}
+
+impl<W: OneShotWrite> OneShotWriter<W> {
+    /// Create a new one shot writer.
+    pub fn new(inner: W) -> Self {
+        Self { inner }
+    }
+}
+
+#[async_trait]
+impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let cursor = oio::Cursor::from(bs);
+        self.inner
+            .write_once(cursor.len() as u64, Box::new(cursor))
+            .await
+    }
+
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.write_once(size, s).await
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/raw/oio/write/two_ways_write.rs 
b/core/src/raw/oio/write/two_ways_write.rs
deleted file mode 100644
index 470655783..000000000
--- a/core/src/raw/oio/write/two_ways_write.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use bytes::Bytes;
-
-use crate::raw::oio::Streamer;
-use crate::raw::*;
-use crate::*;
-
-/// TwoWaysWrite is used to implement [`Write`] based on two ways.
-///
-/// Users can wrap two different writers together.
-pub enum TwoWaysWriter<L: oio::Write, R: oio::Write> {
-    /// The left side for the [`TwoWaysWriter`].
-    Left(L),
-    /// The right side for the [`TwoWaysWriter`].
-    Right(R),
-}
-
-#[async_trait]
-impl<L: oio::Write, R: oio::Write> oio::Write for TwoWaysWriter<L, R> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        match self {
-            Self::Left(l) => l.write(bs).await,
-            Self::Right(r) => r.write(bs).await,
-        }
-    }
-
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
-        match self {
-            Self::Left(l) => l.sink(size, s).await,
-            Self::Right(r) => r.sink(size, s).await,
-        }
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        match self {
-            Self::Left(l) => l.abort().await,
-            Self::Right(r) => r.abort().await,
-        }
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        match self {
-            Self::Left(l) => l.close().await,
-            Self::Right(r) => r.close().await,
-        }
-    }
-}
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 2617768da..0404ed468 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -406,6 +406,7 @@ impl OpStat {
 pub struct OpWrite {
     append: bool,
 
+    buffer_size: Option<usize>,
     content_length: Option<u64>,
     content_type: Option<String>,
     content_disposition: Option<String>,
@@ -439,6 +440,26 @@ impl OpWrite {
         self
     }
 
+    /// Get the buffer size from op.
+    ///
+    /// The buffer size is used by service to decide the buffer size of the 
underlying writer.
+    pub fn buffer_size(&self) -> Option<usize> {
+        self.buffer_size
+    }
+
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
+        self.buffer_size = Some(buffer_size);
+        self
+    }
+
     /// Get the content length from op.
     ///
     /// The content length is the total length of the data to be written.
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 9dc4b1b66..66aa65449 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -32,9 +33,13 @@ use super::error::parse_error;
 use super::pager::CosPager;
 use super::writer::CosWriter;
 use crate::raw::*;
+use crate::services::cos::writer::CosWriters;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 1024 * 1024;
+/// The minimum multipart size of COS is 1 MiB.
+///
+/// ref: <https://www.tencentcloud.com/document/product/436/14112>
+const MINIMUM_MULTIPART_SIZE: usize = 1024 * 1024;
 
 /// Tencent-Cloud COS services support.
 #[doc = include_str!("docs.md")]
@@ -47,10 +52,6 @@ pub struct CosBuilder {
     bucket: Option<String>,
     http_client: Option<HttpClient>,
 
-    /// the part size of cos multipart upload, which should be 1 MB to 5 GB.
-    /// There is no minimum size limit on the last part of your multipart 
upload
-    write_min_size: Option<usize>,
-
     disable_config_load: bool,
 }
 
@@ -125,14 +126,6 @@ impl CosBuilder {
         self
     }
 
-    /// set the minimum size of unsized write, it should be greater than 1 MB.
-    /// Reference: [Upload Part | Tencent 
Cloud](https://www.tencentcloud.com/document/product/436/7750)
-    pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
-        self.write_min_size = Some(write_min_size);
-
-        self
-    }
-
     /// Disable config load so that opendal will not load config from
     /// environment.
     ///
@@ -168,8 +161,6 @@ impl Builder for CosBuilder {
         map.get("endpoint").map(|v| builder.endpoint(v));
         map.get("secret_id").map(|v| builder.secret_id(v));
         map.get("secret_key").map(|v| builder.secret_key(v));
-        map.get("write_min_size")
-            .map(|v| builder.write_min_size(v.parse().expect("input must be a 
number")));
 
         builder
     }
@@ -233,14 +224,6 @@ impl Builder for CosBuilder {
         let cred_loader = TencentCosCredentialLoader::new(client.client(), 
cfg);
 
         let signer = TencentCosSigner::new();
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 1024 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::Cos));
-        }
 
         debug!("backend build finished");
         Ok(CosBackend {
@@ -251,7 +234,6 @@ impl Builder for CosBuilder {
                 signer,
                 loader: cred_loader,
                 client,
-                write_min_size,
             }),
         })
     }
@@ -267,10 +249,7 @@ pub struct CosBackend {
 impl Accessor for CosBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<
-        oio::MultipartUploadWriter<CosWriter>,
-        oio::AppendObjectWriter<CosWriter>,
-    >;
+    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::AtLeastBufWriter<CosWriters>>;
     type BlockingWriter = ();
     type Pager = CosPager;
     type BlockingPager = ();
@@ -358,19 +337,26 @@ impl Accessor for CosBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let writer = CosWriter::new(self.core.clone(), path, args.clone());
 
-        let tw = if args.append() {
+        let w = if args.append() {
+            CosWriters::Three(oio::AppendObjectWriter::new(writer))
+        } else if args.content_length().is_some() {
+            CosWriters::One(oio::OneShotWriter::new(writer))
+        } else {
+            CosWriters::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
             let w =
-                
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
 
-            oio::TwoWaysWriter::Right(w)
+            oio::TwoWaysWriter::Two(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer, 
args.content_length())
-                .with_write_min_size(self.core.write_min_size);
-
-            oio::TwoWaysWriter::Left(w)
+            oio::TwoWaysWriter::One(w)
         };
 
-        return Ok((RpWrite::default(), tw));
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index e288a447c..490dada6e 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -45,7 +45,6 @@ pub struct CosCore {
     pub signer: TencentCosSigner,
     pub loader: TencentCosCredentialLoader,
     pub client: HttpClient,
-    pub write_min_size: usize,
 }
 
 impl Debug for CosCore {
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index e64bc23a1..af2a6ebd0 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -23,9 +23,16 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
+pub type CosWriters = oio::ThreeWaysWriter<
+    oio::OneShotWriter<CosWriter>,
+    oio::MultipartUploadWriter<CosWriter>,
+    oio::AppendObjectWriter<CosWriter>,
+>;
+
 pub struct CosWriter {
     core: Arc<CosCore>,
 
@@ -44,15 +51,15 @@ impl CosWriter {
 }
 
 #[async_trait]
-impl oio::MultipartUploadWrite for CosWriter {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for CosWriter {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.cos_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -69,7 +76,10 @@ impl oio::MultipartUploadWrite for CosWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for CosWriter {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index e90308eda..78ecf0976 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -32,8 +33,14 @@ use super::error::parse_error;
 use super::pager::ObsPager;
 use super::writer::ObsWriter;
 use crate::raw::*;
+use crate::services::obs::writer::ObsWriters;
 use crate::*;
 
+/// The minimum multipart size of OBS is 5 MiB.
+///
+/// ref: 
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
+const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
+
 /// Huawei Cloud OBS services support.
 ///
 /// # Capabilities
@@ -91,9 +98,6 @@ use crate::*;
 ///     Ok(())
 /// }
 /// ```
-
-const DEFAULT_WRITE_MIN_SIZE: usize = 100 * 1024;
-
 /// Huawei-Cloud Object Storage Service (OBS) support
 #[derive(Default, Clone)]
 pub struct ObsBuilder {
@@ -103,9 +107,6 @@ pub struct ObsBuilder {
     secret_access_key: Option<String>,
     bucket: Option<String>,
     http_client: Option<HttpClient>,
-    /// the part size of obs multipart upload, which should be 100 KiB to 5 
GiB.
-    /// There is no minimum size limit on the last part of your multipart 
upload
-    write_min_size: Option<usize>,
 }
 
 impl Debug for ObsBuilder {
@@ -190,14 +191,6 @@ impl ObsBuilder {
         self.http_client = Some(client);
         self
     }
-
-    /// set the minimum size of unsized write, it should be greater than 100 
KB.
-    /// Reference: [Huawei Obs multipart upload 
limits](https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0099.html)
-    pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
-        self.write_min_size = Some(write_min_size);
-
-        self
-    }
 }
 
 impl Builder for ObsBuilder {
@@ -213,8 +206,6 @@ impl Builder for ObsBuilder {
         map.get("access_key_id").map(|v| builder.access_key_id(v));
         map.get("secret_access_key")
             .map(|v| builder.secret_access_key(v));
-        map.get("write_min_size")
-            .map(|v| builder.write_min_size(v.parse().expect("input must be a 
number")));
 
         builder
     }
@@ -296,14 +287,6 @@ impl Builder for ObsBuilder {
                 &endpoint
             }
         });
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 100 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::Obs));
-        }
 
         debug!("backend build finished");
         Ok(ObsBackend {
@@ -314,7 +297,6 @@ impl Builder for ObsBuilder {
                 signer,
                 loader,
                 client,
-                write_min_size,
             }),
         })
     }
@@ -330,10 +312,7 @@ pub struct ObsBackend {
 impl Accessor for ObsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<
-        oio::MultipartUploadWriter<ObsWriter>,
-        oio::AppendObjectWriter<ObsWriter>,
-    >;
+    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::AtLeastBufWriter<ObsWriters>>;
     type BlockingWriter = ();
     type Pager = ObsPager;
     type BlockingPager = ();
@@ -452,19 +431,26 @@ impl Accessor for ObsBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let writer = ObsWriter::new(self.core.clone(), path, args.clone());
 
-        let tw = if args.append() {
+        let w = if args.append() {
+            ObsWriters::Three(oio::AppendObjectWriter::new(writer))
+        } else if args.content_length().is_some() {
+            ObsWriters::One(oio::OneShotWriter::new(writer))
+        } else {
+            ObsWriters::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
             let w =
-                
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
 
-            oio::TwoWaysWriter::Right(w)
+            oio::TwoWaysWriter::Two(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer, 
args.content_length())
-                .with_write_min_size(self.core.write_min_size);
-
-            oio::TwoWaysWriter::Left(w)
+            oio::TwoWaysWriter::One(w)
         };
 
-        return Ok((RpWrite::default(), tw));
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index 52604fd50..3d8ba1daf 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -45,7 +45,6 @@ pub struct ObsCore {
     pub signer: HuaweicloudObsSigner,
     pub loader: HuaweicloudObsCredentialLoader,
     pub client: HttpClient,
-    pub write_min_size: usize,
 }
 
 impl Debug for ObsCore {
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index efcfd3ab6..f8078a955 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -23,10 +23,16 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::MultipartUploadPart;
+use crate::raw::oio::{MultipartUploadPart, Streamer};
 use crate::raw::*;
 use crate::*;
 
+pub type ObsWriters = oio::ThreeWaysWriter<
+    oio::OneShotWriter<ObsWriter>,
+    oio::MultipartUploadWriter<ObsWriter>,
+    oio::AppendObjectWriter<ObsWriter>,
+>;
+
 pub struct ObsWriter {
     core: Arc<ObsCore>,
 
@@ -43,15 +49,16 @@ impl ObsWriter {
         }
     }
 }
+
 #[async_trait]
-impl oio::MultipartUploadWrite for ObsWriter {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for ObsWriter {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.obs_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -68,7 +75,10 @@ impl oio::MultipartUploadWrite for ObsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for ObsWriter {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6704d158b..1aef92b77 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::fmt::Debug;
@@ -35,9 +36,13 @@ use super::error::parse_error;
 use super::pager::OssPager;
 use super::writer::OssWriter;
 use crate::raw::*;
+use crate::services::oss::writer::OssWriters;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
+/// The minimum multipart size of OSS is 100 KiB.
+///
+/// ref: 
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
+const MINIMUM_MULTIPART_SIZE: usize = 100 * 1024;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 /// Aliyun Object Storage Service (OSS) support
@@ -343,14 +348,6 @@ impl Builder for OssBuilder {
 
         let signer = AliyunOssSigner::new(bucket);
 
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 5 * 1024 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::Oss));
-        }
         let batch_max_operations = self
             .batch_max_operations
             .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
@@ -368,7 +365,6 @@ impl Builder for OssBuilder {
                 client,
                 server_side_encryption,
                 server_side_encryption_key_id,
-                write_min_size,
                 batch_max_operations,
             }),
         })
@@ -385,10 +381,7 @@ pub struct OssBackend {
 impl Accessor for OssBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<
-        oio::MultipartUploadWriter<OssWriter>,
-        oio::AppendObjectWriter<OssWriter>,
-    >;
+    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::AtLeastBufWriter<OssWriters>>;
     type BlockingWriter = ();
     type Pager = OssPager;
     type BlockingPager = ();
@@ -480,19 +473,26 @@ impl Accessor for OssBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let writer = OssWriter::new(self.core.clone(), path, args.clone());
 
-        let tw = if args.append() {
+        let w = if args.append() {
+            OssWriters::Three(oio::AppendObjectWriter::new(writer))
+        } else if args.content_length().is_some() {
+            OssWriters::One(oio::OneShotWriter::new(writer))
+        } else {
+            OssWriters::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
             let w =
-                
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
 
-            oio::TwoWaysWriter::Right(w)
+            oio::TwoWaysWriter::Two(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer, 
args.content_length())
-                .with_write_min_size(self.core.write_min_size);
-
-            oio::TwoWaysWriter::Left(w)
+            oio::TwoWaysWriter::One(w)
         };
 
-        return Ok((RpWrite::default(), tw));
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index 6c570fb17..a81db97fa 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -64,7 +64,6 @@ pub struct OssCore {
     pub client: HttpClient,
     pub loader: AliyunLoader,
     pub signer: AliyunOssSigner,
-    pub write_min_size: usize,
     pub batch_max_operations: usize,
 }
 
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 9faad249f..27aa09011 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -23,9 +23,16 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
+pub type OssWriters = oio::ThreeWaysWriter<
+    oio::OneShotWriter<OssWriter>,
+    oio::MultipartUploadWriter<OssWriter>,
+    oio::AppendObjectWriter<OssWriter>,
+>;
+
 pub struct OssWriter {
     core: Arc<OssCore>,
 
@@ -44,15 +51,15 @@ impl OssWriter {
 }
 
 #[async_trait]
-impl oio::MultipartUploadWrite for OssWriter {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for OssWriter {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.oss_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
             false,
         )?;
 
@@ -70,7 +77,10 @@ impl oio::MultipartUploadWrite for OssWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for OssWriter {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 83ff8a7d0..700359009 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
@@ -43,6 +44,7 @@ use super::error::parse_s3_error_code;
 use super::pager::S3Pager;
 use super::writer::S3Writer;
 use crate::raw::*;
+use crate::services::s3::writer::S3Writers;
 use crate::*;
 
 /// Allow constructing correct region endpoint if user gives a global endpoint.
@@ -56,7 +58,10 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, 
&'static str>> = Lazy::new
     m
 });
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
+/// The minimum multipart size of S3 is 5 MiB.
+///
+/// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
+const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 /// Aws S3 and compatible services (including minio, digitalocean space, 
Tencent Cloud Object Storage(COS) and so on) support.
@@ -847,14 +852,6 @@ impl Builder for S3Builder {
 
         let signer = AwsV4Signer::new("s3", &region);
 
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 5 * 1024 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::S3));
-        }
         let batch_max_operations = self
             .batch_max_operations
             .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
@@ -874,7 +871,6 @@ impl Builder for S3Builder {
                 signer,
                 loader,
                 client,
-                write_min_size,
                 batch_max_operations,
             }),
         })
@@ -891,7 +887,7 @@ pub struct S3Backend {
 impl Accessor for S3Backend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::MultipartUploadWriter<S3Writer>;
+    type Writer = oio::TwoWaysWriter<S3Writers, 
oio::AtLeastBufWriter<S3Writers>>;
     type BlockingWriter = ();
     type Pager = S3Pager;
     type BlockingPager = ();
@@ -979,10 +975,26 @@ impl Accessor for S3Backend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        Ok((
-            RpWrite::default(),
-            S3Writer::new(self.core.clone(), path, args),
-        ))
+        let writer = S3Writer::new(self.core.clone(), path, args.clone());
+
+        let w = if args.content_length().is_some() {
+            S3Writers::One(oio::OneShotWriter::new(writer))
+        } else {
+            S3Writers::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
+            let w =
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+
+            oio::TwoWaysWriter::Two(w)
+        } else {
+            oio::TwoWaysWriter::One(w)
+        };
+
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 36321280f..89989bcb7 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -82,7 +82,6 @@ pub struct S3Core {
     pub signer: AwsV4Signer,
     pub loader: Box<dyn AwsCredentialLoad>,
     pub client: HttpClient,
-    pub write_min_size: usize,
     pub batch_max_operations: usize,
 }
 
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 508716d14..a27341d71 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -23,9 +23,13 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
+pub type S3Writers =
+    oio::TwoWaysWriter<oio::OneShotWriter<S3Writer>, 
oio::MultipartUploadWriter<S3Writer>>;
+
 pub struct S3Writer {
     core: Arc<S3Core>,
 
@@ -34,30 +38,25 @@ pub struct S3Writer {
 }
 
 impl S3Writer {
-    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> 
oio::MultipartUploadWriter<Self> {
-        let write_min_size = core.write_min_size;
-
-        let total_size = op.content_length();
-        let s3_writer = S3Writer {
+    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
+        S3Writer {
             core,
             path: path.to_string(),
             op,
-        };
-
-        oio::MultipartUploadWriter::new(s3_writer, 
total_size).with_write_min_size(write_min_size)
+        }
     }
 }
 
 #[async_trait]
-impl oio::MultipartUploadWrite for S3Writer {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for S3Writer {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.s3_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -74,7 +73,10 @@ impl oio::MultipartUploadWrite for S3Writer {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for S3Writer {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index d3676f1ce..10a9c061c 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -395,6 +395,19 @@ impl FutureWrite {
         self
     }
 
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn buffer_size(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
+        self
+    }
+
     /// Set the content length of op.
     ///
     /// If the content length is not set, the content length will be
@@ -457,6 +470,19 @@ impl FutureWriter {
         self
     }
 
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn buffer_size(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_buffer_size(v));
+        self
+    }
+
     /// Set the content length of op.
     ///
     /// If the content length is not set, the content length will be
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index a1effab43..ae8825984 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1233,7 +1233,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
     let (content, size): (Vec<u8>, usize) =
         gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
 
-    let mut w = op.writer(&path).await?;
+    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
 
     // Wrap a buf reader here to make sure content is read in 1MiB chunks.
     let mut cursor = BufReader::with_capacity(1024 * 1024, 
Cursor::new(content.clone()));
@@ -1266,7 +1266,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) -> 
Result<()> {
 
     let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
 
-    let mut w = op.writer(&path).await?;
+    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
 
     for _ in 0..100 {
         match fuzzer.fuzz() {

Reply via email to