This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c8cfb19f0 fix(delete): poll deleter close future on exit (#7316)
c8cfb19f0 is described below

commit c8cfb19f011f6c9692e77a86b398cea7a24641c0
Author: dentiny <[email protected]>
AuthorDate: Thu Mar 26 04:38:52 2026 -0700

    fix(delete): poll deleter close future on exit (#7316)
---
 core/core/src/types/delete/deleter.rs             | 61 +++++++++++++++++++++++
 core/core/src/types/delete/futures_delete_sink.rs |  2 +-
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/core/core/src/types/delete/deleter.rs 
b/core/core/src/types/delete/deleter.rs
index 5311ce5e6..69fb451c0 100644
--- a/core/core/src/types/delete/deleter.rs
+++ b/core/core/src/types/delete/deleter.rs
@@ -194,3 +194,64 @@ impl Deleter {
         FuturesDeleteSink::new(self)
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+    use std::sync::Mutex;
+
+    use futures::SinkExt;
+
+    use super::*;
+    use crate::raw::OpDelete;
+    use crate::raw::oio;
+
+    struct MockBatchDeleter {
+        buffer: Vec<String>,
+        flushed: Arc<Mutex<Vec<String>>>,
+    }
+
+    impl oio::Delete for MockBatchDeleter {
+        async fn delete(&mut self, path: &str, _args: OpDelete) -> Result<()> {
+            self.buffer.push(path.to_string());
+            Ok(())
+        }
+
+        async fn close(&mut self) -> Result<()> {
+            let mut flushed = self.flushed.lock().unwrap();
+            flushed.extend(self.buffer.drain(..));
+            Ok(())
+        }
+    }
+
+    #[tokio::test]
+    async fn test_sink_close_must_flush_buffered_deletes() {
+        let flushed = Arc::new(Mutex::new(Vec::<String>::new()));
+
+        let mock = MockBatchDeleter {
+            buffer: Vec::new(),
+            flushed: flushed.clone(),
+        };
+
+        let deleter = Deleter {
+            deleter: Box::new(mock),
+        };
+        let mut sink = deleter.into_sink::<String>();
+
+        sink.send("file_a".to_string()).await.unwrap();
+        sink.send("file_b".to_string()).await.unwrap();
+        sink.close().await.unwrap();
+
+        let flushed = flushed.lock().unwrap();
+        assert!(
+            flushed.contains(&"file_a".to_string()),
+            "file_a should have been flushed by close, got: {:?}",
+            *flushed
+        );
+        assert!(
+            flushed.contains(&"file_b".to_string()),
+            "file_b should have been flushed by close, got: {:?}",
+            *flushed
+        );
+    }
+}
diff --git a/core/core/src/types/delete/futures_delete_sink.rs 
b/core/core/src/types/delete/futures_delete_sink.rs
index 5600ebafc..12b2e6d68 100644
--- a/core/core/src/types/delete/futures_delete_sink.rs
+++ b/core/core/src/types/delete/futures_delete_sink.rs
@@ -126,7 +126,7 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
                         (deleter, res)
                     };
                     self.state = State::Close(Box::pin(fut));
-                    return Poll::Ready(Ok(()));
+                    continue;
                 }
                 State::Delete(fut) => {
                     let (deleter, res) = ready!(fut.as_mut().poll(cx));

Reply via email to