This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 92b1378  refactor!: move `delete` to `ObjectStoreExt` (#549)
92b1378 is described below

commit 92b13782bda33805c6c0cc0e8d95656f8aa61cd0
Author: Marco Neumann <[email protected]>
AuthorDate: Fri Dec 12 15:05:01 2025 +0100

    refactor!: move `delete` to `ObjectStoreExt` (#549)
    
    * refactor!: move `delete` to `ObjectStoreExt`
    
    The normal delete is really just a bulk delete with a single entry.
    
    Part of #385.
    
    * refactor: improve error messages
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 src/aws/client.rs       |  7 ++++++-
 src/aws/mod.rs          |  5 -----
 src/azure/client.rs     | 42 +++---------------------------------------
 src/azure/credential.rs |  1 -
 src/azure/mod.rs        |  5 +----
 src/chunked.rs          |  4 ----
 src/gcp/mod.rs          |  4 ----
 src/http/mod.rs         |  4 ----
 src/lib.rs              | 32 +++++++++++++++++++++-----------
 src/limit.rs            | 14 ++++++++------
 src/local.rs            | 10 +---------
 src/memory.rs           |  5 -----
 src/prefix.rs           |  5 -----
 src/throttle.rs         | 11 ++++-------
 tests/get_range_file.rs |  4 ----
 15 files changed, 44 insertions(+), 109 deletions(-)

diff --git a/src/aws/client.rs b/src/aws/client.rs
index 150a47c..bd9618e 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -69,6 +69,7 @@ pub(crate) enum Error {
     #[error("Error performing DeleteObjects request: {}", source)]
     DeleteObjectsRequest {
         source: crate::client::retry::RetryError,
+        paths: Vec<String>,
     },
 
     #[error(
@@ -127,6 +128,7 @@ impl From<Error> for crate::Error {
     fn from(err: Error) -> Self {
         match err {
             Error::CompleteMultipartRequest { source, path } => 
source.error(STORE, path),
+            Error::DeleteObjectsRequest { source, paths } => 
source.error(STORE, paths.join(",")),
             _ => Self::Generic {
                 store: STORE,
                 source: Box::new(err),
@@ -551,7 +553,10 @@ impl S3Client {
             .with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
             .send_retry(&self.config.retry_config)
             .await
-            .map_err(|source| Error::DeleteObjectsRequest { source })?
+            .map_err(|source| Error::DeleteObjectsRequest {
+                source,
+                paths: paths.iter().map(|p| p.to_string()).collect(),
+            })?
             .into_body()
             .bytes()
             .await
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index cb66f9d..dd2cf6f 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -250,11 +250,6 @@ impl ObjectStore for AmazonS3 {
         self.client.get_opts(location, options).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        self.client.request(Method::DELETE, location).send().await?;
-        Ok(())
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/azure/client.rs b/src/azure/client.rs
index bce2c24..54ab307 100644
--- a/src/azure/client.rs
+++ b/src/azure/client.rs
@@ -74,12 +74,6 @@ pub(crate) enum Error {
         path: String,
     },
 
-    #[error("Error performing delete request {}: {}", path, source)]
-    DeleteRequest {
-        source: crate::client::retry::RetryError,
-        path: String,
-    },
-
     #[error("Error performing bulk delete request: {}", source)]
     BulkDeleteRequest {
         source: crate::client::retry::RetryError,
@@ -150,9 +144,9 @@ pub(crate) enum Error {
 impl From<Error> for crate::Error {
     fn from(err: Error) -> Self {
         match err {
-            Error::GetRequest { source, path }
-            | Error::DeleteRequest { source, path }
-            | Error::PutRequest { source, path } => source.error(STORE, path),
+            Error::GetRequest { source, path } | Error::PutRequest { source, 
path } => {
+                source.error(STORE, path)
+            }
             _ => Self::Generic {
                 store: STORE,
                 source: Box::new(err),
@@ -627,36 +621,6 @@ impl AzureClient {
             .map_err(|source| Error::Metadata { source })?)
     }
 
-    /// Make an Azure Delete request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
-    pub(crate) async fn delete_request<T: Serialize + ?Sized + Sync>(
-        &self,
-        path: &Path,
-        query: &T,
-    ) -> Result<()> {
-        let credential = self.get_credential().await?;
-        let url = self.config.path_url(path);
-
-        let sensitive = credential
-            .as_deref()
-            .map(|c| c.sensitive_request())
-            .unwrap_or_default();
-        self.client
-            .delete(url.as_str())
-            .query(query)
-            .header(&DELETE_SNAPSHOTS, "include")
-            .with_azure_authorization(&credential, &self.config.account)
-            .retryable(&self.config.retry_config)
-            .sensitive(sensitive)
-            .send()
-            .await
-            .map_err(|source| {
-                let path = path.as_ref().into();
-                Error::DeleteRequest { source, path }
-            })?;
-
-        Ok(())
-    }
-
     fn build_bulk_delete_body(
         &self,
         boundary: &str,
diff --git a/src/azure/credential.rs b/src/azure/credential.rs
index ae630a6..dcc6cdd 100644
--- a/src/azure/credential.rs
+++ b/src/azure/credential.rs
@@ -47,7 +47,6 @@ use url::Url;
 static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
 static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
 pub(crate) static BLOB_TYPE: HeaderName = 
HeaderName::from_static("x-ms-blob-type");
-pub(crate) static DELETE_SNAPSHOTS: HeaderName = 
HeaderName::from_static("x-ms-delete-snapshots");
 pub(crate) static COPY_SOURCE: HeaderName = 
HeaderName::from_static("x-ms-copy-source");
 static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
 static PARTNER_TOKEN: HeaderName = 
HeaderName::from_static("x-ms-partner-token");
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index d22ffcf..04c8f31 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -117,13 +117,10 @@ impl ObjectStore for MicrosoftAzure {
         self.client.get_opts(location, options).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        self.client.delete_request(location, &()).await
-    }
-
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         self.client.list(prefix)
     }
+
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/chunked.rs b/src/chunked.rs
index 49632ed..b362366 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -139,10 +139,6 @@ impl ObjectStore for ChunkedStore {
         self.inner.get_ranges(location, ranges).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        self.inner.delete(location).await
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 270b89a..2fb74b4 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -181,10 +181,6 @@ impl ObjectStore for GoogleCloudStorage {
         self.client.get_opts(location, options).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        self.client.delete_request(location).await
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/http/mod.rs b/src/http/mod.rs
index 673419c..e241002 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -132,10 +132,6 @@ impl ObjectStore for HttpStore {
         self.client.get_opts(location, options).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        self.client.delete(location).await
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/lib.rs b/src/lib.rs
index b0cf542..6ffdf6d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -853,9 +853,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
         .await
     }
 
-    /// Delete the object at the specified location.
-    async fn delete(&self, location: &Path) -> Result<()>;
-
     /// Delete all the objects at the specified locations
     ///
     /// When supported, this method will use bulk operations that delete more
@@ -958,10 +955,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// #         todo!()
     /// #     }
     /// #
-    /// #     async fn delete(&self, _: &Path) -> Result<()> {
-    /// #         todo!()
-    /// #     }
-    /// #
     /// fn delete_stream(
     ///     &self,
     ///     locations: BoxStream<'static, Result<Path>>,
@@ -1107,10 +1100,6 @@ macro_rules! as_ref_impl {
                 self.as_ref().get_ranges(location, ranges).await
             }
 
-            async fn delete(&self, location: &Path) -> Result<()> {
-                self.as_ref().delete(location).await
-            }
-
             fn delete_stream(
                 &self,
                 locations: BoxStream<'static, Result<Path>>,
@@ -1250,6 +1239,9 @@ pub trait ObjectStoreExt: ObjectStore {
     /// Return the metadata for the specified location
     fn head(&self, location: &Path) -> impl Future<Output = 
Result<ObjectMeta>>;
 
+    /// Delete the object at the specified location.
+    fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
+
     /// Copy an object from one path to another in the same object store.
     ///
     /// If there exists an object at the destination, it will be overwritten.
@@ -1306,6 +1298,24 @@ where
         Ok(self.get_opts(location, options).await?.meta)
     }
 
+    async fn delete(&self, location: &Path) -> Result<()> {
+        let location = location.clone();
+        let mut stream =
+            self.delete_stream(futures::stream::once(async move { Ok(location) 
}).boxed());
+        let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
+            store: "ext",
+            source: "`delete_stream` with one location should yield once but 
didn't".into(),
+        })?;
+        if stream.next().await.is_some() {
+            Err(Error::Generic {
+                store: "ext",
+                source: "`delete_stream` with one location expected to yield 
exactly once, but yielded more than once".into(),
+            })
+        } else {
+            Ok(())
+        }
+    }
+
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
         let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
         self.copy_opts(from, to, options).await
diff --git a/src/limit.rs b/src/limit.rs
index 7fddd63..30fe2b6 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -105,16 +105,18 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.get_ranges(location, ranges).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.delete(location).await
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
     ) -> BoxStream<'static, Result<Path>> {
-        self.inner.delete_stream(locations)
+        let inner = Arc::clone(&self.inner);
+        let fut = Arc::clone(&self.semaphore)
+            .acquire_owned()
+            .map(move |permit| {
+                let s = inner.delete_stream(locations);
+                PermitWrapper::new(s, permit.unwrap())
+            });
+        fut.into_stream().flatten().boxed()
     }
 
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
diff --git a/src/local.rs b/src/local.rs
index 5b46a7d..003e7d7 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -40,7 +40,7 @@ use crate::{
     path::{Path, absolute_path_to_url},
     util::InvalidGetRange,
 };
-use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
+use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, 
RenameTargetMode};
 
 /// A specialized `Error` for filesystem object store-related errors
 #[derive(Debug, thiserror::Error)]
@@ -444,14 +444,6 @@ impl ObjectStore for LocalFileSystem {
         .await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        let config = Arc::clone(&self.config);
-        let automatic_cleanup = self.automatic_cleanup;
-        let location = location.clone();
-        maybe_spawn_blocking(move || Self::delete_location(config, 
automatic_cleanup, &location))
-            .await
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/memory.rs b/src/memory.rs
index 08e41c2..f026907 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -294,11 +294,6 @@ impl ObjectStore for InMemory {
             .collect()
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        self.storage.write().map.remove(location);
-        Ok(())
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/prefix.rs b/src/prefix.rs
index 52173dd..cecf03f 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -124,11 +124,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.get_ranges(&full_path, ranges).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        let full_path = self.full_path(location);
-        self.inner.delete(&full_path).await
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/src/throttle.rs b/src/throttle.rs
index 3820608..1fc90d7 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -34,10 +34,13 @@ use std::time::Duration;
 /// Configuration settings for throttled store
 #[derive(Debug, Default, Clone, Copy)]
 pub struct ThrottleConfig {
-    /// Sleep duration for every call to [`delete`](ThrottledStore::delete).
+    /// Sleep duration for every call to [`delete`], or every element in 
[`delete_stream`].
     ///
     /// Sleeping is done before the underlying store is called and 
independently of the success of
     /// the operation.
+    ///
+    /// [`delete`]: crate::ObjectStoreExt::delete
+    /// [`delete_stream`]: ThrottledStore::delete_stream
     pub wait_delete_per_call: Duration,
 
     /// Sleep duration for every byte received during 
[`get_opts`](ThrottledStore::get_opts).
@@ -193,12 +196,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         self.inner.get_ranges(location, ranges).await
     }
 
-    async fn delete(&self, location: &Path) -> Result<()> {
-        sleep(self.config().wait_delete_per_call).await;
-
-        self.inner.delete(location).await
-    }
-
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index df0a414..95027eb 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -58,10 +58,6 @@ impl ObjectStore for MyStore {
         self.0.get_opts(location, options).await
     }
 
-    async fn delete(&self, _: &Path) -> Result<()> {
-        todo!()
-    }
-
     fn delete_stream(
         &self,
         _: BoxStream<'static, Result<Path>>,

Reply via email to