This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new de8a6366 feat(services/s3): Add `start-after` support for list (#2096)
de8a6366 is described below

commit de8a6366823b3bdc4c5233fde1ccf16f4cfa8ad7
Author: Lex Cao <[email protected]>
AuthorDate: Tue Apr 25 00:06:27 2023 +0800

    feat(services/s3): Add `start-after` support for list (#2096)
    
    Closes #2095
    This PR introduces new features:
    1. add `list_with_start_after` capability
    2. add `test_list_with_start_after` test case
---
 core/src/services/s3/backend.rs | 13 ++++++++++--
 core/src/services/s3/core.rs    |  6 ++++++
 core/src/services/s3/pager.rs   | 18 +++++++++++++++--
 core/tests/behavior/list.rs     | 44 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 0adf125a..e0f2af72 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -927,6 +927,9 @@ impl Accessor for S3Backend {
                 write_without_content_length: true,
 
                 list: true,
+                list_with_limit: true,
+                list_with_start_after: true,
+
                 scan: true,
                 copy: true,
                 presign: true,
@@ -1042,14 +1045,20 @@ impl Accessor for S3Backend {
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         Ok((
             RpList::default(),
-            S3Pager::new(self.core.clone(), path, "/", args.limit()),
+            S3Pager::new(
+                self.core.clone(),
+                path,
+                "/",
+                args.limit(),
+                args.start_after(),
+            ),
         ))
     }
 
     async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::Pager)> {
         Ok((
             RpScan::default(),
-            S3Pager::new(self.core.clone(), path, "", args.limit()),
+            S3Pager::new(self.core.clone(), path, "", args.limit(), None),
         ))
     }
 
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 120d6ec3..0081656f 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -454,6 +454,7 @@ impl S3Core {
         continuation_token: &str,
         delimiter: &str,
         limit: Option<usize>,
+        start_after: Option<String>,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
@@ -468,6 +469,11 @@ impl S3Core {
         if let Some(limit) = limit {
             write!(url, "&max-keys={limit}").expect("write into string must 
succeed");
         }
+        if let Some(start_after) = start_after {
+            let start_after = build_abs_path(&self.root, &start_after);
+            write!(url, "&start-after={}", percent_encode_path(&start_after))
+                .expect("write into string must succeed");
+        }
         if !continuation_token.is_empty() {
             // AWS S3 could return continuation-token that contains `=`
             // which could lead `reqsign` parse query wrongly.
diff --git a/core/src/services/s3/pager.rs b/core/src/services/s3/pager.rs
index 250516df..c8e98c16 100644
--- a/core/src/services/s3/pager.rs
+++ b/core/src/services/s3/pager.rs
@@ -35,19 +35,27 @@ pub struct S3Pager {
     path: String,
     delimiter: String,
     limit: Option<usize>,
+    start_after: Option<String>,
 
     token: String,
     done: bool,
 }
 
 impl S3Pager {
-    pub fn new(core: Arc<S3Core>, path: &str, delimiter: &str, limit: 
Option<usize>) -> Self {
+    pub fn new(
+        core: Arc<S3Core>,
+        path: &str,
+        delimiter: &str,
+        limit: Option<usize>,
+        start_after: Option<&str>,
+    ) -> Self {
         Self {
             core,
 
             path: path.to_string(),
             delimiter: delimiter.to_string(),
             limit,
+            start_after: start_after.map(String::from),
 
             token: "".to_string(),
             done: false,
@@ -64,7 +72,13 @@ impl oio::Page for S3Pager {
 
         let resp = self
             .core
-            .s3_list_objects(&self.path, &self.token, &self.delimiter, 
self.limit)
+            .s3_list_objects(
+                &self.path,
+                &self.token,
+                &self.delimiter,
+                self.limit,
+                self.start_after.clone(),
+            )
             .await?;
 
         if resp.status() != http::StatusCode::OK {
diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs
index 648818dd..3324412e 100644
--- a/core/tests/behavior/list.rs
+++ b/core/tests/behavior/list.rs
@@ -23,6 +23,7 @@ use futures::stream::FuturesUnordered;
 use futures::StreamExt;
 use futures::TryStreamExt;
 use log::debug;
+use opendal::ops::OpList;
 use opendal::EntryMode;
 use opendal::ErrorKind;
 use opendal::Operator;
@@ -78,6 +79,7 @@ macro_rules! behavior_list_tests {
                 test_list_sub_dir,
                 test_list_nested_dir,
                 test_list_dir_with_file_path,
+                test_list_with_start_after,
                 test_scan,
                 test_scan_root,
                 test_remove_all,
@@ -278,6 +280,48 @@ pub async fn test_list_dir_with_file_path(op: Operator) -> 
Result<()> {
     Ok(())
 }
 
+/// List with start after should start listing after the specified key
+pub async fn test_list_with_start_after(op: Operator) -> Result<()> {
+    if !op.info().capability().list_with_start_after {
+        return Ok(());
+    }
+
+    let dir = &format!("{}/", uuid::Uuid::new_v4());
+    op.create_dir(dir).await?;
+
+    let given: Vec<String> = vec!["file-0", "file-1", "file-2", "file-3", 
"file-4", "file-5"]
+        .iter()
+        .map(|name| format!("{dir}{name}-{}", uuid::Uuid::new_v4()))
+        .collect();
+
+    given
+        .iter()
+        .map(|name| async {
+            op.write(name, "content")
+                .await
+                .expect("create must succeed");
+        })
+        .collect::<FuturesUnordered<_>>()
+        .collect::<Vec<_>>()
+        .await;
+
+    let option = OpList::new().with_start_after(&given[2]);
+    let mut objects = op.list_with(dir, option).await?;
+    let mut actual = vec![];
+    while let Some(o) = objects.try_next().await? {
+        let path = o.path().to_string();
+        actual.push(path)
+    }
+
+    let expected: Vec<String> = given.into_iter().skip(3).collect();
+
+    assert_eq!(expected, actual);
+
+    op.remove_all(dir).await?;
+
+    Ok(())
+}
+
 pub async fn test_scan_root(op: Operator) -> Result<()> {
     let w = op.scan("").await?;
     let actual = w

Reply via email to