This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new dc07f9454 Add ObjectStore::list_with_offset (#3970) (#3973)
dc07f9454 is described below

commit dc07f9454251b42388c2a7cae8e3d65264d7130b
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 30 18:08:04 2023 +0100

    Add ObjectStore::list_with_offset (#3970) (#3973)
    
    * Stub out ObjectStore::list_with_offset (#3970)
    
    * Add tests and add AWS implementation
    
    * Update localstack
    
    * Add further implementations
---
 .github/workflows/object_store.yml |  2 +-
 object_store/src/aws/client.rs     | 27 ++++++++---
 object_store/src/aws/mod.rs        | 19 +++++++-
 object_store/src/chunked.rs        |  8 ++++
 object_store/src/lib.rs            | 91 +++++++++++++++++++++++++++++++++++++-
 object_store/src/limit.rs          | 10 +++++
 object_store/src/throttle.rs       | 61 ++++++++++++++-----------
 7 files changed, 181 insertions(+), 37 deletions(-)

diff --git a/.github/workflows/object_store.yml 
b/.github/workflows/object_store.yml
index f182d21ee..8e97c4440 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -106,7 +106,7 @@ jobs:
           AWS_SECRET_ACCESS_KEY: test
           AWS_ENDPOINT: http://localhost:4566
         run: |
-          docker run -d -p 4566:4566 localstack/localstack:0.14.4
+          docker run -d -p 4566:4566 localstack/localstack:2.0
           docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 
--imdsv2
           aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
 
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index bd58d0967..7ac4b705b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -382,6 +382,7 @@ impl S3Client {
         prefix: Option<&str>,
         delimiter: bool,
         token: Option<&str>,
+        offset: Option<&str>,
     ) -> Result<(ListResult, Option<String>)> {
         let credential = self.get_credential().await?;
         let url = self.config.bucket_endpoint.clone();
@@ -403,6 +404,10 @@ impl S3Client {
             query.push(("prefix", prefix))
         }
 
+        if let Some(offset) = offset {
+            query.push(("start-after", offset))
+        }
+
         let response = self
             .client
             .request(Method::GET, &url)
@@ -433,14 +438,24 @@ impl S3Client {
         &self,
         prefix: Option<&Path>,
         delimiter: bool,
+        offset: Option<&Path>,
     ) -> BoxStream<'_, Result<ListResult>> {
+        let offset = offset.map(|x| x.to_string());
         let prefix = format_prefix(prefix);
-        stream_paginated(prefix, move |prefix, token| async move {
-            let (r, next_token) = self
-                .list_request(prefix.as_deref(), delimiter, token.as_deref())
-                .await?;
-            Ok((r, prefix, next_token))
-        })
+        stream_paginated(
+            (prefix, offset),
+            move |(prefix, offset), token| async move {
+                let (r, next_token) = self
+                    .list_request(
+                        prefix.as_deref(),
+                        delimiter,
+                        token.as_deref(),
+                        offset.as_deref(),
+                    )
+                    .await?;
+                Ok((r, (prefix, offset), next_token))
+            },
+        )
         .boxed()
     }
 
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 752fb2e7d..1e302e688 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -273,7 +273,22 @@ impl ObjectStore for AmazonS3 {
     ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
         let stream = self
             .client
-            .list_paginated(prefix, false)
+            .list_paginated(prefix, false, None)
+            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+            .try_flatten()
+            .boxed();
+
+        Ok(stream)
+    }
+
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let stream = self
+            .client
+            .list_paginated(prefix, false, Some(offset))
             .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
             .try_flatten()
             .boxed();
@@ -282,7 +297,7 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
-        let mut stream = self.client.list_paginated(prefix, true);
+        let mut stream = self.client.list_paginated(prefix, true, None);
 
         let mut common_prefixes = BTreeSet::new();
         let mut objects = Vec::new();
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 76865ef96..aebefec61 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -174,6 +174,14 @@ impl ObjectStore for ChunkedStore {
         self.inner.list(prefix).await
     }
 
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        self.inner.list_with_offset(prefix, offset).await
+    }
+
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
         self.inner.list_with_delimiter(prefix).await
     }
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 706cc0766..573707128 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -258,7 +258,7 @@ use crate::util::{coalesce_ranges, collect_bytes, 
OBJECT_STORE_COALESCE_DEFAULT}
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
-use futures::{stream::BoxStream, StreamExt};
+use futures::{stream::BoxStream, StreamExt, TryStreamExt};
 use snafu::Snafu;
 use std::fmt::{Debug, Formatter};
 #[cfg(not(target_arch = "wasm32"))]
@@ -371,11 +371,33 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     ///
     /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a 
prefix of `foo/bar/x` but not of
     /// `foo/bar_baz/x`.
+    ///
+    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
     async fn list(
         &self,
         prefix: Option<&Path>,
     ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
 
+    /// List all the objects with the given prefix and a location greater than 
`offset`
+    ///
+    /// Some stores, such as S3 and GCS, may be able to push `offset` down to 
reduce
+    /// the number of network requests required
+    ///
+    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let offset = offset.clone();
+        let stream = self
+            .list(prefix)
+            .await?
+            .try_filter(move |f| futures::future::ready(f.location > offset))
+            .boxed();
+        Ok(stream)
+    }
+
     /// List objects with the given prefix and an implementation specific
     /// delimiter. Returns common prefixes (directories) in addition to object
     /// metadata.
@@ -477,6 +499,14 @@ impl ObjectStore for Box<dyn ObjectStore> {
         self.as_ref().list(prefix).await
     }
 
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        self.as_ref().list_with_offset(prefix, offset).await
+    }
+
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
         self.as_ref().list_with_delimiter(prefix).await
     }
