This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new e9e45a2cc feat(services/s3): Add ListObjectsV1 support for better
support old storage services (#5975)
e9e45a2cc is described below
commit e9e45a2cc90bb5a45e1c8ddeb3f258184267ed1c
Author: Xuanwo <[email protected]>
AuthorDate: Mon Apr 7 19:16:41 2025 +0800
feat(services/s3): Add ListObjectsV1 support for better support old storage
services (#5975)
* Rename existing list objects to v2
Signed-off-by: Xuanwo <[email protected]>
* Implement list objects v1
Signed-off-by: Xuanwo <[email protected]>
* Fix uri
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
.../s3/aws_s3_with_list_objects_v1/action.yml | 39 ++++++
.../s3/minio_s3_with_list_objects_v1/action.yml | 47 +++++++
core/src/raw/enum_utils.rs | 10 ++
core/src/raw/http_util/mod.rs | 1 +
core/src/raw/http_util/uri.rs | 49 +++++++
core/src/services/s3/backend.rs | 13 +-
core/src/services/s3/config.rs | 5 +
core/src/services/s3/core.rs | 127 +++++++++++++++++-
core/src/services/s3/lister.rs | 143 +++++++++++++++++++--
9 files changed, 418 insertions(+), 16 deletions(-)
diff --git a/.github/services/s3/aws_s3_with_list_objects_v1/action.yml
b/.github/services/s3/aws_s3_with_list_objects_v1/action.yml
new file mode 100644
index 000000000..bce43e1aa
--- /dev/null
+++ b/.github/services/s3/aws_s3_with_list_objects_v1/action.yml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: aws_s3
+description: "Behavior test for AWS S3. This service is sponsored by
@datafuse_labs."
+
+runs:
+ using: "composite"
+ steps:
+ - name: Setup
+ uses: 1password/load-secrets-action@v1
+ with:
+ export-env: true
+ env:
+ OPENDAL_S3_ROOT: op://services/s3/root
+ OPENDAL_S3_BUCKET: op://services/s3/bucket
+ OPENDAL_S3_ENDPOINT: op://services/s3/endpoint
+ OPENDAL_S3_ACCESS_KEY_ID: op://services/s3/access_key_id
+ OPENDAL_S3_SECRET_ACCESS_KEY: op://services/s3/secret_access_key
+ OPENDAL_S3_REGION: op://services/s3/region
+
+ - name: Add extra settings
+ shell: bash
+ run: |
+ echo "OPENDAL_S3_DISABLE_LIST_OBJECTS_V2=true" >> $GITHUB_ENV
diff --git a/.github/services/s3/minio_s3_with_list_objects_v1/action.yml
b/.github/services/s3/minio_s3_with_list_objects_v1/action.yml
new file mode 100644
index 000000000..46904d7aa
--- /dev/null
+++ b/.github/services/s3/minio_s3_with_list_objects_v1/action.yml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: minio_s3_with_versioning
+description: "Behavior test for Minio S3 with bucket versioning enabled"
+
+runs:
+ using: "composite"
+ steps:
+ - name: Setup MinIO Server
+ shell: bash
+ working-directory: fixtures/s3
+ run: docker compose -f docker-compose-minio.yml up -d --wait
+ - name: Setup test bucket
+ shell: bash
+ env:
+ AWS_ACCESS_KEY_ID: "minioadmin"
+ AWS_SECRET_ACCESS_KEY: "minioadmin"
+ AWS_EC2_METADATA_DISABLED: "true"
+ run: |
+ aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
+
+ - name: Setup
+ shell: bash
+ run: |
+ cat << EOF >> $GITHUB_ENV
+ OPENDAL_S3_BUCKET=test
+ OPENDAL_S3_ENDPOINT=http://127.0.0.1:9000
+ OPENDAL_S3_ACCESS_KEY_ID=minioadmin
+ OPENDAL_S3_SECRET_ACCESS_KEY=minioadmin
+ OPENDAL_S3_REGION=us-east-1
+ OPENDAL_S3_DISABLE_LIST_OBJECTS_V2=true
+ EOF
diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs
index e95059975..9a539943b 100644
--- a/core/src/raw/enum_utils.rs
+++ b/core/src/raw/enum_utils.rs
@@ -163,6 +163,16 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write>
oio::Write
}
}
+impl<ONE: oio::List, TWO: oio::List, THREE: oio::List> oio::List for
ThreeWays<ONE, TWO, THREE> {
+ async fn next(&mut self) -> Result<Option<oio::Entry>> {
+ match self {
+ Self::One(v) => v.next().await,
+ Self::Two(v) => v.next().await,
+ Self::Three(v) => v.next().await,
+ }
+ }
+}
+
/// FourWays is used to implement traits that based on four ways.
///
/// Users can wrap four different trait types together.
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 8aaab3f00..01264f7f4 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -57,6 +57,7 @@ mod uri;
pub use uri::new_http_uri_invalid_error;
pub use uri::percent_decode_path;
pub use uri::percent_encode_path;
+pub use uri::QueryPairsWriter;
mod error;
pub use error::new_request_build_error;
diff --git a/core/src/raw/http_util/uri.rs b/core/src/raw/http_util/uri.rs
index aeaaee5e9..1f4b73b46 100644
--- a/core/src/raw/http_util/uri.rs
+++ b/core/src/raw/http_util/uri.rs
@@ -64,6 +64,55 @@ pub fn percent_decode_path(path: &str) -> String {
}
}
+/// QueryPairsWriter is used to write query pairs to a url.
+pub struct QueryPairsWriter {
+ base: String,
+ has_query: bool,
+}
+
+impl QueryPairsWriter {
+ /// Create a new QueryPairsWriter with the given base.
+ pub fn new(s: &str) -> Self {
+ // 256 is the average size we observed of a url
+ // in production.
+ //
+ // We eargely allocate the string to avoid multiple
+ // allocations.
+ let mut base = String::with_capacity(256);
+ base.push_str(s);
+
+ Self {
+ base,
+ has_query: false,
+ }
+ }
+
+ /// Push a new pair of key and value to the url.
+ ///
+ /// The input key and value must already been percent
+ /// encoded correcrtly.
+ pub fn push(mut self, key: &str, value: &str) -> Self {
+ if self.has_query {
+ self.base.push('&');
+ } else {
+ self.base.push('?');
+ self.has_query = true;
+ }
+
+ // Append the key and value to the base string
+ self.base.push_str(key);
+ self.base.push('=');
+ self.base.push_str(value);
+
+ self
+ }
+
+ /// Finish the url and return it.
+ pub fn finish(self) -> String {
+ self.base
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 9ce15829a..0a46ebf03 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -43,7 +43,7 @@ use std::sync::LazyLock;
use super::core::*;
use super::delete::S3Deleter;
use super::error::parse_error;
-use super::lister::{S3Lister, S3Listers, S3ObjectVersionsLister};
+use super::lister::{S3ListerV1, S3ListerV2, S3Listers, S3ObjectVersionsLister};
use super::writer::S3Writer;
use super::writer::S3Writers;
use crate::raw::oio::PageLister;
@@ -994,6 +994,7 @@ impl Builder for S3Builder {
server_side_encryption_customer_key_md5,
default_storage_class,
allow_anonymous: self.config.allow_anonymous,
+ disable_list_objects_v2: self.config.disable_list_objects_v2,
signer,
loader,
credential_loaded: AtomicBool::new(false),
@@ -1089,13 +1090,19 @@ impl Access for S3Backend {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
let l = if args.versions() || args.deleted() {
- TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
+ ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
+ self.core.clone(),
+ path,
+ args,
+ )))
+ } else if self.core.disable_list_objects_v2 {
+ ThreeWays::One(PageLister::new(S3ListerV1::new(
self.core.clone(),
path,
args,
)))
} else {
- TwoWays::One(PageLister::new(S3Lister::new(
+ ThreeWays::Two(PageLister::new(S3ListerV2::new(
self.core.clone(),
path,
args,
diff --git a/core/src/services/s3/config.rs b/core/src/services/s3/config.rs
index 74b78a03c..de2edc94c 100644
--- a/core/src/services/s3/config.rs
+++ b/core/src/services/s3/config.rs
@@ -188,6 +188,11 @@ pub struct S3Config {
/// Enable write with append so that opendal will send write request with
append headers.
pub enable_write_with_append: bool,
+
+ /// OpenDAL uses List Objects V2 by default to list objects.
+ /// However, some legacy services do not yet support V2.
+ /// This option allows users to switch back to the older List Objects V1.
+ pub disable_list_objects_v2: bool,
}
impl Debug for S3Config {
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index ba7d530f8..b3bba25e9 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -97,6 +97,7 @@ pub struct S3Core {
pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
pub default_storage_class: Option<HeaderValue>,
pub allow_anonymous: bool,
+ pub disable_list_objects_v2: bool,
pub signer: AwsV4Signer,
pub loader: Box<dyn AwsCredentialLoad>,
@@ -676,7 +677,42 @@ impl S3Core {
self.send(req).await
}
- pub async fn s3_list_objects(
+ pub async fn s3_list_objects_v1(
+ &self,
+ path: &str,
+ marker: &str,
+ delimiter: &str,
+ limit: Option<usize>,
+ ) -> Result<Response<Buffer>> {
+ let p = build_abs_path(&self.root, path);
+
+ let mut url = QueryPairsWriter::new(&self.endpoint);
+
+ if !p.is_empty() {
+ url = url.push("prefix", &percent_encode_path(&p));
+ }
+ if !delimiter.is_empty() {
+ url = url.push("delimiter", delimiter);
+ }
+ if let Some(limit) = limit {
+ url = url.push("max-keys", &limit.to_string());
+ }
+ if !marker.is_empty() {
+ url = url.push("marker", &percent_encode_path(marker));
+ }
+
+ let mut req = Request::get(url.finish())
+ // Inject operation to the request.
+ .extension(Operation::List)
+ .body(Buffer::new())
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub async fn s3_list_objects_v2(
&self,
path: &str,
continuation_token: &str,
@@ -1070,7 +1106,26 @@ pub struct DeleteObjectsResultError {
pub version_id: Option<String>,
}
-/// Output of ListBucket/ListObjects.
+/// Output of ListBucket/ListObjects (a.k.a ListObjectsV1).
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+pub struct ListObjectsOutputV1 {
+ pub is_truncated: Option<bool>,
+ /// ## Notes
+ ///
+ /// `next_marker` is returned only if we have the delimiter request
parameter
+ /// specified. If the response does not include the NextMarker element and
it
+ /// is truncated, we should use the value of the last Key element in the
+ /// response as the marker parameter in the subsequent request to get the
+ /// next set of object keys.
+ ///
+ /// If the contents is empty, we should find common_prefixes instead.
+ pub next_marker: Option<String>,
+ pub common_prefixes: Vec<OutputCommonPrefix>,
+ pub contents: Vec<ListObjectsOutputContent>,
+}
+
+/// Output of ListBucketV2/ListObjectsV2.
///
/// ## Note
///
@@ -1081,7 +1136,7 @@ pub struct DeleteObjectsResultError {
/// is not exist.
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
-pub struct ListObjectsOutput {
+pub struct ListObjectsOutputV2 {
pub is_truncated: Option<bool>,
pub next_continuation_token: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
@@ -1368,8 +1423,69 @@ mod tests {
assert_eq!(out.error.len(), 0);
}
+ /// This example is from
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html#API_ListObjects_Examples
+ #[test]
+ fn test_parse_list_output_v1() {
+ let bs = bytes::Bytes::from(
+ r#"<?xml version="1.0" encoding="UTF-8"?>
+ <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <Name>bucket</Name>
+ <Prefix/>
+ <Marker/>
+ <MaxKeys>1000</MaxKeys>
+ <IsTruncated>false</IsTruncated>
+ <Contents>
+ <Key>my-image.jpg</Key>
+ <LastModified>2009-10-12T17:50:30.000Z</LastModified>
+ <ETag>"fba9dede5f27731c9771645a39863328"</ETag>
+ <Size>434234</Size>
+ <StorageClass>STANDARD</StorageClass>
+ <Owner>
+
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
+ <DisplayName>[email protected]</DisplayName>
+ </Owner>
+ </Contents>
+ <Contents>
+ <Key>my-third-image.jpg</Key>
+ <LastModified>2009-10-12T17:50:30.000Z</LastModified>
+ <ETag>"1b2cf535f27731c974343645a3985328"</ETag>
+ <Size>64994</Size>
+ <StorageClass>STANDARD_IA</StorageClass>
+ <Owner>
+
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
+ <DisplayName>[email protected]</DisplayName>
+ </Owner>
+ </Contents>
+ </ListBucketResult>"#,
+ );
+
+ let out: ListObjectsOutputV1 =
+ quick_xml::de::from_reader(bs.reader()).expect("must success");
+
+ assert!(!out.is_truncated.unwrap());
+ assert!(out.next_marker.is_none());
+ assert!(out.common_prefixes.is_empty());
+ assert_eq!(
+ out.contents,
+ vec![
+ ListObjectsOutputContent {
+ key: "my-image.jpg".to_string(),
+ size: 434234,
+ etag:
Some("\"fba9dede5f27731c9771645a39863328\"".to_string()),
+ last_modified: "2009-10-12T17:50:30.000Z".to_string(),
+ },
+ ListObjectsOutputContent {
+ key: "my-third-image.jpg".to_string(),
+ size: 64994,
+ last_modified: "2009-10-12T17:50:30.000Z".to_string(),
+ etag:
Some("\"1b2cf535f27731c974343645a3985328\"".to_string()),
+ },
+ ]
+ )
+ }
+
#[test]
- fn test_parse_list_output() {
+ fn test_parse_list_output_v2() {
let bs = bytes::Bytes::from(
r#"<ListBucketResult
xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>example-bucket</Name>
@@ -1407,7 +1523,8 @@ mod tests {
</ListBucketResult>"#,
);
- let out: ListObjectsOutput =
quick_xml::de::from_reader(bs.reader()).expect("must success");
+ let out: ListObjectsOutputV2 =
+ quick_xml::de::from_reader(bs.reader()).expect("must success");
assert!(!out.is_truncated.unwrap());
assert!(out.next_continuation_token.is_none());
diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs
index f280e10e4..b5d3703f4 100644
--- a/core/src/services/s3/lister.rs
+++ b/core/src/services/s3/lister.rs
@@ -17,8 +17,7 @@
use std::sync::Arc;
-use super::core::S3Core;
-use super::core::{ListObjectVersionsOutput, ListObjectsOutput};
+use super::core::*;
use super::error::parse_error;
use crate::raw::oio::PageContext;
use crate::raw::*;
@@ -29,9 +28,137 @@ use crate::Result;
use bytes::Buf;
use quick_xml::de;
-pub type S3Listers = TwoWays<oio::PageLister<S3Lister>,
oio::PageLister<S3ObjectVersionsLister>>;
+pub type S3Listers = ThreeWays<
+ oio::PageLister<S3ListerV1>,
+ oio::PageLister<S3ListerV2>,
+ oio::PageLister<S3ObjectVersionsLister>,
+>;
-pub struct S3Lister {
+/// S3ListerV1 implements ListObjectV1 for s3 backend.
+pub struct S3ListerV1 {
+ core: Arc<S3Core>,
+
+ path: String,
+ args: OpList,
+
+ delimiter: &'static str,
+ /// marker can also be used as `start-after` for list objects v1.
+ /// We will use it as `start-after` for the first page and then ignore
+ /// it in the following pages.
+ first_marker: String,
+}
+
+impl S3ListerV1 {
+ pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
+ let delimiter = if args.recursive() { "" } else { "/" };
+ let first_marker = args
+ .start_after()
+ .map(|start_after| build_abs_path(&core.root, start_after))
+ .unwrap_or_default();
+
+ Self {
+ core,
+
+ path: path.to_string(),
+ args,
+ delimiter,
+ first_marker,
+ }
+ }
+}
+
+impl oio::PageList for S3ListerV1 {
+ async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+ let resp = self
+ .core
+ .s3_list_objects_v1(
+ &self.path,
+ // `marker` is used as `start-after` for the first page.
+ if !ctx.token.is_empty() {
+ &ctx.token
+ } else {
+ &self.first_marker
+ },
+ self.delimiter,
+ self.args.limit(),
+ )
+ .await?;
+
+ if resp.status() != http::StatusCode::OK {
+ return Err(parse_error(resp));
+ }
+ let bs = resp.into_body();
+
+ let output: ListObjectsOutputV1 = de::from_reader(bs.reader())
+ .map_err(new_xml_deserialize_error)
+ // Allow S3 list to retry on XML deserialization errors.
+ //
+ // This is because the S3 list API may return incomplete XML data
under high load.
+ // We are confident that our XML decoding logic is correct. When
this error occurs,
+ // we allow retries to obtain the correct data.
+ .map_err(Error::set_temporary)?;
+
+ // Try our best to check whether this list is done.
+ //
+ // - Check `is_truncated`
+ // - Check the length of `common_prefixes` and `contents` (very rare
case)
+ ctx.done = if let Some(is_truncated) = output.is_truncated {
+ !is_truncated
+ } else {
+ output.common_prefixes.is_empty() && output.contents.is_empty()
+ };
+ // Try out best to find the next marker.
+ //
+ // - Check `next-marker`
+ // - Check the last object key
+ // - Check the last common prefix
+ ctx.token = if let Some(next_marker) = &output.next_marker {
+ next_marker.clone()
+ } else if let Some(content) = output.contents.last() {
+ content.key.clone()
+ } else if let Some(prefix) = output.common_prefixes.last() {
+ prefix.prefix.clone()
+ } else {
+ "".to_string()
+ };
+
+ for prefix in output.common_prefixes {
+ let de = oio::Entry::new(
+ &build_rel_path(&self.core.root, &prefix.prefix),
+ Metadata::new(EntryMode::DIR),
+ );
+
+ ctx.entries.push_back(de);
+ }
+
+ for object in output.contents {
+ let mut path = build_rel_path(&self.core.root, &object.key);
+ if path.is_empty() {
+ path = "/".to_string();
+ }
+
+ let mut meta = Metadata::new(EntryMode::from_path(&path));
+ meta.set_is_current(true);
+ if let Some(etag) = &object.etag {
+ meta.set_etag(etag);
+ meta.set_content_md5(etag.trim_matches('"'));
+ }
+ meta.set_content_length(object.size);
+
+ // object.last_modified provides more precise time that contains
+ // nanosecond, let's trim them.
+
meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
+
+ let de = oio::Entry::with(path, meta);
+ ctx.entries.push_back(de);
+ }
+
+ Ok(())
+ }
+}
+
+/// S3ListerV2 implements ListObjectV2 for s3 backend.
+pub struct S3ListerV2 {
core: Arc<S3Core>,
path: String,
@@ -41,7 +168,7 @@ pub struct S3Lister {
abs_start_after: Option<String>,
}
-impl S3Lister {
+impl S3ListerV2 {
pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
@@ -59,11 +186,11 @@ impl S3Lister {
}
}
-impl oio::PageList for S3Lister {
+impl oio::PageList for S3ListerV2 {
async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
let resp = self
.core
- .s3_list_objects(
+ .s3_list_objects_v2(
&self.path,
&ctx.token,
self.delimiter,
@@ -82,7 +209,7 @@ impl oio::PageList for S3Lister {
}
let bs = resp.into_body();
- let output: ListObjectsOutput = de::from_reader(bs.reader())
+ let output: ListObjectsOutputV2 = de::from_reader(bs.reader())
.map_err(new_xml_deserialize_error)
// Allow S3 list to retry on XML deserialization errors.
//