This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new a9a83f1 feat: add option to disable bulk delete for aws (#734)
a9a83f1 is described below
commit a9a83f18071d9e4c5636d379aaa0513c1e06d0d4
Author: Hengfei Yang <[email protected]>
AuthorDate: Wed Jun 3 04:12:46 2026 +0800
feat: add option to disable bulk delete for aws (#734)
* feat: add option to disable bulk delete for aws
* fix(aws): correct documentation references for delete methods
* test: add more unit tests
---
src/aws/builder.rs | 35 +++++++++++
src/aws/client.rs | 173 +++++++++++++++++++++++++++++++++++++++++++++++++++++
src/aws/mod.rs | 19 ++++++
3 files changed, 227 insertions(+)
diff --git a/src/aws/builder.rs b/src/aws/builder.rs
index 85dbaae..5a6d2b4 100644
--- a/src/aws/builder.rs
+++ b/src/aws/builder.rs
@@ -181,6 +181,8 @@ pub struct AmazonS3Builder {
conditional_put: ConfigValue<S3ConditionalPut>,
/// Ignore tags
disable_tagging: ConfigValue<bool>,
+ /// Disable bulk delete
+ disable_bulk_delete: ConfigValue<bool>,
/// Encryption (See [`S3EncryptionConfigKey`])
encryption_type: Option<ConfigValue<S3EncryptionType>>,
encryption_kms_key_id: Option<String>,
@@ -429,6 +431,19 @@ pub enum AmazonS3ConfigKey {
/// - `disable_tagging`
DisableTagging,
+ /// Disable bulk delete (`DeleteObjects`, `POST /?delete`)
+ ///
+ /// If set to `true`, [`delete`](crate::ObjectStoreExt::delete) and
+ /// [`delete_stream`](crate::ObjectStore::delete_stream) will issue
+ /// single-object `DELETE /key` requests instead of the bulk
`DeleteObjects`
+ /// API (`POST /?delete`). Use this for S3-compatible providers that do not
+ /// implement `DeleteObjects` (e.g. Alibaba Cloud OSS).
+ ///
+ /// Supported keys:
+ /// - `aws_disable_bulk_delete`
+ /// - `disable_bulk_delete`
+ DisableBulkDelete,
+
/// Enable Support for S3 Express One Zone
///
/// Supported keys:
@@ -478,6 +493,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::DisableTagging => "aws_disable_tagging",
+ Self::DisableBulkDelete => "aws_disable_bulk_delete",
Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
@@ -525,6 +541,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_copy_if_not_exists" | "copy_if_not_exists" =>
Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" =>
Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" =>
Ok(Self::DisableTagging),
+ "aws_disable_bulk_delete" | "disable_bulk_delete" =>
Ok(Self::DisableBulkDelete),
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
@@ -672,6 +689,7 @@ impl AmazonS3Builder {
}
AmazonS3ConfigKey::SkipSignature =>
self.skip_signature.parse(value),
AmazonS3ConfigKey::DisableTagging =>
self.disable_tagging.parse(value),
+ AmazonS3ConfigKey::DisableBulkDelete =>
self.disable_bulk_delete.parse(value),
AmazonS3ConfigKey::CopyIfNotExists => {
self.copy_if_not_exists =
Some(ConfigValue::Deferred(value.into()))
}
@@ -745,6 +763,7 @@ impl AmazonS3Builder {
}
AmazonS3ConfigKey::ConditionalPut =>
Some(self.conditional_put.to_string()),
AmazonS3ConfigKey::DisableTagging =>
Some(self.disable_tagging.to_string()),
+ AmazonS3ConfigKey::DisableBulkDelete =>
Some(self.disable_bulk_delete.to_string()),
AmazonS3ConfigKey::RequestPayer =>
Some(self.request_payer.to_string()),
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
@@ -1018,6 +1037,21 @@ impl AmazonS3Builder {
self
}
+ /// If set to `true`, [`delete`](crate::ObjectStoreExt::delete) and
+ /// [`delete_stream`](crate::ObjectStore::delete_stream) will issue
+ /// single-object `DELETE /key` requests instead of the bulk
`DeleteObjects`
+ /// API (`POST /?delete`).
+ ///
+ /// The bulk `DeleteObjects` API is more efficient but is not implemented
by
+ /// all S3-compatible providers (e.g. Alibaba Cloud OSS). Setting this to
+ /// `true` restores the single-object delete behaviour that works against
+ /// every S3-compatible provider, at the cost of throughput when deleting
+ /// many objects via [`delete_stream`](crate::ObjectStore::delete_stream).
+ pub fn with_disable_bulk_delete(mut self, disable: bool) -> Self {
+ self.disable_bulk_delete = disable.into();
+ self
+ }
+
/// Use SSE-KMS for server side encryption.
pub fn with_sse_kms_encryption(mut self, kms_key_id: impl Into<String>) ->
Self {
self.encryption_type =
Some(ConfigValue::Parsed(S3EncryptionType::SseKms));
@@ -1241,6 +1275,7 @@ impl AmazonS3Builder {
sign_payload: !self.unsigned_payload.get()?,
skip_signature: self.skip_signature.get()?,
disable_tagging: self.disable_tagging.get()?,
+ disable_bulk_delete: self.disable_bulk_delete.get()?,
checksum,
copy_if_not_exists,
conditional_put: self.conditional_put.get()?,
diff --git a/src/aws/client.rs b/src/aws/client.rs
index f579a89..199f4a8 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -205,6 +205,7 @@ pub(crate) struct S3Config {
pub sign_payload: bool,
pub skip_signature: bool,
pub disable_tagging: bool,
+ pub disable_bulk_delete: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: S3ConditionalPut,
@@ -615,6 +616,16 @@ impl S3Client {
Ok(results)
}
+ /// Make a single-object S3 Delete request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
+ ///
+ /// Unlike [`bulk_delete_request`](Self::bulk_delete_request), this issues
a
+ /// plain `DELETE /key` request, which is part of the core S3 API and is
+ /// supported by every S3-compatible provider.
+ pub(crate) async fn delete_request(&self, path: &Path) -> Result<()> {
+ self.request(Method::DELETE, path).send().await?;
+ Ok(())
+ }
+
/// Make an S3 Copy request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
pub(crate) fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) ->
Request<'a> {
let source = format!("{}/{}", self.config.bucket, encode_path(from));
@@ -1008,10 +1019,13 @@ fn encode_path(path: &Path) -> PercentEncode<'_> {
mod tests {
use super::*;
use crate::GetOptions;
+ use crate::ObjectStore;
+ use crate::aws::{AmazonS3, AmazonS3Builder};
use crate::client::HttpClient;
use crate::client::get::GetClient;
use crate::client::mock_server::MockServer;
use crate::client::retry::RetryContext;
+ use futures_util::{StreamExt, TryStreamExt};
use http::Response;
use http::header::{AUTHORIZATION, CONTENT_LENGTH};
use hyper::Request;
@@ -1049,6 +1063,7 @@ mod tests {
retry_config: Default::default(),
sign_payload: false,
disable_tagging: false,
+ disable_bulk_delete: false,
checksum: None,
copy_if_not_exists: None,
conditional_put: Default::default(),
@@ -1104,6 +1119,7 @@ mod tests {
retry_config: Default::default(),
sign_payload: false,
disable_tagging: false,
+ disable_bulk_delete: false,
checksum: None,
copy_if_not_exists: None,
conditional_put: Default::default(),
@@ -1155,6 +1171,163 @@ mod tests {
mock.shutdown().await;
}
+ /// `(method, path, query)` captured for assertion outside the mock
closure.
+ ///
+ /// `MockServer` swallows panics raised inside its response handler (the
+ /// connection just resets and the S3 retry logic can still surface an `Ok`
+ /// result), so assertions placed inside the closure are silently ignored.
+ /// We capture into shared state and assert in the test body instead.
+ type CapturedRequest = (Method, String, Option<String>);
+
+ fn capture(captured: &Arc<std::sync::Mutex<Vec<CapturedRequest>>>, req:
&Request<Incoming>) {
+ captured.lock().unwrap().push((
+ req.method().clone(),
+ req.uri().path().to_string(),
+ req.uri().query().map(|s| s.to_string()),
+ ));
+ }
+
+ /// Build an `AmazonS3` via the public builder so that `bucket_endpoint`
+ /// is computed by the library from the addressing-style option — i.e.
+ /// the option under test actually drives the URL the client emits.
+ fn make_store(mock: &MockServer, virtual_hosted: bool,
disable_bulk_delete: bool) -> AmazonS3 {
+ AmazonS3Builder::new()
+ .with_endpoint(mock.url())
+ .with_bucket_name("test-bucket")
+ .with_region("us-east-1")
+ .with_allow_http(true)
+ .with_skip_signature(true)
+ .with_virtual_hosted_style_request(virtual_hosted)
+ .with_disable_bulk_delete(disable_bulk_delete)
+ .build()
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_delete_default() {
+ // Default: path-style + bulk delete enabled.
+ // `delete_stream` must issue a single `POST /{bucket}?delete`.
+ let mock = MockServer::new().await;
+ let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> =
Default::default();
+ let c = Arc::clone(&captured);
+ mock.push_fn(move |req| {
+ capture(&c, &req);
+ Response::builder()
+ .status(200)
+
.body("<DeleteResult><Deleted><Key>foo</Key></Deleted></DeleteResult>".to_string())
+ .unwrap()
+ });
+
+ let store = make_store(&mock, false, false);
+ let locations =
futures_util::stream::iter(vec![Ok(Path::from("foo"))]).boxed();
+ let deleted: Vec<_> =
store.delete_stream(locations).try_collect().await.unwrap();
+ assert_eq!(deleted.len(), 1);
+
+ let captured = captured.lock().unwrap().clone();
+ assert_eq!(captured.len(), 1, "expected one bulk delete request");
+ assert_eq!(captured[0].0, Method::POST);
+ assert_eq!(captured[0].1, "/test-bucket");
+ assert_eq!(captured[0].2.as_deref(), Some("delete"));
+
+ mock.shutdown().await;
+ }
+
+ #[tokio::test]
+ async fn test_delete_default_with_disable_bulk() {
+ // Path-style + bulk delete disabled.
+ // `delete_stream` must fan out into `DELETE /{bucket}/{key}` (one per
+ // object, no `?delete` query).
+ let mock = MockServer::new().await;
+ let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> =
Default::default();
+ for _ in 0..2 {
+ let c = Arc::clone(&captured);
+ mock.push_fn(move |req| {
+ capture(&c, &req);
+ Response::builder().status(204).body(String::new()).unwrap()
+ });
+ }
+
+ let store = make_store(&mock, false, true);
+ let locations =
+ futures_util::stream::iter(vec![Ok(Path::from("foo")),
Ok(Path::from("bar"))]).boxed();
+ let deleted: Vec<_> =
store.delete_stream(locations).try_collect().await.unwrap();
+ assert_eq!(deleted.len(), 2);
+
+ let mut captured = captured.lock().unwrap().clone();
+ captured.sort_by(|a, b| a.1.cmp(&b.1));
+ assert_eq!(captured.len(), 2, "expected one DELETE per object");
+ assert_eq!(
+ captured[0],
+ (Method::DELETE, "/test-bucket/bar".to_string(), None)
+ );
+ assert_eq!(
+ captured[1],
+ (Method::DELETE, "/test-bucket/foo".to_string(), None)
+ );
+
+ mock.shutdown().await;
+ }
+
+ #[tokio::test]
+ async fn test_delete_virtual_hosted() {
+ // Virtual-hosted style + bulk delete enabled.
+ // `delete_stream` must issue a single `POST /?delete` (bucket is in
+ // the host, not the path).
+ let mock = MockServer::new().await;
+ let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> =
Default::default();
+ let c = Arc::clone(&captured);
+ mock.push_fn(move |req| {
+ capture(&c, &req);
+ Response::builder()
+ .status(200)
+
.body("<DeleteResult><Deleted><Key>foo</Key></Deleted></DeleteResult>".to_string())
+ .unwrap()
+ });
+
+ let store = make_store(&mock, true, false);
+ let locations =
futures_util::stream::iter(vec![Ok(Path::from("foo"))]).boxed();
+ let deleted: Vec<_> =
store.delete_stream(locations).try_collect().await.unwrap();
+ assert_eq!(deleted.len(), 1);
+
+ let captured = captured.lock().unwrap().clone();
+ assert_eq!(captured.len(), 1, "expected one bulk delete request");
+ assert_eq!(captured[0].0, Method::POST);
+ assert_eq!(captured[0].1, "/");
+ assert_eq!(captured[0].2.as_deref(), Some("delete"));
+
+ mock.shutdown().await;
+ }
+
+ #[tokio::test]
+ async fn test_delete_virtual_hosted_with_disable_bulk() {
+ // Virtual-hosted style + bulk delete disabled.
+ // `delete_stream` must fan out into `DELETE /{key}` (no bucket in
+ // path, no `?delete` query).
+ let mock = MockServer::new().await;
+ let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> =
Default::default();
+ for _ in 0..2 {
+ let c = Arc::clone(&captured);
+ mock.push_fn(move |req| {
+ capture(&c, &req);
+ Response::builder().status(204).body(String::new()).unwrap()
+ });
+ }
+
+ let store = make_store(&mock, true, true);
+ let locations =
+ futures_util::stream::iter(vec![Ok(Path::from("foo")),
Ok(Path::from("bar"))]).boxed();
+ let deleted: Vec<_> =
store.delete_stream(locations).try_collect().await.unwrap();
+ assert_eq!(deleted.len(), 2);
+
+ let mut captured = captured.lock().unwrap().clone();
+ captured.sort_by(|a, b| a.1.cmp(&b.1));
+ assert_eq!(captured.len(), 2, "expected one DELETE per object");
+ assert_eq!(captured[0], (Method::DELETE, "/bar".to_string(), None));
+ assert_eq!(captured[1], (Method::DELETE, "/foo".to_string(), None));
+
+ mock.shutdown().await;
+ }
+
#[tokio::test]
async fn test_default_headers_signed_get_request() {
let mock = MockServer::new().await;
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 935c653..e1cdb06 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -264,6 +264,25 @@ impl ObjectStore for AmazonS3 {
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
let client = Arc::clone(&self.client);
+
+ // Some S3-compatible providers do not implement
+ // the bulk `DeleteObjects` API (`POST /?delete`). When bulk delete is
+ // disabled, fall back to parallel single-object `DELETE /key`
requests,
+ // which are part of the core S3 API supported by every provider.
+ if client.config.disable_bulk_delete {
+ return locations
+ .map(move |location| {
+ let client = Arc::clone(&client);
+ async move {
+ let location = location?;
+ client.delete_request(&location).await?;
+ Ok(location)
+ }
+ })
+ .buffered(20)
+ .boxed();
+ }
+
locations
.try_chunks(1_000)
.map(move |locations| {