This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new dc07f9454 Add ObjectStore::list_with_offset (#3970) (#3973)
dc07f9454 is described below
commit dc07f9454251b42388c2a7cae8e3d65264d7130b
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 30 18:08:04 2023 +0100
Add ObjectStore::list_with_offset (#3970) (#3973)
* Stub out ObjectStore::list_with_offset (#3970)
* Add tests and add AWS implementation
* Update localstack
* Add further implementations
---
.github/workflows/object_store.yml | 2 +-
object_store/src/aws/client.rs | 27 ++++++++---
object_store/src/aws/mod.rs | 19 +++++++-
object_store/src/chunked.rs | 8 ++++
object_store/src/lib.rs | 91 +++++++++++++++++++++++++++++++++++++-
object_store/src/limit.rs | 10 +++++
object_store/src/throttle.rs | 61 ++++++++++++++-----------
7 files changed, 181 insertions(+), 37 deletions(-)
diff --git a/.github/workflows/object_store.yml
b/.github/workflows/object_store.yml
index f182d21ee..8e97c4440 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -106,7 +106,7 @@ jobs:
AWS_SECRET_ACCESS_KEY: test
AWS_ENDPOINT: http://localhost:4566
run: |
- docker run -d -p 4566:4566 localstack/localstack:0.14.4
+ docker run -d -p 4566:4566 localstack/localstack:2.0
docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2
--imdsv2
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index bd58d0967..7ac4b705b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -382,6 +382,7 @@ impl S3Client {
prefix: Option<&str>,
delimiter: bool,
token: Option<&str>,
+ offset: Option<&str>,
) -> Result<(ListResult, Option<String>)> {
let credential = self.get_credential().await?;
let url = self.config.bucket_endpoint.clone();
@@ -403,6 +404,10 @@ impl S3Client {
query.push(("prefix", prefix))
}
+ if let Some(offset) = offset {
+ query.push(("start-after", offset))
+ }
+
let response = self
.client
.request(Method::GET, &url)
@@ -433,14 +438,24 @@ impl S3Client {
&self,
prefix: Option<&Path>,
delimiter: bool,
+ offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>> {
+ let offset = offset.map(|x| x.to_string());
let prefix = format_prefix(prefix);
- stream_paginated(prefix, move |prefix, token| async move {
- let (r, next_token) = self
- .list_request(prefix.as_deref(), delimiter, token.as_deref())
- .await?;
- Ok((r, prefix, next_token))
- })
+ stream_paginated(
+ (prefix, offset),
+ move |(prefix, offset), token| async move {
+ let (r, next_token) = self
+ .list_request(
+ prefix.as_deref(),
+ delimiter,
+ token.as_deref(),
+ offset.as_deref(),
+ )
+ .await?;
+ Ok((r, (prefix, offset), next_token))
+ },
+ )
.boxed()
}
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 752fb2e7d..1e302e688 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -273,7 +273,22 @@ impl ObjectStore for AmazonS3 {
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.client
- .list_paginated(prefix, false)
+ .list_paginated(prefix, false, None)
+ .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+ .try_flatten()
+ .boxed();
+
+ Ok(stream)
+ }
+
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let stream = self
+ .client
+ .list_paginated(prefix, false, Some(offset))
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();
@@ -282,7 +297,7 @@ impl ObjectStore for AmazonS3 {
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
- let mut stream = self.client.list_paginated(prefix, true);
+ let mut stream = self.client.list_paginated(prefix, true, None);
let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 76865ef96..aebefec61 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -174,6 +174,14 @@ impl ObjectStore for ChunkedStore {
self.inner.list(prefix).await
}
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ self.inner.list_with_offset(prefix, offset).await
+ }
+
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 706cc0766..573707128 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -258,7 +258,7 @@ use crate::util::{coalesce_ranges, collect_bytes,
OBJECT_STORE_COALESCE_DEFAULT}
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
-use futures::{stream::BoxStream, StreamExt};
+use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
@@ -371,11 +371,33 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a
prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
+ ///
+ /// Note: the order of returned [`ObjectMeta`] is not guaranteed
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+ /// List all the objects with the given prefix and a location greater than
`offset`
+ ///
+ /// Some stores, such as S3 and GCS, may be able to push `offset` down to
reduce
+ /// the number of network requests required
+ ///
+ /// Note: the order of returned [`ObjectMeta`] is not guaranteed
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let offset = offset.clone();
+ let stream = self
+ .list(prefix)
+ .await?
+ .try_filter(move |f| futures::future::ready(f.location > offset))
+ .boxed();
+ Ok(stream)
+ }
+
/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
@@ -477,6 +499,14 @@ impl ObjectStore for Box<dyn ObjectStore> {
self.as_ref().list(prefix).await
}
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ self.as_ref().list_with_offset(prefix, offset).await
+ }
+
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
self.as_ref().list_with_delimiter(prefix).await
}
@@ -926,6 +956,65 @@ mod tests {
let files = flatten_list_stream(storage, None).await.unwrap();
assert!(files.is_empty(), "{files:?}");
+
+ // Test list order
+ let files = vec![
+ Path::from("a a/b.file"),
+ Path::parse("a%2Fa.file").unwrap(),
+ Path::from("a/😀.file"),
+ Path::from("a/a file"),
+ Path::parse("a/a%2F.file").unwrap(),
+ Path::from("a/a.file"),
+ Path::from("a/a/b.file"),
+ Path::from("a/b.file"),
+ Path::from("aa/a.file"),
+ Path::from("ab/a.file"),
+ ];
+
+ for file in &files {
+ storage.put(file, "foo".into()).await.unwrap();
+ }
+
+ let cases = [
+ (None, Path::from("a")),
+ (None, Path::from("a/a file")),
+ (None, Path::from("a/a/b.file")),
+ (None, Path::from("ab/a.file")),
+ (None, Path::from("a%2Fa.file")),
+ (None, Path::from("a/😀.file")),
+ (Some(Path::from("a")), Path::from("")),
+ (Some(Path::from("a")), Path::from("a")),
+ (Some(Path::from("a")), Path::from("a/😀")),
+ (Some(Path::from("a")), Path::from("a/😀.file")),
+ (Some(Path::from("a")), Path::from("a/b")),
+ (Some(Path::from("a")), Path::from("a/a/b.file")),
+ ];
+
+ for (prefix, offset) in cases {
+ let s = storage
+ .list_with_offset(prefix.as_ref(), &offset)
+ .await
+ .unwrap();
+
+ let mut actual: Vec<_> =
+ s.map_ok(|x| x.location).try_collect().await.unwrap();
+
+ actual.sort_unstable();
+
+ let expected: Vec<_> = files
+ .iter()
+ .cloned()
+ .filter(|x| {
+ let prefix_match =
+ prefix.as_ref().map(|p|
x.prefix_matches(p)).unwrap_or(true);
+ prefix_match && x > &offset
+ })
+ .collect();
+
+ assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
+ }
+
+ delete_fixtures(storage).await;
}
fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index b3e55a918..d0d9f73c5 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -147,6 +147,16 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
Ok(PermitWrapper::new(s, permit).boxed())
}
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
+ let s = self.inner.list_with_offset(prefix, offset).await?;
+ Ok(PermitWrapper::new(s, permit).boxed())
+ }
+
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.list_with_delimiter(prefix).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 6dff64aab..e51303114 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -24,7 +24,7 @@ use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore,
Result};
use async_trait::async_trait;
use bytes::Bytes;
-use futures::{stream::BoxStream, StreamExt};
+use futures::{stream::BoxStream, FutureExt, StreamExt};
use std::time::Duration;
use tokio::io::AsyncWrite;
@@ -185,19 +185,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
GetResult::File(_, _) => unimplemented!(),
};
- GetResult::Stream(
- s.then(move |bytes_result| async move {
- match bytes_result {
- Ok(bytes) => {
- let bytes_len: u32 =
usize_to_u32_saturate(bytes.len());
- sleep(wait_get_per_byte * bytes_len).await;
- Ok(bytes)
- }
- Err(err) => Err(err),
- }
- })
- .boxed(),
- )
+ GetResult::Stream(throttle_stream(s, move |bytes| {
+ let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
+ wait_get_per_byte * bytes_len
+ }))
})
}
@@ -247,20 +238,21 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
// need to copy to avoid moving / referencing `self`
let wait_list_per_entry = self.config().wait_list_per_entry;
+ let stream = self.inner.list(prefix).await?;
+ Ok(throttle_stream(stream, move |_| wait_list_per_entry))
+ }
- self.inner.list(prefix).await.map(|stream| {
- stream
- .then(move |result| async move {
- match result {
- Ok(entry) => {
- sleep(wait_list_per_entry).await;
- Ok(entry)
- }
- Err(err) => Err(err),
- }
- })
- .boxed()
- })
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ sleep(self.config().wait_list_per_call).await;
+
+ // need to copy to avoid moving / referencing `self`
+ let wait_list_per_entry = self.config().wait_list_per_entry;
+ let stream = self.inner.list_with_offset(prefix, offset).await?;
+ Ok(throttle_stream(stream, move |_| wait_list_per_entry))
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -307,6 +299,21 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
x.try_into().unwrap_or(u32::MAX)
}
+fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
+ stream: BoxStream<'_, Result<T, E>>,
+ delay: F,
+) -> BoxStream<'_, Result<T, E>>
+where
+ F: Fn(&T) -> Duration + Send + Sync + 'static,
+{
+ stream
+ .then(move |result| {
+ let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
+ sleep(delay).then(|_| futures::future::ready(result))
+ })
+ .boxed()
+}
+
#[cfg(test)]
mod tests {
use super::*;