This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3e3ab5a chore: decrement flushes_in_progress counter in
await_flush_comp (#147)
3e3ab5a is described below
commit 3e3ab5af94c49cf5429fab0de6d69a20f4e3de45
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Jan 12 07:12:27 2026 +0000
chore: decrement flushes_in_progress counter in await_flush_comp (#147)
---
crates/fluss/src/client/write/accumulator.rs | 47 ++++++++++++++++++++++++++--
1 file changed, 44 insertions(+), 3 deletions(-)
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 74aab9f..83f11ab 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -441,10 +441,20 @@ impl RecordAccumulator {
// Clone handles before awaiting to avoid holding RwLock read guard
across await points
let handles: Vec<_> =
self.incomplete_batches.read().values().cloned().collect();
- for result_handle in handles {
- result_handle.wait().await?;
+ // Await on all handles
+ let result = async {
+ for result_handle in handles {
+ result_handle.wait().await?;
+ }
+ Ok(())
}
- Ok(())
+ .await;
+
+ // Always decrement flushes_in_progress, even if an error occurred
+ // This mimics the Java finally block behavior
+ self.flushes_in_progress.fetch_sub(1, Ordering::SeqCst);
+
+ result
}
}
@@ -557,4 +567,35 @@ mod tests {
assert_eq!(batch.write_batch.attempts(), 1);
Ok(())
}
+
+ #[tokio::test]
+ async fn flush_counter_decremented_on_error() -> Result<()> {
+ use crate::client::write::broadcast::BroadcastOnce;
+ use std::sync::atomic::Ordering;
+
+ let config = Config::default();
+ let accumulator = RecordAccumulator::new(config);
+
+ accumulator.begin_flush();
+ assert_eq!(accumulator.flushes_in_progress.load(Ordering::SeqCst), 1);
+
+ // Create a failing batch by dropping the BroadcastOnce without
broadcasting
+ {
+ let broadcast = BroadcastOnce::default();
+ let receiver = broadcast.receiver();
+ let handle = ResultHandle::new(receiver);
+ accumulator.incomplete_batches.write().insert(1, handle);
+ // broadcast is dropped here, causing an error
+ }
+
+ // Await flush completion should fail but still decrement counter
+ let result = accumulator.await_flush_completion().await;
+ assert!(result.is_err());
+
+ // Counter should still be decremented (this is the critical fix!)
+ assert_eq!(accumulator.flushes_in_progress.load(Ordering::SeqCst), 0);
+ assert!(!accumulator.flush_in_progress());
+
+ Ok(())
+ }
}