This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 722993fd6 fix: Enable exact_buf_write for R2 (#2935)
722993fd6 is described below
commit 722993fd6e4498f100a5b6e5099721338aa41e03
Author: Xuanwo <[email protected]>
AuthorDate: Fri Aug 25 16:12:14 2023 +0800
fix: Enable exact_buf_write for R2 (#2935)
Signed-off-by: Xuanwo <[email protected]>
---
.github/workflows/service_test_s3.yml | 69 +++++++++++++++--------------
core/src/services/s3/backend.rs | 51 ++++++++++++---------
core/src/services/s3/compatible_services.md | 1 +
core/src/services/s3/core.rs | 1 +
4 files changed, 67 insertions(+), 55 deletions(-)
diff --git a/.github/workflows/service_test_s3.yml
b/.github/workflows/service_test_s3.yml
index e066e375b..2a280c7b9 100644
--- a/.github/workflows/service_test_s3.yml
+++ b/.github/workflows/service_test_s3.yml
@@ -174,37 +174,38 @@ jobs:
OPENDAL_S3_ALLOW_ANONYMOUS: on
OPENDAL_S3_REGION: us-east-1
-# Disable this test until we addressed
https://github.com/apache/incubator-opendal/issues/2904
-#
-# r2:
-# runs-on: ubuntu-latest
-# if: github.event_name == 'push' ||
!github.event.pull_request.head.repo.fork
-# steps:
-# - uses: actions/checkout@v3
-# - name: Setup Rust toolchain
-# uses: ./.github/actions/setup
-# with:
-# need-nextest: true
-#
-# - name: Load secret
-# id: op-load-secret
-# uses: 1password/load-secrets-action@v1
-# with:
-# export-env: true
-# env:
-# OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
-# OPENDAL_S3_TEST: op://services/r2/test
-# OPENDAL_S3_BUCKET: op://services/r2/bucket
-# OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
-# OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
-# OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
-#
-# - name: Test
-# shell: bash
-# working-directory: core
-# run: cargo nextest run s3
-# env:
-# OPENDAL_S3_REGION: auto
-# # This is the R2's limitation
-# # Refer to
https://opendal.apache.org/docs/services/s3#compatible-services for more
information
-# OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
+ r2:
+ runs-on: ubuntu-latest
+ if: github.event_name == 'push' ||
!github.event.pull_request.head.repo.fork
+ steps:
+ - uses: actions/checkout@v3
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+ with:
+ need-nextest: true
+
+ - name: Load secret
+ id: op-load-secret
+ uses: 1password/load-secrets-action@v1
+ with:
+ export-env: true
+ env:
+ OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
+ OPENDAL_S3_TEST: op://services/r2/test
+ OPENDAL_S3_BUCKET: op://services/r2/bucket
+ OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
+ OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
+ OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
+
+ - name: Test
+ shell: bash
+ working-directory: core
+ run: cargo nextest run s3
+ env:
+ OPENDAL_S3_REGION: auto
+ # This is the R2's limitation
+ # Refer to
https://opendal.apache.org/docs/services/s3#compatible-services for more
information
+ OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
+ # This is the R2's limitation
+ # Refer to
https://opendal.apache.org/docs/services/s3#compatible-services for more
information
+ OPENDAL_S3_ENABLE_EXACT_BUF_WRITE: true
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 700359009..db8c4db00 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -87,7 +87,7 @@ pub struct S3Builder {
allow_anonymous: bool,
customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,
- // S3 features flags
+ // S3 feature
server_side_encryption: Option<String>,
server_side_encryption_aws_kms_key_id: Option<String>,
server_side_encryption_customer_algorithm: Option<String>,
@@ -95,12 +95,8 @@ pub struct S3Builder {
server_side_encryption_customer_key_md5: Option<String>,
default_storage_class: Option<String>,
enable_virtual_host_style: bool,
-
- /// the part size of s3 multipart upload, which should be 5 MiB to 5 GiB.
- /// There is no minimum size limit on the last part of your multipart
upload
- write_min_size: Option<usize>,
- /// batch_max_operations
batch_max_operations: Option<usize>,
+ enable_exact_buf_write: bool,
http_client: Option<HttpClient>,
}
@@ -519,13 +515,6 @@ impl S3Builder {
endpoint
}
- /// set the minimum size of unsized write, it should be greater than 5 MB.
- /// Reference: [Amazon S3 multipart upload
limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
- pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
- self.write_min_size = Some(write_min_size);
-
- self
- }
/// Set maximum batch operations of this backend.
pub fn batch_max_operations(&mut self, batch_max_operations: usize) ->
&mut Self {
self.batch_max_operations = Some(batch_max_operations);
@@ -533,6 +522,16 @@ impl S3Builder {
self
}
+ /// Enable exact buf write so that opendal will write data with exact size.
+ ///
+ /// This option is used for services like R2 which requires all parts must
be the same size
+ /// except the last part.
+ pub fn enable_exact_buf_write(&mut self) -> &mut Self {
+ self.enable_exact_buf_write = true;
+
+ self
+ }
+
/// Detect region of S3 bucket.
///
/// # Args
@@ -685,10 +684,11 @@ impl Builder for S3Builder {
.map(|_| builder.allow_anonymous());
map.get("default_storage_class")
.map(|v: &String| builder.default_storage_class(v));
- map.get("write_min_size")
- .map(|v| builder.write_min_size(v.parse().expect("input must be a
number")));
map.get("batch_max_operations")
.map(|v| builder.batch_max_operations(v.parse().expect("input must
be a number")));
+ map.get("enable_exact_buf_write")
+ .filter(|v| *v == "on" || *v == "true")
+ .map(|_| builder.enable_exact_buf_write());
builder
}
@@ -868,6 +868,7 @@ impl Builder for S3Builder {
server_side_encryption_customer_key_md5,
default_storage_class,
allow_anonymous: self.allow_anonymous,
+ enable_exact_buf_write: self.enable_exact_buf_write,
signer,
loader,
client,
@@ -887,7 +888,11 @@ pub struct S3Backend {
impl Accessor for S3Backend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<S3Writers,
oio::AtLeastBufWriter<S3Writers>>;
+ type Writer = oio::ThreeWaysWriter<
+ S3Writers,
+ oio::AtLeastBufWriter<S3Writers>,
+ oio::ExactBufWriter<S3Writers>,
+ >;
type BlockingWriter = ();
type Pager = S3Pager;
type BlockingPager = ();
@@ -986,12 +991,16 @@ impl Accessor for S3Backend {
let w = if let Some(buffer_size) = args.buffer_size() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
- let w =
- oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
-
- oio::TwoWaysWriter::Two(w)
+ if self.core.enable_exact_buf_write {
+ oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w,
buffer_size))
+ } else {
+ oio::ThreeWaysWriter::Two(
+ oio::AtLeastBufWriter::new(w, buffer_size)
+ .with_total_size(args.content_length()),
+ )
+ }
} else {
- oio::TwoWaysWriter::One(w)
+ oio::ThreeWaysWriter::One(w)
};
Ok((RpWrite::default(), w))
diff --git a/core/src/services/s3/compatible_services.md
b/core/src/services/s3/compatible_services.md
index d5036cb3d..8ef722db1 100644
--- a/core/src/services/s3/compatible_services.md
+++ b/core/src/services/s3/compatible_services.md
@@ -111,3 +111,4 @@ To connect to r2, we need to set:
- `bucket`: The bucket name of r2.
- `region`: When you create a new bucket, the data location is set to
Automatic by default. So please use `auto` for region.
- `batch_max_operations`: R2's delete objects will return `Internal Error` if
the batch is larger than `700`. Please set this value `<= 700` to make sure
batch delete work as expected.
+- `enable_exact_buf_write`: R2 requires the non-tailing parts size to be
exactly the same. Please enable this option to avoid the error `All
non-trailing parts must have the same length`.
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 89989bcb7..ea77b7108 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -78,6 +78,7 @@ pub struct S3Core {
pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
pub default_storage_class: Option<HeaderValue>,
pub allow_anonymous: bool,
+ pub enable_exact_buf_write: bool,
pub signer: AwsV4Signer,
pub loader: Box<dyn AwsCredentialLoad>,