luoyuxia commented on code in PR #103:
URL: https://github.com/apache/fluss-rust/pull/103#discussion_r2637507210


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -350,25 +643,44 @@ impl LogFetcher {
             let offset = match 
self.log_scanner_status.get_bucket_offset(&bucket) {
                 Some(offset) => offset,
                 None => {
-                    // todo: debug
+                    debug!(
+                        "Skipping fetch request for bucket {bucket} because 
the bucket has been unsubscribed."
+                    );
                     continue;
                 }
             };
 
-            if let Some(leader) = self.get_table_bucket_leader(&bucket) {
-                let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
-                    partition_id: None,
-                    bucket_id: bucket.bucket_id(),
-                    fetch_offset: offset,
-                    // 1M
-                    max_fetch_bytes: 1024 * 1024,
-                };
-
-                fetch_log_req_for_buckets
-                    .entry(leader)
-                    .or_insert_with(Vec::new)
-                    .push(fetch_log_req_for_bucket);
-                ready_for_fetch_count += 1;
+            match self.get_table_bucket_leader(&bucket) {
+                None => {
+                    log::trace!(
+                        "Skipping fetch request for bucket {bucket} because 
leader is not available."
+                    )
+                }
+                Some(leader) => {
+                    if self
+                        .nodes_with_pending_fetch_requests
+                        .lock()
+                        .contains(&leader)
+                    {
+                        log::trace!(
+                            "Skipping fetch request for bucket {bucket} 
because previous request to server {leader} has not been processed."
+                        )
+                    } else {
+                        let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
+                            partition_id: None,
+                            bucket_id: bucket.bucket_id(),
+                            fetch_offset: offset,
+                            // 1M
+                            max_fetch_bytes: 1024 * 1024,
+                        };
+
+                        fetch_log_req_for_buckets
+                            .entry(leader)
+                            .or_insert_with(Vec::new)
+                            .push(fetch_log_req_for_bucket);
+                        ready_for_fetch_count += 1;
+                    }
+                }

Review Comment:
   I agree. But now, let's follow java side logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to