Copilot commented on code in PR #102:
URL: https://github.com/apache/fluss-rust/pull/102#discussion_r2645572554


##########
crates/fluss/src/client/credentials.rs:
##########
@@ -118,7 +118,9 @@ impl CredentialsCache {
 
     async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
         let cluster = self.metadata.get_cluster();
-        let server_node = cluster.get_one_available_server();
+        let server_node = cluster
+            .get_one_available_server()
+            .expect("no tablet server available");

Review Comment:
   Using `.expect()` here will cause a panic when no tablet server is 
available. This is inconsistent with the error handling pattern established 
elsewhere in this PR (e.g., in `metadata.rs` lines 86-94 where the same 
situation is handled gracefully by attempting to reinitialize the cluster). 
Consider returning a proper error or handling the None case more gracefully.
   ```suggestion
               .ok_or_else(|| Error::JsonSerdeError {
                   message: "no tablet server available".to_string(),
               })?;
   ```



##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -64,6 +64,43 @@ impl Cluster {
         }
     }
 
+    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) -> 
Self {
+        let alive_tablet_servers_by_id = self
+            .alive_tablet_servers_by_id
+            .iter()
+            .filter(|&(id, _ts)| id != server_id)

Review Comment:
   The variable `ts` (tablet server) in the closure parameter is unused. 
Consider using just `_` to indicate it's intentionally unused, or use 
`filter(|&(id, _)| id != server_id)` to make it clearer that only the id is 
being checked.
   ```suggestion
               .filter(|&(id, _)| id != server_id)
   ```



##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -64,6 +64,43 @@ impl Cluster {
         }
     }
 
