Copilot commented on code in PR #102:
URL: https://github.com/apache/fluss-rust/pull/102#discussion_r2645572554
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -118,7 +118,9 @@ impl CredentialsCache {
async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
let cluster = self.metadata.get_cluster();
- let server_node = cluster.get_one_available_server();
+ let server_node = cluster
+ .get_one_available_server()
+ .expect("no tablet server available");
Review Comment:
Using `.expect()` here will cause a panic when no tablet server is
available. This is inconsistent with the error handling pattern established
elsewhere in this PR (e.g., in `metadata.rs` lines 86-94 where the same
situation is handled gracefully by attempting to reinitialize the cluster).
Consider returning a proper error or handling the None case more gracefully.
```suggestion
.ok_or_else(|| Error::JsonSerdeError {
message: "no tablet server available".to_string(),
})?;
```
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -64,6 +64,43 @@ impl Cluster {
}
}
+ pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) ->
Self {
+ let alive_tablet_servers_by_id = self
+ .alive_tablet_servers_by_id
+ .iter()
+ .filter(|&(id, _ts)| id != server_id)
Review Comment:
The variable `ts` (tablet server) in the closure parameter is unused.
Consider using just `_` to indicate it's intentionally unused, or use
`filter(|&(id, _)| id != server_id)` to make it clearer that only the id is
being checked.
```suggestion
.filter(|&(id, _)| id != server_id)
```
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -64,6 +64,43 @@ impl Cluster {
}
}
+ pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) ->
Self {
+ let alive_tablet_servers_by_id = self
+ .alive_tablet_servers_by_id
+ .iter()
+ .filter(|&(id, _ts)| id != server_id)
+ .map(|(id, ts)| (*id, ts.clone()))
+ .collect();
+
+ let table_paths: HashSet<&TablePath> = table_ids
+ .iter()
+ .filter_map(|id| self.table_path_by_id.get(id))
+ .collect();
+
+ let available_locations_by_path = self
+ .available_locations_by_path
+ .iter()
+ .filter(|&(path, _locations)| !table_paths.contains(path))
+ .map(|(path, locations)| (path.clone(), locations.clone()))
+ .collect();
+
+ let available_locations_by_bucket = self
+ .available_locations_by_bucket
+ .iter()
+ .filter(|&(_bucket, location)|
!table_paths.contains(&location.table_path))
Review Comment:
The `invalidate_server` method incorrectly removes all available locations
for the affected tables. It should only remove locations where the specific
server is the leader. Currently, if a server fails, all bucket locations for
those tables are removed even if they have different leaders. The filter
condition should check if the location's leader matches the failing server_id,
not just if the table is in the affected tables list.
```suggestion
let available_locations_by_path = self
.available_locations_by_path
.iter()
.map(|(path, locations)| {
let filtered_locations: Vec<BucketLocation> = locations
.iter()
.filter(|location| location.leader.id != *server_id)
.cloned()
.collect();
(path.clone(), filtered_locations)
})
.collect();
let available_locations_by_bucket = self
.available_locations_by_bucket
.iter()
.filter(|&(_bucket, location)| location.leader.id != *server_id)
```
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -66,13 +66,25 @@ impl RpcClient {
server_node: &ServerNode,
) -> Result<ServerConnection, RpcError> {
let server_id = server_node.uid();
- {
+ let connection = {
let connections = self.connections.read();
- if let Some(connection) = connections.get(server_id) {
- return Ok(connection.clone());
+ connections.get(server_id).cloned()
+ };
+
+ if let Some(conn) = connection {
+ if !conn.is_poisoned().await {
+ return Ok(conn.clone());
Review Comment:
The clone operation on `conn` is redundant here. The variable `conn` is
already an `Arc<ServerConnectionInner>` (aliased as `ServerConnection`), and
returning `Ok(conn)` would be more efficient than cloning and then returning
the clone.
```suggestion
return Ok(conn);
```
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -231,6 +243,11 @@ where
}
}
+ async fn is_poisoned(&self) -> bool {
Review Comment:
The method `is_poisoned` is declared as `async fn` but doesn't perform any
asynchronous operations. It only locks a synchronous `parking_lot::Mutex`. This
should be a regular synchronous method, not async. Having it as async adds
unnecessary overhead and could be misleading to callers.
```suggestion
fn is_poisoned(&self) -> bool {
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -315,9 +319,52 @@ impl LogFetcher {
}
}
+ async fn check_and_update_metadata(&self) -> Result<()> {
+ if self.is_partitioned {
+ let partition_ids: Vec<i64> = self
+ .fetchable_buckets()
+ .iter()
+ .filter(|b| self.get_table_bucket_leader(b).is_none())
+ .map(|b| b.partition_id().unwrap())
Review Comment:
The `unwrap()` call assumes that all buckets have a partition_id when
`is_partitioned` is true. While this is likely correct, it would be safer to
use a more defensive approach such as `filter_map` to skip buckets without
partition_ids, or add a clear comment explaining why the unwrap is safe.
```suggestion
.filter_map(|b| b.partition_id())
```
##########
crates/fluss/src/client/metadata.rs:
##########
@@ -16,45 +16,58 @@
// under the License.
use crate::cluster::{Cluster, ServerNode, ServerType};
+use crate::error::Result;
use crate::metadata::{TableBucket, TablePath};
+use crate::proto::MetadataResponse;
use crate::rpc::message::UpdateMetadataRequest;
use crate::rpc::{RpcClient, ServerConnection};
+use log::info;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
-use crate::error::Result;
-use crate::proto::MetadataResponse;
-
#[derive(Default)]
pub struct Metadata {
cluster: RwLock<Arc<Cluster>>,
connections: Arc<RpcClient>,
+ bootstrap: Arc<str>,
}
impl Metadata {
- pub async fn new(boot_strap: &str, connections: Arc<RpcClient>) ->
Result<Self> {
- let custer = Self::init_cluster(boot_strap,
connections.clone()).await?;
+ pub async fn new(bootstrap: &str, connections: Arc<RpcClient>) ->
Result<Self> {
+ let cluster = Self::init_cluster(bootstrap,
connections.clone()).await?;
Ok(Metadata {
- cluster: RwLock::new(Arc::new(custer)),
+ cluster: RwLock::new(Arc::new(cluster)),
connections,
+ bootstrap: bootstrap.into(),
})
}
async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) ->
Result<Cluster> {
- let socker_addrss = boot_strap.parse::<SocketAddr>().unwrap();
+ let socket_address = boot_strap.parse::<SocketAddr>().unwrap();
let server_node = ServerNode::new(
-1,
- socker_addrss.ip().to_string(),
- socker_addrss.port() as u32,
+ socket_address.ip().to_string(),
+ socket_address.port() as u32,
ServerType::CoordinatorServer,
);
let con = connections.get_connection(&server_node).await?;
let response = con.request(UpdateMetadataRequest::new(&[])).await?;
Cluster::from_metadata_response(response, None)
}
+ async fn reinit_cluster(&self) -> Result<()> {
+ let cluster = Self::init_cluster(&self.bootstrap,
self.connections.clone()).await?;
+ *self.cluster.write() = cluster.into();
+ Ok(())
+ }
+
+ pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) {
+ let cluster = self.cluster.read().invalidate_server(server_id,
table_ids);
+ *self.cluster.write() = cluster.into();
Review Comment:
There's a potential race condition between the read and write operations.
The read lock is released before the write lock is acquired, which means
another thread could modify the cluster in between, causing the invalidation to
be based on stale data or overwriting concurrent updates. Consider holding a
write lock for the entire operation, or using a single atomic update pattern.
```suggestion
// Take a write lock for the entire operation to avoid races between
// reading the current cluster state and writing back the updated
one.
let mut cluster_guard = self.cluster.write();
let updated_cluster = cluster_guard.invalidate_server(server_id,
table_ids);
*cluster_guard = Arc::new(updated_cluster);
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -315,9 +319,52 @@ impl LogFetcher {
}
}
+ async fn check_and_update_metadata(&self) -> Result<()> {
+ if self.is_partitioned {
+ let partition_ids: Vec<i64> = self
+ .fetchable_buckets()
+ .iter()
+ .filter(|b| self.get_table_bucket_leader(b).is_none())
+ .map(|b| b.partition_id().unwrap())
+ .collect();
+
+ if !partition_ids.is_empty() {
+ // TODO: Implement once LogFetcher is partition aware
+ }
Review Comment:
The variable `partition_ids` is computed but never used since the TODO block
is empty. This computation could be wasteful if there are many buckets.
Consider removing the computation until the partition-aware implementation is
added, or at least add a comment explaining why the computation is retained.
```suggestion
// TODO: Implement partition-aware metadata refresh for buckets
whose leaders are unknown.
// The implementation will likely need to collect partition IDs
for such buckets and
// perform targeted metadata updates. Until then, we avoid
computing unused partition_ids.
```
##########
crates/fluss/src/client/metadata.rs:
##########
@@ -65,7 +78,22 @@ impl Metadata {
}
pub async fn update_tables_metadata(&self, table_paths:
&HashSet<&TablePath>) -> Result<()> {
- let server = self.cluster.read().get_one_available_server().clone();
+ let maybe_server = {
+ let guard = self.cluster.read();
+ guard.get_one_available_server().cloned()
+ };
+
+ let server = match maybe_server {
+ Some(s) => s,
+ None => {
+ info!(
+ "No available tablet server to update metadata, attempting
to re-initialize cluster using bootstrap server."
+ );
+ self.reinit_cluster().await?;
+ return Ok(());
Review Comment:
When no tablet server is available, the method reinitializes the cluster and
returns Ok(()), but the requested table metadata is not actually fetched. The
`init_cluster` method requests metadata with an empty table paths array, so the
specific tables requested in `table_paths` won't have their metadata updated.
This means the caller thinks the update succeeded, but the table metadata
remains stale. Consider either returning an error, or storing the table_paths
to fetch them after reinit completes, or documenting this behavior clearly if
it's intentional.
```suggestion
// After re-initializing the cluster, try again to find an
available tablet server.
let guard = self.cluster.read();
match guard.get_one_available_server().cloned() {
Some(s) => s,
None => {
info!(
"No available tablet server even after
re-initialization; skipping table metadata update."
);
return Ok(());
}
}
```
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -64,6 +64,43 @@ impl Cluster {
}
}
+ pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) ->
Self {
+ let alive_tablet_servers_by_id = self
+ .alive_tablet_servers_by_id
+ .iter()
+ .filter(|&(id, _ts)| id != server_id)
+ .map(|(id, ts)| (*id, ts.clone()))
+ .collect();
+
+ let table_paths: HashSet<&TablePath> = table_ids
+ .iter()
+ .filter_map(|id| self.table_path_by_id.get(id))
+ .collect();
+
+ let available_locations_by_path = self
+ .available_locations_by_path
+ .iter()
+ .filter(|&(path, _locations)| !table_paths.contains(path))
Review Comment:
The variable `locations` in the closure parameter is unused. Consider using
`_` to indicate it's intentionally unused, or use `filter(|&(path, _)|
!table_paths.contains(path))` for clarity.
```suggestion
.filter(|&(path, _)| !table_paths.contains(path))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]