This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch buffer-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 8f2ff0f3663e01328c1c73d4ff8aa5bef2e1a0a4
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 22 17:08:01 2023 +0800

    refactor
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/append_object_write.rs | 79 ++-------------------------
 core/src/raw/oio/write/at_least_buf_write.rs  | 61 +++++++++++++++------
 core/src/raw/ops.rs                           | 21 +++++++
 core/src/services/cos/backend.rs              | 53 +++++++-----------
 core/src/services/cos/core.rs                 |  1 -
 core/src/services/cos/writer.rs               |  6 ++
 core/src/services/obs/backend.rs              | 55 +++++++------------
 core/src/services/obs/core.rs                 |  1 -
 core/src/services/obs/writer.rs               |  6 ++
 core/src/services/oss/backend.rs              | 39 ++++++-------
 core/src/services/oss/core.rs                 |  1 -
 core/src/services/oss/writer.rs               |  6 ++
 core/src/services/s3/backend.rs               | 42 +++++++++-----
 core/src/services/s3/core.rs                  |  1 -
 core/src/services/s3/writer.rs                | 11 ++--
 core/src/types/operator/operator_futures.rs   | 26 +++++++++
 core/tests/behavior/write.rs                  |  4 +-
 17 files changed, 211 insertions(+), 202 deletions(-)

diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 5d6eda9da..07fa546cc 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -22,8 +22,6 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
-
 /// AppendObjectWrite is used to implement [`Write`] based on append
 /// object. By implementing AppendObjectWrite, services don't need to
 /// care about the details of buffering and uploading parts.
