This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new e8f695e36 feat: Add buffer support for all services (#3045)
e8f695e36 is described below

commit e8f695e36d022b836f23cb0280c4991399f59ef6
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 11:43:26 2023 +0800

    feat: Add buffer support for all services (#3045)
    
    * Implement buffer cal
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add comments
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * feat: Add buffer support for all services
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Make clippy happy
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs                 | 48 +++++++++++++++++++----------
 core/src/services/cos/backend.rs            | 26 ++++++----------
 core/src/services/gcs/backend.rs            | 48 ++++++-----------------------
 core/src/services/gcs/core.rs               |  2 --
 core/src/services/obs/backend.rs            | 26 ++++++----------
 core/src/services/oss/backend.rs            | 25 ++++++---------
 core/src/services/s3/backend.rs             | 25 ++++++---------
 core/src/types/capability.rs                | 12 ++++++++
 core/src/types/operator/operator_futures.rs | 11 +++++--
 9 files changed, 98 insertions(+), 125 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 1abed7bbd..84b587efa 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -17,11 +17,11 @@
 
 use std::fmt::Debug;
 use std::fmt::Formatter;
-use std::io;
 use std::sync::Arc;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
+use std::{cmp, io};
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -95,17 +95,6 @@ use crate::*;
 /// If there is a hint that `ReadStreamable`, we will use existing reader
 /// directly. Otherwise, we will use transform this reader as a stream.
 ///
-/// ### Consume instead of Drop
-///
-/// Normally, if reader is seekable, we need to drop current reader and start
-/// a new read call.
-///
-/// We can consume the data if the seek position is close enough. For
-/// example, users try to seek to `Current(1)`, we can just read the data
-/// can consume it.
-///
-/// In this way, we can reduce the extra cost of dropping reader.
-///
 /// ## List Completion
 ///
 /// There are two styles of list, but not all services support both of
@@ -366,7 +355,10 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
     type Inner = A;
     type Reader = CompleteReader<A, A::Reader>;
     type BlockingReader = CompleteReader<A, A::BlockingReader>;
-    type Writer = CompleteWriter<A::Writer>;
+    type Writer = oio::TwoWaysWriter<
+        CompleteWriter<A::Writer>,
+        oio::ExactBufWriter<CompleteWriter<A::Writer>>,
+    >;
     type BlockingWriter = CompleteWriter<A::BlockingWriter>;
     type Pager = CompletePager<A, A::Pager>;
     type BlockingPager = CompletePager<A, A::BlockingPager>;
@@ -433,10 +425,32 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
             ));
         }
 
-        self.inner
-            .write(path, args)
-            .await
-            .map(|(rp, w)| (rp, CompleteWriter::new(w)))
+        // Calculate buffer size.
+        let buffer_size = args.buffer().map(|mut size| {
+            if let Some(v) = capability.write_multi_max_size {
+                size = cmp::min(v, size);
+            }
+            if let Some(v) = capability.write_multi_min_size {
+                size = cmp::max(v, size);
+            }
+            if let Some(v) = capability.write_multi_align_size {
+                // Make sure size >= size first.
+                size = cmp::max(v, size);
+                size -= size % v;
+            }
+
+            size
+        });
+
+        let (rp, w) = self.inner.write(path, args.clone()).await?;
+        let w = CompleteWriter::new(w);
+
+        let w = match buffer_size {
+            None => oio::TwoWaysWriter::One(w),
+            Some(size) => oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, 
size)),
+        };
+
+        Ok((rp, w))
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 897c4c10a..d4428ad69 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -36,11 +35,6 @@ use crate::raw::*;
 use crate::services::cos::writer::CosWriters;
 use crate::*;
 
-/// The minimum multipart size of COS is 1 MiB.
-///
-/// ref: <https://www.tencentcloud.com/document/product/436/14112>
-const MINIMUM_MULTIPART_SIZE: usize = 1024 * 1024;
-
 /// Tencent-Cloud COS services support.
 #[doc = include_str!("docs.md")]
 #[derive(Default, Clone)]
