This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7a61a4b0 Remove Nested async and Fallibility from ObjectStore::list 
(#4930)
fa7a61a4b0 is described below

commit fa7a61a4b074ca4ec9bf429cc84b6c325057d96e
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Oct 17 22:10:31 2023 +0100

    Remove Nested async and Fallibility from ObjectStore::list (#4930)
    
    * Remove nested async and fallibility from ObjectStore::list
    
    * Clippy
    
    * Update limit test
    
    * Update docs
---
 object_store/src/aws/mod.rs          |  13 +--
 object_store/src/azure/mod.rs        |   7 +-
 object_store/src/chunked.rs          |  13 +--
 object_store/src/client/list.rs      |  32 ++-----
 object_store/src/gcp/mod.rs          |   7 +-
 object_store/src/http/mod.rs         |  24 ++---
 object_store/src/lib.rs              | 178 ++++++++++++++---------------------
 object_store/src/limit.rs            |  44 +++++----
 object_store/src/local.rs            |  82 +++++++---------
 object_store/src/memory.rs           |   7 +-
 object_store/src/prefix.rs           |  17 ++--
 object_store/src/throttle.rs         |  47 ++++-----
 object_store/tests/get_range_file.rs |   5 +-
 13 files changed, 197 insertions(+), 279 deletions(-)

diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 3ddce08002..d3c50861c1 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -331,19 +331,16 @@ impl ObjectStore for AmazonS3 {
             .boxed()
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.client.list(prefix).await
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        self.client.list(prefix)
     }
 
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.client.list_with_offset(prefix, offset).await
+    ) -> BoxStream<'_, Result<ObjectMeta>> {
+        self.client.list_with_offset(prefix, offset)
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 190b73bf94..2a08c67758 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -206,11 +206,8 @@ impl ObjectStore for MicrosoftAzure {
         self.client.delete_request(location, &()).await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.client.list(prefix).await
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        self.client.list(prefix)
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 008dec6794..d3e02b4127 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -147,19 +147,16 @@ impl ObjectStore for ChunkedStore {
         self.inner.delete(location).await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.inner.list(prefix).await
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        self.inner.list(prefix)
     }
 
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.inner.list_with_offset(prefix, offset).await
+    ) -> BoxStream<'_, Result<ObjectMeta>> {
+        self.inner.list_with_offset(prefix, offset)
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
index b2dbee27f1..371894dfeb 100644
--- a/object_store/src/client/list.rs
+++ b/object_store/src/client/list.rs
@@ -46,16 +46,13 @@ pub trait ListClientExt {
         offset: Option<&Path>,
     ) -> BoxStream<'_, Result<ListResult>>;
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
 
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+    ) -> BoxStream<'_, Result<ObjectMeta>>;
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult>;
 }
@@ -90,31 +87,22 @@ impl<T: ListClient> ListClientExt for T {
         .boxed()
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let stream = self
-            .list_paginated(prefix, false, None)
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        self.list_paginated(prefix, false, None)
             .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
             .try_flatten()
-            .boxed();
-
-        Ok(stream)
+            .boxed()
     }
 
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let stream = self
-            .list_paginated(prefix, false, Some(offset))
+    ) -> BoxStream<'_, Result<ObjectMeta>> {
+        self.list_paginated(prefix, false, Some(offset))
             .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
             .try_flatten()