@@ -53,8 +51,6 @@ pub struct AppendObjectWriter<W: AppendObjectWrite> {
     inner: W,
 
     offset: Option<u64>,
-    buffer: oio::VectorCursor,
-    buffer_size: usize,
 }
 
 impl<W: AppendObjectWrite> AppendObjectWriter<W> {
@@ -63,24 +59,9 @@ impl<W: AppendObjectWrite> AppendObjectWriter<W> {
         Self {
             inner,
             offset: None,
-            buffer: oio::VectorCursor::new(),
-            buffer_size: DEFAULT_WRITE_MIN_SIZE,
         }
     }
 
-    /// Configure the write_min_size.
-    ///
-    /// write_min_size is used to control the size of internal buffer.
-    ///
-    /// AppendObjectWriter will flush the buffer to storage when
-    /// the size of buffer is larger than write_min_size.
-    ///
-    /// This value is default to 8 MiB.
-    pub fn with_write_min_size(mut self, v: usize) -> Self {
-        self.buffer_size = v;
-        self
-    }
-
     async fn offset(&mut self) -> Result<u64> {
         if let Some(offset) = self.offset {
             return Ok(offset);
@@ -101,72 +82,24 @@ where
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let offset = self.offset().await?;
 
-        // Ignore empty bytes
-        if bs.is_empty() {
-            return Ok(());
-        }
-
-        self.buffer.push(bs);
-        // Return directly if the buffer is not full
-        if self.buffer.len() <= self.buffer_size {
-            return Ok(());
-        }
-
-        let bs = self.buffer.peak_all();
-        let size = bs.len();
+        let size = bs.len() as u64;
 
-        match self
-            .inner
-            .append(offset, size as u64, AsyncBody::Bytes(bs))
+        self.inner
+            .append(offset, size, AsyncBody::Bytes(bs))
             .await
-        {
-            Ok(_) => {
-                self.buffer.take(size);
-                self.offset = Some(offset + size as u64);
-                Ok(())
-            }
-            Err(e) => {
-                // If the upload fails, we should pop the given bs to make sure
-                // write is re-enter safe.
-                self.buffer.pop();
-                Err(e)
-            }
-        }
+            .map(|_| self.offset = Some(offset + size))
     }
 
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
-        if !self.buffer.is_empty() {
-            return Err(Error::new(
-                ErrorKind::InvalidInput,
-                "Writer::sink should not be used mixed with existing buffer",
-            ));
-        }
-
         let offset = self.offset().await?;
 
         self.inner
             .append(offset, size, AsyncBody::Stream(s))
-            .await?;
-        self.offset = Some(offset + size);
-
-        Ok(())
+            .await
+            .map(|_| self.offset = Some(offset + size))
     }
 
     async fn close(&mut self) -> Result<()> {
-        // Make sure internal buffer has been flushed.
-        if !self.buffer.is_empty() {
-            let bs = self.buffer.peak_exact(self.buffer.len());
-
-            let offset = self.offset().await?;
-            let size = bs.len() as u64;
-            self.inner
-                .append(offset, size, AsyncBody::Bytes(bs))
-                .await?;
-
-            self.buffer.clear();
-            self.offset = Some(offset + size);
-        }
-
         Ok(())
     }
 
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 7a295c313..a86fb2c05 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -27,49 +27,76 @@ use bytes::Bytes;
 pub struct AtLeastBufWriter<W: oio::Write> {
     inner: W,
 
+    /// The total size of the data.
+    ///
+    /// If the total size is known, we will write to underlying storage 
directly without buffer it
+    /// when possible.
+    total_size: Option<u64>,
+
     /// The size for buffer, we will flush the underlying storage if the 
buffer is full.
-    size: usize,
-    buf: oio::ChunkedCursor,
+    buffer_size: usize,
+    buffer: oio::ChunkedCursor,
 }
 
 impl<W: oio::Write> AtLeastBufWriter<W> {
     /// Create a new at least buf writer.
-    pub fn new(inner: W, size: usize) -> Self {
+    pub fn new(inner: W, buffer_size: usize) -> Self {
         Self {
             inner,
-            size,
-            buf: oio::ChunkedCursor::new(),
+            total_size: None,
+            buffer_size,
+            buffer: oio::ChunkedCursor::new(),
         }
     }
+
+    /// Configure the total size for writer.
+    pub fn with_total_size(mut self, total_size: Option<u64>) -> Self {
+        self.total_size = total_size;
+        self
+    }
 }
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
+        // If total size is known and equals to given bytes, we can write it 
directly.
+        if let Some(total_size) = self.total_size {
+            if total_size == bs.len() as u64 {
+                return self.inner.write(bs).await;
+            }
+        }
+
         // Push the bytes into the buffer if the buffer is not full.
-        if self.buf.len() + bs.len() <= self.size {
-            self.buf.push(bs);
+        if self.buffer.len() + bs.len() <= self.buffer_size {
+            self.buffer.push(bs);
             return Ok(());
         }
 
-        let mut buf = self.buf.clone();
+        let mut buf = self.buffer.clone();
         buf.push(bs);
 
         self.inner
             .sink(buf.len() as u64, Box::new(buf))
             .await
             // Clear buffer if the write is successful.
-            .map(|_| self.buf.clear())
+            .map(|_| self.buffer.clear())
     }
 
     async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+        // If total size is known and equals to given stream, we can write it 
directly.
+        if let Some(total_size) = self.total_size {
+            if total_size == size {
+                return self.inner.sink(size, s).await;
+            }
+        }
+
         // Push the bytes into the buffer if the buffer is not full.
-        if self.buf.len() as u64 + size <= self.size as u64 {
-            self.buf.push(s.collect().await?);
+        if self.buffer.len() as u64 + size <= self.buffer_size as u64 {
+            self.buffer.push(s.collect().await?);
             return Ok(());
         }
 
-        let buf = self.buf.clone();
+        let buf = self.buffer.clone();
         let buffer_size = buf.len() as u64;
         let stream = buf.chain(s);
 
@@ -77,20 +104,20 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
             .sink(buffer_size + size, Box::new(stream))
             .await
             // Clear buffer if the write is successful.
-            .map(|_| self.buf.clear())
+            .map(|_| self.buffer.clear())
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.buf.clear();
+        self.buffer.clear();
         self.inner.abort().await
     }
 
     async fn close(&mut self) -> Result<()> {
-        if !self.buf.is_empty() {
+        if !self.buffer.is_empty() {
             self.inner
-                .sink(self.buf.len() as u64, Box::new(self.buf.clone()))
+                .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
                 .await?;
-            self.buf.clear();
+            self.buffer.clear();
         }
 
         self.inner.close().await?;
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 2617768da..0404ed468 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -406,6 +406,7 @@ impl OpStat {
 pub struct OpWrite {
     append: bool,
 
+    buffer_size: Option<usize>,
     content_length: Option<u64>,
     content_type: Option<String>,
     content_disposition: Option<String>,
@@ -439,6 +440,26 @@ impl OpWrite {
         self
     }
 
+    /// Get the buffer size from op.
+    ///
+    /// The buffer size is used by service to decide the buffer size of the 
underlying writer.
+    pub fn buffer_size(&self) -> Option<usize> {
+        self.buffer_size
+    }
+
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
+        self.buffer_size = Some(buffer_size);
+        self
+    }
+
     /// Get the content length from op.
     ///
     /// The content length is the total length of the data to be written.
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 851e79e53..66aa65449 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -32,9 +33,13 @@ use super::error::parse_error;
 use super::pager::CosPager;
 use super::writer::CosWriter;
 use crate::raw::*;
+use crate::services::cos::writer::CosWriters;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 1024 * 1024;
+/// The minimum multipart size of COS is 1 MiB.
+///
+/// ref: <https://www.tencentcloud.com/document/product/436/14112>
+const MINIMUM_MULTIPART_SIZE: usize = 1024 * 1024;
 
 /// Tencent-Cloud COS services support.
 #[doc = include_str!("docs.md")]
@@ -47,10 +52,6 @@ pub struct CosBuilder {
     bucket: Option<String>,
     http_client: Option<HttpClient>,
 
-    /// the part size of cos multipart upload, which should be 1 MB to 5 GB.
-    /// There is no minimum size limit on the last part of your multipart 
upload
-    write_min_size: Option<usize>,
-
     disable_config_load: bool,
 }
 
@@ -125,14 +126,6 @@ impl CosBuilder {
         self
     }
 
-    /// set the minimum size of unsized write, it should be greater than 1 MB.
-    /// Reference: [Upload Part | Tencent 
Cloud](https://www.tencentcloud.com/document/product/436/7750)
-    pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
-        self.write_min_size = Some(write_min_size);
-
-        self
-    }
-
     /// Disable config load so that opendal will not load config from
     /// environment.
     ///
@@ -168,8 +161,6 @@ impl Builder for CosBuilder {
         map.get("endpoint").map(|v| builder.endpoint(v));
         map.get("secret_id").map(|v| builder.secret_id(v));
         map.get("secret_key").map(|v| builder.secret_key(v));
-        map.get("write_min_size")
-            .map(|v| builder.write_min_size(v.parse().expect("input must be a 
number")));
 
         builder
     }
@@ -233,14 +224,6 @@ impl Builder for CosBuilder {
         let cred_loader = TencentCosCredentialLoader::new(client.client(), 
cfg);
 
         let signer = TencentCosSigner::new();
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 1024 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::Cos));
-        }
 
         debug!("backend build finished");
         Ok(CosBackend {
@@ -251,7 +234,6 @@ impl Builder for CosBuilder {
                 signer,
                 loader: cred_loader,
                 client,
-                write_min_size,
             }),
         })
     }
@@ -267,10 +249,7 @@ pub struct CosBackend {
 impl Accessor for CosBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<
-        oio::MultipartUploadWriter<CosWriter>,
-        oio::AppendObjectWriter<CosWriter>,
-    >;
+    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::AtLeastBufWriter<CosWriters>>;
     type BlockingWriter = ();
     type Pager = CosPager;
     type BlockingPager = ();
@@ -358,18 +337,26 @@ impl Accessor for CosBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let writer = CosWriter::new(self.core.clone(), path, args.clone());
 
-        let tw = if args.append() {
+        let w = if args.append() {
+            CosWriters::Three(oio::AppendObjectWriter::new(writer))
+        } else if args.content_length().is_some() {
+            CosWriters::One(oio::OneShotWriter::new(writer))
+        } else {
+            CosWriters::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
             let w =
-                
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
 
             oio::TwoWaysWriter::Two(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer);
-
             oio::TwoWaysWriter::One(w)
         };
 
-        return Ok((RpWrite::default(), tw));
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index e288a447c..490dada6e 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -45,7 +45,6 @@ pub struct CosCore {
     pub signer: TencentCosSigner,
     pub loader: TencentCosCredentialLoader,
     pub client: HttpClient,
-    pub write_min_size: usize,
 }
 
 impl Debug for CosCore {
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index cdcbb9b18..af2a6ebd0 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -27,6 +27,12 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
+pub type CosWriters = oio::ThreeWaysWriter<
+    oio::OneShotWriter<CosWriter>,
+    oio::MultipartUploadWriter<CosWriter>,
+    oio::AppendObjectWriter<CosWriter>,
+>;
+
 pub struct CosWriter {
     core: Arc<CosCore>,
 
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index dc7676d4a..78ecf0976 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -32,8 +33,14 @@ use super::error::parse_error;
 use super::pager::ObsPager;
 use super::writer::ObsWriter;
 use crate::raw::*;
+use crate::services::obs::writer::ObsWriters;
 use crate::*;
 
+/// The minimum multipart size of OBS is 5 MiB.
+///
+/// ref: 
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
+const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
+
 /// Huawei Cloud OBS services support.
 ///
 /// # Capabilities
@@ -91,9 +98,6 @@ use crate::*;
 ///     Ok(())
 /// }
 /// ```
-
-const DEFAULT_WRITE_MIN_SIZE: usize = 100 * 1024;
-
 /// Huawei-Cloud Object Storage Service (OBS) support
 #[derive(Default, Clone)]
 pub struct ObsBuilder {
@@ -103,9 +107,6 @@ pub struct ObsBuilder {
     secret_access_key: Option<String>,
     bucket: Option<String>,
     http_client: Option<HttpClient>,
-    /// the part size of obs multipart upload, which should be 100 KiB to 5 
GiB.
-    /// There is no minimum size limit on the last part of your multipart 
upload
-    write_min_size: Option<usize>,
 }
 
 impl Debug for ObsBuilder {
@@ -190,14 +191,6 @@ impl ObsBuilder {
         self.http_client = Some(client);
         self
     }
-
-    /// set the minimum size of unsized write, it should be greater than 100 
KB.
-    /// Reference: [Huawei Obs multipart upload 
limits](https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0099.html)
-    pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
-        self.write_min_size = Some(write_min_size);
-
-        self
-    }
 }
 
 impl Builder for ObsBuilder {
@@ -213,8 +206,6 @@ impl Builder for ObsBuilder {
         map.get("access_key_id").map(|v| builder.access_key_id(v));
         map.get("secret_access_key")
             .map(|v| builder.secret_access_key(v));
-        map.get("write_min_size")
-            .map(|v| builder.write_min_size(v.parse().expect("input must be a 
number")));
 
         builder
     }
@@ -296,14 +287,6 @@ impl Builder for ObsBuilder {
                 &endpoint
             }
         });
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 100 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::Obs));
-        }
 
         debug!("backend build finished");
         Ok(ObsBackend {
@@ -314,7 +297,6 @@ impl Builder for ObsBuilder {
                 signer,
                 loader,
                 client,
-                write_min_size,
             }),
         })
     }
@@ -330,10 +312,7 @@ pub struct ObsBackend {
 impl Accessor for ObsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<
-        oio::MultipartUploadWriter<ObsWriter>,
-        oio::AppendObjectWriter<ObsWriter>,
-    >;
+    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::AtLeastBufWriter<ObsWriters>>;
     type BlockingWriter = ();
     type Pager = ObsPager;
     type BlockingPager = ();
@@ -452,18 +431,26 @@ impl Accessor for ObsBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let writer = ObsWriter::new(self.core.clone(), path, args.clone());
 
-        let tw = if args.append() {
+        let w = if args.append() {
+            ObsWriters::Three(oio::AppendObjectWriter::new(writer))
+        } else if args.content_length().is_some() {
+            ObsWriters::One(oio::OneShotWriter::new(writer))
+        } else {
+            ObsWriters::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
             let w =
-                
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
 
             oio::TwoWaysWriter::Two(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer);
-
             oio::TwoWaysWriter::One(w)
         };
 
-        return Ok((RpWrite::default(), tw));
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index 52604fd50..3d8ba1daf 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -45,7 +45,6 @@ pub struct ObsCore {
     pub signer: HuaweicloudObsSigner,
     pub loader: HuaweicloudObsCredentialLoader,
     pub client: HttpClient,
-    pub write_min_size: usize,
 }
 
 impl Debug for ObsCore {
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 893a098d7..f8078a955 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -27,6 +27,12 @@ use crate::raw::oio::{MultipartUploadPart, Streamer};
 use crate::raw::*;
 use crate::*;
 
+pub type ObsWriters = oio::ThreeWaysWriter<
+    oio::OneShotWriter<ObsWriter>,
+    oio::MultipartUploadWriter<ObsWriter>,
+    oio::AppendObjectWriter<ObsWriter>,
+>;
+
 pub struct ObsWriter {
     core: Arc<ObsCore>,
 
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 029b3b52b..1aef92b77 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::fmt::Debug;
@@ -35,9 +36,13 @@ use super::error::parse_error;
 use super::pager::OssPager;
 use super::writer::OssWriter;
 use crate::raw::*;
+use crate::services::oss::writer::OssWriters;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
+/// The minimum multipart size of OSS is 100 KiB.
+///
+/// ref: 
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
+const MINIMUM_MULTIPART_SIZE: usize = 100 * 1024;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 /// Aliyun Object Storage Service (OSS) support
@@ -343,14 +348,6 @@ impl Builder for OssBuilder {
 
         let signer = AliyunOssSigner::new(bucket);
 
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 5 * 1024 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::Oss));
-        }
         let batch_max_operations = self
             .batch_max_operations
             .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
@@ -368,7 +365,6 @@ impl Builder for OssBuilder {
                 client,
                 server_side_encryption,
                 server_side_encryption_key_id,
-                write_min_size,
                 batch_max_operations,
             }),
         })
@@ -385,10 +381,7 @@ pub struct OssBackend {
 impl Accessor for OssBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<
-        oio::MultipartUploadWriter<OssWriter>,
-        oio::AppendObjectWriter<OssWriter>,
-    >;
+    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::AtLeastBufWriter<OssWriters>>;
     type BlockingWriter = ();
     type Pager = OssPager;
     type BlockingPager = ();
@@ -480,18 +473,26 @@ impl Accessor for OssBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let writer = OssWriter::new(self.core.clone(), path, args.clone());
 
-        let tw = if args.append() {
+        let w = if args.append() {
+            OssWriters::Three(oio::AppendObjectWriter::new(writer))
+        } else if args.content_length().is_some() {
+            OssWriters::One(oio::OneShotWriter::new(writer))
+        } else {
+            OssWriters::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
             let w =
-                
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
 
             oio::TwoWaysWriter::Two(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer);
-
             oio::TwoWaysWriter::One(w)
         };
 
-        return Ok((RpWrite::default(), tw));
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index 6c570fb17..a81db97fa 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -64,7 +64,6 @@ pub struct OssCore {
     pub client: HttpClient,
     pub loader: AliyunLoader,
     pub signer: AliyunOssSigner,
-    pub write_min_size: usize,
     pub batch_max_operations: usize,
 }
 
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index ff9c8f9d2..27aa09011 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -27,6 +27,12 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
+pub type OssWriters = oio::ThreeWaysWriter<
+    oio::OneShotWriter<OssWriter>,
+    oio::MultipartUploadWriter<OssWriter>,
+    oio::AppendObjectWriter<OssWriter>,
+>;
+
 pub struct OssWriter {
     core: Arc<OssCore>,
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 83ff8a7d0..700359009 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
@@ -43,6 +44,7 @@ use super::error::parse_s3_error_code;
 use super::pager::S3Pager;
 use super::writer::S3Writer;
 use crate::raw::*;
+use crate::services::s3::writer::S3Writers;
 use crate::*;
 
 /// Allow constructing correct region endpoint if user gives a global endpoint.
@@ -56,7 +58,10 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, 
&'static str>> = Lazy::new
     m
 });
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
+/// The minimum multipart size of S3 is 5 MiB.
+///
+/// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
+const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 /// Aws S3 and compatible services (including minio, digitalocean space, 
Tencent Cloud Object Storage(COS) and so on) support.
@@ -847,14 +852,6 @@ impl Builder for S3Builder {
 
         let signer = AwsV4Signer::new("s3", &region);
 
-        let write_min_size = 
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
-        if write_min_size < 5 * 1024 * 1024 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write minimum buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::S3));
-        }
         let batch_max_operations = self
             .batch_max_operations
             .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
