This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e3ab5a  chore: decrement flushes_in_progress counter in 
await_flush_comp (#147)
3e3ab5a is described below

commit 3e3ab5af94c49cf5429fab0de6d69a20f4e3de45
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Jan 12 07:12:27 2026 +0000

    chore: decrement flushes_in_progress counter in await_flush_comp (#147)
---
 crates/fluss/src/client/write/accumulator.rs | 47 ++++++++++++++++++++++++++--
 1 file changed, 44 insertions(+), 3 deletions(-)

diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 74aab9f..83f11ab 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -441,10 +441,20 @@ impl RecordAccumulator {
         // Clone handles before awaiting to avoid holding RwLock read guard 
across await points
         let handles: Vec<_> = 
self.incomplete_batches.read().values().cloned().collect();
 
-        for result_handle in handles {
-            result_handle.wait().await?;
+        // Await on all handles
+        let result = async {
+            for result_handle in handles {
+                result_handle.wait().await?;
+            }
+            Ok(())
         }
-        Ok(())
+        .await;
+
+        // Always decrement flushes_in_progress, even if an error occurred
+        // This mimics the Java finally block behavior
+        self.flushes_in_progress.fetch_sub(1, Ordering::SeqCst);
+
+        result
     }
 }
 
@@ -557,4 +567,35 @@ mod tests {
         assert_eq!(batch.write_batch.attempts(), 1);
         Ok(())
     }
+
+    #[tokio::test]
+    async fn flush_counter_decremented_on_error() -> Result<()> {
+        use crate::client::write::broadcast::BroadcastOnce;
+        use std::sync::atomic::Ordering;
+
+        let config = Config::default();
+        let accumulator = RecordAccumulator::new(config);
+
+        accumulator.begin_flush();
+        assert_eq!(accumulator.flushes_in_progress.load(Ordering::SeqCst), 1);
+
+        // Create a failing batch by dropping the BroadcastOnce without 
broadcasting
+        {
+            let broadcast = BroadcastOnce::default();
+            let receiver = broadcast.receiver();
+            let handle = ResultHandle::new(receiver);
+            accumulator.incomplete_batches.write().insert(1, handle);
+            // broadcast is dropped here, causing an error
+        }
+
+        // Await flush completion should fail but still decrement counter
+        let result = accumulator.await_flush_completion().await;
+        assert!(result.is_err());
+
+        // Counter should still be decremented (this is the critical fix!)
+        assert_eq!(accumulator.flushes_in_progress.load(Ordering::SeqCst), 0);
+        assert!(!accumulator.flush_in_progress());
+
+        Ok(())
+    }
 }

Reply via email to