This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new de8a6366 feat(services/s3): Add `start-after` support for list (#2096)
de8a6366 is described below
commit de8a6366823b3bdc4c5233fde1ccf16f4cfa8ad7
Author: Lex Cao <[email protected]>
AuthorDate: Tue Apr 25 00:06:27 2023 +0800
feat(services/s3): Add `start-after` support for list (#2096)
Closes #2095
This PR introduces new features:
1. add `list_with_start_after` capability
2. add `test_list_with_start_after` test case
---
core/src/services/s3/backend.rs | 13 ++++++++++--
core/src/services/s3/core.rs | 6 ++++++
core/src/services/s3/pager.rs | 18 +++++++++++++++--
core/tests/behavior/list.rs | 44 +++++++++++++++++++++++++++++++++++++++++
4 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 0adf125a..e0f2af72 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -927,6 +927,9 @@ impl Accessor for S3Backend {
write_without_content_length: true,
list: true,
+ list_with_limit: true,
+ list_with_start_after: true,
+
scan: true,
copy: true,
presign: true,
@@ -1042,14 +1045,20 @@ impl Accessor for S3Backend {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
Ok((
RpList::default(),
- S3Pager::new(self.core.clone(), path, "/", args.limit()),
+ S3Pager::new(
+ self.core.clone(),
+ path,
+ "/",
+ args.limit(),
+ args.start_after(),
+ ),
))
}
async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::Pager)> {
Ok((
RpScan::default(),
- S3Pager::new(self.core.clone(), path, "", args.limit()),
+ S3Pager::new(self.core.clone(), path, "", args.limit(), None),
))
}
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 120d6ec3..0081656f 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -454,6 +454,7 @@ impl S3Core {
continuation_token: &str,
delimiter: &str,
limit: Option<usize>,
+ start_after: Option<String>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
@@ -468,6 +469,11 @@ impl S3Core {
if let Some(limit) = limit {
write!(url, "&max-keys={limit}").expect("write into string must
succeed");
}
+ if let Some(start_after) = start_after {
+ let start_after = build_abs_path(&self.root, &start_after);
+ write!(url, "&start-after={}", percent_encode_path(&start_after))
+ .expect("write into string must succeed");
+ }
if !continuation_token.is_empty() {
// AWS S3 could return continuation-token that contains `=`
// which could lead `reqsign` parse query wrongly.
diff --git a/core/src/services/s3/pager.rs b/core/src/services/s3/pager.rs
index 250516df..c8e98c16 100644
--- a/core/src/services/s3/pager.rs
+++ b/core/src/services/s3/pager.rs
@@ -35,19 +35,27 @@ pub struct S3Pager {
path: String,
delimiter: String,
limit: Option<usize>,
+ start_after: Option<String>,
token: String,
done: bool,
}
impl S3Pager {
- pub fn new(core: Arc<S3Core>, path: &str, delimiter: &str, limit:
Option<usize>) -> Self {
+ pub fn new(
+ core: Arc<S3Core>,
+ path: &str,
+ delimiter: &str,
+ limit: Option<usize>,
+ start_after: Option<&str>,
+ ) -> Self {
Self {
core,
path: path.to_string(),
delimiter: delimiter.to_string(),
limit,
+ start_after: start_after.map(String::from),
token: "".to_string(),
done: false,
@@ -64,7 +72,13 @@ impl oio::Page for S3Pager {
let resp = self
.core
- .s3_list_objects(&self.path, &self.token, &self.delimiter,
self.limit)
+ .s3_list_objects(
+ &self.path,
+ &self.token,
+ &self.delimiter,
+ self.limit,
+ self.start_after.clone(),
+ )
.await?;
if resp.status() != http::StatusCode::OK {
diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs
index 648818dd..3324412e 100644
--- a/core/tests/behavior/list.rs
+++ b/core/tests/behavior/list.rs
@@ -23,6 +23,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryStreamExt;
use log::debug;
+use opendal::ops::OpList;
use opendal::EntryMode;
use opendal::ErrorKind;
use opendal::Operator;
@@ -78,6 +79,7 @@ macro_rules! behavior_list_tests {
test_list_sub_dir,
test_list_nested_dir,
test_list_dir_with_file_path,
+ test_list_with_start_after,
test_scan,
test_scan_root,
test_remove_all,
@@ -278,6 +280,48 @@ pub async fn test_list_dir_with_file_path(op: Operator) ->
Result<()> {
Ok(())
}
+/// List with start after should start listing after the specified key
+pub async fn test_list_with_start_after(op: Operator) -> Result<()> {
+ if !op.info().capability().list_with_start_after {
+ return Ok(());
+ }
+
+ let dir = &format!("{}/", uuid::Uuid::new_v4());
+ op.create_dir(dir).await?;
+
+ let given: Vec<String> = vec!["file-0", "file-1", "file-2", "file-3",
"file-4", "file-5"]
+ .iter()
+ .map(|name| format!("{dir}{name}-{}", uuid::Uuid::new_v4()))
+ .collect();
+
+ given
+ .iter()
+ .map(|name| async {
+ op.write(name, "content")
+ .await
+ .expect("create must succeed");
+ })
+ .collect::<FuturesUnordered<_>>()
+ .collect::<Vec<_>>()
+ .await;
+
+ let option = OpList::new().with_start_after(&given[2]);
+ let mut objects = op.list_with(dir, option).await?;
+ let mut actual = vec![];
+ while let Some(o) = objects.try_next().await? {
+ let path = o.path().to_string();
+ actual.push(path)
+ }
+
+ let expected: Vec<String> = given.into_iter().skip(3).collect();
+
+ assert_eq!(expected, actual);
+
+ op.remove_all(dir).await?;
+
+ Ok(())
+}
+
pub async fn test_scan_root(op: Operator) -> Result<()> {
let w = op.scan("").await?;
let actual = w