-            .boxed();
-
-        Ok(stream)
+            .boxed()
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index a75527fe7b..513e396cba 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -601,11 +601,8 @@ impl ObjectStore for GoogleCloudStorage {
         self.client.delete_request(location).await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        self.client.list(prefix).await
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        self.client.list(prefix)
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 6ffb623589..2fd7850b6b 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -34,7 +34,7 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
-use futures::StreamExt;
+use futures::{StreamExt, TryStreamExt};
 use itertools::Itertools;
 use snafu::{OptionExt, ResultExt, Snafu};
 use tokio::io::AsyncWrite;
@@ -122,14 +122,13 @@ impl ObjectStore for HttpStore {
         self.client.delete(location).await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
         let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
-        let status = self.client.list(prefix, "infinity").await?;
-        Ok(futures::stream::iter(
-            status
+        let prefix = prefix.cloned();
+        futures::stream::once(async move {
+            let status = self.client.list(prefix.as_ref(), "infinity").await?;
+
+            let iter = status
                 .response
                 .into_iter()
                 .filter(|r| !r.is_dir())
@@ -138,9 +137,12 @@ impl ObjectStore for HttpStore {
                     response.object_meta(self.client.base_url())
                 })
                 // Filter out exact prefix matches
-                .filter_ok(move |r| r.location.as_ref().len() > prefix_len),
-        )
-        .boxed())
+                .filter_ok(move |r| r.location.as_ref().len() > prefix_len);
+
+            Ok::<_, crate::Error>(futures::stream::iter(iter))
+        })
+        .try_flatten()
+        .boxed()
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index b79042e3cd..9b396444fa 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -95,18 +95,18 @@
 //!
 //! ```
 //! # use object_store::local::LocalFileSystem;
+//! # use std::sync::Arc;
+//! # use object_store::{path::Path, ObjectStore};
+//! # use futures::stream::StreamExt;
 //! # // use LocalFileSystem for example
-//! # fn get_object_store() -> LocalFileSystem {
-//! #   LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(LocalFileSystem::new())
 //! # }
-//!
+//! #
 //! # async fn example() {
-//! use std::sync::Arc;
-//! use object_store::{path::Path, ObjectStore};
-//! use futures::stream::StreamExt;
-//!
+//! #
 //! // create an ObjectStore
-//! let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
 //!
 //! // Recursively list all files below the 'data' path.
 //! // 1. On AWS S3 this would be the 'data/' prefix
@@ -114,21 +114,12 @@
 //! let prefix: Path = "data".try_into().unwrap();
 //!
 //! // Get an `async` stream of Metadata objects:
-//!  let list_stream = object_store
-//!      .list(Some(&prefix))
-//!      .await
-//!      .expect("Error listing files");
+//! let mut list_stream = object_store.list(Some(&prefix));
 //!
-//!  // Print a line about each object based on its metadata
-//!  // using for_each from `StreamExt` trait.
-//!  list_stream
-//!      .for_each(move |meta|  {
-//!          async {
-//!              let meta = meta.expect("Error listing");
-//!              println!("Name: {}, size: {}", meta.location, meta.size);
-//!          }
-//!      })
-//!      .await;
+//! // Print a line about each object
+//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
+//!     println!("Name: {}, size: {}", meta.location, meta.size);
+//! }
 //! # }
 //! ```
 //!
@@ -147,19 +138,18 @@
 //! from remote storage or files in the local filesystem as a stream.
 //!
 //! ```
+//! # use futures::TryStreamExt;
 //! # use object_store::local::LocalFileSystem;
-//! # // use LocalFileSystem for example
-//! # fn get_object_store() -> LocalFileSystem {
-//! #   LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # use std::sync::Arc;
+//! # use object_store::{path::Path, ObjectStore};
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(LocalFileSystem::new())
 //! # }
-//!
+//! #
 //! # async fn example() {
-//! use std::sync::Arc;
-//! use object_store::{path::Path, ObjectStore};
-//! use futures::stream::StreamExt;
-//!
+//! #
 //! // create an ObjectStore
-//! let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
 //!
 //! // Retrieve a specific file
 //! let path: Path = "data/file01.parquet".try_into().unwrap();
@@ -171,16 +161,11 @@
 //!     .unwrap()
 //!     .into_stream();
 //!
-//! // Count the '0's using `map` from `StreamExt` trait
+//! // Count the '0's using `try_fold` from `TryStreamExt` trait
 //! let num_zeros = stream
-//!     .map(|bytes| {
-//!         let bytes = bytes.unwrap();
-//!        bytes.iter().filter(|b| **b == 0).count()
-//!     })
-//!     .collect::<Vec<usize>>()
-//!     .await
-//!     .into_iter()
-//!     .sum::<usize>();
+//!     .try_fold(0, |acc, bytes| async move {
+//!         Ok(acc + bytes.iter().filter(|b| **b == 0).count())
+//!     }).await.unwrap();
 //!
 //! println!("Num zeros in {} is {}", path, num_zeros);
 //! # }
@@ -196,22 +181,19 @@
 //!
 //! ```
 //! # use object_store::local::LocalFileSystem;
-//! # fn get_object_store() -> LocalFileSystem {
-//! #     LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(LocalFileSystem::new())
 //! # }
 //! # async fn put() {
-//!  use object_store::ObjectStore;
-//!  use std::sync::Arc;
-//!  use bytes::Bytes;
-//!  use object_store::path::Path;
-//!
-//!  let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! #
+//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
 //!  let path: Path = "data/file1".try_into().unwrap();
 //!  let bytes = Bytes::from_static(b"hello");
-//!  object_store
-//!      .put(&path, bytes)
-//!      .await
-//!      .unwrap();
+//!  object_store.put(&path, bytes).await.unwrap();
 //! # }
 //! ```
 //!
@@ -220,22 +202,20 @@
 //!
 //! ```
 //! # use object_store::local::LocalFileSystem;
-//! # fn get_object_store() -> LocalFileSystem {
-//! #     LocalFileSystem::new_with_prefix("/tmp").unwrap()
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(LocalFileSystem::new())
 //! # }
 //! # async fn multi_upload() {
-//!  use object_store::ObjectStore;
-//!  use std::sync::Arc;
-//!  use bytes::Bytes;
-//!  use tokio::io::AsyncWriteExt;
-//!  use object_store::path::Path;
-//!
-//!  let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
+//! #
+//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
 //!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store
-//!      .put_multipart(&path)
-//!      .await
-//!      .unwrap();
+//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
+//!
 //!  let bytes = Bytes::from_static(b"hello");
 //!  writer.write_all(&bytes).await.unwrap();
 //!  writer.flush().await.unwrap();
@@ -439,23 +419,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// return Ok. If it is an error, it will be [`Error::NotFound`].
     ///
     /// ```
+    /// # use futures::{StreamExt, TryStreamExt};
     /// # use object_store::local::LocalFileSystem;
     /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
     /// # let root = tempfile::TempDir::new().unwrap();
     /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
-    /// use object_store::{ObjectStore, ObjectMeta};
-    /// use object_store::path::Path;
-    /// use futures::{StreamExt, TryStreamExt};
-    /// use bytes::Bytes;
-    ///
+    /// # use object_store::{ObjectStore, ObjectMeta};
+    /// # use object_store::path::Path;
+    /// # use futures::{StreamExt, TryStreamExt};
+    /// # use bytes::Bytes;
+    /// #
     /// // Create two objects
     /// store.put(&Path::from("foo"), Bytes::from("foo")).await?;
     /// store.put(&Path::from("bar"), Bytes::from("bar")).await?;
     ///
     /// // List object
-    /// let locations = store.list(None).await?
-    ///   .map(|meta: Result<ObjectMeta, _>| meta.map(|m| m.location))
-    ///   .boxed();
+    /// let locations = store.list(None).map_ok(|m| m.location).boxed();
     ///
     /// // Delete them
     /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
@@ -484,10 +463,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// `foo/bar_baz/x`.
     ///
     /// Note: the order of returned [`ObjectMeta`] is not guaranteed
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
 
     /// List all the objects with the given prefix and a location greater than 
`offset`
     ///
@@ -495,18 +471,15 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// the number of network requests required
     ///
     /// Note: the order of returned [`ObjectMeta`] is not guaranteed
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+    ) -> BoxStream<'_, Result<ObjectMeta>> {
         let offset = offset.clone();