@@ -874,7 +871,6 @@ impl Builder for S3Builder {
                 signer,
                 loader,
                 client,
-                write_min_size,
                 batch_max_operations,
             }),
         })
@@ -891,7 +887,7 @@ pub struct S3Backend {
 impl Accessor for S3Backend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::MultipartUploadWriter<S3Writer>;
+    type Writer = oio::TwoWaysWriter<S3Writers, 
oio::AtLeastBufWriter<S3Writers>>;
     type BlockingWriter = ();
     type Pager = S3Pager;
     type BlockingPager = ();
@@ -979,10 +975,26 @@ impl Accessor for S3Backend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        Ok((
-            RpWrite::default(),
-            S3Writer::new(self.core.clone(), path, args),
-        ))
+        let writer = S3Writer::new(self.core.clone(), path, args.clone());
+
+        let w = if args.content_length().is_some() {
+            S3Writers::One(oio::OneShotWriter::new(writer))
+        } else {
+            S3Writers::Two(oio::MultipartUploadWriter::new(writer))
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
+            let w =
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+
+            oio::TwoWaysWriter::Two(w)
+        } else {
+            oio::TwoWaysWriter::One(w)
+        };
+
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 36321280f..89989bcb7 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -82,7 +82,6 @@ pub struct S3Core {
     pub signer: AwsV4Signer,
     pub loader: Box<dyn AwsCredentialLoad>,
     pub client: HttpClient,
-    pub write_min_size: usize,
     pub batch_max_operations: usize,
 }
 
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index c323dd526..a27341d71 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -27,6 +27,9 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
+pub type S3Writers =
+    oio::TwoWaysWriter<oio::OneShotWriter<S3Writer>, 
oio::MultipartUploadWriter<S3Writer>>;
+
 pub struct S3Writer {
     core: Arc<S3Core>,
 
@@ -35,14 +38,12 @@ pub struct S3Writer {
 }
 
 impl S3Writer {
-    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> 
oio::MultipartUploadWriter<Self> {
-        let s3_writer = S3Writer {
+    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
+        S3Writer {
             core,
             path: path.to_string(),
             op,
-        };
-
-        oio::MultipartUploadWriter::new(s3_writer)
+        }
     }
 }
 
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index d3676f1ce..10a9c061c 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -395,6 +395,19 @@ impl FutureWrite {
         self
     }
 
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn buffer_size(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
+        self
+    }
+
     /// Set the content length of op.
     ///
     /// If the content length is not set, the content length will be
@@ -457,6 +470,19 @@ impl FutureWriter {
         self
     }
 
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn buffer_size(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_buffer_size(v));
+        self
+    }
+
     /// Set the content length of op.
     ///
     /// If the content length is not set, the content length will be
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index a1effab43..ae8825984 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1233,7 +1233,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
     let (content, size): (Vec<u8>, usize) =
         gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
 
-    let mut w = op.writer(&path).await?;
+    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
 
     // Wrap a buf reader here to make sure content is read in 1MiB chunks.
     let mut cursor = BufReader::with_capacity(1024 * 1024, 
Cursor::new(content.clone()));
@@ -1266,7 +1266,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) -> 
Result<()> {
 
     let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
 
-    let mut w = op.writer(&path).await?;
+    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
 
     for _ in 0..100 {
         match fuzzer.fuzz() {

Reply via email to