+    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) -> 
Self {
+        let alive_tablet_servers_by_id = self
+            .alive_tablet_servers_by_id
+            .iter()
+            .filter(|&(id, _ts)| id != server_id)
+            .map(|(id, ts)| (*id, ts.clone()))
+            .collect();
+
+        let table_paths: HashSet<&TablePath> = table_ids
+            .iter()
+            .filter_map(|id| self.table_path_by_id.get(id))
+            .collect();
+
+        let available_locations_by_path = self
+            .available_locations_by_path
+            .iter()
+            .filter(|&(path, _locations)| !table_paths.contains(path))
+            .map(|(path, locations)| (path.clone(), locations.clone()))
+            .collect();
+
+        let available_locations_by_bucket = self
+            .available_locations_by_bucket
+            .iter()
+            .filter(|&(_bucket, location)| 
!table_paths.contains(&location.table_path))

Review Comment:
   The `invalidate_server` method incorrectly removes all available locations 
for the affected tables. It should only remove locations where the specific 
server is the leader. Currently, if a server fails, all bucket locations for 
those tables are removed even if they have different leaders. The filter 
condition should check if the location's leader matches the failing server_id, 
not just if the table is in the affected tables list.
   ```suggestion
           let available_locations_by_path = self
               .available_locations_by_path
               .iter()
               .map(|(path, locations)| {
                   let filtered_locations: Vec<BucketLocation> = locations
                       .iter()
                       .filter(|location| location.leader.id != *server_id)
                       .cloned()
                       .collect();
                   (path.clone(), filtered_locations)
               })
               .collect();
   
           let available_locations_by_bucket = self
               .available_locations_by_bucket
               .iter()
               .filter(|&(_bucket, location)| location.leader.id != *server_id)
   ```



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -66,13 +66,25 @@ impl RpcClient {
         server_node: &ServerNode,
     ) -> Result<ServerConnection, RpcError> {
         let server_id = server_node.uid();
-        {
+        let connection = {
             let connections = self.connections.read();
-            if let Some(connection) = connections.get(server_id) {
-                return Ok(connection.clone());
+            connections.get(server_id).cloned()
+        };
+
+        if let Some(conn) = connection {
+            if !conn.is_poisoned().await {
+                return Ok(conn.clone());

Review Comment:
   The clone operation on `conn` is redundant here. The variable `conn` is 
already an `Arc<ServerConnectionInner>` (aliased as `ServerConnection`), and 
returning `Ok(conn)` would be more efficient than cloning and then returning 
the clone.
   ```suggestion
                   return Ok(conn);
   ```



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -231,6 +243,11 @@ where
         }
     }
 
+    async fn is_poisoned(&self) -> bool {

Review Comment:
   The method `is_poisoned` is declared as `async fn` but doesn't perform any 
asynchronous operations. It only locks a synchronous `parking_lot::Mutex`. This 
should be a regular synchronous method, not async. Having it as async adds 
unnecessary overhead and could be misleading to callers.
   ```suggestion
       fn is_poisoned(&self) -> bool {
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -315,9 +319,52 @@ impl LogFetcher {
         }
     }
 
+    async fn check_and_update_metadata(&self) -> Result<()> {
+        if self.is_partitioned {
+            let partition_ids: Vec<i64> = self
+                .fetchable_buckets()
+                .iter()
+                .filter(|b| self.get_table_bucket_leader(b).is_none())
+                .map(|b| b.partition_id().unwrap())

Review Comment:
   The `unwrap()` call assumes that all buckets have a partition_id when 
`is_partitioned` is true. While this is likely correct, it would be safer to 
use a more defensive approach such as `filter_map` to skip buckets without 
partition_ids, or add a clear comment explaining why the unwrap is safe.
   ```suggestion
                   .filter_map(|b| b.partition_id())
   ```



##########
crates/fluss/src/client/metadata.rs:
##########
@@ -16,45 +16,58 @@
 // under the License.
 
 use crate::cluster::{Cluster, ServerNode, ServerType};
+use crate::error::Result;
 use crate::metadata::{TableBucket, TablePath};
+use crate::proto::MetadataResponse;
 use crate::rpc::message::UpdateMetadataRequest;
 use crate::rpc::{RpcClient, ServerConnection};
+use log::info;
 use parking_lot::RwLock;
 use std::collections::HashSet;
 use std::net::SocketAddr;
 use std::sync::Arc;
 
-use crate::error::Result;
-use crate::proto::MetadataResponse;
-
 #[derive(Default)]
 pub struct Metadata {
     cluster: RwLock<Arc<Cluster>>,
     connections: Arc<RpcClient>,
+    bootstrap: Arc<str>,
 }
 
 impl Metadata {
-    pub async fn new(boot_strap: &str, connections: Arc<RpcClient>) -> 
Result<Self> {
-        let custer = Self::init_cluster(boot_strap, 
connections.clone()).await?;
+    pub async fn new(bootstrap: &str, connections: Arc<RpcClient>) -> 
Result<Self> {
+        let cluster = Self::init_cluster(bootstrap, 
connections.clone()).await?;
         Ok(Metadata {
-            cluster: RwLock::new(Arc::new(custer)),
+            cluster: RwLock::new(Arc::new(cluster)),
             connections,
+            bootstrap: bootstrap.into(),
         })
     }
 
     async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) -> 
Result<Cluster> {
-        let socker_addrss = boot_strap.parse::<SocketAddr>().unwrap();
+        let socket_address = boot_strap.parse::<SocketAddr>().unwrap();
         let server_node = ServerNode::new(
             -1,
-            socker_addrss.ip().to_string(),
-            socker_addrss.port() as u32,
+            socket_address.ip().to_string(),
+            socket_address.port() as u32,
             ServerType::CoordinatorServer,
         );
         let con = connections.get_connection(&server_node).await?;
         let response = con.request(UpdateMetadataRequest::new(&[])).await?;
         Cluster::from_metadata_response(response, None)
     }
 
+    async fn reinit_cluster(&self) -> Result<()> {
+        let cluster = Self::init_cluster(&self.bootstrap, 
self.connections.clone()).await?;
+        *self.cluster.write() = cluster.into();
+        Ok(())
+    }
+
+    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) {
+        let cluster = self.cluster.read().invalidate_server(server_id, 
table_ids);
+        *self.cluster.write() = cluster.into();

Review Comment:
   There's a potential race condition between the read and write operations. 
The read lock is released before the write lock is acquired, which means 
another thread could modify the cluster in between, causing the invalidation to 
be based on stale data or overwriting concurrent updates. Consider holding a 
write lock for the entire operation, or using a single atomic update pattern.
   ```suggestion
           // Take a write lock for the entire operation to avoid races between
           // reading the current cluster state and writing back the updated 
one.
           let mut cluster_guard = self.cluster.write();
           let updated_cluster = cluster_guard.invalidate_server(server_id, 
table_ids);
           *cluster_guard = Arc::new(updated_cluster);
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -315,9 +319,52 @@ impl LogFetcher {
         }
     }
 
+    async fn check_and_update_metadata(&self) -> Result<()> {
+        if self.is_partitioned {
+            let partition_ids: Vec<i64> = self
+                .fetchable_buckets()
+                .iter()
+                .filter(|b| self.get_table_bucket_leader(b).is_none())
+                .map(|b| b.partition_id().unwrap())
+                .collect();
+
+            if !partition_ids.is_empty() {
+                // TODO: Implement once LogFetcher is partition aware
+            }

Review Comment:
   The variable `partition_ids` is computed but never used since the TODO block 
is empty. This computation could be wasteful if there are many buckets. 
Consider removing the computation until the partition-aware implementation is 
added, or at least add a comment explaining why the computation is retained.
   ```suggestion
               // TODO: Implement partition-aware metadata refresh for buckets 
whose leaders are unknown.
               // The implementation will likely need to collect partition IDs 
for such buckets and
               // perform targeted metadata updates. Until then, we avoid 
computing unused partition_ids.
   ```



##########
crates/fluss/src/client/metadata.rs:
##########
@@ -65,7 +78,22 @@ impl Metadata {
     }
 
     pub async fn update_tables_metadata(&self, table_paths: 
&HashSet<&TablePath>) -> Result<()> {
-        let server = self.cluster.read().get_one_available_server().clone();
+        let maybe_server = {
+            let guard = self.cluster.read();
+            guard.get_one_available_server().cloned()
+        };
+
+        let server = match maybe_server {
+            Some(s) => s,
+            None => {
+                info!(
+                    "No available tablet server to update metadata, attempting 
to re-initialize cluster using bootstrap server."
+                );
+                self.reinit_cluster().await?;
+                return Ok(());

Review Comment:
   When no tablet server is available, the method reinitializes the cluster and 
returns Ok(()), but the requested table metadata is not actually fetched. The 
`init_cluster` method requests metadata with an empty table paths array, so the 
specific tables requested in `table_paths` won't have their metadata updated. 
This means the caller thinks the update succeeded, but the table metadata 
remains stale. Consider either returning an error, or storing the table_paths 
to fetch them after reinit completes, or documenting this behavior clearly if 
it's intentional.
   ```suggestion
   
                   // After re-initializing the cluster, try again to find an 
available tablet server.
                   let guard = self.cluster.read();
                   match guard.get_one_available_server().cloned() {
                       Some(s) => s,
                       None => {
                           info!(
                               "No available tablet server even after 
re-initialization; skipping table metadata update."
                           );
                           return Ok(());
                       }
                   }
   ```



##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -64,6 +64,43 @@ impl Cluster {
         }
     }
 
+    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) -> 
Self {
+        let alive_tablet_servers_by_id = self
+            .alive_tablet_servers_by_id
+            .iter()
+            .filter(|&(id, _ts)| id != server_id)
+            .map(|(id, ts)| (*id, ts.clone()))
+            .collect();
+
+        let table_paths: HashSet<&TablePath> = table_ids
+            .iter()
+            .filter_map(|id| self.table_path_by_id.get(id))
+            .collect();
+
+        let available_locations_by_path = self
+            .available_locations_by_path
+            .iter()
+            .filter(|&(path, _locations)| !table_paths.contains(path))

Review Comment:
   The variable `locations` in the closure parameter is unused. Consider using 
`_` to indicate it's intentionally unused, or use `filter(|&(path, _)| 
!table_paths.contains(path))` for clarity.
   ```suggestion
               .filter(|&(path, _)| !table_paths.contains(path))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to