-        let stream = self
-            .list(prefix)
-            .await?
+        self.list(prefix)
             .try_filter(move |f| futures::future::ready(f.location > offset))
-            .boxed();
-        Ok(stream)
+            .boxed()
     }
 
     /// List objects with the given prefix and an implementation specific
@@ -624,19 +597,16 @@ macro_rules! as_ref_impl {
                 self.as_ref().delete_stream(locations)
             }
 
-            async fn list(
-                &self,
-                prefix: Option<&Path>,
-            ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-                self.as_ref().list(prefix).await
+            fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, 
Result<ObjectMeta>> {
+                self.as_ref().list(prefix)
             }
 
-            async fn list_with_offset(
+            fn list_with_offset(
                 &self,
                 prefix: Option<&Path>,
                 offset: &Path,
-            ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-                self.as_ref().list_with_offset(prefix, offset).await
+            ) -> BoxStream<'_, Result<ObjectMeta>> {
+                self.as_ref().list_with_offset(prefix, offset)
             }
 
             async fn list_with_delimiter(
@@ -973,7 +943,6 @@ mod test_util {
     ) -> Result<Vec<Path>> {
         storage
             .list(prefix)
-            .await?
             .map_ok(|meta| meta.location)
             .try_collect::<Vec<Path>>()
             .await
@@ -1264,11 +1233,7 @@ mod tests {
         ];
 
         for (prefix, offset) in cases {
-            let s = storage
-                .list_with_offset(prefix.as_ref(), &offset)
-                .await
-                .unwrap();
-
+            let s = storage.list_with_offset(prefix.as_ref(), &offset);
             let mut actual: Vec<_> =
                 s.map_ok(|x| x.location).try_collect().await.unwrap();
 
@@ -1700,12 +1665,7 @@ mod tests {
     }
 
     async fn delete_fixtures(storage: &DynObjectStore) {
-        let paths = storage
-            .list(None)
-            .await
-            .unwrap()
-            .map_ok(|meta| meta.location)
-            .boxed();
+        let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
         storage
             .delete_stream(paths)
             .try_collect::<Vec<_>>()
@@ -1714,18 +1674,18 @@ mod tests {
     }
 
     /// Test that the returned stream does not borrow the lifetime of Path
-    async fn list_store<'a, 'b>(
+    fn list_store<'a>(
         store: &'a dyn ObjectStore,
-        path_str: &'b str,
-    ) -> super::Result<BoxStream<'a, super::Result<ObjectMeta>>> {
+        path_str: &str,
+    ) -> BoxStream<'a, Result<ObjectMeta>> {
         let path = Path::from(path_str);
-        store.list(Some(&path)).await
+        store.list(Some(&path))
     }
 
     #[tokio::test]
     async fn test_list_lifetimes() {
         let store = memory::InMemory::new();
-        let mut stream = list_store(&store, "path").await.unwrap();
+        let mut stream = list_store(&store, "path");
         assert!(stream.next().await.is_none());
     }
 
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index a9b8c4b050..00cbce023c 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -23,7 +23,7 @@ use crate::{
 };
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::Stream;
+use futures::{FutureExt, Stream};
 use std::io::{Error, IoSlice};
 use std::ops::Range;
 use std::pin::Pin;
@@ -147,23 +147,31 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.delete_stream(locations)
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
-        let s = self.inner.list(prefix).await?;
-        Ok(PermitWrapper::new(s, permit).boxed())
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        let prefix = prefix.cloned();
+        let fut = Arc::clone(&self.semaphore)
+            .acquire_owned()
+            .map(move |permit| {
+                let s = self.inner.list(prefix.as_ref());
+                PermitWrapper::new(s, permit.unwrap())
+            });
+        fut.into_stream().flatten().boxed()
     }
 
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
-        let s = self.inner.list_with_offset(prefix, offset).await?;
-        Ok(PermitWrapper::new(s, permit).boxed())
+    ) -> BoxStream<'_, Result<ObjectMeta>> {
+        let prefix = prefix.cloned();
+        let offset = offset.clone();
+        let fut = Arc::clone(&self.semaphore)
+            .acquire_owned()
+            .map(move |permit| {
+                let s = self.inner.list_with_offset(prefix.as_ref(), &offset);
+                PermitWrapper::new(s, permit.unwrap())
+            });
+        fut.into_stream().flatten().boxed()
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -272,6 +280,8 @@ mod tests {
     use crate::memory::InMemory;
     use crate::tests::*;
     use crate::ObjectStore;
+    use futures::stream::StreamExt;
+    use std::pin::Pin;
     use std::time::Duration;
     use tokio::time::timeout;
 
@@ -290,19 +300,21 @@ mod tests {
 
         let mut streams = Vec::with_capacity(max_requests);
         for _ in 0..max_requests {
-            let stream = integration.list(None).await.unwrap();
+            let mut stream = integration.list(None).peekable();
+            Pin::new(&mut stream).peek().await; // Ensure semaphore is acquired
             streams.push(stream);
         }
 
         let t = Duration::from_millis(20);
 
         // Expect to not be able to make another request
-        assert!(timeout(t, integration.list(None)).await.is_err());
+        let fut = integration.list(None).collect::<Vec<_>>();
+        assert!(timeout(t, fut).await.is_err());
 
         // Drop one of the streams
         streams.pop();
 
         // Can now make another request
-        integration.list(None).await.unwrap();
+        integration.list(None).collect::<Vec<_>>().await;
     }
 }
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 3d4a02a1e9..38467c3a9e 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -420,14 +420,14 @@ impl ObjectStore for LocalFileSystem {
         .await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
         let config = Arc::clone(&self.config);
 
         let root_path = match prefix {
-            Some(prefix) => config.path_to_filesystem(prefix)?,
+            Some(prefix) => match config.path_to_filesystem(prefix) {
+                Ok(path) => path,
+                Err(e) => return 
futures::future::ready(Err(e)).into_stream().boxed(),
+            },
             None => self.config.root.to_file_path().unwrap(),
         };
 
@@ -457,36 +457,34 @@ impl ObjectStore for LocalFileSystem {
         // If no tokio context, return iterator directly as no
         // need to perform chunked spawn_blocking reads
         if tokio::runtime::Handle::try_current().is_err() {
-            return Ok(futures::stream::iter(s).boxed());
+            return futures::stream::iter(s).boxed();
         }
 
         // Otherwise list in batches of CHUNK_SIZE
         const CHUNK_SIZE: usize = 1024;
 
         let buffer = VecDeque::with_capacity(CHUNK_SIZE);
-        let stream =
-            futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| 
async move {
-                if buffer.is_empty() {
-                    (s, buffer) = tokio::task::spawn_blocking(move || {
-                        for _ in 0..CHUNK_SIZE {
-                            match s.next() {
-                                Some(r) => buffer.push_back(r),
-                                None => break,
-                            }
+        futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async 
move {
+            if buffer.is_empty() {
+                (s, buffer) = tokio::task::spawn_blocking(move || {
+                    for _ in 0..CHUNK_SIZE {
+                        match s.next() {
+                            Some(r) => buffer.push_back(r),
+                            None => break,
                         }
-                        (s, buffer)
-                    })
-                    .await?;
-                }
-
-                match buffer.pop_front() {
-                    Some(Err(e)) => Err(e),
-                    Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
-                    None => Ok(None),
-                }
-            });
+                    }
+                    (s, buffer)
+                })
+                .await?;
+            }
 
-        Ok(stream.boxed())
+            match buffer.pop_front() {
+                Some(Err(e)) => Err(e),
+                Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
+                None => Ok(None),
+            }
+        })
+        .boxed()
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -1138,21 +1136,14 @@ mod tests {
 
         let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
 
-        // `list` must fail
-        match store.list(None).await {
-            Err(_) => {
-                // ok, error found
-            }
-            Ok(mut stream) => {
-                let mut any_err = false;
-                while let Some(res) = stream.next().await {
-                    if res.is_err() {
-                        any_err = true;
-                    }
-                }
-                assert!(any_err);
+        let mut stream = store.list(None);
+        let mut any_err = false;
+        while let Some(res) = stream.next().await {
+            if res.is_err() {
+                any_err = true;
             }
         }
+        assert!(any_err);
 
         // `list_with_delimiter
         assert!(store.list_with_delimiter(None).await.is_err());
@@ -1226,13 +1217,7 @@ mod tests {
         prefix: Option<&Path>,
         expected: &[&str],
     ) {
-        let result: Vec<_> = integration
-            .list(prefix)
-            .await
-            .unwrap()
-            .try_collect()
-            .await
-            .unwrap();
+        let result: Vec<_> = 
integration.list(prefix).try_collect().await.unwrap();
 
         let mut strings: Vec<_> = result.iter().map(|x| 
x.location.as_ref()).collect();
         strings.sort_unstable();
@@ -1428,8 +1413,7 @@ mod tests {
 
         std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
 
-        let list_stream = integration.list(None).await.unwrap();
-        let res: Vec<_> = list_stream.try_collect().await.unwrap();
+        let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
         assert_eq!(res.len(), 1);
         assert_eq!(res[0].location.as_ref(), filename);
 
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index f638ed6d7a..00b330b5eb 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -228,10 +228,7 @@ impl ObjectStore for InMemory {
         Ok(())
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
         let root = Path::default();
         let prefix = prefix.unwrap_or(&root);
 
@@ -256,7 +253,7 @@ impl ObjectStore for InMemory {
             })
             .collect();
 
-        Ok(futures::stream::iter(values).boxed())
+        futures::stream::iter(values).boxed()
     }
 
     /// The memory implementation returns all results, as opposed to the cloud
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 39585f73b6..3776dec2e8 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -144,24 +144,21 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.delete(&full_path).await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
         let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
-        let s = self.inner.list(Some(&prefix)).await?;
-        Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
+        let s = self.inner.list(Some(&prefix));
+        s.map_ok(|meta| self.strip_meta(meta)).boxed()
     }
 
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+    ) -> BoxStream<'_, Result<ObjectMeta>> {
         let offset = self.full_path(offset);
         let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
-        let s = self.inner.list_with_offset(Some(&prefix), &offset).await?;
-        Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed())
+        let s = self.inner.list_with_offset(Some(&prefix), &offset);
+        s.map_ok(|meta| self.strip_meta(meta)).boxed()
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 58c476ab45..f716a11f8a 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -233,29 +233,30 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         self.inner.delete(location).await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        sleep(self.config().wait_list_per_call).await;
-
-        // need to copy to avoid moving / referencing `self`
-        let wait_list_per_entry = self.config().wait_list_per_entry;
-        let stream = self.inner.list(prefix).await?;
-        Ok(throttle_stream(stream, move |_| wait_list_per_entry))
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        let stream = self.inner.list(prefix);
+        futures::stream::once(async move {
+            let wait_list_per_entry = self.config().wait_list_per_entry;
+            sleep(self.config().wait_list_per_call).await;
+            throttle_stream(stream, move |_| wait_list_per_entry)
+        })
+        .flatten()
+        .boxed()
     }
 
-    async fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
-    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        sleep(self.config().wait_list_per_call).await;
-
-        // need to copy to avoid moving / referencing `self`
-        let wait_list_per_entry = self.config().wait_list_per_entry;
-        let stream = self.inner.list_with_offset(prefix, offset).await?;
-        Ok(throttle_stream(stream, move |_| wait_list_per_entry))
+    ) -> BoxStream<'_, Result<ObjectMeta>> {
+        let stream = self.inner.list_with_offset(prefix, offset);
+        futures::stream::once(async move {
+            let wait_list_per_entry = self.config().wait_list_per_entry;
+            sleep(self.config().wait_list_per_call).await;
+            throttle_stream(stream, move |_| wait_list_per_entry)
+        })
+        .flatten()
+        .boxed()
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -511,13 +512,7 @@ mod tests {
         let prefix = Path::from("foo");
 
         // clean up store
-        let entries: Vec<_> = store
-            .list(Some(&prefix))
-            .await
-            .unwrap()
-            .try_collect()
-            .await
-            .unwrap();
+        let entries: Vec<_> = 
store.list(Some(&prefix)).try_collect().await.unwrap();
 
         for entry in entries {
             store.delete(&entry.location).await.unwrap();
@@ -583,8 +578,6 @@ mod tests {
         let t0 = Instant::now();
         store
             .list(Some(&prefix))
-            .await
-            .unwrap()
             .try_collect::<Vec<_>>()
             .await
             .unwrap();
diff --git a/object_store/tests/get_range_file.rs 
b/object_store/tests/get_range_file.rs
index f926e3b07f..25c4692606 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -75,10 +75,7 @@ impl ObjectStore for MyStore {
         todo!()
     }
 
-    async fn list(
-        &self,
-        _: Option<&Path>,
-    ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> 
{
+    fn list(&self, _: Option<&Path>) -> BoxStream<'_, 
object_store::Result<ObjectMeta>> {
         todo!()
     }
 

Reply via email to