This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f166314  chore: scanner should retry when bucket lead is not available 
(#102)
f166314 is described below

commit f1663149d89a923631414a84b131ff512c5f6350
Author: Keith Lee <[email protected]>
AuthorDate: Thu Dec 25 14:02:50 2025 +0000

    chore: scanner should retry when bucket lead is not available (#102)
---
 crates/fluss/src/client/credentials.rs    |  4 +-
 crates/fluss/src/client/metadata.rs       | 51 ++++++++++++++++----
 crates/fluss/src/client/table/scanner.rs  | 78 ++++++++++++++++++++++++++-----
 crates/fluss/src/cluster/cluster.rs       | 52 +++++++++++++++++----
 crates/fluss/src/rpc/server_connection.rs | 25 ++++++++--
 5 files changed, 175 insertions(+), 35 deletions(-)

diff --git a/crates/fluss/src/client/credentials.rs 
b/crates/fluss/src/client/credentials.rs
index 8adfe48..ffb682e 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -118,7 +118,9 @@ impl CredentialsCache {
 
     async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
         let cluster = self.metadata.get_cluster();
-        let server_node = cluster.get_one_available_server();
+        let server_node = cluster
+            .get_one_available_server()
+            .expect("no tablet server available");
         let conn = self.rpc_client.get_connection(server_node).await?;
 
         let request = GetSecurityTokenRequest::new();
diff --git a/crates/fluss/src/client/metadata.rs 
b/crates/fluss/src/client/metadata.rs
index 3c3ba4b..a514422 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -16,38 +16,40 @@
 // under the License.
 
 use crate::cluster::{Cluster, ServerNode, ServerType};
+use crate::error::Result;
 use crate::metadata::{TableBucket, TablePath};
+use crate::proto::MetadataResponse;
 use crate::rpc::message::UpdateMetadataRequest;
 use crate::rpc::{RpcClient, ServerConnection};
+use log::info;
 use parking_lot::RwLock;
 use std::collections::HashSet;
 use std::net::SocketAddr;
 use std::sync::Arc;
 
-use crate::error::Result;
-use crate::proto::MetadataResponse;
-
 #[derive(Default)]
 pub struct Metadata {
     cluster: RwLock<Arc<Cluster>>,
     connections: Arc<RpcClient>,
+    bootstrap: Arc<str>,
 }
 
 impl Metadata {
-    pub async fn new(boot_strap: &str, connections: Arc<RpcClient>) -> 
Result<Self> {
-        let custer = Self::init_cluster(boot_strap, 
connections.clone()).await?;
+    pub async fn new(bootstrap: &str, connections: Arc<RpcClient>) -> 
Result<Self> {
+        let cluster = Self::init_cluster(bootstrap, 
connections.clone()).await?;
         Ok(Metadata {
-            cluster: RwLock::new(Arc::new(custer)),
+            cluster: RwLock::new(Arc::new(cluster)),
             connections,
+            bootstrap: bootstrap.into(),
         })
     }
 
     async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) -> 
Result<Cluster> {
-        let socker_addrss = boot_strap.parse::<SocketAddr>().unwrap();
+        let socket_address = boot_strap.parse::<SocketAddr>().unwrap();
         let server_node = ServerNode::new(
             -1,
-            socker_addrss.ip().to_string(),
-            socker_addrss.port() as u32,
+            socket_address.ip().to_string(),
+            socket_address.port() as u32,
             ServerType::CoordinatorServer,
         );
         let con = connections.get_connection(&server_node).await?;
@@ -55,6 +57,20 @@ impl Metadata {
         Cluster::from_metadata_response(response, None)
     }
 
+    async fn reinit_cluster(&self) -> Result<()> {
+        let cluster = Self::init_cluster(&self.bootstrap, 
self.connections.clone()).await?;
+        *self.cluster.write() = cluster.into();
+        Ok(())
+    }
+
+    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) {
+        // Take a write lock for the entire operation to avoid races between
+        // reading the current cluster state and writing back the updated one.
+        let mut cluster_guard = self.cluster.write();
+        let updated_cluster = cluster_guard.invalidate_server(server_id, 
table_ids);
+        *cluster_guard = Arc::new(updated_cluster);
+    }
+
     pub async fn update(&self, metadata_response: MetadataResponse) -> 
Result<()> {
         let origin_cluster = self.cluster.read().clone();
         let new_cluster =
@@ -65,7 +81,22 @@ impl Metadata {
     }
 
     pub async fn update_tables_metadata(&self, table_paths: 
&HashSet<&TablePath>) -> Result<()> {
-        let server = self.cluster.read().get_one_available_server().clone();
+        let maybe_server = {
+            let guard = self.cluster.read();
+            guard.get_one_available_server().cloned()
+        };
+
+        let server = match maybe_server {
+            Some(s) => s,
+            None => {
+                info!(
+                    "No available tablet server to update metadata, attempting 
to re-initialize cluster using bootstrap server."
+                );
+                self.reinit_cluster().await?;
+                return Ok(());
+            }
+        };
+
         let conn = self.connections.get_connection(&server).await?;
 
         let update_table_paths: Vec<&TablePath> = 
table_paths.iter().copied().collect();
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 2246e2c..11bdfa3 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -24,7 +24,7 @@ use crate::client::table::log_fetch_buffer::{
 use crate::client::table::remote_log::{
     RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
 };
-use crate::error::{Error, Result};
+use crate::error::{Error, Result, RpcError};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
 use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
@@ -271,6 +271,8 @@ struct LogFetcher {
     credentials_cache: Arc<CredentialsCache>,
     log_fetch_buffer: Arc<LogFetchBuffer>,
     nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
+    table_path: TablePath,
+    is_partitioned: bool,
 }
 
 impl LogFetcher {
@@ -299,6 +301,8 @@ impl LogFetcher {
             credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
             log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
             nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
+            table_path: table_info.table_path.clone(),
+            is_partitioned: table_info.is_partitioned(),
         })
     }
 
@@ -315,9 +319,45 @@ impl LogFetcher {
         }
     }
 
+    async fn check_and_update_metadata(&self) -> Result<()> {
+        if self.is_partitioned {
+            // TODO: Implement partition-aware metadata refresh for buckets 
whose leaders are unknown.
+            // The implementation will likely need to collect partition IDs 
for such buckets and
+            // perform targeted metadata updates. Until then, we avoid 
computing unused partition_ids.
+            return Ok(());
+        }
+
+        let need_update = self
+            .fetchable_buckets()
+            .iter()
+            .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
+
+        if !need_update {
+            return Ok(());
+        }
+
+        // TODO: Handle PartitionNotExist error
+        self.metadata
+            .update_tables_metadata(&HashSet::from([&self.table_path]))
+            .await
+            .or_else(|e| {
+                if let Error::RpcError { source, .. } = &e
+                    && matches!(source, RpcError::ConnectionError(_) | 
RpcError::Poisoned(_))
+                {
+                    warn!(
+                        "Retrying after encountering error while updating 
table metadata: {}",
+                        e
+                    );
+                    Ok(())
+                } else {
+                    Err(e)
+                }
+            })
+    }
+
     /// Send fetch requests asynchronously without waiting for responses
     async fn send_fetches(&self) -> Result<()> {
-        // todo: check update metadata like fluss-java in case leader changes
+        self.check_and_update_metadata().await?;
         let fetch_request = self.prepare_fetch_log_requests().await;
 
         for (leader, fetch_request) in fetch_request {
@@ -337,6 +377,7 @@ impl LogFetcher {
             let remote_log_downloader = 
Arc::clone(&self.remote_log_downloader);
             let creds_cache = self.credentials_cache.clone();
             let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
+            let metadata = self.metadata.clone();
 
             // Spawn async task to handle the fetch request
             // Note: These tasks are not explicitly tracked or cancelled when 
LogFetcher is dropped.
@@ -351,27 +392,34 @@ impl LogFetcher {
                     nodes_with_pending.lock().remove(&leader);
                 });
 
-                let server_node = cluster
-                    .get_tablet_server(leader)
-                    .expect("todo: handle leader not exist.");
+                let server_node = match cluster.get_tablet_server(leader) {
+                    Some(node) => node,
+                    None => {
+                        warn!("No server node found for leader {}, retrying", 
leader);
+                        Self::handle_fetch_failure(metadata, &leader, 
&fetch_request).await;
+                        return;
+                    }
+                };
 
                 let con = match conns.get_connection(server_node).await {
                     Ok(con) => con,
                     Err(e) => {
-                        // todo: handle failed to get connection
-                        warn!("Failed to get connection to destination node: 
{e:?}");
+                        warn!("Retrying after error getting connection to 
destination node: {e:?}");
+                        Self::handle_fetch_failure(metadata, &leader, 
&fetch_request).await;
                         return;
                     }
                 };
 
                 let fetch_response = match con
-                    .request(message::FetchLogRequest::new(fetch_request))
+                    
.request(message::FetchLogRequest::new(fetch_request.clone()))
                     .await
                 {
                     Ok(resp) => resp,
                     Err(e) => {
-                        // todo: handle fetch log from destination node
-                        warn!("Failed to fetch log from destination node 
{server_node:?}: {e:?}");
+                        warn!(
+                            "Retrying after error fetching log from 
destination node {server_node:?}: {e:?}"
+                        );
+                        Self::handle_fetch_failure(metadata, &leader, 
&fetch_request).await;
                         return;
                     }
                 };
@@ -387,7 +435,6 @@ impl LogFetcher {
                 )
                 .await
                 {
-                    // todo: handle fail to handle fetch response
                     error!("Fail to handle fetch response: {e:?}");
                 }
             });
@@ -396,6 +443,15 @@ impl LogFetcher {
         Ok(())
     }
 
+    async fn handle_fetch_failure(
+        metadata: Arc<Metadata>,
+        server_id: &i32,
+        request: &FetchLogRequest,
+    ) {
+        let table_ids = request.tables_req.iter().map(|r| 
r.table_id).collect();
+        metadata.invalidate_server(server_id, table_ids);
+    }
+
     /// Handle fetch response and add completed fetches to buffer
     async fn handle_fetch_response(
         fetch_response: crate::proto::FetchLogResponse,
diff --git a/crates/fluss/src/cluster/cluster.rs 
b/crates/fluss/src/cluster/cluster.rs
index a6f20a8..f14d055 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -22,7 +22,7 @@ use crate::metadata::{JsonSerde, TableBucket, 
TableDescriptor, TableInfo, TableP
 use crate::proto::MetadataResponse;
 use crate::rpc::{from_pb_server_node, from_pb_table_path};
 use rand::random_range;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 
 static EMPTY: Vec<BucketLocation> = Vec::new();
 
@@ -64,6 +64,43 @@ impl Cluster {
         }
     }
 
+    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) -> 
Self {
+        let alive_tablet_servers_by_id = self
+            .alive_tablet_servers_by_id
+            .iter()
+            .filter(|&(id, _)| id != server_id)
+            .map(|(id, ts)| (*id, ts.clone()))
+            .collect();
+
+        let table_paths: HashSet<&TablePath> = table_ids
+            .iter()
+            .filter_map(|id| self.table_path_by_id.get(id))
+            .collect();
+
+        let available_locations_by_path = self
+            .available_locations_by_path
+            .iter()
+            .filter(|&(path, _)| !table_paths.contains(path))
+            .map(|(path, locations)| (path.clone(), locations.clone()))
+            .collect();
+
+        let available_locations_by_bucket = self
+            .available_locations_by_bucket
+            .iter()
+            .filter(|&(_bucket, location)| 
!table_paths.contains(&location.table_path))
+            .map(|(bucket, location)| (bucket.clone(), location.clone()))
+            .collect();
+
+        Cluster::new(
+            self.coordinator_server.clone(),
+            alive_tablet_servers_by_id,
+            available_locations_by_path,
+            available_locations_by_bucket,
+            self.table_id_by_path.clone(),
+            self.table_info_by_path.clone(),
+        )
+    }
+
     pub fn update(&mut self, cluster: Cluster) {
         let Cluster {
             coordinator_server,
@@ -214,15 +251,12 @@ impl Cluster {
             .unwrap_or(&EMPTY)
     }
 
-    pub fn get_one_available_server(&self) -> &ServerNode {
-        assert!(
-            !self.alive_tablet_servers.is_empty(),
-            "no alive tablet server in cluster"
-        );
+    pub fn get_one_available_server(&self) -> Option<&ServerNode> {
+        if self.alive_tablet_servers.is_empty() {
+            return None;
+        }
         let offset = random_range(0..self.alive_tablet_servers.len());
-        self.alive_tablet_servers
-            .get(offset)
-            .unwrap_or_else(|| panic!("can't find alive tab server by offset 
{offset}"))
+        self.alive_tablet_servers.get(offset)
     }
 
     pub fn get_bucket_count(&self, table_path: &TablePath) -> i32 {
diff --git a/crates/fluss/src/rpc/server_connection.rs 
b/crates/fluss/src/rpc/server_connection.rs
index fdeb56f..441b175 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -66,13 +66,25 @@ impl RpcClient {
         server_node: &ServerNode,
     ) -> Result<ServerConnection, RpcError> {
         let server_id = server_node.uid();
-        {
+        let connection = {
             let connections = self.connections.read();
-            if let Some(connection) = connections.get(server_id) {
-                return Ok(connection.clone());
+            connections.get(server_id).cloned()
+        };
+
+        if let Some(conn) = connection {
+            if !conn.is_poisoned() {
+                return Ok(conn);
             }
         }
-        let new_server = self.connect(server_node).await?;
+
+        let new_server = match self.connect(server_node).await {
+            Ok(new_server) => new_server,
+            Err(e) => {
+                self.connections.write().remove(server_id);
+                return Err(e);
+            }
+        };
+
         self.connections
             .write()
             .insert(server_id.clone(), new_server.clone());
@@ -231,6 +243,11 @@ where
         }
     }
 
+    fn is_poisoned(&self) -> bool {
+        let guard = self.state.lock();
+        matches!(*guard, ConnectionState::Poison(_))
+    }
+
     pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, Error>
     where
         R: RequestBody + Send + WriteVersionedType<Vec<u8>>,

Reply via email to