This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9873184  chore: fix append hang issue (#126)
9873184 is described below

commit 98731846d6e05f5736eb4ef9a8e9da62d51a0663
Author: AlexZhao <[email protected]>
AuthorDate: Wed Jan 7 13:50:27 2026 +0800

    chore: fix append hang issue (#126)
    
    ---------
    
    Co-authored-by: 赵海源 <[email protected]>
---
 crates/fluss/src/client/write/accumulator.rs | 50 +++++++++++++++-------------
 1 file changed, 26 insertions(+), 24 deletions(-)

diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 215adbe..beae0ca 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -299,34 +299,36 @@ impl RecordAccumulator {
                     .batches
                     .get(&table_bucket.bucket_id())
                 {
-                    let mut batch = {
+                    let mut maybe_batch = None;
+                    {
                         let mut batch_lock = deque.lock().await;
-                        if batch_lock.is_empty() {
-                            continue;
+                        if !batch_lock.is_empty() {
+                            let first_batch = batch_lock.front().unwrap();
+
+                            if size + first_batch.estimated_size_in_bytes() > 
max_size as i64
+                                && !ready.is_empty()
+                            {
+                                // there is a rare case that a single batch 
size is larger than the request size
+                                // due to compression; in this case we will 
still eventually send this batch in
+                                // a single request.
+                                break;
+                            }
+
+                            maybe_batch = 
Some(batch_lock.pop_front().unwrap());
                         }
-                        let first_batch = batch_lock.front().unwrap();
-
-                        if size + first_batch.estimated_size_in_bytes() > 
max_size as i64
-                            && !ready.is_empty()
-                        {
-                            // there is a rare case that a single batch size 
is larger than the request size
-                            // due to compression; in this case we will still 
eventually send this batch in
-                            // a single request.
-                            break;
-                        }
-
-                        batch_lock.pop_front().unwrap()
-                    };
+                    }
 
-                    let current_batch_size = batch.estimated_size_in_bytes();
-                    size += current_batch_size;
+                    if let Some(mut batch) = maybe_batch {
+                        let current_batch_size = 
batch.estimated_size_in_bytes();
+                        size += current_batch_size;
 
-                    // mark the batch as drained.
-                    batch.drained(current_time_ms());
-                    ready.push(Arc::new(ReadyWriteBatch {
-                        table_bucket,
-                        write_batch: batch,
-                    }));
+                        // mark the batch as drained.
+                        batch.drained(current_time_ms());
+                        ready.push(Arc::new(ReadyWriteBatch {
+                            table_bucket,
+                            write_batch: batch,
+                        }));
+                    }
                 }
             }
             if current_index == start {

Reply via email to