This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new dd6183f  chore: fix potential deadlock with holding lock during await 
(#145)
dd6183f is described below

commit dd6183f75f5c98cb569731981fa419898e73a915
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Jan 12 02:18:27 2026 +0000

    chore: fix potential deadlock with holding lock during await (#145)
---
 crates/fluss/src/client/write/accumulator.rs | 195 +++++++++++++++++----------
 1 file changed, 121 insertions(+), 74 deletions(-)

diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 001d0aa..74aab9f 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -31,6 +31,9 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
 use tokio::sync::Mutex;
 
+// Type alias to simplify complex nested types
+type BucketBatches = Vec<(BucketId, Arc<Mutex<VecDeque<WriteBatch>>>)>;
+
 #[allow(dead_code)]
 pub struct RecordAccumulator {
     config: Config,
@@ -138,20 +141,25 @@ impl RecordAccumulator {
         abort_if_batch_full: bool,
     ) -> Result<RecordAppendResult> {
         let table_path = &record.table_path;
-        let mut binding = self
-            .write_batches
-            .entry(table_path.as_ref().clone())
-            .or_insert_with(|| BucketAndWriteBatches {
-                table_id: 0,
-                is_partitioned_table: false,
-                partition_id: None,
-                batches: Default::default(),
-            });
-        let bucket_and_batches = binding.value_mut();
-        let dq = bucket_and_batches
-            .batches
-            .entry(bucket_id)
-            .or_insert_with(|| Mutex::new(VecDeque::new()));
+
+        let dq = {
+            let mut binding = self
+                .write_batches
+                .entry(table_path.as_ref().clone())
+                .or_insert_with(|| BucketAndWriteBatches {
+                    table_id: 0,
+                    is_partitioned_table: false,
+                    partition_id: None,
+                    batches: Default::default(),
+                });
+            let bucket_and_batches = binding.value_mut();
+            bucket_and_batches
+                .batches
+                .entry(bucket_id)
+                .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
+                .clone()
+        };
+
         let mut dq_guard = dq.lock().await;
         if let Some(append_result) = self.try_append(record, &mut dq_guard)? {
             return Ok(append_result);
@@ -166,16 +174,31 @@ impl RecordAccumulator {
     }
 
     pub async fn ready(&self, cluster: &Arc<Cluster>) -> ReadyCheckResult {
+        // Snapshot just the Arcs we need, avoiding cloning the entire 
BucketAndWriteBatches struct
+        let entries: Vec<(TablePath, BucketBatches)> = self
+            .write_batches
+            .iter()
+            .map(|entry| {
+                let table_path = entry.key().clone();
+                let bucket_batches: Vec<_> = entry
+                    .value()
+                    .batches
+                    .iter()
+                    .map(|(bucket_id, batch_arc)| (*bucket_id, 
batch_arc.clone()))
+                    .collect();
+                (table_path, bucket_batches)
+            })
+            .collect();
+
         let mut ready_nodes = HashSet::new();
         let mut next_ready_check_delay_ms = self.batch_timeout_ms;
         let mut unknown_leader_tables = HashSet::new();
-        for entry in self.write_batches.iter() {
-            let table_path = entry.key();
-            let batches = entry.value();
+
+        for (table_path, bucket_batches) in entries {
             next_ready_check_delay_ms = self
                 .bucket_ready(
-                    table_path,
-                    batches,
+                    &table_path,
+                    bucket_batches,
                     &mut ready_nodes,
                     &mut unknown_leader_tables,
                     cluster,
@@ -194,7 +217,7 @@ impl RecordAccumulator {
     async fn bucket_ready(
         &self,
         table_path: &TablePath,
-        batches: &BucketAndWriteBatches,
+        bucket_batches: BucketBatches,
         ready_nodes: &mut HashSet<ServerNode>,
         unknown_leader_tables: &mut HashSet<TablePath>,
         cluster: &Cluster,
@@ -202,7 +225,7 @@ impl RecordAccumulator {
     ) -> i64 {
         let mut next_delay = next_ready_check_delay_ms;
 
-        for (bucket_id, batch) in batches.batches.iter() {
+        for (bucket_id, batch) in bucket_batches {
             let batch_guard = batch.lock().await;
             if batch_guard.is_empty() {
                 continue;
@@ -212,7 +235,7 @@ impl RecordAccumulator {
             let waited_time_ms = batch.waited_time_ms(current_time_ms());
             let deque_size = batch_guard.len();
             let full = deque_size > 1 || batch.is_closed();
-            let table_bucket = cluster.get_table_bucket(table_path, 
*bucket_id);
+            let table_bucket = cluster.get_table_bucket(table_path, bucket_id);
             if let Some(leader) = cluster.leader_for(&table_bucket) {
                 next_delay =
                     self.batch_ready(leader, waited_time_ms, full, 
ready_nodes, next_delay);
@@ -281,60 +304,77 @@ impl RecordAccumulator {
             return Ok(ready);
         }
 
-        let mut nodes_drain_index_guard = self.nodes_drain_index.lock().await;
-        let drain_index = 
nodes_drain_index_guard.entry(node.id()).or_insert(0);
-        let start = *drain_index % buckets.len();
+        // Get the start index without holding the lock across awaits
+        let start = {
+            let mut nodes_drain_index_guard = 
self.nodes_drain_index.lock().await;
+            let drain_index = 
nodes_drain_index_guard.entry(node.id()).or_insert(0);
+            *drain_index % buckets.len()
+        };
+
         let mut current_index = start;
+        // Assigned at the start of each loop iteration (line 323), used after 
loop (line 376)
+        let mut last_processed_index;
 
         loop {
             let bucket = &buckets[current_index];
             let table_path = bucket.table_path.clone();
             let table_bucket = bucket.table_bucket.clone();
-            nodes_drain_index_guard.insert(node.id(), current_index);
+            last_processed_index = current_index;
             current_index = (current_index + 1) % buckets.len();
 
-            let bucket_and_write_batches = self.write_batches.get(&table_path);
-            if let Some(bucket_and_write_batches) = bucket_and_write_batches {
-                if let Some(deque) = bucket_and_write_batches
-                    .batches
-                    .get(&table_bucket.bucket_id())
+            let deque = self
+                .write_batches
+                .get(&table_path)
+                .and_then(|bucket_and_write_batches| {
+                    bucket_and_write_batches
+                        .batches
+                        .get(&table_bucket.bucket_id())
+                        .cloned()
+                });
+
+            if let Some(deque) = deque {
+                let mut maybe_batch = None;
                 {
-                    let mut maybe_batch = None;
-                    {
-                        let mut batch_lock = deque.lock().await;
-                        if !batch_lock.is_empty() {
-                            let first_batch = batch_lock.front().unwrap();
-
-                            if size + first_batch.estimated_size_in_bytes() > 
max_size as i64
-                                && !ready.is_empty()
-                            {
-                                // there is a rare case that a single batch 
size is larger than the request size
-                                // due to compression; in this case we will 
still eventually send this batch in
-                                // a single request.
-                                break;
-                            }
-
-                            maybe_batch = 
Some(batch_lock.pop_front().unwrap());
+                    let mut batch_lock = deque.lock().await;
+                    if !batch_lock.is_empty() {
+                        let first_batch = batch_lock.front().unwrap();
+
+                        if size + first_batch.estimated_size_in_bytes() > 
max_size as i64
+                            && !ready.is_empty()
+                        {
+                            // there is a rare case that a single batch size 
is larger than the request size
+                            // due to compression; in this case we will still 
eventually send this batch in
+                            // a single request.
+                            break;
                         }
+
+                        maybe_batch = Some(batch_lock.pop_front().unwrap());
                     }
+                }
 
-                    if let Some(mut batch) = maybe_batch {
-                        let current_batch_size = 
batch.estimated_size_in_bytes();
-                        size += current_batch_size;
+                if let Some(mut batch) = maybe_batch {
+                    let current_batch_size = batch.estimated_size_in_bytes();
+                    size += current_batch_size;
 
-                        // mark the batch as drained.
-                        batch.drained(current_time_ms());
-                        ready.push(ReadyWriteBatch {
-                            table_bucket,
-                            write_batch: batch,
-                        });
-                    }
+                    // mark the batch as drained.
+                    batch.drained(current_time_ms());
+                    ready.push(ReadyWriteBatch {
+                        table_bucket,
+                        write_batch: batch,
+                    });
                 }
             }
             if current_index == start {
                 break;
             }
         }
+
+        // Store the last processed index to maintain round-robin fairness
+        {
+            let mut nodes_drain_index_guard = 
self.nodes_drain_index.lock().await;
+            nodes_drain_index_guard.insert(node.id(), last_processed_index);
+        }
+
         Ok(ready)
     }
 
@@ -347,20 +387,25 @@ impl RecordAccumulator {
         let table_path = ready_write_batch.write_batch.table_path().clone();
         let bucket_id = ready_write_batch.table_bucket.bucket_id();
         let table_id = 
u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0);
-        let mut binding =
-            self.write_batches
-                .entry(table_path)
-                .or_insert_with(|| BucketAndWriteBatches {
-                    table_id,
-                    is_partitioned_table: false,
-                    partition_id: None,
-                    batches: Default::default(),
-                });
-        let bucket_and_batches = binding.value_mut();
-        let dq = bucket_and_batches
-            .batches
-            .entry(bucket_id)
-            .or_insert_with(|| Mutex::new(VecDeque::new()));
+
+        let dq = {
+            let mut binding =
+                self.write_batches
+                    .entry(table_path)
+                    .or_insert_with(|| BucketAndWriteBatches {
+                        table_id,
+                        is_partitioned_table: false,
+                        partition_id: None,
+                        batches: Default::default(),
+                    });
+            let bucket_and_batches = binding.value_mut();
+            bucket_and_batches
+                .batches
+                .entry(bucket_id)
+                .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
+                .clone()
+        };
+
         let mut dq_guard = dq.lock().await;
         dq_guard.push_front(ready_write_batch.write_batch);
     }
@@ -392,9 +437,11 @@ impl RecordAccumulator {
     }
 
     #[allow(unused_must_use)]
-    #[allow(clippy::await_holding_lock)]
     pub async fn await_flush_completion(&self) -> Result<()> {
-        for result_handle in self.incomplete_batches.read().values() {
+        // Clone handles before awaiting to avoid holding RwLock read guard 
across await points
+        let handles: Vec<_> = 
self.incomplete_batches.read().values().cloned().collect();
+
+        for result_handle in handles {
             result_handle.wait().await?;
         }
         Ok(())
@@ -411,7 +458,7 @@ struct BucketAndWriteBatches {
     table_id: TableId,
     is_partitioned_table: bool,
     partition_id: Option<PartitionId>,
-    batches: HashMap<BucketId, Mutex<VecDeque<WriteBatch>>>,
+    batches: HashMap<BucketId, Arc<Mutex<VecDeque<WriteBatch>>>>,
 }
 
 pub struct RecordAppendResult {

Reply via email to