This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new e9e45a2cc feat(services/s3): Add ListObjectsV1 support for better 
support old storage services  (#5975)
e9e45a2cc is described below

commit e9e45a2cc90bb5a45e1c8ddeb3f258184267ed1c
Author: Xuanwo <[email protected]>
AuthorDate: Mon Apr 7 19:16:41 2025 +0800

    feat(services/s3): Add ListObjectsV1 support for better support old storage 
services  (#5975)
    
    * Rename existing list objects to v2
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Implement list objects v1
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix uri
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .../s3/aws_s3_with_list_objects_v1/action.yml      |  39 ++++++
 .../s3/minio_s3_with_list_objects_v1/action.yml    |  47 +++++++
 core/src/raw/enum_utils.rs                         |  10 ++
 core/src/raw/http_util/mod.rs                      |   1 +
 core/src/raw/http_util/uri.rs                      |  49 +++++++
 core/src/services/s3/backend.rs                    |  13 +-
 core/src/services/s3/config.rs                     |   5 +
 core/src/services/s3/core.rs                       | 127 +++++++++++++++++-
 core/src/services/s3/lister.rs                     | 143 +++++++++++++++++++--
 9 files changed, 418 insertions(+), 16 deletions(-)

diff --git a/.github/services/s3/aws_s3_with_list_objects_v1/action.yml 
b/.github/services/s3/aws_s3_with_list_objects_v1/action.yml
new file mode 100644
index 000000000..bce43e1aa
--- /dev/null
+++ b/.github/services/s3/aws_s3_with_list_objects_v1/action.yml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: aws_s3
+description: "Behavior test for AWS S3. This service is sponsored by 
@datafuse_labs."
+
+runs:
+  using: "composite"
+  steps:
+    - name: Setup
+      uses: 1password/load-secrets-action@v1
+      with:
+        export-env: true
+      env:
+        OPENDAL_S3_ROOT: op://services/s3/root
+        OPENDAL_S3_BUCKET: op://services/s3/bucket
+        OPENDAL_S3_ENDPOINT: op://services/s3/endpoint
+        OPENDAL_S3_ACCESS_KEY_ID: op://services/s3/access_key_id
+        OPENDAL_S3_SECRET_ACCESS_KEY: op://services/s3/secret_access_key
+        OPENDAL_S3_REGION: op://services/s3/region
+
+    - name: Add extra settings
+      shell: bash
+      run: |
+        echo "OPENDAL_S3_DISABLE_LIST_OBJECTS_V2=true" >> $GITHUB_ENV
diff --git a/.github/services/s3/minio_s3_with_list_objects_v1/action.yml 
b/.github/services/s3/minio_s3_with_list_objects_v1/action.yml
new file mode 100644
index 000000000..46904d7aa
--- /dev/null
+++ b/.github/services/s3/minio_s3_with_list_objects_v1/action.yml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: minio_s3_with_versioning
+description: "Behavior test for Minio S3 with bucket versioning enabled"
+
+runs:
+  using: "composite"
+  steps:
+    - name: Setup MinIO Server
+      shell: bash
+      working-directory: fixtures/s3
+      run: docker compose -f docker-compose-minio.yml up -d --wait
+    - name: Setup test bucket
+      shell: bash
+      env:
+        AWS_ACCESS_KEY_ID: "minioadmin"
+        AWS_SECRET_ACCESS_KEY: "minioadmin"
+        AWS_EC2_METADATA_DISABLED: "true"
+      run: |
+        aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
+
+    - name: Setup
+      shell: bash
+      run: |
+        cat << EOF >> $GITHUB_ENV
+        OPENDAL_S3_BUCKET=test
+        OPENDAL_S3_ENDPOINT=http://127.0.0.1:9000
+        OPENDAL_S3_ACCESS_KEY_ID=minioadmin
+        OPENDAL_S3_SECRET_ACCESS_KEY=minioadmin
+        OPENDAL_S3_REGION=us-east-1
+        OPENDAL_S3_DISABLE_LIST_OBJECTS_V2=true
+        EOF
diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs
index e95059975..9a539943b 100644
--- a/core/src/raw/enum_utils.rs
+++ b/core/src/raw/enum_utils.rs
@@ -163,6 +163,16 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
     }
 }
 
