Copilot commented on code in PR #222:
URL: https://github.com/apache/fluss-rust/pull/222#discussion_r2739864252
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -618,12 +667,14 @@ impl LogFetcher {
}
async fn check_and_update_metadata(&self) -> Result<()> {
- let need_update = self
+ // Collect buckets that are missing leader information
+ let buckets_needing_leader: Vec<TableBucket> = self
.fetchable_buckets()
- .iter()
- .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
+ .into_iter()
+ .filter(|bucket| self.get_table_bucket_leader(bucket).is_none())
+ .collect();
Review Comment:
`buckets_needing_leader` is only used to check `is_empty()`, so collecting
into a `Vec` adds an avoidable allocation. Consider reverting to an iterator
`any(...)` boolean, or keep the `Vec` only if you’ll use it later (e.g., to
drive a partition-aware metadata refresh).
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -213,7 +213,7 @@ impl Cluster {
let mut bucket_for_table = vec![];
for bucket_metadata in table_metadata.bucket_metadata {
let bucket_id = bucket_metadata.bucket_id;
- let bucket = TableBucket::new(table_id, bucket_id);
+ let bucket = TableBucket::new(table_id, None, bucket_id);
Review Comment:
Partitioned table scans will not be able to resolve leaders because
Cluster::from_metadata_response only indexes buckets from `table_metadata` with
`partition_id=None`. When scanners create `TableBucket` with
`Some(partition_id)`, `cluster.leader_for()` won’t find a matching key.
Consider also ingesting `metadata_response.partition_metadata` and inserting
`TableBucket::new(table_id, Some(partition_id), bucket_id)` entries (and
corresponding BucketLocations) into
`available_locations_by_bucket`/`available_locations_by_path`, or normalize
leader lookups to ignore partition_id when appropriate.
```suggestion
let partition_id = bucket_metadata.partition_id;
let bucket = TableBucket::new(table_id, partition_id,
bucket_id);
```
##########
crates/fluss/src/metadata/table.rs:
##########
@@ -1169,10 +1169,10 @@ pub struct TableBucket {
}
impl TableBucket {
- pub fn new(table_id: TableId, bucket: BucketId) -> Self {
- TableBucket {
+ pub fn new(table_id: TableId, partition_id: Option<PartitionId>, bucket:
BucketId) -> Self {
+ Self {
table_id,
Review Comment:
Changing `TableBucket::new` to require `partition_id` is a breaking change
for external callers. If this type is part of the public client API, consider
keeping the old `new(table_id, bucket)` (defaulting `partition_id=None`) and
adding a new constructor like `new_with_partition(...)` to preserve backwards
compatibility.
```suggestion
pub fn new(table_id: TableId, bucket: BucketId) -> Self {
Self {
table_id,
partition_id: None,
bucket,
}
}
pub fn new_with_partition(
table_id: TableId,
partition_id: Option<PartitionId>,
bucket: BucketId,
) -> Self {
Self {
table_id,
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -647,7 +698,7 @@ impl LogFetcher {
return Ok(());
}
- // TODO: Handle PartitionNotExist error
+ // Non-partitioned table: standard metadata refresh
self.metadata
.update_tables_metadata(&HashSet::from([&self.table_path]))
.await
Review Comment:
Both the `is_partitioned` branch above and the non-partitioned path here
execute essentially the same `update_tables_metadata(...).await.or_else(...)`
logic. Consider factoring the retrying metadata refresh into a shared helper
(or unifying the branches) to avoid drift and simplify future partition-aware
enhancements.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]