This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch exact-buf in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 95e16acc3f8242219265d5380d870b1b5dd7bbca Author: Xuanwo <[email protected]> AuthorDate: Fri Aug 25 15:51:16 2023 +0800 fix: Enable exact_buf_write for R2 Signed-off-by: Xuanwo <[email protected]> --- .github/workflows/service_test_s3.yml | 69 +++++++++++++++-------------- core/src/services/s3/backend.rs | 51 ++++++++++++--------- core/src/services/s3/compatible_services.md | 1 + core/src/services/s3/core.rs | 1 + 4 files changed, 67 insertions(+), 55 deletions(-) diff --git a/.github/workflows/service_test_s3.yml b/.github/workflows/service_test_s3.yml index e066e375b..2a280c7b9 100644 --- a/.github/workflows/service_test_s3.yml +++ b/.github/workflows/service_test_s3.yml @@ -174,37 +174,38 @@ jobs: OPENDAL_S3_ALLOW_ANONYMOUS: on OPENDAL_S3_REGION: us-east-1 -# Disable this test until we addressed https://github.com/apache/incubator-opendal/issues/2904 -# -# r2: -# runs-on: ubuntu-latest -# if: github.event_name == 'push' || !github.event.pull_request.head.repo.fork -# steps: -# - uses: actions/checkout@v3 -# - name: Setup Rust toolchain -# uses: ./.github/actions/setup -# with: -# need-nextest: true -# -# - name: Load secret -# id: op-load-secret -# uses: 1password/load-secrets-action@v1 -# with: -# export-env: true -# env: -# OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }} -# OPENDAL_S3_TEST: op://services/r2/test -# OPENDAL_S3_BUCKET: op://services/r2/bucket -# OPENDAL_S3_ENDPOINT: op://services/r2/endpoint -# OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id -# OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key -# -# - name: Test -# shell: bash -# working-directory: core -# run: cargo nextest run s3 -# env: -# OPENDAL_S3_REGION: auto -# # This is the R2's limitation -# # Refer to https://opendal.apache.org/docs/services/s3#compatible-services for more information -# OPENDAL_S3_BATCH_MAX_OPERATIONS: 700 + r2: + runs-on: ubuntu-latest + if: github.event_name == 'push' || !github.event.pull_request.head.repo.fork + steps: + - uses: actions/checkout@v3 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + need-nextest: true + + - name: Load secret + id: op-load-secret + uses: 1password/load-secrets-action@v1 + with: + export-env: true + env: + OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }} + OPENDAL_S3_TEST: op://services/r2/test + OPENDAL_S3_BUCKET: op://services/r2/bucket + OPENDAL_S3_ENDPOINT: op://services/r2/endpoint + OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id + OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key + + - name: Test + shell: bash + working-directory: core + run: cargo nextest run s3 + env: + OPENDAL_S3_REGION: auto + # This is the R2's limitation + # Refer to https://opendal.apache.org/docs/services/s3#compatible-services for more information + OPENDAL_S3_BATCH_MAX_OPERATIONS: 700 + # This is the R2's limitation + # Refer to https://opendal.apache.org/docs/services/s3#compatible-services for more information + OPENDAL_S3_ENABLE_EXACT_BUF_WRITE: true diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 700359009..db8c4db00 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -87,7 +87,7 @@ pub struct S3Builder { allow_anonymous: bool, customed_credential_load: Option<Box<dyn AwsCredentialLoad>>, - // S3 features flags + // S3 feature server_side_encryption: Option<String>, server_side_encryption_aws_kms_key_id: Option<String>, server_side_encryption_customer_algorithm: Option<String>, @@ -95,12 +95,8 @@ pub struct S3Builder { server_side_encryption_customer_key_md5: Option<String>, default_storage_class: Option<String>, enable_virtual_host_style: bool, - - /// the part size of s3 multipart upload, which should be 5 MiB to 5 GiB. - /// There is no minimum size limit on the last part of your multipart upload - write_min_size: Option<usize>, - /// batch_max_operations batch_max_operations: Option<usize>, + enable_exact_buf_write: bool, http_client: Option<HttpClient>, } @@ -519,13 +515,6 @@ impl S3Builder { endpoint } - /// set the minimum size of unsized write, it should be greater than 5 MB. - /// Reference: [Amazon S3 multipart upload limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) - pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self { - self.write_min_size = Some(write_min_size); - - self - } /// Set maximum batch operations of this backend. pub fn batch_max_operations(&mut self, batch_max_operations: usize) -> &mut Self { self.batch_max_operations = Some(batch_max_operations); @@ -533,6 +522,16 @@ impl S3Builder { self } + /// Enable exact buf write so that opendal will write data with exact size. + /// + /// This option is used for services like R2 which requires all parts must be the same size + /// except the last part. + pub fn enable_exact_buf_write(&mut self) -> &mut Self { + self.enable_exact_buf_write = true; + + self + } + /// Detect region of S3 bucket. /// /// # Args @@ -685,10 +684,11 @@ impl Builder for S3Builder { .map(|_| builder.allow_anonymous()); map.get("default_storage_class") .map(|v: &String| builder.default_storage_class(v)); - map.get("write_min_size") - .map(|v| builder.write_min_size(v.parse().expect("input must be a number"))); map.get("batch_max_operations") .map(|v| builder.batch_max_operations(v.parse().expect("input must be a number"))); + map.get("enable_exact_buf_write") + .filter(|v| *v == "on" || *v == "true") + .map(|_| builder.enable_exact_buf_write()); builder } @@ -868,6 +868,7 @@ impl Builder for S3Builder { server_side_encryption_customer_key_md5, default_storage_class, allow_anonymous: self.allow_anonymous, + enable_exact_buf_write: self.enable_exact_buf_write, signer, loader, client, @@ -887,7 +888,11 @@ pub struct S3Backend { impl Accessor for S3Backend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter<S3Writers, oio::AtLeastBufWriter<S3Writers>>; + type Writer = oio::ThreeWaysWriter< + S3Writers, + oio::AtLeastBufWriter<S3Writers>, + oio::ExactBufWriter<S3Writers>, + >; type BlockingWriter = (); type Pager = S3Pager; type BlockingPager = (); @@ -986,12 +991,16 @@ impl Accessor for S3Backend { let w = if let Some(buffer_size) = args.buffer_size() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); - let w = - oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); - - oio::TwoWaysWriter::Two(w) + if self.core.enable_exact_buf_write { + oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w, buffer_size)) + } else { + oio::ThreeWaysWriter::Two( + oio::AtLeastBufWriter::new(w, buffer_size) + .with_total_size(args.content_length()), + ) + } } else { - oio::TwoWaysWriter::One(w) + oio::ThreeWaysWriter::One(w) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/s3/compatible_services.md b/core/src/services/s3/compatible_services.md index d5036cb3d..8ef722db1 100644 --- a/core/src/services/s3/compatible_services.md +++ b/core/src/services/s3/compatible_services.md @@ -111,3 +111,4 @@ To connect to r2, we need to set: - `bucket`: The bucket name of r2. - `region`: When you create a new bucket, the data location is set to Automatic by default. So please use `auto` for region. - `batch_max_operations`: R2's delete objects will return `Internal Error` if the batch is larger than `700`. Please set this value `<= 700` to make sure batch delete work as expected. +- `enable_exact_buf_write`: R2 requires the non-tailing parts size to be exactly the same. Please enable this option to avoid the error `All non-trailing parts must have the same length`. diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 89989bcb7..ea77b7108 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -78,6 +78,7 @@ pub struct S3Core { pub server_side_encryption_customer_key_md5: Option<HeaderValue>, pub default_storage_class: Option<HeaderValue>, pub allow_anonymous: bool, + pub enable_exact_buf_write: bool, pub signer: AwsV4Signer, pub loader: Box<dyn AwsCredentialLoad>,
