This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c8cfb19f0 fix(delete): poll deleter close future on exit (#7316)
c8cfb19f0 is described below
commit c8cfb19f011f6c9692e77a86b398cea7a24641c0
Author: dentiny <[email protected]>
AuthorDate: Thu Mar 26 04:38:52 2026 -0700
fix(delete): poll deleter close future on exit (#7316)
---
core/core/src/types/delete/deleter.rs | 61 +++++++++++++++++++++++
core/core/src/types/delete/futures_delete_sink.rs | 2 +-
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git a/core/core/src/types/delete/deleter.rs
b/core/core/src/types/delete/deleter.rs
index 5311ce5e6..69fb451c0 100644
--- a/core/core/src/types/delete/deleter.rs
+++ b/core/core/src/types/delete/deleter.rs
@@ -194,3 +194,64 @@ impl Deleter {
FuturesDeleteSink::new(self)
}
}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+ use std::sync::Mutex;
+
+ use futures::SinkExt;
+
+ use super::*;
+ use crate::raw::OpDelete;
+ use crate::raw::oio;
+
+ struct MockBatchDeleter {
+ buffer: Vec<String>,
+ flushed: Arc<Mutex<Vec<String>>>,
+ }
+
+ impl oio::Delete for MockBatchDeleter {
+ async fn delete(&mut self, path: &str, _args: OpDelete) -> Result<()> {
+ self.buffer.push(path.to_string());
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ let mut flushed = self.flushed.lock().unwrap();
+ flushed.extend(self.buffer.drain(..));
+ Ok(())
+ }
+ }
+
+ #[tokio::test]
+ async fn test_sink_close_must_flush_buffered_deletes() {
+ let flushed = Arc::new(Mutex::new(Vec::<String>::new()));
+
+ let mock = MockBatchDeleter {
+ buffer: Vec::new(),
+ flushed: flushed.clone(),
+ };
+
+ let deleter = Deleter {
+ deleter: Box::new(mock),
+ };
+ let mut sink = deleter.into_sink::<String>();
+
+ sink.send("file_a".to_string()).await.unwrap();
+ sink.send("file_b".to_string()).await.unwrap();
+ sink.close().await.unwrap();
+
+ let flushed = flushed.lock().unwrap();
+ assert!(
+ flushed.contains(&"file_a".to_string()),
+ "file_a should have been flushed by close, got: {:?}",
+ *flushed
+ );
+ assert!(
+ flushed.contains(&"file_b".to_string()),
+ "file_b should have been flushed by close, got: {:?}",
+ *flushed
+ );
+ }
+}
diff --git a/core/core/src/types/delete/futures_delete_sink.rs
b/core/core/src/types/delete/futures_delete_sink.rs
index 5600ebafc..12b2e6d68 100644
--- a/core/core/src/types/delete/futures_delete_sink.rs
+++ b/core/core/src/types/delete/futures_delete_sink.rs
@@ -126,7 +126,7 @@ impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
(deleter, res)
};
self.state = State::Close(Box::pin(fut));
- return Poll::Ready(Ok(()));
+ continue;
}
State::Delete(fut) => {
let (deleter, res) = ready!(fut.as_mut().poll(cx));