This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new e8f695e36 feat: Add buffer support for all services (#3045)
e8f695e36 is described below
commit e8f695e36d022b836f23cb0280c4991399f59ef6
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 11:43:26 2023 +0800
feat: Add buffer support for all services (#3045)
* Implement buffer cal
Signed-off-by: Xuanwo <[email protected]>
* Add comments
Signed-off-by: Xuanwo <[email protected]>
* feat: Add buffer support for all services
Signed-off-by: Xuanwo <[email protected]>
* Make clippy happy
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/layers/complete.rs | 48 +++++++++++++++++++----------
core/src/services/cos/backend.rs | 26 ++++++----------
core/src/services/gcs/backend.rs | 48 ++++++-----------------------
core/src/services/gcs/core.rs | 2 --
core/src/services/obs/backend.rs | 26 ++++++----------
core/src/services/oss/backend.rs | 25 ++++++---------
core/src/services/s3/backend.rs | 25 ++++++---------
core/src/types/capability.rs | 12 ++++++++
core/src/types/operator/operator_futures.rs | 11 +++++--
9 files changed, 98 insertions(+), 125 deletions(-)
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 1abed7bbd..84b587efa 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -17,11 +17,11 @@
use std::fmt::Debug;
use std::fmt::Formatter;
-use std::io;
use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
+use std::{cmp, io};
use async_trait::async_trait;
use bytes::Bytes;
@@ -95,17 +95,6 @@ use crate::*;
/// If there is a hint that `ReadStreamable`, we will use existing reader
/// directly. Otherwise, we will use transform this reader as a stream.
///
-/// ### Consume instead of Drop
-///
-/// Normally, if reader is seekable, we need to drop current reader and start
-/// a new read call.
-///
-/// We can consume the data if the seek position is close enough. For
-/// example, users try to seek to `Current(1)`, we can just read the data
-/// can consume it.
-///
-/// In this way, we can reduce the extra cost of dropping reader.
-///
/// ## List Completion
///
/// There are two styles of list, but not all services support both of
@@ -366,7 +355,10 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
type Inner = A;
type Reader = CompleteReader<A, A::Reader>;
type BlockingReader = CompleteReader<A, A::BlockingReader>;
- type Writer = CompleteWriter<A::Writer>;
+ type Writer = oio::TwoWaysWriter<
+ CompleteWriter<A::Writer>,
+ oio::ExactBufWriter<CompleteWriter<A::Writer>>,
+ >;
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
type Pager = CompletePager<A, A::Pager>;
type BlockingPager = CompletePager<A, A::BlockingPager>;
@@ -433,10 +425,32 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
));
}
- self.inner
- .write(path, args)
- .await
- .map(|(rp, w)| (rp, CompleteWriter::new(w)))
+ // Calculate buffer size.
+ let buffer_size = args.buffer().map(|mut size| {
+ if let Some(v) = capability.write_multi_max_size {
+ size = cmp::min(v, size);
+ }
+ if let Some(v) = capability.write_multi_min_size {
+ size = cmp::max(v, size);
+ }
+ if let Some(v) = capability.write_multi_align_size {
+ // Make sure size >= size first.
+ size = cmp::max(v, size);
+ size -= size % v;
+ }
+
+ size
+ });
+
+ let (rp, w) = self.inner.write(path, args.clone()).await?;
+ let w = CompleteWriter::new(w);
+
+ let w = match buffer_size {
+ None => oio::TwoWaysWriter::One(w),
+ Some(size) => oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w,
size)),
+ };
+
+ Ok((rp, w))
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 897c4c10a..d4428ad69 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@@ -36,11 +35,6 @@ use crate::raw::*;
use crate::services::cos::writer::CosWriters;
use crate::*;
-/// The minimum multipart size of COS is 1 MiB.
-///
-/// ref: <https://www.tencentcloud.com/document/product/436/14112>
-const MINIMUM_MULTIPART_SIZE: usize = 1024 * 1024;
-
/// Tencent-Cloud COS services support.
#[doc = include_str!("docs.md")]
#[derive(Default, Clone)]
@@ -249,7 +243,7 @@ pub struct CosBackend {
impl Accessor for CosBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<CosWriters,
oio::ExactBufWriter<CosWriters>>;
+ type Writer = CosWriters;
type BlockingWriter = ();
type Pager = CosPager;
type BlockingPager = ();
@@ -276,6 +270,14 @@ impl Accessor for CosBackend {
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_disposition: true,
+ // The min multipart size of COS is 1 MiB.
+ //
+ // ref:
<https://www.tencentcloud.com/document/product/436/14112>
+ write_multi_min_size: Some(1024 * 1024),
+ // The max multipart size of COS is 5 GiB.
+ //
+ // ref:
<https://www.tencentcloud.com/document/product/436/14112>
+ write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
delete: true,
create_dir: true,
@@ -342,16 +344,6 @@ impl Accessor for CosBackend {
CosWriters::One(oio::MultipartUploadWriter::new(writer))
};
- let w = if let Some(buffer_size) = args.buffer() {
- let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
- let w = oio::ExactBufWriter::new(w, buffer_size);
-
- oio::TwoWaysWriter::Two(w)
- } else {
- oio::TwoWaysWriter::One(w)
- };
-
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 20e7a1f5e..d8ad7b9ce 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
@@ -41,8 +40,6 @@ use crate::*;
const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
const DEFAULT_GCS_SCOPE: &str =
"https://www.googleapis.com/auth/devstorage.read_write";
-/// It's recommended that you use at least 8 MiB for the chunk size.
-const DEFAULT_WRITE_FIXED_SIZE: usize = 8 * 1024 * 1024;
/// [Google Cloud Storage](https://cloud.google.com/storage) services support.
#[doc = include_str!("docs.md")]
@@ -69,9 +66,6 @@ pub struct GcsBuilder {
customed_token_loader: Option<Box<dyn GoogleTokenLoad>>,
predefined_acl: Option<String>,
default_storage_class: Option<String>,
-
- /// the fixed size writer uses to flush into underlying storage.
- write_fixed_size: Option<usize>,
}
impl GcsBuilder {
@@ -189,16 +183,6 @@ impl GcsBuilder {
};
self
}
-
- /// The buffer size should be a multiple of 256 KiB (256 x 1024 bytes),
unless it's the last chunk that completes the upload.
- /// Larger chunk sizes typically make uploads faster, but note that
there's a tradeoff between speed and memory usage.
- /// It's recommended that you use at least 8 MiB for the chunk size.
- /// Reference: [Perform resumable
uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
- pub fn write_fixed_size(&mut self, fixed_buffer_size: usize) -> &mut Self {
- self.write_fixed_size = Some(fixed_buffer_size);
-
- self
- }
}
impl Debug for GcsBuilder {
@@ -298,17 +282,6 @@ impl Builder for GcsBuilder {
let signer = GoogleSigner::new("storage");
- let write_fixed_size =
self.write_fixed_size.unwrap_or(DEFAULT_WRITE_FIXED_SIZE);
- // GCS requires write must align with 256 KiB.
- if write_fixed_size % (256 * 1024) != 0 {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "The write fixed buffer size is misconfigured",
- )
- .with_context("service", Scheme::Gcs)
- .with_context("write_fixed_size", write_fixed_size.to_string()));
- }
-
let backend = GcsBackend {
core: Arc::new(GcsCore {
endpoint,
@@ -320,7 +293,6 @@ impl Builder for GcsBuilder {
credential_loader: cred_loader,
predefined_acl: self.predefined_acl.clone(),
default_storage_class: self.default_storage_class.clone(),
- write_fixed_size,
}),
};
@@ -338,7 +310,7 @@ pub struct GcsBackend {
impl Accessor for GcsBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<GcsWriters,
oio::ExactBufWriter<GcsWriters>>;
+ type Writer = GcsWriters;
type BlockingWriter = ();
type Pager = GcsPager;
type BlockingPager = ();
@@ -364,6 +336,13 @@ impl Accessor for GcsBackend {
write: true,
write_can_multi: true,
write_with_content_type: true,
+ // The buffer size should be a multiple of 256 KiB (256 x 1024
bytes), unless it's the last chunk that completes the upload.
+ // Larger chunk sizes typically make uploads faster, but note
that there's a tradeoff between speed and memory usage.
+ // It's recommended that you use at least 8 MiB for the chunk
size.
+ //
+ // Reference: [Perform resumable
uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
+ write_multi_align_size: Some(256 * 1024 * 1024),
+
delete: true,
copy: true,
@@ -420,18 +399,9 @@ impl Accessor for GcsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let w = GcsWriter::new(self.core.clone(), path, args.clone());
+ let w = GcsWriter::new(self.core.clone(), path, args);
let w = oio::RangeWriter::new(w);
- let w = if let Some(buffer_size) = args.buffer() {
- // FIXME: we should align with 256KiB instead.
- let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size);
-
- oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
- } else {
- oio::TwoWaysWriter::One(w)
- };
-
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 40acf42bc..c9eeb9cb8 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -55,8 +55,6 @@ pub struct GcsCore {
pub predefined_acl: Option<String>,
pub default_storage_class: Option<String>,
-
- pub write_fixed_size: usize,
}
impl Debug for GcsCore {
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 7caa6d749..ab0a6b32e 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@@ -36,11 +35,6 @@ use crate::raw::*;
use crate::services::obs::writer::ObsWriters;
use crate::*;
-/// The minimum multipart size of OBS is 5 MiB.
-///
-/// ref:
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
-const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
-
/// Huawei-Cloud Object Storage Service (OBS) support
#[doc = include_str!("docs.md")]
#[derive(Default, Clone)]
@@ -256,7 +250,7 @@ pub struct ObsBackend {
impl Accessor for ObsBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<ObsWriters,
oio::ExactBufWriter<ObsWriters>>;
+ type Writer = ObsWriters;
type BlockingWriter = ();
type Pager = ObsPager;
type BlockingPager = ();
@@ -282,6 +276,14 @@ impl Accessor for ObsBackend {
write_can_multi: true,
write_with_content_type: true,
write_with_cache_control: true,
+ // The min multipart size of OBS is 5 MiB.
+ //
+ // ref:
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
+ write_multi_min_size: Some(5 * 1024 * 1024),
+ // The max multipart size of OBS is 5 GiB.
+ //
+ // ref:
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
+ write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
delete: true,
create_dir: true,
@@ -380,16 +382,6 @@ impl Accessor for ObsBackend {
ObsWriters::One(oio::MultipartUploadWriter::new(writer))
};
- let w = if let Some(buffer_size) = args.buffer() {
- let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
- let w = oio::ExactBufWriter::new(w, buffer_size);
-
- oio::TwoWaysWriter::Two(w)
- } else {
- oio::TwoWaysWriter::One(w)
- };
-
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 9022079bc..b90fe8f2c 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp::max;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
@@ -39,10 +38,6 @@ use crate::raw::*;
use crate::services::oss::writer::OssWriters;
use crate::*;
-/// The minimum multipart size of OSS is 100 KiB.
-///
-/// ref:
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
-const MINIMUM_MULTIPART_SIZE: usize = 100 * 1024;
const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
/// Aliyun Object Storage Service (OSS) support
@@ -381,7 +376,7 @@ pub struct OssBackend {
impl Accessor for OssBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<OssWriters,
oio::ExactBufWriter<OssWriters>>;
+ type Writer = OssWriters;
type BlockingWriter = ();
type Pager = OssPager;
type BlockingPager = ();
@@ -408,6 +403,14 @@ impl Accessor for OssBackend {
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
+ // The min multipart size of OSS is 100 KiB.
+ //
+ // ref:
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
+ write_multi_min_size: Some(100 * 1024),
+ // The max multipart size of OSS is 5 GiB.
+ //
+ // ref:
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
+ write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
delete: true,
create_dir: true,
@@ -479,16 +482,6 @@ impl Accessor for OssBackend {
OssWriters::One(oio::MultipartUploadWriter::new(writer))
};
- let w = if let Some(buffer_size) = args.buffer() {
- let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
- let w = oio::ExactBufWriter::new(w, buffer_size);
-
- oio::TwoWaysWriter::Two(w)
- } else {
- oio::TwoWaysWriter::One(w)
- };
-
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 89a8330b8..e0433a831 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
@@ -58,10 +57,6 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str,
&'static str>> = Lazy::new
m
});
-/// The minimum multipart size of S3 is 5 MiB.
-///
-/// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
-const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
/// Aws S3 and compatible services (including minio, digitalocean space,
Tencent Cloud Object Storage(COS) and so on) support.
@@ -888,7 +883,7 @@ pub struct S3Backend {
impl Accessor for S3Backend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<S3Writers,
oio::ExactBufWriter<S3Writers>>;
+ type Writer = S3Writers;
type BlockingWriter = ();
type Pager = S3Pager;
type BlockingPager = ();
@@ -916,6 +911,14 @@ impl Accessor for S3Backend {
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
+ // The min multipart size of S3 is 5 MiB.
+ //
+ // ref:
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
+ write_multi_min_size: Some(5 * 1024 * 1024),
+ // The max multipart size of S3 is 5 GiB.
+ //
+ // ref:
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
+ write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
create_dir: true,
delete: true,
@@ -976,18 +979,10 @@ impl Accessor for S3Backend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let writer = S3Writer::new(self.core.clone(), path, args.clone());
+ let writer = S3Writer::new(self.core.clone(), path, args);
let w = oio::MultipartUploadWriter::new(writer);
- let w = if let Some(buffer_size) = args.buffer() {
- let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
-
- oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
- } else {
- oio::TwoWaysWriter::One(w)
- };
-
Ok((RpWrite::default(), w))
}
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index b568b3ee8..abe7eb9c4 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -84,6 +84,18 @@ pub struct Capability {
pub write_with_content_disposition: bool,
/// If operator supports write with cache control.
pub write_with_cache_control: bool,
+ /// write_multi_max_size is the max size that services support in
write_multi.
+ ///
+ /// For example, AWS S3 supports 5GiB as max in write_multi.
+ pub write_multi_max_size: Option<usize>,
+ /// write_multi_min_size is the min size that services support in
write_multi.
+ ///
+ /// For example, AWS S3 requires at least 5MiB in write_multi expect the
last one.
+ pub write_multi_min_size: Option<usize>,
+ /// write_multi_align_size is the align size that services required in
write_multi.
+ ///
+ /// For example, Google GCS requires align size to 256KiB in write_multi.
+ pub write_multi_align_size: Option<usize>,
/// If operator supports create dir.
pub create_dir: bool,
diff --git a/core/src/types/operator/operator_futures.rs
b/core/src/types/operator/operator_futures.rs
index 63c013f05..879601769 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -454,8 +454,15 @@ impl FutureWriter {
///
/// ## NOTE
///
- /// Service could have their own minimum buffer size while perform write
operations like
- /// multipart uploads. So the buffer size may be larger than the given
buffer size.
+ /// Service could have their own limitation for buffer size. It's possible
that buffer size
+ /// is not equal to the given buffer size.
+ ///
+ /// For example:
+ ///
+ /// - AWS S3 requires the part size to be in [5MiB, 5GiB].
+ /// - GCS requires the part size to be aligned with 256 KiB.
+ ///
+ /// The services will alter the buffer size to meet their requirements.
pub fn buffer(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|args| args.with_buffer(v));
self