This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new dd6183f chore: fix potential deadlock with holding lock during await
(#145)
dd6183f is described below
commit dd6183f75f5c98cb569731981fa419898e73a915
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Jan 12 02:18:27 2026 +0000
chore: fix potential deadlock with holding lock during await (#145)
---
crates/fluss/src/client/write/accumulator.rs | 195 +++++++++++++++++----------
1 file changed, 121 insertions(+), 74 deletions(-)
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 001d0aa..74aab9f 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -31,6 +31,9 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
use tokio::sync::Mutex;
+// Type alias to simplify complex nested types
+type BucketBatches = Vec<(BucketId, Arc<Mutex<VecDeque<WriteBatch>>>)>;
+
#[allow(dead_code)]
pub struct RecordAccumulator {
config: Config,
@@ -138,20 +141,25 @@ impl RecordAccumulator {
abort_if_batch_full: bool,
) -> Result<RecordAppendResult> {
let table_path = &record.table_path;
- let mut binding = self
- .write_batches
- .entry(table_path.as_ref().clone())
- .or_insert_with(|| BucketAndWriteBatches {
- table_id: 0,
- is_partitioned_table: false,
- partition_id: None,
- batches: Default::default(),
- });
- let bucket_and_batches = binding.value_mut();
- let dq = bucket_and_batches
- .batches
- .entry(bucket_id)
- .or_insert_with(|| Mutex::new(VecDeque::new()));
+
+ let dq = {
+ let mut binding = self
+ .write_batches
+ .entry(table_path.as_ref().clone())
+ .or_insert_with(|| BucketAndWriteBatches {
+ table_id: 0,
+ is_partitioned_table: false,
+ partition_id: None,
+ batches: Default::default(),
+ });
+ let bucket_and_batches = binding.value_mut();
+ bucket_and_batches
+ .batches
+ .entry(bucket_id)
+ .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
+ .clone()
+ };
+
let mut dq_guard = dq.lock().await;
if let Some(append_result) = self.try_append(record, &mut dq_guard)? {
return Ok(append_result);
@@ -166,16 +174,31 @@ impl RecordAccumulator {
}
pub async fn ready(&self, cluster: &Arc<Cluster>) -> ReadyCheckResult {
+ // Snapshot just the Arcs we need, avoiding cloning the entire
BucketAndWriteBatches struct
+ let entries: Vec<(TablePath, BucketBatches)> = self
+ .write_batches
+ .iter()
+ .map(|entry| {
+ let table_path = entry.key().clone();
+ let bucket_batches: Vec<_> = entry
+ .value()
+ .batches
+ .iter()
+ .map(|(bucket_id, batch_arc)| (*bucket_id,
batch_arc.clone()))
+ .collect();
+ (table_path, bucket_batches)
+ })
+ .collect();
+
let mut ready_nodes = HashSet::new();
let mut next_ready_check_delay_ms = self.batch_timeout_ms;
let mut unknown_leader_tables = HashSet::new();
- for entry in self.write_batches.iter() {
- let table_path = entry.key();
- let batches = entry.value();
+
+ for (table_path, bucket_batches) in entries {
next_ready_check_delay_ms = self
.bucket_ready(
- table_path,
- batches,
+ &table_path,
+ bucket_batches,
&mut ready_nodes,
&mut unknown_leader_tables,
cluster,
@@ -194,7 +217,7 @@ impl RecordAccumulator {
async fn bucket_ready(
&self,
table_path: &TablePath,
- batches: &BucketAndWriteBatches,
+ bucket_batches: BucketBatches,
ready_nodes: &mut HashSet<ServerNode>,
unknown_leader_tables: &mut HashSet<TablePath>,
cluster: &Cluster,
@@ -202,7 +225,7 @@ impl RecordAccumulator {
) -> i64 {
let mut next_delay = next_ready_check_delay_ms;
- for (bucket_id, batch) in batches.batches.iter() {
+ for (bucket_id, batch) in bucket_batches {
let batch_guard = batch.lock().await;
if batch_guard.is_empty() {
continue;
@@ -212,7 +235,7 @@ impl RecordAccumulator {
let waited_time_ms = batch.waited_time_ms(current_time_ms());
let deque_size = batch_guard.len();
let full = deque_size > 1 || batch.is_closed();
- let table_bucket = cluster.get_table_bucket(table_path,
*bucket_id);
+ let table_bucket = cluster.get_table_bucket(table_path, bucket_id);
if let Some(leader) = cluster.leader_for(&table_bucket) {
next_delay =
self.batch_ready(leader, waited_time_ms, full,
ready_nodes, next_delay);
@@ -281,60 +304,77 @@ impl RecordAccumulator {
return Ok(ready);
}
- let mut nodes_drain_index_guard = self.nodes_drain_index.lock().await;
- let drain_index =
nodes_drain_index_guard.entry(node.id()).or_insert(0);
- let start = *drain_index % buckets.len();
+ // Get the start index without holding the lock across awaits
+ let start = {
+ let mut nodes_drain_index_guard =
self.nodes_drain_index.lock().await;
+ let drain_index =
nodes_drain_index_guard.entry(node.id()).or_insert(0);
+ *drain_index % buckets.len()
+ };
+
let mut current_index = start;
+ // Assigned at the start of each loop iteration (line 323), used after
loop (line 376)
+ let mut last_processed_index;
loop {
let bucket = &buckets[current_index];
let table_path = bucket.table_path.clone();
let table_bucket = bucket.table_bucket.clone();
- nodes_drain_index_guard.insert(node.id(), current_index);
+ last_processed_index = current_index;
current_index = (current_index + 1) % buckets.len();
- let bucket_and_write_batches = self.write_batches.get(&table_path);
- if let Some(bucket_and_write_batches) = bucket_and_write_batches {
- if let Some(deque) = bucket_and_write_batches
- .batches
- .get(&table_bucket.bucket_id())
+ let deque = self
+ .write_batches
+ .get(&table_path)
+ .and_then(|bucket_and_write_batches| {
+ bucket_and_write_batches
+ .batches
+ .get(&table_bucket.bucket_id())
+ .cloned()
+ });
+
+ if let Some(deque) = deque {
+ let mut maybe_batch = None;
{
- let mut maybe_batch = None;
- {
- let mut batch_lock = deque.lock().await;
- if !batch_lock.is_empty() {
- let first_batch = batch_lock.front().unwrap();
-
- if size + first_batch.estimated_size_in_bytes() >
max_size as i64
- && !ready.is_empty()
- {
- // there is a rare case that a single batch
size is larger than the request size
- // due to compression; in this case we will
still eventually send this batch in
- // a single request.
- break;
- }
-
- maybe_batch =
Some(batch_lock.pop_front().unwrap());
+ let mut batch_lock = deque.lock().await;
+ if !batch_lock.is_empty() {
+ let first_batch = batch_lock.front().unwrap();
+
+ if size + first_batch.estimated_size_in_bytes() >
max_size as i64
+ && !ready.is_empty()
+ {
+ // there is a rare case that a single batch size
is larger than the request size
+ // due to compression; in this case we will still
eventually send this batch in
+ // a single request.
+ break;
}
+
+ maybe_batch = Some(batch_lock.pop_front().unwrap());
}
+ }
- if let Some(mut batch) = maybe_batch {
- let current_batch_size =
batch.estimated_size_in_bytes();
- size += current_batch_size;
+ if let Some(mut batch) = maybe_batch {
+ let current_batch_size = batch.estimated_size_in_bytes();
+ size += current_batch_size;
- // mark the batch as drained.
- batch.drained(current_time_ms());
- ready.push(ReadyWriteBatch {
- table_bucket,
- write_batch: batch,
- });
- }
+ // mark the batch as drained.
+ batch.drained(current_time_ms());
+ ready.push(ReadyWriteBatch {
+ table_bucket,
+ write_batch: batch,
+ });
}
}
if current_index == start {
break;
}
}
+
+ // Store the last processed index to maintain round-robin fairness
+ {
+ let mut nodes_drain_index_guard =
self.nodes_drain_index.lock().await;
+ nodes_drain_index_guard.insert(node.id(), last_processed_index);
+ }
+
Ok(ready)
}
@@ -347,20 +387,25 @@ impl RecordAccumulator {
let table_path = ready_write_batch.write_batch.table_path().clone();
let bucket_id = ready_write_batch.table_bucket.bucket_id();
let table_id =
u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0);
- let mut binding =
- self.write_batches
- .entry(table_path)
- .or_insert_with(|| BucketAndWriteBatches {
- table_id,
- is_partitioned_table: false,
- partition_id: None,
- batches: Default::default(),
- });
- let bucket_and_batches = binding.value_mut();
- let dq = bucket_and_batches
- .batches
- .entry(bucket_id)
- .or_insert_with(|| Mutex::new(VecDeque::new()));
+
+ let dq = {
+ let mut binding =
+ self.write_batches
+ .entry(table_path)
+ .or_insert_with(|| BucketAndWriteBatches {
+ table_id,
+ is_partitioned_table: false,
+ partition_id: None,
+ batches: Default::default(),
+ });
+ let bucket_and_batches = binding.value_mut();
+ bucket_and_batches
+ .batches
+ .entry(bucket_id)
+ .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
+ .clone()
+ };
+
let mut dq_guard = dq.lock().await;
dq_guard.push_front(ready_write_batch.write_batch);
}
@@ -392,9 +437,11 @@ impl RecordAccumulator {
}
#[allow(unused_must_use)]
- #[allow(clippy::await_holding_lock)]
pub async fn await_flush_completion(&self) -> Result<()> {
- for result_handle in self.incomplete_batches.read().values() {
+ // Clone handles before awaiting to avoid holding RwLock read guard
across await points
+ let handles: Vec<_> =
self.incomplete_batches.read().values().cloned().collect();
+
+ for result_handle in handles {
result_handle.wait().await?;
}
Ok(())
@@ -411,7 +458,7 @@ struct BucketAndWriteBatches {
table_id: TableId,
is_partitioned_table: bool,
partition_id: Option<PartitionId>,
- batches: HashMap<BucketId, Mutex<VecDeque<WriteBatch>>>,
+ batches: HashMap<BucketId, Arc<Mutex<VecDeque<WriteBatch>>>>,
}
pub struct RecordAppendResult {