This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new e0721f8  feat!: use `'static` lifetime for `delete_stream` (#524)
e0721f8 is described below

commit e0721f8ab278f596f6008802fbafd3d8bebdf6f1
Author: Heran Lin <[email protected]>
AuthorDate: Sat Nov 1 04:31:20 2025 +0800

    feat!: use `'static` lifetime for `delete_stream` (#524)
    
    * feat!: use `'static` lifetime for `delete_stream`
    
    * Address comments
---
 src/aws/mod.rs          |  28 ++++++----
 src/azure/mod.rs        |  28 ++++++----
 src/chunked.rs          |   7 +++
 src/gcp/mod.rs          |  21 ++++++-
 src/http/mod.rs         |  18 ++++++
 src/lib.rs              | 143 +++++++++++++++++++++++++++++++++++++++++-------
 src/limit.rs            |   8 +--
 src/local.rs            | 143 +++++++++++++++++++++++++++++-------------------
 src/memory.rs           |  14 +++++
 src/prefix.rs           |  43 +++++++++------
 src/throttle.rs         |  48 ++++++++++++++++
 tests/get_range_file.rs |   7 +++
 12 files changed, 385 insertions(+), 123 deletions(-)

diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 3cd6ca8..586d40b 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -255,20 +255,24 @@ impl ObjectStore for AmazonS3 {
         Ok(())
     }
 
-    fn delete_stream<'a>(
-        &'a self,
-        locations: BoxStream<'a, Result<Path>>,
-    ) -> BoxStream<'a, Result<Path>> {
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        let client = Arc::clone(&self.client);
         locations
             .try_chunks(1_000)
-            .map(move |locations| async {
-                // Early return the error. We ignore the paths that have 
already been
-                // collected into the chunk.
-                let locations = locations.map_err(|e| e.1)?;
-                self.client
-                    .bulk_delete_request(locations)
-                    .await
-                    .map(futures::stream::iter)
+            .map(move |locations| {
+                let client = Arc::clone(&client);
+                async move {
+                    // Early return the error. We ignore the paths that have 
already been
+                    // collected into the chunk.
+                    let locations = locations.map_err(|e| e.1)?;
+                    client
+                        .bulk_delete_request(locations)
+                        .await
+                        .map(futures::stream::iter)
+                }
             })
             .buffered(20)
             .try_flatten()
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index c5344d7..237ef25 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -123,20 +123,24 @@ impl ObjectStore for MicrosoftAzure {
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         self.client.list(prefix)
     }
-    fn delete_stream<'a>(
-        &'a self,
-        locations: BoxStream<'a, Result<Path>>,
-    ) -> BoxStream<'a, Result<Path>> {
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        let client = Arc::clone(&self.client);
         locations
             .try_chunks(256)
-            .map(move |locations| async {
-                // Early return the error. We ignore the paths that have 
already been
-                // collected into the chunk.
-                let locations = locations.map_err(|e| e.1)?;
-                self.client
-                    .bulk_delete_request(locations)
-                    .await
-                    .map(futures::stream::iter)
+            .map(move |locations| {
+                let client = Arc::clone(&client);
+                async move {
+                    // Early return the error. We ignore the paths that have 
already been
+                    // collected into the chunk.
+                    let locations = locations.map_err(|e| e.1)?;
+                    client
+                        .bulk_delete_request(locations)
+                        .await
+                        .map(futures::stream::iter)
+                }
             })
             .buffered(20)
             .try_flatten()
diff --git a/src/chunked.rs b/src/chunked.rs
index 1f78415..eacb667 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -150,6 +150,13 @@ impl ObjectStore for ChunkedStore {
         self.inner.delete(location).await
     }
 
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        self.inner.delete_stream(locations)
+    }
+
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         self.inner.list(prefix)
     }
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 44f2c2c..0b18108 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -47,7 +47,7 @@ use crate::{
 };
 use async_trait::async_trait;
 use client::GoogleCloudStorageClient;
-use futures::stream::BoxStream;
+use futures::stream::{BoxStream, StreamExt};
 use http::Method;
 use url::Url;
 
@@ -184,6 +184,24 @@ impl ObjectStore for GoogleCloudStorage {
         self.client.delete_request(location).await
     }
 
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        let client = Arc::clone(&self.client);
+        locations
+            .map(move |location| {
+                let client = Arc::clone(&client);
+                async move {
+                    let location = location?;
+                    client.delete_request(&location).await?;
+                    Ok(location)
+                }
+            })
+            .buffered(10)
+            .boxed()
+    }
+
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         self.client.list(prefix)
     }
