fresh-borzoni opened a new issue, #146: URL: https://github.com/apache/fluss-rust/issues/146
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss-rust/issues) and found nothing similar. ### Please describe the bug 🐞 The `flushes_in_progress` counter is incremented in `begin_flush()` but never decremented in `await_flush_completion()`. This causes batching to be permanently disabled after the first flush. ## Root Cause This is a porting bug from the Java implementation. The Java code correctly decrements in a `finally` block: **Java (correct):** ```java public void awaitFlushCompletion() throws InterruptedException { try { for (WriteBatch.RequestFuture future : incomplete.requestResults()) { future.await(); } } finally { flushesInProgress.decrementAndGet(); // ← Cleanup } } ``` Rust (incorrect): ```rust pub async fn await_flush_completion(&self) -> Result<()> { let handles: Vec<_> = self.incomplete_batches.read().values().cloned().collect(); for result_handle in handles { result_handle.wait().await?; } Ok(()) // ← Missing decrement! } ``` Impact After the first flush() call: - flushes_in_progress = 1 forever - flush_in_progress() returns true forever - In batch_ready(): all batches become immediately sendable - Batching optimization is permanently disabled References - Java source: https://github.com/apache/fluss/blob/main/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java - Kafka source: https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ### Solution Add decrement after await loop completes (in all code paths). ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
