Copilot commented on code in PR #222:
URL: https://github.com/apache/fluss-rust/pull/222#discussion_r2762030300


##########
crates/fluss/src/metadata/table.rs:
##########
@@ -1205,10 +1205,10 @@ pub struct TableBucket {
 }
 
 impl TableBucket {
-    pub fn new(table_id: TableId, bucket: BucketId) -> Self {
-        TableBucket {
+    pub fn new(table_id: TableId, partition_id: Option<PartitionId>, bucket: 
BucketId) -> Self {
+        Self {
             table_id,
-            partition_id: None,
+            partition_id,
             bucket,
         }
     }

Review Comment:
   `TableBucket::new` now accepts an explicit `partition_id`, but 
`new_with_partition` still exists and effectively duplicates the same 
constructor behavior. To avoid API confusion/duplication, consider removing 
`new_with_partition`, or making it delegate to `new` and taking a non-optional 
`PartitionId` (so the name matches the signature).



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -43,6 +42,7 @@ use crate::record::{
 };
 use crate::rpc::{RpcClient, RpcError, message};
 use crate::util::FairBucketStatusMap;
+use crate::{PartitionId, TableId};

Review Comment:
   PR description mentions adding `TableScan.filter_partition(partition_id)`, 
but there is no `filter_partition` implementation anywhere in the Rust codebase 
(no matches found). Either implement the API on `TableScan` (and wire it into 
scan/log fetch behavior) or update the PR description / changelog to reflect 
what was actually added (currently only `subscribe_partition` is introduced).



##########
crates/fluss/tests/integration/log_table.rs:
##########
@@ -1098,11 +1098,69 @@ mod table_test {
             "Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' 
does not exist."
         ));
 
+        let log_scanner = table
+            .new_scan()
+            .create_log_scanner()
+            .expect("Failed to create log scanner");
+        let partition_info = admin
+            .list_partition_infos(&table_path)
+            .await
+            .expect("Failed to list partition infos");
+        for partition_info in partition_info {
+            log_scanner
+                .subscribe_partition(partition_info.get_partition_id(), 0, 0)
+                .await
+                .expect("Failed to subscribe to partition");
+        }
+
+        let expected_records = vec![
+            (1, "US", 100i64),
+            (2, "US", 200i64),
+            (3, "EU", 300i64),
+            (4, "EU", 400),
+            (5, "US", 500i64),
+            (6, "US", 600i64),
+            (7, "EU", 700i64),
+            (8, "EU", 800i64),
+        ];
+        let expected_records: Vec<(i32, String, i64)> = expected_records
+            .into_iter()
+            .map(|(id, region, val)| (id, region.to_string(), val))
+            .collect();
+
+        let mut collected_records: Vec<(i32, String, i64)> = Vec::new();
+        let start_time = std::time::Instant::now();
+        while collected_records.len() < expected_records.len()
+            && start_time.elapsed() < Duration::from_secs(10)
+        {
+            if let Ok(records) = 
log_scanner.poll(Duration::from_millis(500)).await {
+                for rec in records {
+                    let row = rec.row();
+                    collected_records.push((
+                        row.get_int(0),
+                        row.get_string(1).to_string(),
+                        row.get_long(2),
+                    ));
+                }

Review Comment:
   This loop silently ignores `poll` errors (`if let Ok(records) = ...`). In an 
integration test this can hide the real failure and turn it into a 
timeout/length mismatch. Consider failing fast on `Err` (e.g., by 
propagating/expecting the error) so the test reports the underlying cause.
   ```suggestion
               let records = log_scanner
                   .poll(Duration::from_millis(500))
                   .await
                   .expect("Failed to poll log scanner");
               for rec in records {
                   let row = rec.row();
                   collected_records.push((
                       row.get_int(0),
                       row.get_string(1).to_string(),
                       row.get_long(2),
                   ));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to