@@ -282,7 +300,6 @@ impl PaginatedListStore for GoogleCloudStorage {
 
 #[cfg(test)]
 mod test {
-
     use credential::DEFAULT_GCS_BASE_URL;
 
     use crate::integration::*;
diff --git a/src/http/mod.rs b/src/http/mod.rs
index d573125..cd6311a 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -136,6 +136,24 @@ impl ObjectStore for HttpStore {
         self.client.delete(location).await
     }
 
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        let client = Arc::clone(&self.client);
+        locations
+            .map(move |location| {
+                let client = Arc::clone(&client);
+                async move {
+                    let location = location?;
+                    client.delete(&location).await?;
+                    Ok(location)
+                }
+            })
+            .buffered(10)
+            .boxed()
+    }
+
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
         let prefix = prefix.cloned();
diff --git a/src/lib.rs b/src/lib.rs
index 7bd3c99..2e059fc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -873,9 +873,8 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// Delete all the objects at the specified locations
     ///
     /// When supported, this method will use bulk operations that delete more
-    /// than one object per a request. The default implementation will call
-    /// the single object delete method for each location, but with up to 10
-    /// concurrent requests.
+    /// than one object per a request. Otherwise, the implementation may call
+    /// the single object delete method for each location.
     ///
     /// The returned stream yields the results of the delete operations in the
     /// same order as the input locations. However, some errors will be from
@@ -911,19 +910,123 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// # let rt = 
tokio::runtime::Builder::new_current_thread().build().unwrap();
     /// # rt.block_on(example()).unwrap();
     /// ```
-    fn delete_stream<'a>(
-        &'a self,
-        locations: BoxStream<'a, Result<Path>>,
-    ) -> BoxStream<'a, Result<Path>> {
-        locations
-            .map(|location| async {
-                let location = location?;
-                self.delete(&location).await?;
-                Ok(location)
-            })
-            .buffered(10)
-            .boxed()
-    }
+    ///
+    /// Note: Before version 0.13, `delete_stream` has a default implementation
+    /// that deletes each object with up to 10 concurrent requests. This 
default
+    /// behavior has been removed, and each implementation must now provide its
+    /// own `delete_stream` implementation explicitly. The following example
+    /// shows how to implement `delete_stream` to get the previous default
+    /// behavior.
+    ///
+    /// ```
+    /// # use async_trait::async_trait;
+    /// # use futures::stream::{BoxStream, StreamExt};
+    /// # use object_store::path::Path;
+    /// # use object_store::{
+    /// #     GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore,
+    /// #     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
+    /// # };
+    /// # use std::fmt;
+    /// # use std::fmt::Debug;
+    /// # use std::sync::Arc;
+    /// #
+    /// # struct ExampleClient;
+    /// #
+    /// # impl ExampleClient {
+    /// #     async fn delete(&self, _path: &Path) -> Result<()> {
+    /// #         Ok(())
+    /// #     }
+    /// # }
+    /// #
+    /// # struct ExampleStore {
+    /// #     client: Arc<ExampleClient>,
+    /// # }
+    /// #
+    /// # impl Debug for ExampleStore {
+    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+    /// #         write!(f, "ExampleStore")
+    /// #     }
+    /// # }
+    /// #
+    /// # impl fmt::Display for ExampleStore {
+    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+    /// #         write!(f, "ExampleStore")
+    /// #     }
+    /// # }
+    /// #
+    /// # #[async_trait]
+    /// # impl ObjectStore for ExampleStore {
+    /// #     async fn put_opts(&self, _: &Path, _: PutPayload, _: PutOptions) 
-> Result<PutResult> {
+    /// #         todo!()
+    /// #     }
+    /// #
+    /// #     async fn put_multipart_opts(
+    /// #         &self,
+    /// #         _: &Path,
+    /// #         _: PutMultipartOptions,
+    /// #     ) -> Result<Box<dyn MultipartUpload>> {
+    /// #         todo!()
+    /// #     }
+    /// #
+    /// #     async fn get_opts(&self, _: &Path, _: GetOptions) -> 
Result<GetResult> {
+    /// #         todo!()
+    /// #     }
+    /// #
+    /// #     async fn delete(&self, _: &Path) -> Result<()> {
+    /// #         todo!()
+    /// #     }
+    /// #
+    /// fn delete_stream(
+    ///     &self,
+    ///     locations: BoxStream<'static, Result<Path>>,
+    /// ) -> BoxStream<'static, Result<Path>> {
+    ///     let client = Arc::clone(&self.client);
+    ///     locations
+    ///         .map(move |location| {
+    ///             let client = Arc::clone(&client);
+    ///             async move {
+    ///                 let location = location?;
+    ///                 client.delete(&location).await?;
+    ///                 Ok(location)
+    ///             }
+    ///         })
+    ///         .buffered(10)
+    ///         .boxed()
+    /// }
+    /// #
+    /// #     fn list(&self, _: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
+    /// #         todo!()
+    /// #     }
+    /// #
+    /// #     async fn list_with_delimiter(&self, _: Option<&Path>) -> 
Result<ListResult> {
+    /// #         todo!()
+    /// #     }
+    /// #
+    /// #     async fn copy(&self, _: &Path, _: &Path) -> Result<()> {
+    /// #         todo!()
+    /// #     }
+    /// #
+    /// #     async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> 
Result<()> {
+    /// #         todo!()
+    /// #     }
+    /// # }
+    /// #
+    /// # async fn example() {
+    /// #     let store = ExampleStore { client: Arc::new(ExampleClient) };
+    /// #     let paths = futures::stream::iter(vec![Ok(Path::from("foo")), 
Ok(Path::from("bar"))]).boxed();
+    /// #     let results = 
store.delete_stream(paths).collect::<Vec<_>>().await;
+    /// #     assert_eq!(results.len(), 2);
+    /// #     assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
+    /// #     assert_eq!(results[1].as_ref().unwrap(), &Path::from("bar"));
+    /// # }
+    /// #
+    /// # let rt = 
tokio::runtime::Builder::new_current_thread().build().unwrap();
+    /// # rt.block_on(example());
+    /// ```
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>>;
 
     /// List all the objects with the given prefix.
     ///
@@ -1053,10 +1156,10 @@ macro_rules! as_ref_impl {
                 self.as_ref().delete(location).await
             }
 
-            fn delete_stream<'a>(
-                &'a self,
-                locations: BoxStream<'a, Result<Path>>,
-            ) -> BoxStream<'a, Result<Path>> {
+            fn delete_stream(
+                &self,
+                locations: BoxStream<'static, Result<Path>>,
+            ) -> BoxStream<'static, Result<Path>> {
                 self.as_ref().delete_stream(locations)
             }
 
