This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c3b1a924c fix(services/s3): fix batch delete with version (#5684)
c3b1a924c is described below
commit c3b1a924cc38587bdcb58ce09385a33ab74414b7
Author: meteorgan <[email protected]>
AuthorDate: Mon Mar 3 13:37:24 2025 +0800
fix(services/s3): fix batch delete with version (#5684)
* fix(services/s3): fix batch delete with version
* add unit tests for delete with version
* refine the tests
---
core/src/raw/accessor.rs | 2 +-
core/src/services/s3/core.rs | 26 ++++++++++++
core/src/services/s3/delete.rs | 20 ++++++----
core/src/types/delete/deleter.rs | 16 ++++----
core/src/types/operator/operator.rs | 14 +++----
core/tests/behavior/async_delete.rs | 79 +++++++++++++++++++++++++++++++++++--
6 files changed, 130 insertions(+), 27 deletions(-)
diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs
index fa7e0f24d..7d48cb24d 100644
--- a/core/src/raw/accessor.rs
+++ b/core/src/raw/accessor.rs
@@ -998,7 +998,7 @@ impl AccessorInfo {
}
}
- /// Get service's full capabilities.
+ /// Update service's full capabilities.
///
/// # Panic Safety
///
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 173a33d50..5f383ac43 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -1024,6 +1024,7 @@ pub struct DeleteObjectsResult {
#[serde(rename_all = "PascalCase")]
pub struct DeleteObjectsResultDeleted {
pub key: String,
+ pub version_id: Option<String>,
}
#[derive(Default, Debug, Deserialize)]
@@ -1032,6 +1033,7 @@ pub struct DeleteObjectsResultError {
pub code: String,
pub key: String,
pub message: String,
+ pub version_id: Option<String>,
}
/// Output of ListBucket/ListObjects.
@@ -1308,6 +1310,30 @@ mod tests {
assert_eq!(out.error[0].message, "Access Denied");
}
+ #[test]
+ fn test_deserialize_delete_objects_with_version_id() {
+ let bs = Bytes::from(
+ r#"<?xml version="1.0" encoding="UTF-8"?>
+ <DeleteResult
xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <Deleted>
+ <Key>SampleDocument.txt</Key>
+ <VersionId>OYcLXagmS.WaD..oyH4KRguB95_YhLs7</VersionId>
+ </Deleted>
+ </DeleteResult>"#,
+ );
+
+ let out: DeleteObjectsResult =
+ quick_xml::de::from_reader(bs.reader()).expect("must success");
+
+ assert_eq!(out.deleted.len(), 1);
+ assert_eq!(out.deleted[0].key, "SampleDocument.txt");
+ assert_eq!(
+ out.deleted[0].version_id,
+ Some("OYcLXagmS.WaD..oyH4KRguB95_YhLs7".to_owned())
+ );
+ assert_eq!(out.error.len(), 0);
+ }
+
#[test]
fn test_parse_list_output() {
let bs = bytes::Bytes::from(
diff --git a/core/src/services/s3/delete.rs b/core/src/services/s3/delete.rs
index b2b49c840..f0912f7f1 100644
--- a/core/src/services/s3/delete.rs
+++ b/core/src/services/s3/delete.rs
@@ -80,17 +80,21 @@ impl oio::BatchDelete for S3Deleter {
};
for i in result.deleted {
let path = build_rel_path(&self.core.root, &i.key);
- // TODO: fix https://github.com/apache/opendal/issues/5329
- batched_result.succeeded.push((path, OpDelete::new()));
+ let mut op = OpDelete::new();
+ if let Some(version_id) = i.version_id {
+ op = op.with_version(version_id.as_str());
+ }
+ batched_result.succeeded.push((path, op));
}
for i in result.error {
let path = build_rel_path(&self.core.root, &i.key);
-
- batched_result.failed.push((
- path,
- OpDelete::new(),
- parse_delete_objects_result_error(i),
- ));
+ let mut op = OpDelete::new();
+ if let Some(version_id) = &i.version_id {
+ op = op.with_version(version_id.as_str());
+ }
+ batched_result
+ .failed
+ .push((path, op, parse_delete_objects_result_error(i)));
}
Ok(batched_result)
diff --git a/core/src/types/delete/deleter.rs b/core/src/types/delete/deleter.rs
index 74163809e..9b4b2c865 100644
--- a/core/src/types/delete/deleter.rs
+++ b/core/src/types/delete/deleter.rs
@@ -118,9 +118,9 @@ impl Deleter {
///
/// Also see:
///
- /// - [`Deleter::delete_try_iter`]: delete an fallible iterator of paths.
+ /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
/// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
- /// - [`Deleter::delete_try_stream`]: delete an fallible stream of paths.
+ /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
pub async fn delete_iter<I, D>(&mut self, iter: I) -> Result<()>
where
I: IntoIterator<Item = D>,
@@ -132,13 +132,13 @@ impl Deleter {
Ok(())
}
- /// Delete an fallible iterator of paths.
+ /// Delete a fallible iterator of paths.
///
/// Also see:
///
/// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
/// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
- /// - [`Deleter::delete_try_stream`]: delete an fallible stream of paths.
+ /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
pub async fn delete_try_iter<I, D>(&mut self, try_iter: I) -> Result<()>
where
I: IntoIterator<Item = Result<D>>,
@@ -156,8 +156,8 @@ impl Deleter {
/// Also see:
///
/// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
- /// - [`Deleter::delete_try_iter`]: delete an fallible iterator of paths.
- /// - [`Deleter::delete_try_stream`]: delete an fallible stream of paths.
+ /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
+ /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
pub async fn delete_stream<S, D>(&mut self, mut stream: S) -> Result<()>
where
S: Stream<Item = D>,
@@ -171,12 +171,12 @@ impl Deleter {
Ok(())
}
- /// Delete an fallible stream of paths.
+ /// Delete a fallible stream of paths.
///
/// Also see:
///
/// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
- /// - [`Deleter::delete_try_iter`]: delete an fallible iterator of paths.
+ /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
/// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
pub async fn delete_try_stream<S, D>(&mut self, mut try_stream: S) ->
Result<()>
where
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index c01834656..c79a9f6fa 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1116,9 +1116,9 @@ impl Operator {
///
/// Also see:
///
- /// - [`Operator::delete_try_iter`]: delete an fallible iterator of paths.
+ /// - [`Operator::delete_try_iter`]: delete a fallible iterator of paths.
/// - [`Operator::delete_stream`]: delete an infallible stream of paths.
- /// - [`Operator::delete_try_stream`]: delete an fallible stream of paths.
+ /// - [`Operator::delete_try_stream`]: delete a fallible stream of paths.
pub async fn delete_iter<I, D>(&self, iter: I) -> Result<()>
where
I: IntoIterator<Item = D>,
@@ -1136,7 +1136,7 @@ impl Operator {
///
/// - [`Operator::delete_iter`]: delete an infallible iterator of paths.
/// - [`Operator::delete_stream`]: delete an infallible stream of paths.
- /// - [`Operator::delete_try_stream`]: delete an fallible stream of paths.
+ /// - [`Operator::delete_try_stream`]: delete a fallible stream of paths.
pub async fn delete_try_iter<I, D>(&self, try_iter: I) -> Result<()>
where
I: IntoIterator<Item = Result<D>>,
@@ -1153,8 +1153,8 @@ impl Operator {
/// Also see:
///
/// - [`Operator::delete_iter`]: delete an infallible iterator of paths.
- /// - [`Operator::delete_try_iter`]: delete an fallible iterator of paths.
- /// - [`Operator::delete_try_stream`]: delete an fallible stream of paths.
+ /// - [`Operator::delete_try_iter`]: delete a fallible iterator of paths.
+ /// - [`Operator::delete_try_stream`]: delete a fallible stream of paths.
pub async fn delete_stream<S, D>(&self, stream: S) -> Result<()>
where
S: Stream<Item = D>,
@@ -1166,12 +1166,12 @@ impl Operator {
Ok(())
}
- /// Delete an fallible stream of paths.
+ /// Delete a fallible stream of paths.
///
/// Also see:
///
/// - [`Operator::delete_iter`]: delete an infallible iterator of paths.
- /// - [`Operator::delete_try_iter`]: delete an fallible iterator of paths.
+ /// - [`Operator::delete_try_iter`]: delete a fallible iterator of paths.
/// - [`Operator::delete_stream`]: delete an infallible stream of paths.
pub async fn delete_try_stream<S, D>(&self, try_stream: S) -> Result<()>
where
diff --git a/core/tests/behavior/async_delete.rs
b/core/tests/behavior/async_delete.rs
index c860ab638..c88ecd66e 100644
--- a/core/tests/behavior/async_delete.rs
+++ b/core/tests/behavior/async_delete.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::*;
use anyhow::Result;
use futures::TryStreamExt;
use log::warn;
-
-use crate::*;
+use opendal::raw::{Access, OpDelete};
pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
let cap = op.info().full_capability();
@@ -34,7 +34,9 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
test_delete_stream,
test_remove_one_file,
test_delete_with_version,
- test_delete_with_not_existing_version
+ test_delete_with_not_existing_version,
+ test_batch_delete,
+ test_batch_delete_with_version
));
if cap.list_with_recursive {
tests.extend(async_trials!(op, test_remove_all_basic));
@@ -277,3 +279,74 @@ pub async fn test_delete_with_not_existing_version(op:
Operator) -> Result<()> {
Ok(())
}
+
+pub async fn test_batch_delete(op: Operator) -> Result<()> {
+ let mut cap = op.info().full_capability();
+ if cap.delete_max_size.unwrap_or(1) <= 1 {
+ return Ok(());
+ }
+
+ cap.delete_max_size = Some(2);
+ op.inner().info().update_full_capability(|_| cap);
+
+ let mut files = Vec::new();
+ for _ in 0..5 {
+ let (path, content, _) = TEST_FIXTURE.new_file(op.clone());
+ op.write(path.as_str(), content)
+ .await
+ .expect("write must succeed");
+ files.push(path);
+ }
+
+ op.delete_iter(files.clone())
+ .await
+ .expect("batch delete must succeed");
+
+ for path in files {
+ let stat = op.stat(path.as_str()).await;
+ assert!(stat.is_err());
+ assert_eq!(stat.unwrap_err().kind(), ErrorKind::NotFound);
+ }
+
+ Ok(())
+}
+
+pub async fn test_batch_delete_with_version(op: Operator) -> Result<()> {
+ let mut cap = op.info().full_capability();
+ if !cap.delete_with_version {
+ return Ok(());
+ }
+ if cap.delete_max_size.unwrap_or(1) <= 1 {
+ return Ok(());
+ }
+
+ cap.delete_max_size = Some(2);
+ op.inner().info().update_full_capability(|_| cap);
+
+ let mut files = Vec::new();
+ for _ in 0..5 {
+ let (path, content, _) = TEST_FIXTURE.new_file(op.clone());
+ op.write(path.as_str(), content)
+ .await
+ .expect("write must succeed");
+ let meta = op.stat(path.as_str()).await.expect("stat must succeed");
+ let version = meta.version().expect("must have version");
+ let op_args = OpDelete::new().with_version(version);
+ files.push((path, op_args));
+ }
+
+ op.delete_iter(files.clone())
+ .await
+ .expect("batch delete must succeed");
+
+ for (path, args) in files {
+ let stat = op
+ .stat_with(path.as_str())
+ .version(args.version().unwrap())
+ .await;
+ assert!(stat.is_err());
+ assert_eq!(stat.unwrap_err().kind(), ErrorKind::NotFound);
+ }
+
+ Ok(())
+}