This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bbd5fad chore: fix read deadlock (#165)
bbd5fad is described below
commit bbd5fad18963a6893dbb73ea7a0390bcaf2aff8e
Author: AlexZhao <[email protected]>
AuthorDate: Fri Jan 16 14:22:48 2026 +0800
chore: fix read deadlock (#165)
---
crates/fluss/src/client/table/log_fetch_buffer.rs | 26 +++++++++++++++--------
1 file changed, 17 insertions(+), 9 deletions(-)
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index e9bac53..c55c994 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -211,19 +211,27 @@ impl LogFetchBuffer {
pub fn buffered_buckets(&self) -> Vec<TableBucket> {
let mut buckets = Vec::new();
- let next_in_line_fetch = self.next_in_line_fetch.lock();
- if let Some(complete_fetch) = next_in_line_fetch.as_ref() {
- if !complete_fetch.is_consumed() {
- buckets.push(complete_fetch.table_bucket().clone());
+ // Avoid holding multiple locks at once to prevent lock-order
inversion.
+ {
+ let next_in_line_fetch = self.next_in_line_fetch.lock();
+ if let Some(complete_fetch) = next_in_line_fetch.as_ref() {
+ if !complete_fetch.is_consumed() {
+ buckets.push(complete_fetch.table_bucket().clone());
+ }
}
}
- let completed = self.completed_fetches.lock();
- for fetch in completed.iter() {
- buckets.push(fetch.table_bucket().clone());
+ {
+ let completed = self.completed_fetches.lock();
+ for fetch in completed.iter() {
+ buckets.push(fetch.table_bucket().clone());
+ }
+ }
+
+ {
+ let pending = self.pending_fetches.lock();
+ buckets.extend(pending.keys().cloned());
}
- let pending = self.pending_fetches.lock();
- buckets.extend(pending.keys().cloned());
buckets
}
}