diff --git a/src/limit.rs b/src/limit.rs
index ddafa92..7102f06 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -137,10 +137,10 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.delete(location).await
     }
 
-    fn delete_stream<'a>(
-        &'a self,
-        locations: BoxStream<'a, Result<Path>>,
-    ) -> BoxStream<'a, Result<Path>> {
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
         self.inner.delete_stream(locations)
     }
 
diff --git a/src/local.rs b/src/local.rs
index 9294424..31db111 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -247,28 +247,7 @@ impl LocalFileSystem {
 
     /// Return an absolute filesystem path of the given file location
     pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
-        if !is_valid_file_path(location) {
-            let path = location.as_ref().into();
-            let error = Error::InvalidPath { path };
-            return Err(error.into());
-        }
-
-        let path = self.config.prefix_to_filesystem(location)?;
-
-        #[cfg(target_os = "windows")]
-        let path = {
-            let path = path.to_string_lossy();
-
-            // Assume the first char is the drive letter and the next is a 
colon.
-            let mut out = String::new();
-            let drive = &path[..2]; // The drive letter and colon (e.g., "C:")
-            let filepath = &path[2..].replace(':', "%3A"); // Replace 
subsequent colons
-            out.push_str(drive);
-            out.push_str(filepath);
-            PathBuf::from(out)
-        };
-
-        Ok(path)
+        self.config.path_to_filesystem(location)
     }
 
     /// Enable automatic cleanup of empty directories when deleting files
