This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9873184 chore: fix append hang issue (#126)
9873184 is described below
commit 98731846d6e05f5736eb4ef9a8e9da62d51a0663
Author: AlexZhao <[email protected]>
AuthorDate: Wed Jan 7 13:50:27 2026 +0800
chore: fix append hang issue (#126)
---------
Co-authored-by: 赵海源 <[email protected]>
---
crates/fluss/src/client/write/accumulator.rs | 50 +++++++++++++++-------------
1 file changed, 26 insertions(+), 24 deletions(-)
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 215adbe..beae0ca 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -299,34 +299,36 @@ impl RecordAccumulator {
.batches
.get(&table_bucket.bucket_id())
{
- let mut batch = {
+ let mut maybe_batch = None;
+ {
let mut batch_lock = deque.lock().await;
- if batch_lock.is_empty() {
- continue;
+ if !batch_lock.is_empty() {
+ let first_batch = batch_lock.front().unwrap();
+
+ if size + first_batch.estimated_size_in_bytes() >
max_size as i64
+ && !ready.is_empty()
+ {
+ // there is a rare case that a single batch
size is larger than the request size
+ // due to compression; in this case we will
still eventually send this batch in
+ // a single request.
+ break;
+ }
+
+ maybe_batch =
Some(batch_lock.pop_front().unwrap());
}
- let first_batch = batch_lock.front().unwrap();
-
- if size + first_batch.estimated_size_in_bytes() >
max_size as i64
- && !ready.is_empty()
- {
- // there is a rare case that a single batch size
is larger than the request size
- // due to compression; in this case we will still
eventually send this batch in
- // a single request.
- break;
- }
-
- batch_lock.pop_front().unwrap()
- };
+ }
- let current_batch_size = batch.estimated_size_in_bytes();
- size += current_batch_size;
+ if let Some(mut batch) = maybe_batch {
+ let current_batch_size =
batch.estimated_size_in_bytes();
+ size += current_batch_size;
- // mark the batch as drained.
- batch.drained(current_time_ms());
- ready.push(Arc::new(ReadyWriteBatch {
- table_bucket,
- write_batch: batch,
- }));
+ // mark the batch as drained.
+ batch.drained(current_time_ms());
+ ready.push(Arc::new(ReadyWriteBatch {
+ table_bucket,
+ write_batch: batch,
+ }));
+ }
}
}
if current_index == start {