This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new e0721f8 feat!: use `'static` lifetime for `delete_stream` (#524)
e0721f8 is described below
commit e0721f8ab278f596f6008802fbafd3d8bebdf6f1
Author: Heran Lin <[email protected]>
AuthorDate: Sat Nov 1 04:31:20 2025 +0800
feat!: use `'static` lifetime for `delete_stream` (#524)
* feat!: use `'static` lifetime for `delete_stream`
* Address comments
---
src/aws/mod.rs | 28 ++++++----
src/azure/mod.rs | 28 ++++++----
src/chunked.rs | 7 +++
src/gcp/mod.rs | 21 ++++++-
src/http/mod.rs | 18 ++++++
src/lib.rs | 143 +++++++++++++++++++++++++++++++++++++++++-------
src/limit.rs | 8 +--
src/local.rs | 143 +++++++++++++++++++++++++++++-------------------
src/memory.rs | 14 +++++
src/prefix.rs | 43 +++++++++------
src/throttle.rs | 48 ++++++++++++++++
tests/get_range_file.rs | 7 +++
12 files changed, 385 insertions(+), 123 deletions(-)
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 3cd6ca8..586d40b 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -255,20 +255,24 @@ impl ObjectStore for AmazonS3 {
Ok(())
}
- fn delete_stream<'a>(
- &'a self,
- locations: BoxStream<'a, Result<Path>>,
- ) -> BoxStream<'a, Result<Path>> {
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ let client = Arc::clone(&self.client);
locations
.try_chunks(1_000)
- .map(move |locations| async {
- // Early return the error. We ignore the paths that have
already been
- // collected into the chunk.
- let locations = locations.map_err(|e| e.1)?;
- self.client
- .bulk_delete_request(locations)
- .await
- .map(futures::stream::iter)
+ .map(move |locations| {
+ let client = Arc::clone(&client);
+ async move {
+ // Early return the error. We ignore the paths that have
already been
+ // collected into the chunk.
+ let locations = locations.map_err(|e| e.1)?;
+ client
+ .bulk_delete_request(locations)
+ .await
+ .map(futures::stream::iter)
+ }
})
.buffered(20)
.try_flatten()
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index c5344d7..237ef25 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -123,20 +123,24 @@ impl ObjectStore for MicrosoftAzure {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
self.client.list(prefix)
}
- fn delete_stream<'a>(
- &'a self,
- locations: BoxStream<'a, Result<Path>>,
- ) -> BoxStream<'a, Result<Path>> {
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ let client = Arc::clone(&self.client);
locations
.try_chunks(256)
- .map(move |locations| async {
- // Early return the error. We ignore the paths that have
already been
- // collected into the chunk.
- let locations = locations.map_err(|e| e.1)?;
- self.client
- .bulk_delete_request(locations)
- .await
- .map(futures::stream::iter)
+ .map(move |locations| {
+ let client = Arc::clone(&client);
+ async move {
+ // Early return the error. We ignore the paths that have
already been
+ // collected into the chunk.
+ let locations = locations.map_err(|e| e.1)?;
+ client
+ .bulk_delete_request(locations)
+ .await
+ .map(futures::stream::iter)
+ }
})
.buffered(20)
.try_flatten()
diff --git a/src/chunked.rs b/src/chunked.rs
index 1f78415..eacb667 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -150,6 +150,13 @@ impl ObjectStore for ChunkedStore {
self.inner.delete(location).await
}
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ self.inner.delete_stream(locations)
+ }
+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
self.inner.list(prefix)
}
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 44f2c2c..0b18108 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -47,7 +47,7 @@ use crate::{
};
use async_trait::async_trait;
use client::GoogleCloudStorageClient;
-use futures::stream::BoxStream;
+use futures::stream::{BoxStream, StreamExt};
use http::Method;
use url::Url;
@@ -184,6 +184,24 @@ impl ObjectStore for GoogleCloudStorage {
self.client.delete_request(location).await
}
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ let client = Arc::clone(&self.client);
+ locations
+ .map(move |location| {
+ let client = Arc::clone(&client);
+ async move {
+ let location = location?;
+ client.delete_request(&location).await?;
+ Ok(location)
+ }
+ })
+ .buffered(10)
+ .boxed()
+ }
+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
self.client.list(prefix)
}
@@ -282,7 +300,6 @@ impl PaginatedListStore for GoogleCloudStorage {
#[cfg(test)]
mod test {
-
use credential::DEFAULT_GCS_BASE_URL;
use crate::integration::*;
diff --git a/src/http/mod.rs b/src/http/mod.rs
index d573125..cd6311a 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -136,6 +136,24 @@ impl ObjectStore for HttpStore {
self.client.delete(location).await
}
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ let client = Arc::clone(&self.client);
+ locations
+ .map(move |location| {
+ let client = Arc::clone(&client);
+ async move {
+ let location = location?;
+ client.delete(&location).await?;
+ Ok(location)
+ }
+ })
+ .buffered(10)
+ .boxed()
+ }
+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let prefix = prefix.cloned();
diff --git a/src/lib.rs b/src/lib.rs
index 7bd3c99..2e059fc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -873,9 +873,8 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// Delete all the objects at the specified locations
///
/// When supported, this method will use bulk operations that delete more
- /// than one object per a request. The default implementation will call
- /// the single object delete method for each location, but with up to 10
- /// concurrent requests.
+ /// than one object per a request. Otherwise, the implementation may call
+ /// the single object delete method for each location.
///
/// The returned stream yields the results of the delete operations in the
/// same order as the input locations. However, some errors will be from
@@ -911,19 +910,123 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// # let rt =
tokio::runtime::Builder::new_current_thread().build().unwrap();
/// # rt.block_on(example()).unwrap();
/// ```
- fn delete_stream<'a>(
- &'a self,
- locations: BoxStream<'a, Result<Path>>,
- ) -> BoxStream<'a, Result<Path>> {
- locations
- .map(|location| async {
- let location = location?;
- self.delete(&location).await?;
- Ok(location)
- })
- .buffered(10)
- .boxed()
- }
+ ///
+ /// Note: Before version 0.13, `delete_stream` has a default implementation
+ /// that deletes each object with up to 10 concurrent requests. This
default
+ /// behavior has been removed, and each implementation must now provide its
+ /// own `delete_stream` implementation explicitly. The following example
+ /// shows how to implement `delete_stream` to get the previous default
+ /// behavior.
+ ///
+ /// ```
+ /// # use async_trait::async_trait;
+ /// # use futures::stream::{BoxStream, StreamExt};
+ /// # use object_store::path::Path;
+ /// # use object_store::{
+ /// # GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore,
+ /// # PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
+ /// # };
+ /// # use std::fmt;
+ /// # use std::fmt::Debug;
+ /// # use std::sync::Arc;
+ /// #
+ /// # struct ExampleClient;
+ /// #
+ /// # impl ExampleClient {
+ /// # async fn delete(&self, _path: &Path) -> Result<()> {
+ /// # Ok(())
+ /// # }
+ /// # }
+ /// #
+ /// # struct ExampleStore {
+ /// # client: Arc<ExampleClient>,
+ /// # }
+ /// #
+ /// # impl Debug for ExampleStore {
+ /// # fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ /// # write!(f, "ExampleStore")
+ /// # }
+ /// # }
+ /// #
+ /// # impl fmt::Display for ExampleStore {
+ /// # fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ /// # write!(f, "ExampleStore")
+ /// # }
+ /// # }
+ /// #
+ /// # #[async_trait]
+ /// # impl ObjectStore for ExampleStore {
+ /// # async fn put_opts(&self, _: &Path, _: PutPayload, _: PutOptions)
-> Result<PutResult> {
+ /// # todo!()
+ /// # }
+ /// #
+ /// # async fn put_multipart_opts(
+ /// # &self,
+ /// # _: &Path,
+ /// # _: PutMultipartOptions,
+ /// # ) -> Result<Box<dyn MultipartUpload>> {
+ /// # todo!()
+ /// # }
+ /// #
+ /// # async fn get_opts(&self, _: &Path, _: GetOptions) ->
Result<GetResult> {
+ /// # todo!()
+ /// # }
+ /// #
+ /// # async fn delete(&self, _: &Path) -> Result<()> {
+ /// # todo!()
+ /// # }
+ /// #
+ /// fn delete_stream(
+ /// &self,
+ /// locations: BoxStream<'static, Result<Path>>,
+ /// ) -> BoxStream<'static, Result<Path>> {
+ /// let client = Arc::clone(&self.client);
+ /// locations
+ /// .map(move |location| {
+ /// let client = Arc::clone(&client);
+ /// async move {
+ /// let location = location?;
+ /// client.delete(&location).await?;
+ /// Ok(location)
+ /// }
+ /// })
+ /// .buffered(10)
+ /// .boxed()
+ /// }
+ /// #
+ /// # fn list(&self, _: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
+ /// # todo!()
+ /// # }
+ /// #
+ /// # async fn list_with_delimiter(&self, _: Option<&Path>) ->
Result<ListResult> {
+ /// # todo!()
+ /// # }
+ /// #
+ /// # async fn copy(&self, _: &Path, _: &Path) -> Result<()> {
+ /// # todo!()
+ /// # }
+ /// #
+ /// # async fn copy_if_not_exists(&self, _: &Path, _: &Path) ->
Result<()> {
+ /// # todo!()
+ /// # }
+ /// # }
+ /// #
+ /// # async fn example() {
+ /// # let store = ExampleStore { client: Arc::new(ExampleClient) };
+ /// # let paths = futures::stream::iter(vec![Ok(Path::from("foo")),
Ok(Path::from("bar"))]).boxed();
+ /// # let results =
store.delete_stream(paths).collect::<Vec<_>>().await;
+ /// # assert_eq!(results.len(), 2);
+ /// # assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
+ /// # assert_eq!(results[1].as_ref().unwrap(), &Path::from("bar"));
+ /// # }
+ /// #
+ /// # let rt =
tokio::runtime::Builder::new_current_thread().build().unwrap();
+ /// # rt.block_on(example());
+ /// ```
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>>;
/// List all the objects with the given prefix.
///
@@ -1053,10 +1156,10 @@ macro_rules! as_ref_impl {
self.as_ref().delete(location).await
}
- fn delete_stream<'a>(
- &'a self,
- locations: BoxStream<'a, Result<Path>>,
- ) -> BoxStream<'a, Result<Path>> {
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
self.as_ref().delete_stream(locations)
}
diff --git a/src/limit.rs b/src/limit.rs
index ddafa92..7102f06 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -137,10 +137,10 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.delete(location).await
}
- fn delete_stream<'a>(
- &'a self,
- locations: BoxStream<'a, Result<Path>>,
- ) -> BoxStream<'a, Result<Path>> {
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
self.inner.delete_stream(locations)
}
diff --git a/src/local.rs b/src/local.rs
index 9294424..31db111 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -247,28 +247,7 @@ impl LocalFileSystem {
/// Return an absolute filesystem path of the given file location
pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
- if !is_valid_file_path(location) {
- let path = location.as_ref().into();
- let error = Error::InvalidPath { path };
- return Err(error.into());
- }
-
- let path = self.config.prefix_to_filesystem(location)?;
-
- #[cfg(target_os = "windows")]
- let path = {
- let path = path.to_string_lossy();
-
- // Assume the first char is the drive letter and the next is a
colon.
- let mut out = String::new();
- let drive = &path[..2]; // The drive letter and colon (e.g., "C:")
- let filepath = &path[2..].replace(':', "%3A"); // Replace
subsequent colons
- out.push_str(drive);
- out.push_str(filepath);
- PathBuf::from(out)
- };
-
- Ok(path)
+ self.config.path_to_filesystem(location)
}
/// Enable automatic cleanup of empty directories when deleting files
@@ -293,6 +272,32 @@ impl Config {
.map_err(|_| Error::InvalidUrl { url }.into())
}
+ /// Return an absolute filesystem path of the given file location
+ fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
+ if !is_valid_file_path(location) {
+ let path = location.as_ref().into();
+ let error = Error::InvalidPath { path };
+ return Err(error.into());
+ }
+
+ let path = self.prefix_to_filesystem(location)?;
+
+ #[cfg(target_os = "windows")]
+ let path = {
+ let path = path.to_string_lossy();
+
+ // Assume the first char is the drive letter and the next is a
colon.
+ let mut out = String::new();
+ let drive = &path[..2]; // The drive letter and colon (e.g., "C:")
+ let filepath = &path[2..].replace(':', "%3A"); // Replace
subsequent colons
+ out.push_str(drive);
+ out.push_str(filepath);
+ PathBuf::from(out)
+ };
+
+ Ok(path)
+ }
+
/// Resolves the provided absolute filesystem path to a [`Path`] prefix
fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
Ok(Path::from_absolute_path_with_base(
@@ -449,41 +454,33 @@ impl ObjectStore for LocalFileSystem {
async fn delete(&self, location: &Path) -> Result<()> {
let config = Arc::clone(&self.config);
- let path = self.path_to_filesystem(location)?;
- let automactic_cleanup = self.automatic_cleanup;
- maybe_spawn_blocking(move || {
- if let Err(e) = std::fs::remove_file(&path) {
- Err(match e.kind() {
- ErrorKind::NotFound => Error::NotFound { path, source: e
}.into(),
- _ => Error::UnableToDeleteFile { path, source: e }.into(),
- })
- } else if automactic_cleanup {
- let root = &config.root;
- let root = root
- .to_file_path()
- .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
-
- // here we will try to traverse up and delete an empty dir if
possible until we reach the root or get an error
- let mut parent = path.parent();
-
- while let Some(loc) = parent {
- if loc != root && std::fs::remove_dir(loc).is_ok() {
- parent = loc.parent();
- } else {
- break;
- }
- }
+ let automatic_cleanup = self.automatic_cleanup;
+ let location = location.clone();
+ maybe_spawn_blocking(move || Self::delete_location(config,
automatic_cleanup, &location))
+ .await
+ }
- Ok(())
- } else {
- Ok(())
- }
- })
- .await
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ let config = Arc::clone(&self.config);
+ let automatic_cleanup = self.automatic_cleanup;
+ locations
+ .map(move |location| {
+ let config = Arc::clone(&config);
+ maybe_spawn_blocking(move || {
+ let location = location?;
+ Self::delete_location(config, automatic_cleanup,
&location)?;
+ Ok(location)
+ })
+ })
+ .buffered(10)
+ .boxed()
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
- self.list_with_maybe_offset(prefix, None)
+ Self::list_with_maybe_offset(Arc::clone(&self.config), prefix, None)
}
fn list_with_offset(
@@ -491,7 +488,7 @@ impl ObjectStore for LocalFileSystem {
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
- self.list_with_maybe_offset(prefix, Some(offset))
+ Self::list_with_maybe_offset(Arc::clone(&self.config), prefix,
Some(offset))
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -629,13 +626,45 @@ impl ObjectStore for LocalFileSystem {
}
impl LocalFileSystem {
+ fn delete_location(
+ config: Arc<Config>,
+ automatic_cleanup: bool,
+ location: &Path,
+ ) -> Result<()> {
+ let path = config.path_to_filesystem(location)?;
+ if let Err(e) = std::fs::remove_file(&path) {
+ Err(match e.kind() {
+ ErrorKind::NotFound => Error::NotFound { path, source: e
}.into(),
+ _ => Error::UnableToDeleteFile { path, source: e }.into(),
+ })
+ } else if automatic_cleanup {
+ let root = &config.root;
+ let root = root
+ .to_file_path()
+ .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
+
+ // here we will try to traverse up and delete an empty dir if
possible until we reach the root or get an error
+ let mut parent = path.parent();
+
+ while let Some(loc) = parent {
+ if loc != root && std::fs::remove_dir(loc).is_ok() {
+ parent = loc.parent();
+ } else {
+ break;
+ }
+ }
+
+ Ok(())
+ } else {
+ Ok(())
+ }
+ }
+
fn list_with_maybe_offset(
- &self,
+ config: Arc<Config>,
prefix: Option<&Path>,
maybe_offset: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
- let config = Arc::clone(&self.config);
-
let root_path = match prefix {
Some(prefix) => match config.prefix_to_filesystem(prefix) {
Ok(path) => path,
diff --git a/src/memory.rs b/src/memory.rs
index 517cb1d..55d75fd 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -311,6 +311,20 @@ impl ObjectStore for InMemory {
Ok(())
}
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ let storage = Arc::clone(&self.storage);
+ locations
+ .map(move |location| {
+ let location = location?;
+ storage.write().map.remove(&location);
+ Ok(location)
+ })
+ .boxed()
+ }
+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);
diff --git a/src/prefix.rs b/src/prefix.rs
index 160fbbf..a455826 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -50,33 +50,28 @@ impl<T: ObjectStore> PrefixStore<T> {
/// Create the full path from a path relative to prefix
fn full_path(&self, location: &Path) -> Path {
- self.prefix.parts().chain(location.parts()).collect()
+ full_path(&self.prefix, location)
}
/// Strip the constant prefix from a given path
fn strip_prefix(&self, path: Path) -> Path {
- // Note cannot use match because of borrow checker
- if let Some(suffix) = path.prefix_match(&self.prefix) {
- return suffix.collect();
- }
- path
+ strip_prefix(&self.prefix, path)
}
/// Strip the constant prefix from a given ObjectMeta
fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
- ObjectMeta {
- last_modified: meta.last_modified,
- size: meta.size,
- location: self.strip_prefix(meta.location),
- e_tag: meta.e_tag,
- version: None,
- }
+ strip_meta(&self.prefix, meta)
}
}
-// Note: This is a relative hack to move these two functions to pure functions
so they don't rely
-// on the `self` lifetime. Expected to be cleaned up before merge.
-//
+// Note: This is a relative hack to move these functions to pure functions so
they don't rely
+// on the `self` lifetime.
+
+/// Create the full path from a path relative to prefix
+fn full_path(prefix: &Path, path: &Path) -> Path {
+ prefix.parts().chain(path.parts()).collect()
+}
+
/// Strip the constant prefix from a given path
fn strip_prefix(prefix: &Path, path: Path) -> Path {
// Note cannot use match because of borrow checker
@@ -96,6 +91,7 @@ fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
version: None,
}
}
+
#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
@@ -158,6 +154,21 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.delete(&full_path).await
}
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ let prefix = self.prefix.clone();
+ let locations = locations
+ .map(move |location| location.map(|loc| full_path(&prefix, &loc)))
+ .boxed();
+ let prefix = self.prefix.clone();
+ self.inner
+ .delete_stream(locations)
+ .map(move |location| location.map(|loc| strip_prefix(&prefix,
loc)))
+ .boxed()
+ }
+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
let s = self.inner.list(Some(&prefix));
diff --git a/src/throttle.rs b/src/throttle.rs
index 85a9186..9419ae8 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -237,6 +237,19 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.delete(location).await
}
+ fn delete_stream(
+ &self,
+ locations: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ // We wait for a certain duration before each delete location.
+ // This may be suboptimal if the inner store implements batch deletes.
+ // But there is no way around unnecessary waits since we do not know
+ // how the inner store implements `delete_stream`.
+ let wait_delete_per_call = self.config().wait_delete_per_call;
+ let locations = throttle_stream(locations, move |_|
wait_delete_per_call);
+ self.inner.delete_stream(locations)
+ }
+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
let stream = self.inner.list(prefix);
let config = Arc::clone(&self.config);
@@ -456,6 +469,21 @@ mod tests {
assert_bounds!(measure_delete(&store, Some(10)).await, 1);
}
+ #[tokio::test]
+ // macos github runner is so slow it can't complete within WAIT_TIME*2
+ #[cfg(target_os = "linux")]
+ async fn delete_stream_test() {
+ let inner = InMemory::new();
+ let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+ assert_bounds!(measure_delete_stream(&store, 0).await, 0);
+ assert_bounds!(measure_delete_stream(&store, 10).await, 0);
+
+ store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
+ assert_bounds!(measure_delete_stream(&store, 0).await, 0);
+ assert_bounds!(measure_delete_stream(&store, 10).await, 10);
+ }
+
#[tokio::test]
// macos github runner is so slow it can't complete within WAIT_TIME*2
#[cfg(target_os = "linux")]
@@ -598,6 +626,26 @@ mod tests {
t0.elapsed()
}
+ #[allow(dead_code)]
+ async fn measure_delete_stream(store: &ThrottledStore<InMemory>,
n_entries: usize) -> Duration {
+ let prefix = place_test_objects(store, n_entries).await;
+
+ // materialize the paths so that the throttle time for listing is not
counted
+ let paths = store.list(Some(&prefix)).collect::<Vec<_>>().await;
+ let paths = futures::stream::iter(paths)
+ .map(|x| x.map(|m| m.location))
+ .boxed();
+
+ let t0 = Instant::now();
+ store
+ .delete_stream(paths)
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ t0.elapsed()
+ }
+
#[allow(dead_code)]
#[cfg(target_os = "linux")]
async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes:
Option<usize>) -> Duration {
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index 04317af..d39d972 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -62,6 +62,13 @@ impl ObjectStore for MyStore {
todo!()
}
+ fn delete_stream(
+ &self,
+ _: BoxStream<'static, Result<Path>>,
+ ) -> BoxStream<'static, Result<Path>> {
+ todo!()
+ }
+
fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>
{
todo!()
}