@@ -293,6 +272,32 @@ impl Config {
             .map_err(|_| Error::InvalidUrl { url }.into())
     }
 
+    /// Return an absolute filesystem path of the given file location
+    fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
+        if !is_valid_file_path(location) {
+            let path = location.as_ref().into();
+            let error = Error::InvalidPath { path };
+            return Err(error.into());
+        }
+
+        let path = self.prefix_to_filesystem(location)?;
+
+        #[cfg(target_os = "windows")]
+        let path = {
+            let path = path.to_string_lossy();
+
+            // Assume the first char is the drive letter and the next is a 
colon.
+            let mut out = String::new();
+            let drive = &path[..2]; // The drive letter and colon (e.g., "C:")
+            let filepath = &path[2..].replace(':', "%3A"); // Replace 
subsequent colons
+            out.push_str(drive);
+            out.push_str(filepath);
+            PathBuf::from(out)
+        };
+
+        Ok(path)
+    }
+
     /// Resolves the provided absolute filesystem path to a [`Path`] prefix
     fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
         Ok(Path::from_absolute_path_with_base(
@@ -449,41 +454,33 @@ impl ObjectStore for LocalFileSystem {
 
     async fn delete(&self, location: &Path) -> Result<()> {
         let config = Arc::clone(&self.config);
-        let path = self.path_to_filesystem(location)?;
-        let automactic_cleanup = self.automatic_cleanup;
-        maybe_spawn_blocking(move || {
-            if let Err(e) = std::fs::remove_file(&path) {
-                Err(match e.kind() {
-                    ErrorKind::NotFound => Error::NotFound { path, source: e 
}.into(),
-                    _ => Error::UnableToDeleteFile { path, source: e }.into(),
-                })
-            } else if automactic_cleanup {
-                let root = &config.root;
-                let root = root
-                    .to_file_path()
-                    .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
-
-                // here we will try to traverse up and delete an empty dir if 
possible until we reach the root or get an error
-                let mut parent = path.parent();
-
-                while let Some(loc) = parent {
-                    if loc != root && std::fs::remove_dir(loc).is_ok() {
-                        parent = loc.parent();
-                    } else {
-                        break;
-                    }
-                }
+        let automatic_cleanup = self.automatic_cleanup;
+        let location = location.clone();
+        maybe_spawn_blocking(move || Self::delete_location(config, 
automatic_cleanup, &location))
+            .await
+    }
 
-                Ok(())
-            } else {
-                Ok(())
-            }
-        })
-        .await
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        let config = Arc::clone(&self.config);
+        let automatic_cleanup = self.automatic_cleanup;
+        locations
+            .map(move |location| {
+                let config = Arc::clone(&config);
+                maybe_spawn_blocking(move || {
+                    let location = location?;
+                    Self::delete_location(config, automatic_cleanup, 
&location)?;
+                    Ok(location)
+                })
+            })
+            .buffered(10)
+            .boxed()
     }
 
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
-        self.list_with_maybe_offset(prefix, None)
+        Self::list_with_maybe_offset(Arc::clone(&self.config), prefix, None)
     }
 
     fn list_with_offset(
@@ -491,7 +488,7 @@ impl ObjectStore for LocalFileSystem {
         prefix: Option<&Path>,
         offset: &Path,
     ) -> BoxStream<'static, Result<ObjectMeta>> {
-        self.list_with_maybe_offset(prefix, Some(offset))
+        Self::list_with_maybe_offset(Arc::clone(&self.config), prefix, 
Some(offset))
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -629,13 +626,45 @@ impl ObjectStore for LocalFileSystem {
 }
 
 impl LocalFileSystem {
+    fn delete_location(
+        config: Arc<Config>,
+        automatic_cleanup: bool,
+        location: &Path,
+    ) -> Result<()> {
+        let path = config.path_to_filesystem(location)?;
+        if let Err(e) = std::fs::remove_file(&path) {
+            Err(match e.kind() {
+                ErrorKind::NotFound => Error::NotFound { path, source: e 
}.into(),
+                _ => Error::UnableToDeleteFile { path, source: e }.into(),
+            })
+        } else if automatic_cleanup {
+            let root = &config.root;
+            let root = root
+                .to_file_path()
+                .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
+
+            // here we will try to traverse up and delete an empty dir if 
possible until we reach the root or get an error
+            let mut parent = path.parent();
+
+            while let Some(loc) = parent {
+                if loc != root && std::fs::remove_dir(loc).is_ok() {
+                    parent = loc.parent();
+                } else {
+                    break;
+                }
+            }
+
+            Ok(())
+        } else {
+            Ok(())
+        }
+    }
+
     fn list_with_maybe_offset(
-        &self,
+        config: Arc<Config>,
         prefix: Option<&Path>,
         maybe_offset: Option<&Path>,
     ) -> BoxStream<'static, Result<ObjectMeta>> {
-        let config = Arc::clone(&self.config);
-
         let root_path = match prefix {
             Some(prefix) => match config.prefix_to_filesystem(prefix) {
                 Ok(path) => path,
diff --git a/src/memory.rs b/src/memory.rs
index 517cb1d..55d75fd 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -311,6 +311,20 @@ impl ObjectStore for InMemory {
         Ok(())
     }
 
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        let storage = Arc::clone(&self.storage);
+        locations
+            .map(move |location| {
+                let location = location?;
+                storage.write().map.remove(&location);
+                Ok(location)
+            })
+            .boxed()
+    }
+
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         let root = Path::default();
         let prefix = prefix.unwrap_or(&root);
diff --git a/src/prefix.rs b/src/prefix.rs
index 160fbbf..a455826 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -50,33 +50,28 @@ impl<T: ObjectStore> PrefixStore<T> {
 
     /// Create the full path from a path relative to prefix
     fn full_path(&self, location: &Path) -> Path {
-        self.prefix.parts().chain(location.parts()).collect()
+        full_path(&self.prefix, location)
     }
 
     /// Strip the constant prefix from a given path
     fn strip_prefix(&self, path: Path) -> Path {
-        // Note cannot use match because of borrow checker
-        if let Some(suffix) = path.prefix_match(&self.prefix) {
-            return suffix.collect();
-        }
-        path
+        strip_prefix(&self.prefix, path)
     }
 
     /// Strip the constant prefix from a given ObjectMeta
     fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
-        ObjectMeta {
-            last_modified: meta.last_modified,
-            size: meta.size,
-            location: self.strip_prefix(meta.location),
-            e_tag: meta.e_tag,
-            version: None,
-        }
+        strip_meta(&self.prefix, meta)
     }
 }
 
-// Note: This is a relative hack to move these two functions to pure functions 
so they don't rely
-// on the `self` lifetime. Expected to be cleaned up before merge.
-//
+// Note: This is a relative hack to move these functions to pure functions so 
they don't rely
+// on the `self` lifetime.
+
+/// Create the full path from a path relative to prefix
+fn full_path(prefix: &Path, path: &Path) -> Path {
+    prefix.parts().chain(path.parts()).collect()
+}
+
 /// Strip the constant prefix from a given path
 fn strip_prefix(prefix: &Path, path: Path) -> Path {
     // Note cannot use match because of borrow checker
@@ -96,6 +91,7 @@ fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
         version: None,
     }
 }
+
 #[async_trait::async_trait]
 impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
     async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
@@ -158,6 +154,21 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.delete(&full_path).await
     }
 
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        let prefix = self.prefix.clone();
+        let locations = locations
+            .map(move |location| location.map(|loc| full_path(&prefix, &loc)))
+            .boxed();
+        let prefix = self.prefix.clone();
+        self.inner
+            .delete_stream(locations)
+            .map(move |location| location.map(|loc| strip_prefix(&prefix, 
loc)))
+            .boxed()
+    }
+
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
         let s = self.inner.list(Some(&prefix));
