This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 722993fd6 fix: Enable exact_buf_write for R2 (#2935)
722993fd6 is described below

commit 722993fd6e4498f100a5b6e5099721338aa41e03
Author: Xuanwo <[email protected]>
AuthorDate: Fri Aug 25 16:12:14 2023 +0800

    fix: Enable exact_buf_write for R2 (#2935)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .github/workflows/service_test_s3.yml       | 69 +++++++++++++++--------------
 core/src/services/s3/backend.rs             | 51 ++++++++++++---------
 core/src/services/s3/compatible_services.md |  1 +
 core/src/services/s3/core.rs                |  1 +
 4 files changed, 67 insertions(+), 55 deletions(-)

diff --git a/.github/workflows/service_test_s3.yml 
b/.github/workflows/service_test_s3.yml
index e066e375b..2a280c7b9 100644
--- a/.github/workflows/service_test_s3.yml
+++ b/.github/workflows/service_test_s3.yml
@@ -174,37 +174,38 @@ jobs:
           OPENDAL_S3_ALLOW_ANONYMOUS: on
           OPENDAL_S3_REGION: us-east-1
 
-# Disable this test until we addressed 
https://github.com/apache/incubator-opendal/issues/2904
-#
-#  r2:
-#    runs-on: ubuntu-latest
-#    if: github.event_name == 'push' || 
!github.event.pull_request.head.repo.fork
-#    steps:
-#      - uses: actions/checkout@v3
-#      - name: Setup Rust toolchain
-#        uses: ./.github/actions/setup
-#        with:
-#          need-nextest: true
-#
-#      - name: Load secret
-#        id: op-load-secret
-#        uses: 1password/load-secrets-action@v1
-#        with:
-#          export-env: true
-#        env:
-#          OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
-#          OPENDAL_S3_TEST: op://services/r2/test
-#          OPENDAL_S3_BUCKET: op://services/r2/bucket
-#          OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
-#          OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
-#          OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
-#
-#      - name: Test
-#        shell: bash
-#        working-directory: core
-#        run: cargo nextest run s3
-#        env:
-#          OPENDAL_S3_REGION: auto
-#          # This is the R2's limitation
-#          # Refer to 
https://opendal.apache.org/docs/services/s3#compatible-services for more 
information
-#          OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
+  r2:
+    runs-on: ubuntu-latest
+    if: github.event_name == 'push' || 
!github.event.pull_request.head.repo.fork
+    steps:
+      - uses: actions/checkout@v3
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+        with:
+          need-nextest: true
+
+      - name: Load secret
+        id: op-load-secret
+        uses: 1password/load-secrets-action@v1
+        with:
+          export-env: true
+        env:
+          OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
+          OPENDAL_S3_TEST: op://services/r2/test
+          OPENDAL_S3_BUCKET: op://services/r2/bucket
+          OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
+          OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
+          OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
+
+      - name: Test
+        shell: bash
+        working-directory: core
+        run: cargo nextest run s3
+        env:
+          OPENDAL_S3_REGION: auto
+          # This is the R2's limitation
+          # Refer to 
https://opendal.apache.org/docs/services/s3#compatible-services for more 
information
+          OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
+          # This is the R2's limitation
+          # Refer to 
https://opendal.apache.org/docs/services/s3#compatible-services for more 
information
+          OPENDAL_S3_ENABLE_EXACT_BUF_WRITE: true
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 700359009..db8c4db00 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -87,7 +87,7 @@ pub struct S3Builder {
     allow_anonymous: bool,
     customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,
 
-    // S3 features flags
+    // S3 feature
     server_side_encryption: Option<String>,
     server_side_encryption_aws_kms_key_id: Option<String>,
     server_side_encryption_customer_algorithm: Option<String>,
@@ -95,12 +95,8 @@ pub struct S3Builder {
     server_side_encryption_customer_key_md5: Option<String>,
     default_storage_class: Option<String>,
     enable_virtual_host_style: bool,
-
-    /// the part size of s3 multipart upload, which should be 5 MiB to 5 GiB.
-    /// There is no minimum size limit on the last part of your multipart 
upload
-    write_min_size: Option<usize>,
-    /// batch_max_operations
     batch_max_operations: Option<usize>,
+    enable_exact_buf_write: bool,
 
     http_client: Option<HttpClient>,
 }
@@ -519,13 +515,6 @@ impl S3Builder {
         endpoint
     }
 
-    /// set the minimum size of unsized write, it should be greater than 5 MB.
-    /// Reference: [Amazon S3 multipart upload 
limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
-    pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
-        self.write_min_size = Some(write_min_size);
-
-        self
-    }
     /// Set maximum batch operations of this backend.
     pub fn batch_max_operations(&mut self, batch_max_operations: usize) -> 
&mut Self {
         self.batch_max_operations = Some(batch_max_operations);
@@ -533,6 +522,16 @@ impl S3Builder {
         self
     }
 
+    /// Enable exact buf write so that opendal will write data with exact size.
+    ///
+    /// This option is used for services like R2 which requires all parts must 
be the same size
+    /// except the last part.
+    pub fn enable_exact_buf_write(&mut self) -> &mut Self {
+        self.enable_exact_buf_write = true;
+
+        self
+    }
+
     /// Detect region of S3 bucket.
     ///
     /// # Args
@@ -685,10 +684,11 @@ impl Builder for S3Builder {
             .map(|_| builder.allow_anonymous());
         map.get("default_storage_class")
             .map(|v: &String| builder.default_storage_class(v));
-        map.get("write_min_size")
-            .map(|v| builder.write_min_size(v.parse().expect("input must be a 
number")));
         map.get("batch_max_operations")
             .map(|v| builder.batch_max_operations(v.parse().expect("input must 
be a number")));
+        map.get("enable_exact_buf_write")
+            .filter(|v| *v == "on" || *v == "true")
+            .map(|_| builder.enable_exact_buf_write());
 
         builder
     }
@@ -868,6 +868,7 @@ impl Builder for S3Builder {
                 server_side_encryption_customer_key_md5,
                 default_storage_class,
                 allow_anonymous: self.allow_anonymous,
+                enable_exact_buf_write: self.enable_exact_buf_write,
                 signer,
                 loader,
                 client,
@@ -887,7 +888,11 @@ pub struct S3Backend {
 impl Accessor for S3Backend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<S3Writers, 
oio::AtLeastBufWriter<S3Writers>>;
+    type Writer = oio::ThreeWaysWriter<
+        S3Writers,
+        oio::AtLeastBufWriter<S3Writers>,
+        oio::ExactBufWriter<S3Writers>,
+    >;
     type BlockingWriter = ();
     type Pager = S3Pager;
     type BlockingPager = ();
@@ -986,12 +991,16 @@ impl Accessor for S3Backend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
-
-            oio::TwoWaysWriter::Two(w)
+            if self.core.enable_exact_buf_write {
+                oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w, 
buffer_size))
+            } else {
+                oio::ThreeWaysWriter::Two(
+                    oio::AtLeastBufWriter::new(w, buffer_size)
+                        .with_total_size(args.content_length()),
+                )
+            }
         } else {
-            oio::TwoWaysWriter::One(w)
+            oio::ThreeWaysWriter::One(w)
         };
 
         Ok((RpWrite::default(), w))
diff --git a/core/src/services/s3/compatible_services.md 
b/core/src/services/s3/compatible_services.md
index d5036cb3d..8ef722db1 100644
--- a/core/src/services/s3/compatible_services.md
+++ b/core/src/services/s3/compatible_services.md
@@ -111,3 +111,4 @@ To connect to r2, we need to set:
 - `bucket`: The bucket name of r2.
 - `region`: When you create a new bucket, the data location is set to 
Automatic by default. So please use `auto` for region.
 - `batch_max_operations`: R2's delete objects will return `Internal Error` if 
the batch is larger than `700`. Please set this value `<= 700` to make sure 
batch delete work as expected.
+- `enable_exact_buf_write`: R2 requires the non-tailing parts size to be 
exactly the same. Please enable this option to avoid the error `All 
non-trailing parts must have the same length`.
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 89989bcb7..ea77b7108 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -78,6 +78,7 @@ pub struct S3Core {
     pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
     pub default_storage_class: Option<HeaderValue>,
     pub allow_anonymous: bool,
+    pub enable_exact_buf_write: bool,
 
     pub signer: AwsV4Signer,
     pub loader: Box<dyn AwsCredentialLoad>,

Reply via email to