This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c3b1a924c fix(services/s3): fix batch delete with version (#5684)
c3b1a924c is described below

commit c3b1a924cc38587bdcb58ce09385a33ab74414b7
Author: meteorgan <[email protected]>
AuthorDate: Mon Mar 3 13:37:24 2025 +0800

    fix(services/s3): fix batch delete with version (#5684)
    
    * fix(services/s3): fix batch delete with version
    
    * add unit tests for delete with version
    
    * refine the tests
---
 core/src/raw/accessor.rs            |  2 +-
 core/src/services/s3/core.rs        | 26 ++++++++++++
 core/src/services/s3/delete.rs      | 20 ++++++----
 core/src/types/delete/deleter.rs    | 16 ++++----
 core/src/types/operator/operator.rs | 14 +++----
 core/tests/behavior/async_delete.rs | 79 +++++++++++++++++++++++++++++++++++--
 6 files changed, 130 insertions(+), 27 deletions(-)

diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs
index fa7e0f24d..7d48cb24d 100644
--- a/core/src/raw/accessor.rs
+++ b/core/src/raw/accessor.rs
@@ -998,7 +998,7 @@ impl AccessorInfo {
         }
     }
 
-    /// Get service's full capabilities.
+    /// Update service's full capabilities.
     ///
     /// # Panic Safety
     ///
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 173a33d50..5f383ac43 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -1024,6 +1024,7 @@ pub struct DeleteObjectsResult {
 #[serde(rename_all = "PascalCase")]
 pub struct DeleteObjectsResultDeleted {
     pub key: String,
+    pub version_id: Option<String>,
 }
 
 #[derive(Default, Debug, Deserialize)]
@@ -1032,6 +1033,7 @@ pub struct DeleteObjectsResultError {
     pub code: String,
     pub key: String,
     pub message: String,
+    pub version_id: Option<String>,
 }
 
 /// Output of ListBucket/ListObjects.
@@ -1308,6 +1310,30 @@ mod tests {
         assert_eq!(out.error[0].message, "Access Denied");
     }
 
+    #[test]
+    fn test_deserialize_delete_objects_with_version_id() {
+        let bs = Bytes::from(
+            r#"<?xml version="1.0" encoding="UTF-8"?>
+                  <DeleteResult 
xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
+                    <Deleted>
+                      <Key>SampleDocument.txt</Key>
+                      <VersionId>OYcLXagmS.WaD..oyH4KRguB95_YhLs7</VersionId>
+                    </Deleted>
+                  </DeleteResult>"#,
+        );
+
+        let out: DeleteObjectsResult =
+            quick_xml::de::from_reader(bs.reader()).expect("must success");
+
+        assert_eq!(out.deleted.len(), 1);
+        assert_eq!(out.deleted[0].key, "SampleDocument.txt");
+        assert_eq!(
+            out.deleted[0].version_id,
+            Some("OYcLXagmS.WaD..oyH4KRguB95_YhLs7".to_owned())
+        );
+        assert_eq!(out.error.len(), 0);
+    }
+
     #[test]
     fn test_parse_list_output() {
         let bs = bytes::Bytes::from(
diff --git a/core/src/services/s3/delete.rs b/core/src/services/s3/delete.rs
index b2b49c840..f0912f7f1 100644
--- a/core/src/services/s3/delete.rs
+++ b/core/src/services/s3/delete.rs
@@ -80,17 +80,21 @@ impl oio::BatchDelete for S3Deleter {
         };
         for i in result.deleted {
             let path = build_rel_path(&self.core.root, &i.key);
-            // TODO: fix https://github.com/apache/opendal/issues/5329
-            batched_result.succeeded.push((path, OpDelete::new()));
+            let mut op = OpDelete::new();
+            if let Some(version_id) = i.version_id {
+                op = op.with_version(version_id.as_str());
+            }
+            batched_result.succeeded.push((path, op));
         }
         for i in result.error {
             let path = build_rel_path(&self.core.root, &i.key);
-
-            batched_result.failed.push((
-                path,
-                OpDelete::new(),
-                parse_delete_objects_result_error(i),
-            ));
+            let mut op = OpDelete::new();
+            if let Some(version_id) = &i.version_id {
+                op = op.with_version(version_id.as_str());
+            }
+            batched_result
+                .failed
+                .push((path, op, parse_delete_objects_result_error(i)));
         }
 
         Ok(batched_result)
diff --git a/core/src/types/delete/deleter.rs b/core/src/types/delete/deleter.rs
index 74163809e..9b4b2c865 100644
--- a/core/src/types/delete/deleter.rs
+++ b/core/src/types/delete/deleter.rs
@@ -118,9 +118,9 @@ impl Deleter {
     ///
     /// Also see:
     ///
-    /// - [`Deleter::delete_try_iter`]: delete an fallible iterator of paths.
+    /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
     /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
-    /// - [`Deleter::delete_try_stream`]: delete an fallible stream of paths.
+    /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
     pub async fn delete_iter<I, D>(&mut self, iter: I) -> Result<()>
     where
         I: IntoIterator<Item = D>,
@@ -132,13 +132,13 @@ impl Deleter {
         Ok(())
     }
 
-    /// Delete an fallible iterator of paths.
+    /// Delete a fallible iterator of paths.
     ///
     /// Also see:
     ///
     /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
     /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
-    /// - [`Deleter::delete_try_stream`]: delete an fallible stream of paths.
+    /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
     pub async fn delete_try_iter<I, D>(&mut self, try_iter: I) -> Result<()>
     where
         I: IntoIterator<Item = Result<D>>,
@@ -156,8 +156,8 @@ impl Deleter {
     /// Also see:
     ///
     /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
-    /// - [`Deleter::delete_try_iter`]: delete an fallible iterator of paths.
-    /// - [`Deleter::delete_try_stream`]: delete an fallible stream of paths.
+    /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
+    /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
     pub async fn delete_stream<S, D>(&mut self, mut stream: S) -> Result<()>
     where
         S: Stream<Item = D>,
@@ -171,12 +171,12 @@ impl Deleter {
         Ok(())
     }
 
-    /// Delete an fallible stream of paths.
+    /// Delete a fallible stream of paths.
     ///
     /// Also see:
     ///
     /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
-    /// - [`Deleter::delete_try_iter`]: delete an fallible iterator of paths.
+    /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
     /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
     pub async fn delete_try_stream<S, D>(&mut self, mut try_stream: S) -> 
Result<()>
     where
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index c01834656..c79a9f6fa 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1116,9 +1116,9 @@ impl Operator {
     ///
     /// Also see:
     ///
-    /// - [`Operator::delete_try_iter`]: delete an fallible iterator of paths.
+    /// - [`Operator::delete_try_iter`]: delete a fallible iterator of paths.
     /// - [`Operator::delete_stream`]: delete an infallible stream of paths.
-    /// - [`Operator::delete_try_stream`]: delete an fallible stream of paths.
+    /// - [`Operator::delete_try_stream`]: delete a fallible stream of paths.
     pub async fn delete_iter<I, D>(&self, iter: I) -> Result<()>
     where
         I: IntoIterator<Item = D>,
@@ -1136,7 +1136,7 @@ impl Operator {
     ///
     /// - [`Operator::delete_iter`]: delete an infallible iterator of paths.
     /// - [`Operator::delete_stream`]: delete an infallible stream of paths.
-    /// - [`Operator::delete_try_stream`]: delete an fallible stream of paths.
+    /// - [`Operator::delete_try_stream`]: delete a fallible stream of paths.
     pub async fn delete_try_iter<I, D>(&self, try_iter: I) -> Result<()>
     where
         I: IntoIterator<Item = Result<D>>,
@@ -1153,8 +1153,8 @@ impl Operator {
     /// Also see:
     ///
     /// - [`Operator::delete_iter`]: delete an infallible iterator of paths.
-    /// - [`Operator::delete_try_iter`]: delete an fallible iterator of paths.
-    /// - [`Operator::delete_try_stream`]: delete an fallible stream of paths.
+    /// - [`Operator::delete_try_iter`]: delete a fallible iterator of paths.
+    /// - [`Operator::delete_try_stream`]: delete a fallible stream of paths.
     pub async fn delete_stream<S, D>(&self, stream: S) -> Result<()>
     where
         S: Stream<Item = D>,
@@ -1166,12 +1166,12 @@ impl Operator {
         Ok(())
     }
 
-    /// Delete an fallible stream of paths.
+    /// Delete a fallible stream of paths.
     ///
     /// Also see:
     ///
     /// - [`Operator::delete_iter`]: delete an infallible iterator of paths.
-    /// - [`Operator::delete_try_iter`]: delete an fallible iterator of paths.
+    /// - [`Operator::delete_try_iter`]: delete a fallible iterator of paths.
     /// - [`Operator::delete_stream`]: delete an infallible stream of paths.
     pub async fn delete_try_stream<S, D>(&self, try_stream: S) -> Result<()>
     where
diff --git a/core/tests/behavior/async_delete.rs 
b/core/tests/behavior/async_delete.rs
index c860ab638..c88ecd66e 100644
--- a/core/tests/behavior/async_delete.rs
+++ b/core/tests/behavior/async_delete.rs
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::*;
 use anyhow::Result;
 use futures::TryStreamExt;
 use log::warn;
-
-use crate::*;
+use opendal::raw::{Access, OpDelete};
 
 pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
     let cap = op.info().full_capability();
@@ -34,7 +34,9 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
             test_delete_stream,
             test_remove_one_file,
             test_delete_with_version,
-            test_delete_with_not_existing_version
+            test_delete_with_not_existing_version,
+            test_batch_delete,
+            test_batch_delete_with_version
         ));
         if cap.list_with_recursive {
             tests.extend(async_trials!(op, test_remove_all_basic));
@@ -277,3 +279,74 @@ pub async fn test_delete_with_not_existing_version(op: 
Operator) -> Result<()> {
 
     Ok(())
 }
+
+pub async fn test_batch_delete(op: Operator) -> Result<()> {
+    let mut cap = op.info().full_capability();
+    if cap.delete_max_size.unwrap_or(1) <= 1 {
+        return Ok(());
+    }
+
+    cap.delete_max_size = Some(2);
+    op.inner().info().update_full_capability(|_| cap);
+
+    let mut files = Vec::new();
+    for _ in 0..5 {
+        let (path, content, _) = TEST_FIXTURE.new_file(op.clone());
+        op.write(path.as_str(), content)
+            .await
+            .expect("write must succeed");
+        files.push(path);
+    }
+
+    op.delete_iter(files.clone())
+        .await
+        .expect("batch delete must succeed");
+
+    for path in files {
+        let stat = op.stat(path.as_str()).await;
+        assert!(stat.is_err());
+        assert_eq!(stat.unwrap_err().kind(), ErrorKind::NotFound);
+    }
+
+    Ok(())
+}
+
+pub async fn test_batch_delete_with_version(op: Operator) -> Result<()> {
+    let mut cap = op.info().full_capability();
+    if !cap.delete_with_version {
+        return Ok(());
+    }
+    if cap.delete_max_size.unwrap_or(1) <= 1 {
+        return Ok(());
+    }
+
+    cap.delete_max_size = Some(2);
+    op.inner().info().update_full_capability(|_| cap);
+
+    let mut files = Vec::new();
+    for _ in 0..5 {
+        let (path, content, _) = TEST_FIXTURE.new_file(op.clone());
+        op.write(path.as_str(), content)
+            .await
+            .expect("write must succeed");
+        let meta = op.stat(path.as_str()).await.expect("stat must succeed");
+        let version = meta.version().expect("must have version");
+        let op_args = OpDelete::new().with_version(version);
+        files.push((path, op_args));
+    }
+
+    op.delete_iter(files.clone())
+        .await
+        .expect("batch delete must succeed");
+
+    for (path, args) in files {
+        let stat = op
+            .stat_with(path.as_str())
+            .version(args.version().unwrap())
+            .await;
+        assert!(stat.is_err());
+        assert_eq!(stat.unwrap_err().kind(), ErrorKind::NotFound);
+    }
+
+    Ok(())
+}

Reply via email to