diff --git a/src/throttle.rs b/src/throttle.rs
index 85a9186..9419ae8 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -237,6 +237,19 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         self.inner.delete(location).await
     }
 
+    fn delete_stream(
+        &self,
+        locations: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        // We wait for a certain duration before each delete location.
+        // This may be suboptimal if the inner store implements batch deletes.
+        // But there is no way around unnecessary waits since we do not know
+        // how the inner store implements `delete_stream`.
+        let wait_delete_per_call = self.config().wait_delete_per_call;
+        let locations = throttle_stream(locations, move |_| 
wait_delete_per_call);
+        self.inner.delete_stream(locations)
+    }
+
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         let stream = self.inner.list(prefix);
         let config = Arc::clone(&self.config);
@@ -456,6 +469,21 @@ mod tests {
         assert_bounds!(measure_delete(&store, Some(10)).await, 1);
     }
 
+    #[tokio::test]
+    // macos github runner is so slow it can't complete within WAIT_TIME*2
+    #[cfg(target_os = "linux")]
+    async fn delete_stream_test() {
+        let inner = InMemory::new();
+        let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+        assert_bounds!(measure_delete_stream(&store, 0).await, 0);
+        assert_bounds!(measure_delete_stream(&store, 10).await, 0);
+
+        store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
+        assert_bounds!(measure_delete_stream(&store, 0).await, 0);
+        assert_bounds!(measure_delete_stream(&store, 10).await, 10);
+    }
+
     #[tokio::test]
     // macos github runner is so slow it can't complete within WAIT_TIME*2
     #[cfg(target_os = "linux")]
@@ -598,6 +626,26 @@ mod tests {
         t0.elapsed()
     }
 
+    #[allow(dead_code)]
+    async fn measure_delete_stream(store: &ThrottledStore<InMemory>, 
n_entries: usize) -> Duration {
+        let prefix = place_test_objects(store, n_entries).await;
+
+        // materialize the paths so that the throttle time for listing is not 
counted
+        let paths = store.list(Some(&prefix)).collect::<Vec<_>>().await;
+        let paths = futures::stream::iter(paths)
+            .map(|x| x.map(|m| m.location))
+            .boxed();
+
+        let t0 = Instant::now();
+        store
+            .delete_stream(paths)
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        t0.elapsed()
+    }
+
     #[allow(dead_code)]
     #[cfg(target_os = "linux")]
     async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: 
Option<usize>) -> Duration {
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index 04317af..d39d972 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -62,6 +62,13 @@ impl ObjectStore for MyStore {
         todo!()
     }
 
+    fn delete_stream(
+        &self,
+        _: BoxStream<'static, Result<Path>>,
+    ) -> BoxStream<'static, Result<Path>> {
+        todo!()
+    }
+
     fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> 
{
         todo!()
     }

Reply via email to