Copilot commented on code in PR #222:
URL: https://github.com/apache/fluss-rust/pull/222#discussion_r2739864252


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -618,12 +667,14 @@ impl LogFetcher {
     }
 
     async fn check_and_update_metadata(&self) -> Result<()> {
-        let need_update = self
+        // Collect buckets that are missing leader information
+        let buckets_needing_leader: Vec<TableBucket> = self
             .fetchable_buckets()
-            .iter()
-            .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
+            .into_iter()
+            .filter(|bucket| self.get_table_bucket_leader(bucket).is_none())
+            .collect();

Review Comment:
   `buckets_needing_leader` is only used to check `is_empty()`, so collecting 
into a `Vec` adds an avoidable allocation. Consider reverting to an iterator 
`any(...)` boolean, or keep the `Vec` only if you’ll use it later (e.g., to 
drive a partition-aware metadata refresh).



##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -213,7 +213,7 @@ impl Cluster {
             let mut bucket_for_table = vec![];
             for bucket_metadata in table_metadata.bucket_metadata {
                 let bucket_id = bucket_metadata.bucket_id;
-                let bucket = TableBucket::new(table_id, bucket_id);
+                let bucket = TableBucket::new(table_id, None, bucket_id);

Review Comment:
   Partitioned table scans will not be able to resolve leaders because 
Cluster::from_metadata_response only indexes buckets from `table_metadata` with 
`partition_id=None`. When scanners create `TableBucket` with 
`Some(partition_id)`, `cluster.leader_for()` won’t find a matching key. 
Consider also ingesting `metadata_response.partition_metadata` and inserting 
`TableBucket::new(table_id, Some(partition_id), bucket_id)` entries (and 
corresponding BucketLocations) into 
`available_locations_by_bucket`/`available_locations_by_path`, or normalize 
leader lookups to ignore partition_id when appropriate.
   ```suggestion
                   let partition_id = bucket_metadata.partition_id;
                   let bucket = TableBucket::new(table_id, partition_id, 
bucket_id);
   ```



##########
crates/fluss/src/metadata/table.rs:
##########
@@ -1169,10 +1169,10 @@ pub struct TableBucket {
 }
 
 impl TableBucket {
-    pub fn new(table_id: TableId, bucket: BucketId) -> Self {
-        TableBucket {
+    pub fn new(table_id: TableId, partition_id: Option<PartitionId>, bucket: 
BucketId) -> Self {
+        Self {
             table_id,

Review Comment:
   Changing `TableBucket::new` to require `partition_id` is a breaking change 
for external callers. If this type is part of the public client API, consider 
keeping the old `new(table_id, bucket)` (defaulting `partition_id=None`) and 
adding a new constructor like `new_with_partition(...)` to preserve backwards 
compatibility.
   ```suggestion
       pub fn new(table_id: TableId, bucket: BucketId) -> Self {
           Self {
               table_id,
               partition_id: None,
               bucket,
           }
       }
   
       pub fn new_with_partition(
           table_id: TableId,
           partition_id: Option<PartitionId>,
           bucket: BucketId,
       ) -> Self {
           Self {
               table_id,
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -647,7 +698,7 @@ impl LogFetcher {
             return Ok(());
         }
 
-        // TODO: Handle PartitionNotExist error
+        // Non-partitioned table: standard metadata refresh
         self.metadata
             .update_tables_metadata(&HashSet::from([&self.table_path]))
             .await

Review Comment:
   Both the `is_partitioned` branch above and the non-partitioned path here 
execute essentially the same `update_tables_metadata(...).await.or_else(...)` 
logic. Consider factoring the retrying metadata refresh into a shared helper 
(or unifying the branches) to avoid drift and simplify future partition-aware 
enhancements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to