@@ -249,7 +243,7 @@ pub struct CosBackend {
 impl Accessor for CosBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::ExactBufWriter<CosWriters>>;
+    type Writer = CosWriters;
     type BlockingWriter = ();
     type Pager = CosPager;
     type BlockingPager = ();
@@ -276,6 +270,14 @@ impl Accessor for CosBackend {
                 write_with_content_type: true,
                 write_with_cache_control: true,
                 write_with_content_disposition: true,
+                // The min multipart size of COS is 1 MiB.
+                //
+                // ref: 
<https://www.tencentcloud.com/document/product/436/14112>
+                write_multi_min_size: Some(1024 * 1024),
+                // The max multipart size of COS is 5 GiB.
+                //
+                // ref: 
<https://www.tencentcloud.com/document/product/436/14112>
+                write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
 
                 delete: true,
                 create_dir: true,
@@ -342,16 +344,6 @@ impl Accessor for CosBackend {
             CosWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            let w = oio::ExactBufWriter::new(w, buffer_size);
-
-            oio::TwoWaysWriter::Two(w)
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 20e7a1f5e..d8ad7b9ce 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
@@ -41,8 +40,6 @@ use crate::*;
 
 const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";;
 const DEFAULT_GCS_SCOPE: &str = 
"https://www.googleapis.com/auth/devstorage.read_write";;
-/// It's recommended that you use at least 8 MiB for the chunk size.
-const DEFAULT_WRITE_FIXED_SIZE: usize = 8 * 1024 * 1024;
 
 /// [Google Cloud Storage](https://cloud.google.com/storage) services support.
 #[doc = include_str!("docs.md")]
@@ -69,9 +66,6 @@ pub struct GcsBuilder {
     customed_token_loader: Option<Box<dyn GoogleTokenLoad>>,
     predefined_acl: Option<String>,
     default_storage_class: Option<String>,
-
-    /// the fixed size writer uses to flush into underlying storage.
-    write_fixed_size: Option<usize>,
 }
 
 impl GcsBuilder {
@@ -189,16 +183,6 @@ impl GcsBuilder {
         };
         self
     }
-
-    /// The buffer size should be a multiple of 256 KiB (256 x 1024 bytes), 
unless it's the last chunk that completes the upload.
-    /// Larger chunk sizes typically make uploads faster, but note that 
there's a tradeoff between speed and memory usage.
-    /// It's recommended that you use at least 8 MiB for the chunk size.
-    /// Reference: [Perform resumable 
uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
-    pub fn write_fixed_size(&mut self, fixed_buffer_size: usize) -> &mut Self {
-        self.write_fixed_size = Some(fixed_buffer_size);
-
-        self
-    }
 }
 
 impl Debug for GcsBuilder {
@@ -298,17 +282,6 @@ impl Builder for GcsBuilder {
 
         let signer = GoogleSigner::new("storage");
 
-        let write_fixed_size = 
self.write_fixed_size.unwrap_or(DEFAULT_WRITE_FIXED_SIZE);
-        // GCS requires write must align with 256 KiB.
-        if write_fixed_size % (256 * 1024) != 0 {
-            return Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                "The write fixed buffer size is misconfigured",
-            )
-            .with_context("service", Scheme::Gcs)
-            .with_context("write_fixed_size", write_fixed_size.to_string()));
-        }
-
         let backend = GcsBackend {
             core: Arc::new(GcsCore {
                 endpoint,
@@ -320,7 +293,6 @@ impl Builder for GcsBuilder {
                 credential_loader: cred_loader,
                 predefined_acl: self.predefined_acl.clone(),
                 default_storage_class: self.default_storage_class.clone(),
-                write_fixed_size,
             }),
         };
 
@@ -338,7 +310,7 @@ pub struct GcsBackend {
 impl Accessor for GcsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<GcsWriters, 
oio::ExactBufWriter<GcsWriters>>;
+    type Writer = GcsWriters;
     type BlockingWriter = ();
     type Pager = GcsPager;
     type BlockingPager = ();
@@ -364,6 +336,13 @@ impl Accessor for GcsBackend {
                 write: true,
                 write_can_multi: true,
                 write_with_content_type: true,
+                // The buffer size should be a multiple of 256 KiB (256 x 1024 
bytes), unless it's the last chunk that completes the upload.
+                // Larger chunk sizes typically make uploads faster, but note 
that there's a tradeoff between speed and memory usage.
+                // It's recommended that you use at least 8 MiB for the chunk 
size.
+                //
+                // Reference: [Perform resumable 
uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
+                write_multi_align_size: Some(256 * 1024 * 1024),
+
                 delete: true,
                 copy: true,
 
@@ -420,18 +399,9 @@ impl Accessor for GcsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let w = GcsWriter::new(self.core.clone(), path, args.clone());
+        let w = GcsWriter::new(self.core.clone(), path, args);
         let w = oio::RangeWriter::new(w);
 
-        let w = if let Some(buffer_size) = args.buffer() {
-            // FIXME: we should align with 256KiB instead.
-            let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size);
-
-            oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 40acf42bc..c9eeb9cb8 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -55,8 +55,6 @@ pub struct GcsCore {
 
     pub predefined_acl: Option<String>,
     pub default_storage_class: Option<String>,
-
-    pub write_fixed_size: usize,
 }
 
 impl Debug for GcsCore {
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 7caa6d749..ab0a6b32e 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -36,11 +35,6 @@ use crate::raw::*;
 use crate::services::obs::writer::ObsWriters;
 use crate::*;
 
-/// The minimum multipart size of OBS is 5 MiB.
-///
-/// ref: 
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
-const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
-
 /// Huawei-Cloud Object Storage Service (OBS) support
 #[doc = include_str!("docs.md")]
 #[derive(Default, Clone)]
@@ -256,7 +250,7 @@ pub struct ObsBackend {
 impl Accessor for ObsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::ExactBufWriter<ObsWriters>>;
+    type Writer = ObsWriters;
     type BlockingWriter = ();
     type Pager = ObsPager;
     type BlockingPager = ();
@@ -282,6 +276,14 @@ impl Accessor for ObsBackend {
                 write_can_multi: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
+                // The min multipart size of OBS is 5 MiB.
+                //
+                // ref: 
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
+                write_multi_min_size: Some(5 * 1024 * 1024),
+                // The max multipart size of OBS is 5 GiB.
+                //
+                // ref: 
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
+                write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
 
                 delete: true,
                 create_dir: true,
@@ -380,16 +382,6 @@ impl Accessor for ObsBackend {
             ObsWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            let w = oio::ExactBufWriter::new(w, buffer_size);
-
-            oio::TwoWaysWriter::Two(w)
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 9022079bc..b90fe8f2c 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::fmt::Debug;
@@ -39,10 +38,6 @@ use crate::raw::*;
 use crate::services::oss::writer::OssWriters;
 use crate::*;
 
-/// The minimum multipart size of OSS is 100 KiB.
-///
-/// ref: 
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
-const MINIMUM_MULTIPART_SIZE: usize = 100 * 1024;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 /// Aliyun Object Storage Service (OSS) support
@@ -381,7 +376,7 @@ pub struct OssBackend {
 impl Accessor for OssBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::ExactBufWriter<OssWriters>>;
+    type Writer = OssWriters;
     type BlockingWriter = ();
     type Pager = OssPager;
     type BlockingPager = ();
@@ -408,6 +403,14 @@ impl Accessor for OssBackend {
                 write_with_cache_control: true,
                 write_with_content_type: true,
                 write_with_content_disposition: true,
+                // The min multipart size of OSS is 100 KiB.
+                //
+                // ref: 
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
+                write_multi_min_size: Some(100 * 1024),
+                // The max multipart size of OSS is 5 GiB.
+                //
+                // ref: 
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
+                write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
 
                 delete: true,
                 create_dir: true,
@@ -479,16 +482,6 @@ impl Accessor for OssBackend {
             OssWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            let w = oio::ExactBufWriter::new(w, buffer_size);
-
-            oio::TwoWaysWriter::Two(w)
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 89a8330b8..e0433a831 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
@@ -58,10 +57,6 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, 
&'static str>> = Lazy::new
     m
 });
 
-/// The minimum multipart size of S3 is 5 MiB.
-///
-/// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
-const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 /// Aws S3 and compatible services (including minio, digitalocean space, 
Tencent Cloud Object Storage(COS) and so on) support.
@@ -888,7 +883,7 @@ pub struct S3Backend {
 impl Accessor for S3Backend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<S3Writers, 
oio::ExactBufWriter<S3Writers>>;
+    type Writer = S3Writers;
     type BlockingWriter = ();
     type Pager = S3Pager;
     type BlockingPager = ();
@@ -916,6 +911,14 @@ impl Accessor for S3Backend {
                 write_can_multi: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
+                // The min multipart size of S3 is 5 MiB.
+                //
+                // ref: 
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
+                write_multi_min_size: Some(5 * 1024 * 1024),
+                // The max multipart size of S3 is 5 GiB.
+                //
+                // ref: 
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
+                write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
 
                 create_dir: true,
                 delete: true,
@@ -976,18 +979,10 @@ impl Accessor for S3Backend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let writer = S3Writer::new(self.core.clone(), path, args.clone());
+        let writer = S3Writer::new(self.core.clone(), path, args);
 
         let w = oio::MultipartUploadWriter::new(writer);
 
-        let w = if let Some(buffer_size) = args.buffer() {
-            let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
-            oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
-        } else {
-            oio::TwoWaysWriter::One(w)
-        };
-
         Ok((RpWrite::default(), w))
     }
 
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index b568b3ee8..abe7eb9c4 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -84,6 +84,18 @@ pub struct Capability {
     pub write_with_content_disposition: bool,
     /// If operator supports write with cache control.
     pub write_with_cache_control: bool,
+    /// write_multi_max_size is the max size that services support in 
write_multi.
+    ///
+    /// For example, AWS S3 supports 5GiB as max in write_multi.
+    pub write_multi_max_size: Option<usize>,
+    /// write_multi_min_size is the min size that services support in 
write_multi.
+    ///
+    /// For example, AWS S3 requires at least 5MiB in write_multi expect the 
last one.
+    pub write_multi_min_size: Option<usize>,
+    /// write_multi_align_size is the align size that services required in 
write_multi.
+    ///
+    /// For example, Google GCS requires align size to 256KiB in write_multi.
+    pub write_multi_align_size: Option<usize>,
 
     /// If operator supports create dir.
     pub create_dir: bool,
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index 63c013f05..879601769 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -454,8 +454,15 @@ impl FutureWriter {
     ///
     /// ## NOTE
     ///
-    /// Service could have their own minimum buffer size while perform write 
operations like
-    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    /// Service could have their own limitation for buffer size. It's possible 
that buffer size
+    /// is not equal to the given buffer size.
+    ///
+    /// For example:
+    ///
+    /// - AWS S3 requires the part size to be in [5MiB, 5GiB].
+    /// - GCS requires the part size to be aligned with 256 KiB.
+    ///
+    /// The services will alter the buffer size to meet their requirements.
     pub fn buffer(mut self, v: usize) -> Self {
         self.0 = self.0.map_args(|args| args.with_buffer(v));
         self

Reply via email to