@@ -926,6 +956,65 @@ mod tests {
 
         let files = flatten_list_stream(storage, None).await.unwrap();
         assert!(files.is_empty(), "{files:?}");
+
+        // Test list order
+        let files = vec![
+            Path::from("a a/b.file"),
+            Path::parse("a%2Fa.file").unwrap(),
+            Path::from("a/😀.file"),
+            Path::from("a/a file"),
+            Path::parse("a/a%2F.file").unwrap(),
+            Path::from("a/a.file"),
+            Path::from("a/a/b.file"),
+            Path::from("a/b.file"),
+            Path::from("aa/a.file"),
+            Path::from("ab/a.file"),
+        ];
+
+        for file in &files {
+            storage.put(file, "foo".into()).await.unwrap();
+        }
+
+        let cases = [
+            (None, Path::from("a")),
+            (None, Path::from("a/a file")),
+            (None, Path::from("a/a/b.file")),
+            (None, Path::from("ab/a.file")),
+            (None, Path::from("a%2Fa.file")),
+            (None, Path::from("a/😀.file")),
+            (Some(Path::from("a")), Path::from("")),
+            (Some(Path::from("a")), Path::from("a")),
+            (Some(Path::from("a")), Path::from("a/😀")),
+            (Some(Path::from("a")), Path::from("a/😀.file")),
+            (Some(Path::from("a")), Path::from("a/b")),
+            (Some(Path::from("a")), Path::from("a/a/b.file")),
+        ];
+
+        for (prefix, offset) in cases {
+            let s = storage
+                .list_with_offset(prefix.as_ref(), &offset)
+                .await
+                .unwrap();
+
+            let mut actual: Vec<_> =
+                s.map_ok(|x| x.location).try_collect().await.unwrap();
+
+            actual.sort_unstable();
+
+            let expected: Vec<_> = files
+                .iter()
+                .cloned()
+                .filter(|x| {
+                    let prefix_match =
+                        prefix.as_ref().map(|p| 
x.prefix_matches(p)).unwrap_or(true);
+                    prefix_match && x > &offset
+                })
+                .collect();
+
+            assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
+        }
+
+        delete_fixtures(storage).await;
     }
 
     fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index b3e55a918..d0d9f73c5 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -147,6 +147,16 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         Ok(PermitWrapper::new(s, permit).boxed())
     }
 
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
+        let s = self.inner.list_with_offset(prefix, offset).await?;
+        Ok(PermitWrapper::new(s, permit).boxed())
+    }
+
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
         let _permit = self.semaphore.acquire().await.unwrap();
         self.inner.list_with_delimiter(prefix).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 6dff64aab..e51303114 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -24,7 +24,7 @@ use crate::MultipartId;
 use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, 
Result};
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::{stream::BoxStream, StreamExt};
+use futures::{stream::BoxStream, FutureExt, StreamExt};
 use std::time::Duration;
 use tokio::io::AsyncWrite;
 
@@ -185,19 +185,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
                 GetResult::File(_, _) => unimplemented!(),
             };
 
-            GetResult::Stream(
-                s.then(move |bytes_result| async move {
-                    match bytes_result {
-                        Ok(bytes) => {
-                            let bytes_len: u32 = 
usize_to_u32_saturate(bytes.len());
-                            sleep(wait_get_per_byte * bytes_len).await;
-                            Ok(bytes)
-                        }
-                        Err(err) => Err(err),
-                    }
-                })
-                .boxed(),
-            )
+            GetResult::Stream(throttle_stream(s, move |bytes| {
+                let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
+                wait_get_per_byte * bytes_len
+            }))
         })
     }
 
@@ -247,20 +238,21 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
 
         // need to copy to avoid moving / referencing `self`
         let wait_list_per_entry = self.config().wait_list_per_entry;
+        let stream = self.inner.list(prefix).await?;
+        Ok(throttle_stream(stream, move |_| wait_list_per_entry))
+    }
 
-        self.inner.list(prefix).await.map(|stream| {
-            stream
-                .then(move |result| async move {
-                    match result {
-                        Ok(entry) => {
-                            sleep(wait_list_per_entry).await;
-                            Ok(entry)
-                        }
-                        Err(err) => Err(err),
-                    }
-                })
-                .boxed()
-        })
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        sleep(self.config().wait_list_per_call).await;
+
+        // need to copy to avoid moving / referencing `self`
+        let wait_list_per_entry = self.config().wait_list_per_entry;
+        let stream = self.inner.list_with_offset(prefix, offset).await?;
+        Ok(throttle_stream(stream, move |_| wait_list_per_entry))
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -307,6 +299,21 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
     x.try_into().unwrap_or(u32::MAX)
 }
 
+fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
+    stream: BoxStream<'_, Result<T, E>>,
+    delay: F,
+) -> BoxStream<'_, Result<T, E>>
+where
+    F: Fn(&T) -> Duration + Send + Sync + 'static,
+{
+    stream
+        .then(move |result| {
+            let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
+            sleep(delay).then(|_| futures::future::ready(result))
+        })
+        .boxed()
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to