This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fa7a61a4b0 Remove Nested async and Fallibility from ObjectStore::list
(#4930)
fa7a61a4b0 is described below
commit fa7a61a4b074ca4ec9bf429cc84b6c325057d96e
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Oct 17 22:10:31 2023 +0100
Remove Nested async and Fallibility from ObjectStore::list (#4930)
* Remove nested async and fallibility from ObjectStore::list
* Clippy
* Update limit test
* Update docs
---
object_store/src/aws/mod.rs | 13 +--
object_store/src/azure/mod.rs | 7 +-
object_store/src/chunked.rs | 13 +--
object_store/src/client/list.rs | 32 ++-----
object_store/src/gcp/mod.rs | 7 +-
object_store/src/http/mod.rs | 24 ++---
object_store/src/lib.rs | 178 ++++++++++++++---------------------
object_store/src/limit.rs | 44 +++++----
object_store/src/local.rs | 82 +++++++---------
object_store/src/memory.rs | 7 +-
object_store/src/prefix.rs | 17 ++--
object_store/src/throttle.rs | 47 ++++-----
object_store/tests/get_range_file.rs | 5 +-
13 files changed, 197 insertions(+), 279 deletions(-)
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 3ddce08002..d3c50861c1 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -331,19 +331,16 @@ impl ObjectStore for AmazonS3 {
.boxed()
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.client.list(prefix).await
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ self.client.list(prefix)
}
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.client.list_with_offset(prefix, offset).await
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
+ self.client.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 190b73bf94..2a08c67758 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -206,11 +206,8 @@ impl ObjectStore for MicrosoftAzure {
self.client.delete_request(location, &()).await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.client.list(prefix).await
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ self.client.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 008dec6794..d3e02b4127 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -147,19 +147,16 @@ impl ObjectStore for ChunkedStore {
self.inner.delete(location).await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.inner.list(prefix).await
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ self.inner.list(prefix)
}
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.inner.list_with_offset(prefix, offset).await
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
+ self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
index b2dbee27f1..371894dfeb 100644
--- a/object_store/src/client/list.rs
+++ b/object_store/src/client/list.rs
@@ -46,16 +46,13 @@ pub trait ListClientExt {
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>>;
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+ ) -> BoxStream<'_, Result<ObjectMeta>>;
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult>;
}
@@ -90,31 +87,22 @@ impl<T: ListClient> ListClientExt for T {
.boxed()
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let stream = self
- .list_paginated(prefix, false, None)
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ self.list_paginated(prefix, false, None)
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
- .boxed();
-
- Ok(stream)
+ .boxed()
}
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let stream = self
- .list_paginated(prefix, false, Some(offset))
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
+ self.list_paginated(prefix, false, Some(offset))
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
- .boxed();
-
- Ok(stream)
+ .boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index a75527fe7b..513e396cba 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -601,11 +601,8 @@ impl ObjectStore for GoogleCloudStorage {
self.client.delete_request(location).await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.client.list(prefix).await
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ self.client.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 6ffb623589..2fd7850b6b 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -34,7 +34,7 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
-use futures::StreamExt;
+use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
@@ -122,14 +122,13 @@ impl ObjectStore for HttpStore {
self.client.delete(location).await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
- let status = self.client.list(prefix, "infinity").await?;
- Ok(futures::stream::iter(
- status
+ let prefix = prefix.cloned();
+ futures::stream::once(async move {
+ let status = self.client.list(prefix.as_ref(), "infinity").await?;
+
+ let iter = status
.response
.into_iter()
.filter(|r| !r.is_dir())
@@ -138,9 +137,12 @@ impl ObjectStore for HttpStore {
response.object_meta(self.client.base_url())
})
// Filter out exact prefix matches
- .filter_ok(move |r| r.location.as_ref().len() > prefix_len),
- )
- .boxed())
+ .filter_ok(move |r| r.location.as_ref().len() > prefix_len);
+
+ Ok::<_, crate::Error>(futures::stream::iter(iter))
+ })
+ .try_flatten()
+ .boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index b79042e3cd..9b396444fa 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -95,18 +95,18 @@
//!
//! ```
//! # use object_store::local::LocalFileSystem;
+//! # use std::sync::Arc;
+//! # use object_store::{path::Path, ObjectStore};
+//! # use futures::stream::StreamExt;
//! # // use LocalFileSystem for example
-//! # fn get_object_store() -> LocalFileSystem {
-//! # LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(LocalFileSystem::new())
//! # }
-//!
+//! #
//! # async fn example() {
-//! use std::sync::Arc;
-//! use object_store::{path::Path, ObjectStore};
-//! use futures::stream::StreamExt;
-//!
+//! #
//! // create an ObjectStore
-//! let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//!
//! // Recursively list all files below the 'data' path.
//! // 1. On AWS S3 this would be the 'data/' prefix
@@ -114,21 +114,12 @@
//! let prefix: Path = "data".try_into().unwrap();
//!
//! // Get an `async` stream of Metadata objects:
-//! let list_stream = object_store
-//! .list(Some(&prefix))
-//! .await
-//! .expect("Error listing files");
+//! let mut list_stream = object_store.list(Some(&prefix));
//!
-//! // Print a line about each object based on its metadata
-//! // using for_each from `StreamExt` trait.
-//! list_stream
-//! .for_each(move |meta| {
-//! async {
-//! let meta = meta.expect("Error listing");
-//! println!("Name: {}, size: {}", meta.location, meta.size);
-//! }
-//! })
-//! .await;
+//! // Print a line about each object
+//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
+//! println!("Name: {}, size: {}", meta.location, meta.size);
+//! }
//! # }
//! ```
//!
@@ -147,19 +138,18 @@
//! from remote storage or files in the local filesystem as a stream.
//!
//! ```
+//! # use futures::TryStreamExt;
//! # use object_store::local::LocalFileSystem;
-//! # // use LocalFileSystem for example
-//! # fn get_object_store() -> LocalFileSystem {
-//! # LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # use std::sync::Arc;
+//! # use object_store::{path::Path, ObjectStore};
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(LocalFileSystem::new())
//! # }
-//!
+//! #
//! # async fn example() {
-//! use std::sync::Arc;
-//! use object_store::{path::Path, ObjectStore};
-//! use futures::stream::StreamExt;
-//!
+//! #
//! // create an ObjectStore
-//! let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//!
//! // Retrieve a specific file
//! let path: Path = "data/file01.parquet".try_into().unwrap();
@@ -171,16 +161,11 @@
//! .unwrap()
//! .into_stream();
//!
-//! // Count the '0's using `map` from `StreamExt` trait
+//! // Count the '0's using `try_fold` from `TryStreamExt` trait
//! let num_zeros = stream
-//! .map(|bytes| {
-//! let bytes = bytes.unwrap();
-//! bytes.iter().filter(|b| **b == 0).count()
-//! })
-//! .collect::<Vec<usize>>()
-//! .await
-//! .into_iter()
-//! .sum::<usize>();
+//! .try_fold(0, |acc, bytes| async move {
+//! Ok(acc + bytes.iter().filter(|b| **b == 0).count())
+//! }).await.unwrap();
//!
//! println!("Num zeros in {} is {}", path, num_zeros);
//! # }
@@ -196,22 +181,19 @@
//!
//! ```
//! # use object_store::local::LocalFileSystem;
-//! # fn get_object_store() -> LocalFileSystem {
-//! # LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn put() {
-//! use object_store::ObjectStore;
-//! use std::sync::Arc;
-//! use bytes::Bytes;
-//! use object_store::path::Path;
-//!
-//! let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! #
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/file1".try_into().unwrap();
//! let bytes = Bytes::from_static(b"hello");
-//! object_store
-//! .put(&path, bytes)
-//! .await
-//! .unwrap();
+//! object_store.put(&path, bytes).await.unwrap();
//! # }
//! ```
//!
@@ -220,22 +202,20 @@
//!
//! ```
//! # use object_store::local::LocalFileSystem;
-//! # fn get_object_store() -> LocalFileSystem {
-//! # LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn multi_upload() {
-//! use object_store::ObjectStore;
-//! use std::sync::Arc;
-//! use bytes::Bytes;
-//! use tokio::io::AsyncWriteExt;
-//! use object_store::path::Path;
-//!
-//! let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! #
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store
-//! .put_multipart(&path)
-//! .await
-//! .unwrap();
+//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
+//!
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
@@ -439,23 +419,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// return Ok. If it is an error, it will be [`Error::NotFound`].
///
/// ```
+ /// # use futures::{StreamExt, TryStreamExt};
/// # use object_store::local::LocalFileSystem;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # let root = tempfile::TempDir::new().unwrap();
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
- /// use object_store::{ObjectStore, ObjectMeta};
- /// use object_store::path::Path;
- /// use futures::{StreamExt, TryStreamExt};
- /// use bytes::Bytes;
- ///
+ /// # use object_store::{ObjectStore, ObjectMeta};
+ /// # use object_store::path::Path;
+ /// # use futures::{StreamExt, TryStreamExt};
+ /// # use bytes::Bytes;
+ /// #
/// // Create two objects
/// store.put(&Path::from("foo"), Bytes::from("foo")).await?;
/// store.put(&Path::from("bar"), Bytes::from("bar")).await?;
///
/// // List object
- /// let locations = store.list(None).await?
- /// .map(|meta: Result<ObjectMeta, _>| meta.map(|m| m.location))
- /// .boxed();
+ /// let locations = store.list(None).map_ok(|m| m.location).boxed();
///
/// // Delete them
/// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
@@ -484,10 +463,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// `foo/bar_baz/x`.
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
/// List all the objects with the given prefix and a location greater than
`offset`
///
@@ -495,18 +471,15 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// the number of network requests required
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
let offset = offset.clone();
- let stream = self
- .list(prefix)
- .await?
+ self.list(prefix)
.try_filter(move |f| futures::future::ready(f.location > offset))
- .boxed();
- Ok(stream)
+ .boxed()
}
/// List objects with the given prefix and an implementation specific
@@ -624,19 +597,16 @@ macro_rules! as_ref_impl {
self.as_ref().delete_stream(locations)
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.as_ref().list(prefix).await
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_,
Result<ObjectMeta>> {
+ self.as_ref().list(prefix)
}
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- self.as_ref().list_with_offset(prefix, offset).await
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
+ self.as_ref().list_with_offset(prefix, offset)
}
async fn list_with_delimiter(
@@ -973,7 +943,6 @@ mod test_util {
) -> Result<Vec<Path>> {
storage
.list(prefix)
- .await?
.map_ok(|meta| meta.location)
.try_collect::<Vec<Path>>()
.await
@@ -1264,11 +1233,7 @@ mod tests {
];
for (prefix, offset) in cases {
- let s = storage
- .list_with_offset(prefix.as_ref(), &offset)
- .await
- .unwrap();
-
+ let s = storage.list_with_offset(prefix.as_ref(), &offset);
let mut actual: Vec<_> =
s.map_ok(|x| x.location).try_collect().await.unwrap();
@@ -1700,12 +1665,7 @@ mod tests {
}
async fn delete_fixtures(storage: &DynObjectStore) {
- let paths = storage
- .list(None)
- .await
- .unwrap()
- .map_ok(|meta| meta.location)
- .boxed();
+ let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
storage
.delete_stream(paths)
.try_collect::<Vec<_>>()
@@ -1714,18 +1674,18 @@ mod tests {
}
/// Test that the returned stream does not borrow the lifetime of Path
- async fn list_store<'a, 'b>(
+ fn list_store<'a>(
store: &'a dyn ObjectStore,
- path_str: &'b str,
- ) -> super::Result<BoxStream<'a, super::Result<ObjectMeta>>> {
+ path_str: &str,
+ ) -> BoxStream<'a, Result<ObjectMeta>> {
let path = Path::from(path_str);
- store.list(Some(&path)).await
+ store.list(Some(&path))
}
#[tokio::test]
async fn test_list_lifetimes() {
let store = memory::InMemory::new();
- let mut stream = list_store(&store, "path").await.unwrap();
+ let mut stream = list_store(&store, "path");
assert!(stream.next().await.is_none());
}
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index a9b8c4b050..00cbce023c 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -23,7 +23,7 @@ use crate::{
};
use async_trait::async_trait;
use bytes::Bytes;
-use futures::Stream;
+use futures::{FutureExt, Stream};
use std::io::{Error, IoSlice};
use std::ops::Range;
use std::pin::Pin;
@@ -147,23 +147,31 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.delete_stream(locations)
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
- let s = self.inner.list(prefix).await?;
- Ok(PermitWrapper::new(s, permit).boxed())
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ let prefix = prefix.cloned();
+ let fut = Arc::clone(&self.semaphore)
+ .acquire_owned()
+ .map(move |permit| {
+ let s = self.inner.list(prefix.as_ref());
+ PermitWrapper::new(s, permit.unwrap())
+ });
+ fut.into_stream().flatten().boxed()
}
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
- let s = self.inner.list_with_offset(prefix, offset).await?;
- Ok(PermitWrapper::new(s, permit).boxed())
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
+ let prefix = prefix.cloned();
+ let offset = offset.clone();
+ let fut = Arc::clone(&self.semaphore)
+ .acquire_owned()
+ .map(move |permit| {
+ let s = self.inner.list_with_offset(prefix.as_ref(), &offset);
+ PermitWrapper::new(s, permit.unwrap())
+ });
+ fut.into_stream().flatten().boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -272,6 +280,8 @@ mod tests {
use crate::memory::InMemory;
use crate::tests::*;
use crate::ObjectStore;
+ use futures::stream::StreamExt;
+ use std::pin::Pin;
use std::time::Duration;
use tokio::time::timeout;
@@ -290,19 +300,21 @@ mod tests {
let mut streams = Vec::with_capacity(max_requests);
for _ in 0..max_requests {
- let stream = integration.list(None).await.unwrap();
+ let mut stream = integration.list(None).peekable();
+ Pin::new(&mut stream).peek().await; // Ensure semaphore is acquired
streams.push(stream);
}
let t = Duration::from_millis(20);
// Expect to not be able to make another request
- assert!(timeout(t, integration.list(None)).await.is_err());
+ let fut = integration.list(None).collect::<Vec<_>>();
+ assert!(timeout(t, fut).await.is_err());
// Drop one of the streams
streams.pop();
// Can now make another request
- integration.list(None).await.unwrap();
+ integration.list(None).collect::<Vec<_>>().await;
}
}
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 3d4a02a1e9..38467c3a9e 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -420,14 +420,14 @@ impl ObjectStore for LocalFileSystem {
.await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
let config = Arc::clone(&self.config);
let root_path = match prefix {
- Some(prefix) => config.path_to_filesystem(prefix)?,
+ Some(prefix) => match config.path_to_filesystem(prefix) {
+ Ok(path) => path,
+ Err(e) => return
futures::future::ready(Err(e)).into_stream().boxed(),
+ },
None => self.config.root.to_file_path().unwrap(),
};
@@ -457,36 +457,34 @@ impl ObjectStore for LocalFileSystem {
// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
if tokio::runtime::Handle::try_current().is_err() {
- return Ok(futures::stream::iter(s).boxed());
+ return futures::stream::iter(s).boxed();
}
// Otherwise list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;
let buffer = VecDeque::with_capacity(CHUNK_SIZE);
- let stream =
- futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)|
async move {
- if buffer.is_empty() {
- (s, buffer) = tokio::task::spawn_blocking(move || {
- for _ in 0..CHUNK_SIZE {
- match s.next() {
- Some(r) => buffer.push_back(r),
- None => break,
- }
+ futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async
move {
+ if buffer.is_empty() {
+ (s, buffer) = tokio::task::spawn_blocking(move || {
+ for _ in 0..CHUNK_SIZE {
+ match s.next() {
+ Some(r) => buffer.push_back(r),
+ None => break,
}
- (s, buffer)
- })
- .await?;
- }
-
- match buffer.pop_front() {
- Some(Err(e)) => Err(e),
- Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
- None => Ok(None),
- }
- });
+ }
+ (s, buffer)
+ })
+ .await?;
+ }
- Ok(stream.boxed())
+ match buffer.pop_front() {
+ Some(Err(e)) => Err(e),
+ Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
+ None => Ok(None),
+ }
+ })
+ .boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -1138,21 +1136,14 @@ mod tests {
let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
- // `list` must fail
- match store.list(None).await {
- Err(_) => {
- // ok, error found
- }
- Ok(mut stream) => {
- let mut any_err = false;
- while let Some(res) = stream.next().await {
- if res.is_err() {
- any_err = true;
- }
- }
- assert!(any_err);
+ let mut stream = store.list(None);
+ let mut any_err = false;
+ while let Some(res) = stream.next().await {
+ if res.is_err() {
+ any_err = true;
}
}
+ assert!(any_err);
// `list_with_delimiter
assert!(store.list_with_delimiter(None).await.is_err());
@@ -1226,13 +1217,7 @@ mod tests {
prefix: Option<&Path>,
expected: &[&str],
) {
- let result: Vec<_> = integration
- .list(prefix)
- .await
- .unwrap()
- .try_collect()
- .await
- .unwrap();
+ let result: Vec<_> =
integration.list(prefix).try_collect().await.unwrap();
let mut strings: Vec<_> = result.iter().map(|x|
x.location.as_ref()).collect();
strings.sort_unstable();
@@ -1428,8 +1413,7 @@ mod tests {
std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
- let list_stream = integration.list(None).await.unwrap();
- let res: Vec<_> = list_stream.try_collect().await.unwrap();
+ let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
assert_eq!(res.len(), 1);
assert_eq!(res[0].location.as_ref(), filename);
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index f638ed6d7a..00b330b5eb 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -228,10 +228,7 @@ impl ObjectStore for InMemory {
Ok(())
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
let root = Path::default();
let prefix = prefix.unwrap_or(&root);
@@ -256,7 +253,7 @@ impl ObjectStore for InMemory {
})
.collect();
- Ok(futures::stream::iter(values).boxed())
+ futures::stream::iter(values).boxed()
}
/// The memory implementation returns all results, as opposed to the cloud
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 39585f73b6..3776dec2e8 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -144,24 +144,21 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.delete(&full_path).await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
- let s = self.inner.list(Some(&prefix)).await?;
- Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
+ let s = self.inner.list(Some(&prefix));
+ s.map_ok(|meta| self.strip_meta(meta)).boxed()
}
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
let offset = self.full_path(offset);
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
- let s = self.inner.list_with_offset(Some(&prefix), &offset).await?;
- Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
+ let s = self.inner.list_with_offset(Some(&prefix), &offset);
+ s.map_ok(|meta| self.strip_meta(meta)).boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 58c476ab45..f716a11f8a 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -233,29 +233,30 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.delete(location).await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- sleep(self.config().wait_list_per_call).await;
-
- // need to copy to avoid moving / referencing `self`
- let wait_list_per_entry = self.config().wait_list_per_entry;
- let stream = self.inner.list(prefix).await?;
- Ok(throttle_stream(stream, move |_| wait_list_per_entry))
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ let stream = self.inner.list(prefix);
+ futures::stream::once(async move {
+ let wait_list_per_entry = self.config().wait_list_per_entry;
+ sleep(self.config().wait_list_per_call).await;
+ throttle_stream(stream, move |_| wait_list_per_entry)
+ })
+ .flatten()
+ .boxed()
}
- async fn list_with_offset(
+ fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- sleep(self.config().wait_list_per_call).await;
-
- // need to copy to avoid moving / referencing `self`
- let wait_list_per_entry = self.config().wait_list_per_entry;
- let stream = self.inner.list_with_offset(prefix, offset).await?;
- Ok(throttle_stream(stream, move |_| wait_list_per_entry))
+ ) -> BoxStream<'_, Result<ObjectMeta>> {
+ let stream = self.inner.list_with_offset(prefix, offset);
+ futures::stream::once(async move {
+ let wait_list_per_entry = self.config().wait_list_per_entry;
+ sleep(self.config().wait_list_per_call).await;
+ throttle_stream(stream, move |_| wait_list_per_entry)
+ })
+ .flatten()
+ .boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -511,13 +512,7 @@ mod tests {
let prefix = Path::from("foo");
// clean up store
- let entries: Vec<_> = store
- .list(Some(&prefix))
- .await
- .unwrap()
- .try_collect()
- .await
- .unwrap();
+ let entries: Vec<_> =
store.list(Some(&prefix)).try_collect().await.unwrap();
for entry in entries {
store.delete(&entry.location).await.unwrap();
@@ -583,8 +578,6 @@ mod tests {
let t0 = Instant::now();
store
.list(Some(&prefix))
- .await
- .unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
diff --git a/object_store/tests/get_range_file.rs
b/object_store/tests/get_range_file.rs
index f926e3b07f..25c4692606 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -75,10 +75,7 @@ impl ObjectStore for MyStore {
todo!()
}
- async fn list(
- &self,
- _: Option<&Path>,
- ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
+ fn list(&self, _: Option<&Path>) -> BoxStream<'_,
object_store::Result<ObjectMeta>> {
todo!()
}