This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7c5af55 chore: invalidate leader info on stale metadata error when
handling fetch response (#189)
7c5af55 is described below
commit 7c5af553e8482f9809a380a8934608a5a931ac7d
Author: AlexZhao <[email protected]>
AuthorDate: Fri Jan 23 14:44:55 2026 +0800
chore: invalidate leader info on stale metadata error when handling fetch
response (#189)
---
crates/fluss/src/client/metadata.rs | 12 ++-
crates/fluss/src/client/table/scanner.rs | 149 +++++++++++++++++++++++++------
crates/fluss/src/cluster/cluster.rs | 64 ++++++++++---
crates/fluss/src/metadata/table.rs | 2 +-
4 files changed, 185 insertions(+), 42 deletions(-)
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index 0e6f965..3c6730b 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -17,7 +17,7 @@
use crate::cluster::{Cluster, ServerNode, ServerType};
use crate::error::Result;
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
use crate::proto::MetadataResponse;
use crate::rpc::message::UpdateMetadataRequest;
use crate::rpc::{RpcClient, ServerConnection};
@@ -71,6 +71,16 @@ impl Metadata {
*cluster_guard = Arc::new(updated_cluster);
}
+ pub fn invalidate_physical_table_meta(
+ &self,
+ physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
+ ) {
+ let mut cluster_guard = self.cluster.write();
+ let updated_cluster =
+
cluster_guard.invalidate_physical_table_meta(physical_tables_to_invalid);
+ *cluster_guard = Arc::new(updated_cluster);
+ }
+
pub async fn update(&self, metadata_response: MetadataResponse) ->
Result<()> {
let origin_cluster = self.cluster.read().clone();
let new_cluster =
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index cf0b257..afa44f3 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -36,7 +36,7 @@ use crate::client::table::remote_log::{
RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
};
use crate::error::{ApiError, Error, FlussError, Result};
-use crate::metadata::{TableBucket, TableInfo, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
use crate::rpc::{RpcClient, RpcError, message};
@@ -462,6 +462,16 @@ struct LogFetcher {
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
}
+struct FetchResponseContext {
+ metadata: Arc<Metadata>,
+ log_fetch_buffer: Arc<LogFetchBuffer>,
+ log_scanner_status: Arc<LogScannerStatus>,
+ read_context: ReadContext,
+ remote_read_context: ReadContext,
+ remote_log_downloader: Arc<RemoteLogDownloader>,
+ credentials_cache: Arc<CredentialsCache>,
+}
+
impl LogFetcher {
pub fn new(
table_info: TableInfo,
@@ -518,7 +528,8 @@ impl LogFetcher {
| FlussError::LogStorageException
| FlussError::KvStorageException
| FlussError::StorageException
- | FlussError::FencedLeaderEpochException => FetchErrorContext {
+ | FlussError::FencedLeaderEpochException
+ | FlussError::LeaderNotAvailableException => FetchErrorContext {
action: FetchErrorAction::Ignore,
log_level: FetchErrorLogLevel::Debug,
log_message: format!(
@@ -570,6 +581,17 @@ impl LogFetcher {
}
}
+ fn should_invalidate_table_meta(error: FlussError) -> bool {
+ matches!(
+ error,
+ FlussError::NotLeaderOrFollower
+ | FlussError::LeaderNotAvailableException
+ | FlussError::FencedLeaderEpochException
+ | FlussError::UnknownTableOrBucketException
+ | FlussError::InvalidCoordinatorException
+ )
+ }
+
async fn check_and_update_metadata(&self) -> Result<()> {
let need_update = self
.fetchable_buckets()
@@ -639,6 +661,15 @@ impl LogFetcher {
let creds_cache = self.credentials_cache.clone();
let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
let metadata = self.metadata.clone();
+ let response_context = FetchResponseContext {
+ metadata: metadata.clone(),
+ log_fetch_buffer,
+ log_scanner_status,
+ read_context,
+ remote_read_context,
+ remote_log_downloader,
+ credentials_cache: creds_cache,
+ };
// Spawn async task to handle the fetch request
// Note: These tasks are not explicitly tracked or cancelled when
LogFetcher is dropped.
// This is acceptable because:
@@ -684,16 +715,7 @@ impl LogFetcher {
}
};
- Self::handle_fetch_response(
- fetch_response,
- &log_fetch_buffer,
- &log_scanner_status,
- &read_context,
- &remote_read_context,
- &remote_log_downloader,
- &creds_cache,
- )
- .await;
+ Self::handle_fetch_response(fetch_response,
response_context).await;
});
}
@@ -712,13 +734,18 @@ impl LogFetcher {
/// Handle fetch response and add completed fetches to buffer
async fn handle_fetch_response(
fetch_response: crate::proto::FetchLogResponse,
- log_fetch_buffer: &Arc<LogFetchBuffer>,
- log_scanner_status: &Arc<LogScannerStatus>,
- read_context: &ReadContext,
- remote_read_context: &ReadContext,
- remote_log_downloader: &Arc<RemoteLogDownloader>,
- credentials_cache: &Arc<CredentialsCache>,
+ context: FetchResponseContext,
) {
+ let FetchResponseContext {
+ metadata,
+ log_fetch_buffer,
+ log_scanner_status,
+ read_context,
+ remote_read_context,
+ remote_log_downloader,
+ credentials_cache,
+ } = context;
+
for pb_fetch_log_resp in fetch_response.tables_resp {
let table_id = pb_fetch_log_resp.table_id;
let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
@@ -745,6 +772,20 @@ impl LogFetcher {
.into();
let error = FlussError::for_code(error_code);
+ if Self::should_invalidate_table_meta(error) {
+ // TODO: Consider triggering table meta invalidation
from sender/lookup paths.
+ let table_id = table_bucket.table_id();
+ let cluster = metadata.get_cluster();
+ if let Some(table_path) =
cluster.get_table_path_by_id(table_id) {
+ let physical_tables =
+
HashSet::from([PhysicalTablePath::of(table_path.clone())]);
+
metadata.invalidate_physical_table_meta(&physical_tables);
+ } else {
+ warn!(
+ "Table id {table_id} is missing from
table_path_by_id while invalidating table metadata"
+ );
+ }
+ }
let error_context = Self::describe_fetch_error(
error,
&table_bucket,
@@ -1577,20 +1618,72 @@ mod tests {
}],
};
- LogFetcher::handle_fetch_response(
- response,
- &fetcher.log_fetch_buffer,
- &fetcher.log_scanner_status,
- &fetcher.read_context,
- &fetcher.remote_read_context,
- &fetcher.remote_log_downloader,
- &fetcher.credentials_cache,
- )
- .await;
+ let response_context = FetchResponseContext {
+ metadata: metadata.clone(),
+ log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
+ log_scanner_status: fetcher.log_scanner_status.clone(),
+ read_context: fetcher.read_context.clone(),
+ remote_read_context: fetcher.remote_read_context.clone(),
+ remote_log_downloader: fetcher.remote_log_downloader.clone(),
+ credentials_cache: fetcher.credentials_cache.clone(),
+ };
+
+ LogFetcher::handle_fetch_response(response, response_context).await;
let completed = fetcher.log_fetch_buffer.poll().expect("completed
fetch");
let api_error = completed.api_error().expect("api error");
assert_eq!(api_error.code, FlussError::AuthorizationException.code());
Ok(())
}
+
+ #[tokio::test]
+ async fn handle_fetch_response_invalidates_table_meta() -> Result<()> {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = build_table_info(table_path.clone(), 1, 1);
+ let cluster = build_cluster_arc(&table_path, 1, 1);
+ let metadata = Arc::new(Metadata::new_for_test(cluster.clone()));
+ let status = Arc::new(LogScannerStatus::new());
+ status.assign_scan_bucket(TableBucket::new(1, 0), 5);
+ let fetcher = LogFetcher::new(
+ table_info.clone(),
+ Arc::new(RpcClient::new()),
+ metadata.clone(),
+ status.clone(),
+ None,
+ )?;
+
+ let bucket = TableBucket::new(1, 0);
+ assert!(metadata.leader_for(&bucket).is_some());
+
+ let response = crate::proto::FetchLogResponse {
+ tables_resp: vec![crate::proto::PbFetchLogRespForTable {
+ table_id: 1,
+ buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
+ partition_id: None,
+ bucket_id: 0,
+ error_code: Some(FlussError::NotLeaderOrFollower.code()),
+ error_message: Some("not leader".to_string()),
+ high_watermark: None,
+ log_start_offset: None,
+ remote_log_fetch_info: None,
+ records: None,
+ }],
+ }],
+ };
+
+ let response_context = FetchResponseContext {
+ metadata: metadata.clone(),
+ log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
+ log_scanner_status: fetcher.log_scanner_status.clone(),
+ read_context: fetcher.read_context.clone(),
+ remote_read_context: fetcher.remote_read_context.clone(),
+ remote_log_downloader: fetcher.remote_log_downloader.clone(),
+ credentials_cache: fetcher.credentials_cache.clone(),
+ };
+
+ LogFetcher::handle_fetch_response(response, response_context).await;
+
+ assert!(metadata.leader_for(&bucket).is_none());
+ Ok(())
+ }
}
diff --git a/crates/fluss/src/cluster/cluster.rs
b/crates/fluss/src/cluster/cluster.rs
index f14d055..2484026 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -18,7 +18,9 @@
use crate::BucketId;
use crate::cluster::{BucketLocation, ServerNode, ServerType};
use crate::error::Result;
-use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo,
TablePath};
+use crate::metadata::{
+ JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo,
TablePath,
+};
use crate::proto::MetadataResponse;
use crate::rpc::{from_pb_server_node, from_pb_table_path};
use rand::random_range;
@@ -77,23 +79,33 @@ impl Cluster {
.filter_map(|id| self.table_path_by_id.get(id))
.collect();
- let available_locations_by_path = self
- .available_locations_by_path
- .iter()
- .filter(|&(path, _)| !table_paths.contains(path))
- .map(|(path, locations)| (path.clone(), locations.clone()))
- .collect();
+ let (available_locations_by_path, available_locations_by_bucket) =
+ self.filter_bucket_locations_by_path(&table_paths);
- let available_locations_by_bucket = self
- .available_locations_by_bucket
+ Cluster::new(
+ self.coordinator_server.clone(),
+ alive_tablet_servers_by_id,
+ available_locations_by_path,
+ available_locations_by_bucket,
+ self.table_id_by_path.clone(),
+ self.table_info_by_path.clone(),
+ )
+ }
+
+ pub fn invalidate_physical_table_meta(
+ &self,
+ physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
+ ) -> Self {
+ let table_paths: HashSet<&TablePath> = physical_tables_to_invalid
.iter()
- .filter(|&(_bucket, location)|
!table_paths.contains(&location.table_path))
- .map(|(bucket, location)| (bucket.clone(), location.clone()))
+ .map(|path| path.get_table_path())
.collect();
+ let (available_locations_by_path, available_locations_by_bucket) =
+ self.filter_bucket_locations_by_path(&table_paths);
Cluster::new(
self.coordinator_server.clone(),
- alive_tablet_servers_by_id,
+ self.alive_tablet_servers_by_id.clone(),
available_locations_by_path,
available_locations_by_bucket,
self.table_id_by_path.clone(),
@@ -122,6 +134,30 @@ impl Cluster {
self.table_info_by_path = table_info_by_path;
}
+ fn filter_bucket_locations_by_path(
+ &self,
+ table_paths: &HashSet<&TablePath>,
+ ) -> (
+ HashMap<TablePath, Vec<BucketLocation>>,
+ HashMap<TableBucket, BucketLocation>,
+ ) {
+ let available_locations_by_path = self
+ .available_locations_by_path
+ .iter()
+ .filter(|&(path, _)| !table_paths.contains(path))
+ .map(|(path, locations)| (path.clone(), locations.clone()))
+ .collect();
+
+ let available_locations_by_bucket = self
+ .available_locations_by_bucket
+ .iter()
+ .filter(|&(_bucket, location)|
!table_paths.contains(&location.table_path))
+ .map(|(bucket, location)| (bucket.clone(), location.clone()))
+ .collect();
+
+ (available_locations_by_path, available_locations_by_bucket)
+ }
+
pub fn from_metadata_response(
metadata_response: MetadataResponse,
origin_cluster: Option<&Cluster>,
@@ -242,6 +278,10 @@ impl Cluster {
&self.table_id_by_path
}
+ pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> {
+ self.table_path_by_id.get(&table_id)
+ }
+
pub fn get_available_buckets_for_table_path(
&self,
table_path: &TablePath,
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 8204e7c..f4cf972 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -697,7 +697,7 @@ impl TablePath {
}
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhysicalTablePath {
table_path: TablePath,
#[allow(dead_code)]