+impl<ONE: oio::List, TWO: oio::List, THREE: oio::List> oio::List for 
ThreeWays<ONE, TWO, THREE> {
+    async fn next(&mut self) -> Result<Option<oio::Entry>> {
+        match self {
+            Self::One(v) => v.next().await,
+            Self::Two(v) => v.next().await,
+            Self::Three(v) => v.next().await,
+        }
+    }
+}
+
 /// FourWays is used to implement traits that based on four ways.
 ///
 /// Users can wrap four different trait types together.
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 8aaab3f00..01264f7f4 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -57,6 +57,7 @@ mod uri;
 pub use uri::new_http_uri_invalid_error;
 pub use uri::percent_decode_path;
 pub use uri::percent_encode_path;
+pub use uri::QueryPairsWriter;
 
 mod error;
 pub use error::new_request_build_error;
diff --git a/core/src/raw/http_util/uri.rs b/core/src/raw/http_util/uri.rs
index aeaaee5e9..1f4b73b46 100644
--- a/core/src/raw/http_util/uri.rs
+++ b/core/src/raw/http_util/uri.rs
@@ -64,6 +64,55 @@ pub fn percent_decode_path(path: &str) -> String {
     }
 }
 
+/// QueryPairsWriter is used to write query pairs to a url.
+pub struct QueryPairsWriter {
+    base: String,
+    has_query: bool,
+}
+
+impl QueryPairsWriter {
+    /// Create a new QueryPairsWriter with the given base.
+    pub fn new(s: &str) -> Self {
+        // 256 is the average size we observed of a url
+        // in production.
+        //
+        // We eargely allocate the string to avoid multiple
+        // allocations.
+        let mut base = String::with_capacity(256);
+        base.push_str(s);
+
+        Self {
+            base,
+            has_query: false,
+        }
+    }
+
+    /// Push a new pair of key and value to the url.
+    ///
+    /// The input key and value must already been percent
+    /// encoded correcrtly.
+    pub fn push(mut self, key: &str, value: &str) -> Self {
+        if self.has_query {
+            self.base.push('&');
+        } else {
+            self.base.push('?');
+            self.has_query = true;
+        }
+
+        // Append the key and value to the base string
+        self.base.push_str(key);
+        self.base.push('=');
+        self.base.push_str(value);
+
+        self
+    }
+
+    /// Finish the url and return it.
+    pub fn finish(self) -> String {
+        self.base
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 9ce15829a..0a46ebf03 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -43,7 +43,7 @@ use std::sync::LazyLock;
 use super::core::*;
 use super::delete::S3Deleter;
 use super::error::parse_error;
-use super::lister::{S3Lister, S3Listers, S3ObjectVersionsLister};
+use super::lister::{S3ListerV1, S3ListerV2, S3Listers, S3ObjectVersionsLister};
 use super::writer::S3Writer;
 use super::writer::S3Writers;
 use crate::raw::oio::PageLister;
@@ -994,6 +994,7 @@ impl Builder for S3Builder {
                 server_side_encryption_customer_key_md5,
                 default_storage_class,
                 allow_anonymous: self.config.allow_anonymous,
+                disable_list_objects_v2: self.config.disable_list_objects_v2,
                 signer,
                 loader,
                 credential_loaded: AtomicBool::new(false),
@@ -1089,13 +1090,19 @@ impl Access for S3Backend {
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
         let l = if args.versions() || args.deleted() {
-            TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
+            ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
+                self.core.clone(),
+                path,
+                args,
+            )))
+        } else if self.core.disable_list_objects_v2 {
+            ThreeWays::One(PageLister::new(S3ListerV1::new(
                 self.core.clone(),
                 path,
                 args,
             )))
         } else {
-            TwoWays::One(PageLister::new(S3Lister::new(
+            ThreeWays::Two(PageLister::new(S3ListerV2::new(
                 self.core.clone(),
                 path,
                 args,
diff --git a/core/src/services/s3/config.rs b/core/src/services/s3/config.rs
index 74b78a03c..de2edc94c 100644
--- a/core/src/services/s3/config.rs
+++ b/core/src/services/s3/config.rs
@@ -188,6 +188,11 @@ pub struct S3Config {
 
     /// Enable write with append so that opendal will send write request with 
append headers.
     pub enable_write_with_append: bool,
+
+    /// OpenDAL uses List Objects V2 by default to list objects.
+    /// However, some legacy services do not yet support V2.
+    /// This option allows users to switch back to the older List Objects V1.
+    pub disable_list_objects_v2: bool,
 }
 
 impl Debug for S3Config {
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index ba7d530f8..b3bba25e9 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -97,6 +97,7 @@ pub struct S3Core {
     pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
     pub default_storage_class: Option<HeaderValue>,
     pub allow_anonymous: bool,
+    pub disable_list_objects_v2: bool,
 
     pub signer: AwsV4Signer,
     pub loader: Box<dyn AwsCredentialLoad>,
@@ -676,7 +677,42 @@ impl S3Core {
         self.send(req).await
     }
 
-    pub async fn s3_list_objects(
+    pub async fn s3_list_objects_v1(
+        &self,
+        path: &str,
+        marker: &str,
+        delimiter: &str,
+        limit: Option<usize>,
+    ) -> Result<Response<Buffer>> {
+        let p = build_abs_path(&self.root, path);
+
+        let mut url = QueryPairsWriter::new(&self.endpoint);
+
+        if !p.is_empty() {
+            url = url.push("prefix", &percent_encode_path(&p));
+        }
+        if !delimiter.is_empty() {
+            url = url.push("delimiter", delimiter);
+        }
+        if let Some(limit) = limit {
+            url = url.push("max-keys", &limit.to_string());
+        }
+        if !marker.is_empty() {
+            url = url.push("marker", &percent_encode_path(marker));
+        }
+
+        let mut req = Request::get(url.finish())
+            // Inject operation to the request.
+            .extension(Operation::List)
+            .body(Buffer::new())
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn s3_list_objects_v2(
         &self,
         path: &str,
         continuation_token: &str,
@@ -1070,7 +1106,26 @@ pub struct DeleteObjectsResultError {
     pub version_id: Option<String>,
 }
 
-/// Output of ListBucket/ListObjects.
+/// Output of ListBucket/ListObjects (a.k.a ListObjectsV1).
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+pub struct ListObjectsOutputV1 {
+    pub is_truncated: Option<bool>,
+    /// ## Notes
+    ///
+    /// `next_marker` is returned only if we have the delimiter request 
parameter
+    /// specified. If the response does not include the NextMarker element and 
it
+    /// is truncated, we should use the value of the last Key element in the
+    /// response as the marker parameter in the subsequent request to get the
+    /// next set of object keys.
+    ///
+    /// If the contents is empty, we should find common_prefixes instead.
+    pub next_marker: Option<String>,
+    pub common_prefixes: Vec<OutputCommonPrefix>,
+    pub contents: Vec<ListObjectsOutputContent>,
+}
+
+/// Output of ListBucketV2/ListObjectsV2.
 ///
 /// ## Note
 ///
@@ -1081,7 +1136,7 @@ pub struct DeleteObjectsResultError {
 /// is not exist.
 #[derive(Default, Debug, Deserialize)]
 #[serde(default, rename_all = "PascalCase")]
-pub struct ListObjectsOutput {
+pub struct ListObjectsOutputV2 {
     pub is_truncated: Option<bool>,
     pub next_continuation_token: Option<String>,
     pub common_prefixes: Vec<OutputCommonPrefix>,
@@ -1368,8 +1423,69 @@ mod tests {
         assert_eq!(out.error.len(), 0);
     }
 
+    /// This example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html#API_ListObjects_Examples
+    #[test]
+    fn test_parse_list_output_v1() {
+        let bs = bytes::Bytes::from(
+            r#"<?xml version="1.0" encoding="UTF-8"?>
+            <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
+                <Name>bucket</Name>
+                <Prefix/>
+                <Marker/>
+                <MaxKeys>1000</MaxKeys>
+                <IsTruncated>false</IsTruncated>
+                <Contents>
+                    <Key>my-image.jpg</Key>
+                    <LastModified>2009-10-12T17:50:30.000Z</LastModified>
+                    <ETag>"fba9dede5f27731c9771645a39863328"</ETag>
+                    <Size>434234</Size>
+                    <StorageClass>STANDARD</StorageClass>
+                    <Owner>
+                        
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
+                        <DisplayName>[email protected]</DisplayName>
+                    </Owner>
+                </Contents>
+                <Contents>
+                   <Key>my-third-image.jpg</Key>
+                     <LastModified>2009-10-12T17:50:30.000Z</LastModified>
+                     <ETag>"1b2cf535f27731c974343645a3985328"</ETag>
+                     <Size>64994</Size>
+                     <StorageClass>STANDARD_IA</StorageClass>
+                     <Owner>
+                        
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
+                        <DisplayName>[email protected]</DisplayName>
+                    </Owner>
+                </Contents>
+            </ListBucketResult>"#,
+        );
+
+        let out: ListObjectsOutputV1 =
+            quick_xml::de::from_reader(bs.reader()).expect("must success");
+
+        assert!(!out.is_truncated.unwrap());
+        assert!(out.next_marker.is_none());
+        assert!(out.common_prefixes.is_empty());
+        assert_eq!(
+            out.contents,
+            vec![
+                ListObjectsOutputContent {
+                    key: "my-image.jpg".to_string(),
+                    size: 434234,
+                    etag: 
Some("\"fba9dede5f27731c9771645a39863328\"".to_string()),
+                    last_modified: "2009-10-12T17:50:30.000Z".to_string(),
+                },
+                ListObjectsOutputContent {
+                    key: "my-third-image.jpg".to_string(),
+                    size: 64994,
+                    last_modified: "2009-10-12T17:50:30.000Z".to_string(),
+                    etag: 
Some("\"1b2cf535f27731c974343645a3985328\"".to_string()),
+                },
+            ]
+        )
+    }
+
     #[test]
-    fn test_parse_list_output() {
+    fn test_parse_list_output_v2() {
         let bs = bytes::Bytes::from(
             r#"<ListBucketResult 
xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
   <Name>example-bucket</Name>
@@ -1407,7 +1523,8 @@ mod tests {
 </ListBucketResult>"#,
         );
 
-        let out: ListObjectsOutput = 
quick_xml::de::from_reader(bs.reader()).expect("must success");
+        let out: ListObjectsOutputV2 =
+            quick_xml::de::from_reader(bs.reader()).expect("must success");
 
         assert!(!out.is_truncated.unwrap());
         assert!(out.next_continuation_token.is_none());
diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs
index f280e10e4..b5d3703f4 100644
--- a/core/src/services/s3/lister.rs
+++ b/core/src/services/s3/lister.rs
@@ -17,8 +17,7 @@
 
 use std::sync::Arc;
 
-use super::core::S3Core;
-use super::core::{ListObjectVersionsOutput, ListObjectsOutput};
+use super::core::*;
 use super::error::parse_error;
 use crate::raw::oio::PageContext;
 use crate::raw::*;
@@ -29,9 +28,137 @@ use crate::Result;
 use bytes::Buf;
 use quick_xml::de;
 
-pub type S3Listers = TwoWays<oio::PageLister<S3Lister>, 
oio::PageLister<S3ObjectVersionsLister>>;
+pub type S3Listers = ThreeWays<
+    oio::PageLister<S3ListerV1>,
+    oio::PageLister<S3ListerV2>,
+    oio::PageLister<S3ObjectVersionsLister>,
+>;
 
-pub struct S3Lister {
+/// S3ListerV1 implements ListObjectV1 for s3 backend.
+pub struct S3ListerV1 {
+    core: Arc<S3Core>,
+
+    path: String,
+    args: OpList,
+
+    delimiter: &'static str,
+    /// marker can also be used as `start-after` for list objects v1.
+    /// We will use it as `start-after` for the first page and then ignore
+    /// it in the following pages.
+    first_marker: String,
+}
+
+impl S3ListerV1 {
+    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
+        let delimiter = if args.recursive() { "" } else { "/" };
+        let first_marker = args
+            .start_after()
+            .map(|start_after| build_abs_path(&core.root, start_after))
+            .unwrap_or_default();
+
+        Self {
+            core,
+
+            path: path.to_string(),
+            args,
+            delimiter,
+            first_marker,
+        }
+    }
+}
+
+impl oio::PageList for S3ListerV1 {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let resp = self
+            .core
+            .s3_list_objects_v1(
+                &self.path,
+                // `marker` is used as `start-after` for the first page.
+                if !ctx.token.is_empty() {
+                    &ctx.token
+                } else {
+                    &self.first_marker
+                },
+                self.delimiter,
+                self.args.limit(),
+            )
+            .await?;
+
+        if resp.status() != http::StatusCode::OK {
+            return Err(parse_error(resp));
+        }
+        let bs = resp.into_body();
+
+        let output: ListObjectsOutputV1 = de::from_reader(bs.reader())
+            .map_err(new_xml_deserialize_error)
+            // Allow S3 list to retry on XML deserialization errors.
+            //
+            // This is because the S3 list API may return incomplete XML data 
under high load.
+            // We are confident that our XML decoding logic is correct. When 
this error occurs,
+            // we allow retries to obtain the correct data.
+            .map_err(Error::set_temporary)?;
+
+        // Try our best to check whether this list is done.
+        //
+        // - Check `is_truncated`
+        // - Check the length of `common_prefixes` and `contents` (very rare 
case)
+        ctx.done = if let Some(is_truncated) = output.is_truncated {
+            !is_truncated
+        } else {
+            output.common_prefixes.is_empty() && output.contents.is_empty()
+        };
+        // Try out best to find the next marker.
+        //
+        // - Check `next-marker`
+        // - Check the last object key
+        // - Check the last common prefix
+        ctx.token = if let Some(next_marker) = &output.next_marker {
+            next_marker.clone()
+        } else if let Some(content) = output.contents.last() {
+            content.key.clone()
+        } else if let Some(prefix) = output.common_prefixes.last() {
+            prefix.prefix.clone()
+        } else {
+            "".to_string()
+        };
+
+        for prefix in output.common_prefixes {
+            let de = oio::Entry::new(
+                &build_rel_path(&self.core.root, &prefix.prefix),
+                Metadata::new(EntryMode::DIR),
+            );
+
+            ctx.entries.push_back(de);
+        }
+
+        for object in output.contents {
+            let mut path = build_rel_path(&self.core.root, &object.key);
+            if path.is_empty() {
+                path = "/".to_string();
+            }
+
+            let mut meta = Metadata::new(EntryMode::from_path(&path));
+            meta.set_is_current(true);
+            if let Some(etag) = &object.etag {
+                meta.set_etag(etag);
+                meta.set_content_md5(etag.trim_matches('"'));
+            }
+            meta.set_content_length(object.size);
+
+            // object.last_modified provides more precise time that contains
+            // nanosecond, let's trim them.
+            
meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
+
+            let de = oio::Entry::with(path, meta);
+            ctx.entries.push_back(de);
+        }
+
+        Ok(())
+    }
+}
+
+/// S3ListerV2 implements ListObjectV2 for s3 backend.
+pub struct S3ListerV2 {
     core: Arc<S3Core>,
 
     path: String,
@@ -41,7 +168,7 @@ pub struct S3Lister {
     abs_start_after: Option<String>,
 }
 
-impl S3Lister {
+impl S3ListerV2 {
     pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
         let delimiter = if args.recursive() { "" } else { "/" };
         let abs_start_after = args
@@ -59,11 +186,11 @@ impl S3Lister {
     }
 }
 
-impl oio::PageList for S3Lister {
+impl oio::PageList for S3ListerV2 {
     async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
-            .s3_list_objects(
+            .s3_list_objects_v2(
                 &self.path,
                 &ctx.token,
                 self.delimiter,
@@ -82,7 +209,7 @@ impl oio::PageList for S3Lister {
         }
         let bs = resp.into_body();
 
-        let output: ListObjectsOutput = de::from_reader(bs.reader())
+        let output: ListObjectsOutputV2 = de::from_reader(bs.reader())
             .map_err(new_xml_deserialize_error)
             // Allow S3 list to retry on XML deserialization errors.
             //

Reply via email to