This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new bbd5fad  chore: fix read deadlock (#165)
bbd5fad is described below

commit bbd5fad18963a6893dbb73ea7a0390bcaf2aff8e
Author: AlexZhao <[email protected]>
AuthorDate: Fri Jan 16 14:22:48 2026 +0800

    chore: fix read deadlock (#165)
---
 crates/fluss/src/client/table/log_fetch_buffer.rs | 26 +++++++++++++++--------
 1 file changed, 17 insertions(+), 9 deletions(-)

diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index e9bac53..c55c994 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -211,19 +211,27 @@ impl LogFetchBuffer {
     pub fn buffered_buckets(&self) -> Vec<TableBucket> {
         let mut buckets = Vec::new();
 
-        let next_in_line_fetch = self.next_in_line_fetch.lock();
-        if let Some(complete_fetch) = next_in_line_fetch.as_ref() {
-            if !complete_fetch.is_consumed() {
-                buckets.push(complete_fetch.table_bucket().clone());
+        // Avoid holding multiple locks at once to prevent lock-order 
inversion.
+        {
+            let next_in_line_fetch = self.next_in_line_fetch.lock();
+            if let Some(complete_fetch) = next_in_line_fetch.as_ref() {
+                if !complete_fetch.is_consumed() {
+                    buckets.push(complete_fetch.table_bucket().clone());
+                }
             }
         }
 
-        let completed = self.completed_fetches.lock();
-        for fetch in completed.iter() {
-            buckets.push(fetch.table_bucket().clone());
+        {
+            let completed = self.completed_fetches.lock();
+            for fetch in completed.iter() {
+                buckets.push(fetch.table_bucket().clone());
+            }
+        }
+
+        {
+            let pending = self.pending_fetches.lock();
+            buckets.extend(pending.keys().cloned());
         }
-        let pending = self.pending_fetches.lock();
-        buckets.extend(pending.keys().cloned());
         buckets
     }
 }

Reply via email to