This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c5af55  chore: invalidate leader info on stale metadata error when 
handling fetch response (#189)
7c5af55 is described below

commit 7c5af553e8482f9809a380a8934608a5a931ac7d
Author: AlexZhao <[email protected]>
AuthorDate: Fri Jan 23 14:44:55 2026 +0800

    chore: invalidate leader info on stale metadata error when handling fetch 
response (#189)
---
 crates/fluss/src/client/metadata.rs      |  12 ++-
 crates/fluss/src/client/table/scanner.rs | 149 +++++++++++++++++++++++++------
 crates/fluss/src/cluster/cluster.rs      |  64 ++++++++++---
 crates/fluss/src/metadata/table.rs       |   2 +-
 4 files changed, 185 insertions(+), 42 deletions(-)

diff --git a/crates/fluss/src/client/metadata.rs 
b/crates/fluss/src/client/metadata.rs
index 0e6f965..3c6730b 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -17,7 +17,7 @@
 
 use crate::cluster::{Cluster, ServerNode, ServerType};
 use crate::error::Result;
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
 use crate::proto::MetadataResponse;
 use crate::rpc::message::UpdateMetadataRequest;
 use crate::rpc::{RpcClient, ServerConnection};
@@ -71,6 +71,16 @@ impl Metadata {
         *cluster_guard = Arc::new(updated_cluster);
     }
 
+    pub fn invalidate_physical_table_meta(
+        &self,
+        physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
+    ) {
+        let mut cluster_guard = self.cluster.write();
+        let updated_cluster =
+            
cluster_guard.invalidate_physical_table_meta(physical_tables_to_invalid);
+        *cluster_guard = Arc::new(updated_cluster);
+    }
+
     pub async fn update(&self, metadata_response: MetadataResponse) -> 
Result<()> {
         let origin_cluster = self.cluster.read().clone();
         let new_cluster =
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index cf0b257..afa44f3 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -36,7 +36,7 @@ use crate::client::table::remote_log::{
     RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
 };
 use crate::error::{ApiError, Error, FlussError, Result};
-use crate::metadata::{TableBucket, TableInfo, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
 use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
 use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
 use crate::rpc::{RpcClient, RpcError, message};
@@ -462,6 +462,16 @@ struct LogFetcher {
     nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
 }
 
+struct FetchResponseContext {
+    metadata: Arc<Metadata>,
+    log_fetch_buffer: Arc<LogFetchBuffer>,
+    log_scanner_status: Arc<LogScannerStatus>,
+    read_context: ReadContext,
+    remote_read_context: ReadContext,
+    remote_log_downloader: Arc<RemoteLogDownloader>,
+    credentials_cache: Arc<CredentialsCache>,
+}
+
 impl LogFetcher {
     pub fn new(
         table_info: TableInfo,
@@ -518,7 +528,8 @@ impl LogFetcher {
             | FlussError::LogStorageException
             | FlussError::KvStorageException
             | FlussError::StorageException
-            | FlussError::FencedLeaderEpochException => FetchErrorContext {
+            | FlussError::FencedLeaderEpochException
+            | FlussError::LeaderNotAvailableException => FetchErrorContext {
                 action: FetchErrorAction::Ignore,
                 log_level: FetchErrorLogLevel::Debug,
                 log_message: format!(
@@ -570,6 +581,17 @@ impl LogFetcher {
         }
     }
 
+    fn should_invalidate_table_meta(error: FlussError) -> bool {
+        matches!(
+            error,
+            FlussError::NotLeaderOrFollower
+                | FlussError::LeaderNotAvailableException
+                | FlussError::FencedLeaderEpochException
+                | FlussError::UnknownTableOrBucketException
+                | FlussError::InvalidCoordinatorException
+        )
+    }
+
     async fn check_and_update_metadata(&self) -> Result<()> {
         let need_update = self
             .fetchable_buckets()
@@ -639,6 +661,15 @@ impl LogFetcher {
             let creds_cache = self.credentials_cache.clone();
             let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
             let metadata = self.metadata.clone();
+            let response_context = FetchResponseContext {
+                metadata: metadata.clone(),
+                log_fetch_buffer,
+                log_scanner_status,
+                read_context,
+                remote_read_context,
+                remote_log_downloader,
+                credentials_cache: creds_cache,
+            };
             // Spawn async task to handle the fetch request
             // Note: These tasks are not explicitly tracked or cancelled when 
LogFetcher is dropped.
             // This is acceptable because:
@@ -684,16 +715,7 @@ impl LogFetcher {
                     }
                 };
 
-                Self::handle_fetch_response(
-                    fetch_response,
-                    &log_fetch_buffer,
-                    &log_scanner_status,
-                    &read_context,
-                    &remote_read_context,
-                    &remote_log_downloader,
-                    &creds_cache,
-                )
-                .await;
+                Self::handle_fetch_response(fetch_response, 
response_context).await;
             });
         }
 
@@ -712,13 +734,18 @@ impl LogFetcher {
     /// Handle fetch response and add completed fetches to buffer
     async fn handle_fetch_response(
         fetch_response: crate::proto::FetchLogResponse,
-        log_fetch_buffer: &Arc<LogFetchBuffer>,
-        log_scanner_status: &Arc<LogScannerStatus>,
-        read_context: &ReadContext,
-        remote_read_context: &ReadContext,
-        remote_log_downloader: &Arc<RemoteLogDownloader>,
-        credentials_cache: &Arc<CredentialsCache>,
+        context: FetchResponseContext,
     ) {
+        let FetchResponseContext {
+            metadata,
+            log_fetch_buffer,
+            log_scanner_status,
+            read_context,
+            remote_read_context,
+            remote_log_downloader,
+            credentials_cache,
+        } = context;
+
         for pb_fetch_log_resp in fetch_response.tables_resp {
             let table_id = pb_fetch_log_resp.table_id;
             let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
@@ -745,6 +772,20 @@ impl LogFetcher {
                     .into();
 
                     let error = FlussError::for_code(error_code);
+                    if Self::should_invalidate_table_meta(error) {
+                        // TODO: Consider triggering table meta invalidation 
from sender/lookup paths.
+                        let table_id = table_bucket.table_id();
+                        let cluster = metadata.get_cluster();
+                        if let Some(table_path) = 
cluster.get_table_path_by_id(table_id) {
+                            let physical_tables =
+                                
HashSet::from([PhysicalTablePath::of(table_path.clone())]);
+                            
metadata.invalidate_physical_table_meta(&physical_tables);
+                        } else {
+                            warn!(
+                                "Table id {table_id} is missing from 
table_path_by_id while invalidating table metadata"
+                            );
+                        }
+                    }
                     let error_context = Self::describe_fetch_error(
                         error,
                         &table_bucket,
@@ -1577,20 +1618,72 @@ mod tests {
             }],
         };
 
-        LogFetcher::handle_fetch_response(
-            response,
-            &fetcher.log_fetch_buffer,
-            &fetcher.log_scanner_status,
-            &fetcher.read_context,
-            &fetcher.remote_read_context,
-            &fetcher.remote_log_downloader,
-            &fetcher.credentials_cache,
-        )
-        .await;
+        let response_context = FetchResponseContext {
+            metadata: metadata.clone(),
+            log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
+            log_scanner_status: fetcher.log_scanner_status.clone(),
+            read_context: fetcher.read_context.clone(),
+            remote_read_context: fetcher.remote_read_context.clone(),
+            remote_log_downloader: fetcher.remote_log_downloader.clone(),
+            credentials_cache: fetcher.credentials_cache.clone(),
+        };
+
+        LogFetcher::handle_fetch_response(response, response_context).await;
 
         let completed = fetcher.log_fetch_buffer.poll().expect("completed 
fetch");
         let api_error = completed.api_error().expect("api error");
         assert_eq!(api_error.code, FlussError::AuthorizationException.code());
         Ok(())
     }
+
+    #[tokio::test]
+    async fn handle_fetch_response_invalidates_table_meta() -> Result<()> {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = build_table_info(table_path.clone(), 1, 1);
+        let cluster = build_cluster_arc(&table_path, 1, 1);
+        let metadata = Arc::new(Metadata::new_for_test(cluster.clone()));
+        let status = Arc::new(LogScannerStatus::new());
+        status.assign_scan_bucket(TableBucket::new(1, 0), 5);
+        let fetcher = LogFetcher::new(
+            table_info.clone(),
+            Arc::new(RpcClient::new()),
+            metadata.clone(),
+            status.clone(),
+            None,
+        )?;
+
+        let bucket = TableBucket::new(1, 0);
+        assert!(metadata.leader_for(&bucket).is_some());
+
+        let response = crate::proto::FetchLogResponse {
+            tables_resp: vec![crate::proto::PbFetchLogRespForTable {
+                table_id: 1,
+                buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
+                    partition_id: None,
+                    bucket_id: 0,
+                    error_code: Some(FlussError::NotLeaderOrFollower.code()),
+                    error_message: Some("not leader".to_string()),
+                    high_watermark: None,
+                    log_start_offset: None,
+                    remote_log_fetch_info: None,
+                    records: None,
+                }],
+            }],
+        };
+
+        let response_context = FetchResponseContext {
+            metadata: metadata.clone(),
+            log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
+            log_scanner_status: fetcher.log_scanner_status.clone(),
+            read_context: fetcher.read_context.clone(),
+            remote_read_context: fetcher.remote_read_context.clone(),
+            remote_log_downloader: fetcher.remote_log_downloader.clone(),
+            credentials_cache: fetcher.credentials_cache.clone(),
+        };
+
+        LogFetcher::handle_fetch_response(response, response_context).await;
+
+        assert!(metadata.leader_for(&bucket).is_none());
+        Ok(())
+    }
 }
diff --git a/crates/fluss/src/cluster/cluster.rs 
b/crates/fluss/src/cluster/cluster.rs
index f14d055..2484026 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -18,7 +18,9 @@
 use crate::BucketId;
 use crate::cluster::{BucketLocation, ServerNode, ServerType};
 use crate::error::Result;
-use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo, 
TablePath};
+use crate::metadata::{
+    JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, 
TablePath,
+};
 use crate::proto::MetadataResponse;
 use crate::rpc::{from_pb_server_node, from_pb_table_path};
 use rand::random_range;
@@ -77,23 +79,33 @@ impl Cluster {
             .filter_map(|id| self.table_path_by_id.get(id))
             .collect();
 
-        let available_locations_by_path = self
-            .available_locations_by_path
-            .iter()
-            .filter(|&(path, _)| !table_paths.contains(path))
-            .map(|(path, locations)| (path.clone(), locations.clone()))
-            .collect();
+        let (available_locations_by_path, available_locations_by_bucket) =
+            self.filter_bucket_locations_by_path(&table_paths);
 
-        let available_locations_by_bucket = self
-            .available_locations_by_bucket
+        Cluster::new(
+            self.coordinator_server.clone(),
+            alive_tablet_servers_by_id,
+            available_locations_by_path,
+            available_locations_by_bucket,
+            self.table_id_by_path.clone(),
+            self.table_info_by_path.clone(),
+        )
+    }
+
+    pub fn invalidate_physical_table_meta(
+        &self,
+        physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
+    ) -> Self {
+        let table_paths: HashSet<&TablePath> = physical_tables_to_invalid
             .iter()
-            .filter(|&(_bucket, location)| 
!table_paths.contains(&location.table_path))
-            .map(|(bucket, location)| (bucket.clone(), location.clone()))
+            .map(|path| path.get_table_path())
             .collect();
+        let (available_locations_by_path, available_locations_by_bucket) =
+            self.filter_bucket_locations_by_path(&table_paths);
 
         Cluster::new(
             self.coordinator_server.clone(),
-            alive_tablet_servers_by_id,
+            self.alive_tablet_servers_by_id.clone(),
             available_locations_by_path,
             available_locations_by_bucket,
             self.table_id_by_path.clone(),
@@ -122,6 +134,30 @@ impl Cluster {
         self.table_info_by_path = table_info_by_path;
     }
 
+    fn filter_bucket_locations_by_path(
+        &self,
+        table_paths: &HashSet<&TablePath>,
+    ) -> (
+        HashMap<TablePath, Vec<BucketLocation>>,
+        HashMap<TableBucket, BucketLocation>,
+    ) {
+        let available_locations_by_path = self
+            .available_locations_by_path
+            .iter()
+            .filter(|&(path, _)| !table_paths.contains(path))
+            .map(|(path, locations)| (path.clone(), locations.clone()))
+            .collect();
+
+        let available_locations_by_bucket = self
+            .available_locations_by_bucket
+            .iter()
+            .filter(|&(_bucket, location)| 
!table_paths.contains(&location.table_path))
+            .map(|(bucket, location)| (bucket.clone(), location.clone()))
+            .collect();
+
+        (available_locations_by_path, available_locations_by_bucket)
+    }
+
     pub fn from_metadata_response(
         metadata_response: MetadataResponse,
         origin_cluster: Option<&Cluster>,
@@ -242,6 +278,10 @@ impl Cluster {
         &self.table_id_by_path
     }
 
+    pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> {
+        self.table_path_by_id.get(&table_id)
+    }
+
     pub fn get_available_buckets_for_table_path(
         &self,
         table_path: &TablePath,
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 8204e7c..f4cf972 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -697,7 +697,7 @@ impl TablePath {
     }
 }
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct PhysicalTablePath {
     table_path: TablePath,
     #[allow(dead_code)]

Reply via email to