This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f166314 chore: scanner should retry when bucket lead is not available
(#102)
f166314 is described below
commit f1663149d89a923631414a84b131ff512c5f6350
Author: Keith Lee <[email protected]>
AuthorDate: Thu Dec 25 14:02:50 2025 +0000
chore: scanner should retry when bucket lead is not available (#102)
---
crates/fluss/src/client/credentials.rs | 4 +-
crates/fluss/src/client/metadata.rs | 51 ++++++++++++++++----
crates/fluss/src/client/table/scanner.rs | 78 ++++++++++++++++++++++++++-----
crates/fluss/src/cluster/cluster.rs | 52 +++++++++++++++++----
crates/fluss/src/rpc/server_connection.rs | 25 ++++++++--
5 files changed, 175 insertions(+), 35 deletions(-)
diff --git a/crates/fluss/src/client/credentials.rs
b/crates/fluss/src/client/credentials.rs
index 8adfe48..ffb682e 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -118,7 +118,9 @@ impl CredentialsCache {
async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
let cluster = self.metadata.get_cluster();
- let server_node = cluster.get_one_available_server();
+ let server_node = cluster
+ .get_one_available_server()
+ .expect("no tablet server available");
let conn = self.rpc_client.get_connection(server_node).await?;
let request = GetSecurityTokenRequest::new();
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index 3c3ba4b..a514422 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -16,38 +16,40 @@
// under the License.
use crate::cluster::{Cluster, ServerNode, ServerType};
+use crate::error::Result;
use crate::metadata::{TableBucket, TablePath};
+use crate::proto::MetadataResponse;
use crate::rpc::message::UpdateMetadataRequest;
use crate::rpc::{RpcClient, ServerConnection};
+use log::info;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
-use crate::error::Result;
-use crate::proto::MetadataResponse;
-
#[derive(Default)]
pub struct Metadata {
cluster: RwLock<Arc<Cluster>>,
connections: Arc<RpcClient>,
+ bootstrap: Arc<str>,
}
impl Metadata {
- pub async fn new(boot_strap: &str, connections: Arc<RpcClient>) ->
Result<Self> {
- let custer = Self::init_cluster(boot_strap,
connections.clone()).await?;
+ pub async fn new(bootstrap: &str, connections: Arc<RpcClient>) ->
Result<Self> {
+ let cluster = Self::init_cluster(bootstrap,
connections.clone()).await?;
Ok(Metadata {
- cluster: RwLock::new(Arc::new(custer)),
+ cluster: RwLock::new(Arc::new(cluster)),
connections,
+ bootstrap: bootstrap.into(),
})
}
async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) ->
Result<Cluster> {
- let socker_addrss = boot_strap.parse::<SocketAddr>().unwrap();
+ let socket_address = boot_strap.parse::<SocketAddr>().unwrap();
let server_node = ServerNode::new(
-1,
- socker_addrss.ip().to_string(),
- socker_addrss.port() as u32,
+ socket_address.ip().to_string(),
+ socket_address.port() as u32,
ServerType::CoordinatorServer,
);
let con = connections.get_connection(&server_node).await?;
@@ -55,6 +57,20 @@ impl Metadata {
Cluster::from_metadata_response(response, None)
}
+ async fn reinit_cluster(&self) -> Result<()> {
+ let cluster = Self::init_cluster(&self.bootstrap,
self.connections.clone()).await?;
+ *self.cluster.write() = cluster.into();
+ Ok(())
+ }
+
+ pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) {
+ // Take a write lock for the entire operation to avoid races between
+ // reading the current cluster state and writing back the updated one.
+ let mut cluster_guard = self.cluster.write();
+ let updated_cluster = cluster_guard.invalidate_server(server_id,
table_ids);
+ *cluster_guard = Arc::new(updated_cluster);
+ }
+
pub async fn update(&self, metadata_response: MetadataResponse) ->
Result<()> {
let origin_cluster = self.cluster.read().clone();
let new_cluster =
@@ -65,7 +81,22 @@ impl Metadata {
}
pub async fn update_tables_metadata(&self, table_paths:
&HashSet<&TablePath>) -> Result<()> {
- let server = self.cluster.read().get_one_available_server().clone();
+ let maybe_server = {
+ let guard = self.cluster.read();
+ guard.get_one_available_server().cloned()
+ };
+
+ let server = match maybe_server {
+ Some(s) => s,
+ None => {
+ info!(
+ "No available tablet server to update metadata, attempting
to re-initialize cluster using bootstrap server."
+ );
+ self.reinit_cluster().await?;
+ return Ok(());
+ }
+ };
+
let conn = self.connections.get_connection(&server).await?;
let update_table_paths: Vec<&TablePath> =
table_paths.iter().copied().collect();
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 2246e2c..11bdfa3 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -24,7 +24,7 @@ use crate::client::table::log_fetch_buffer::{
use crate::client::table::remote_log::{
RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
};
-use crate::error::{Error, Result};
+use crate::error::{Error, Result, RpcError};
use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
@@ -271,6 +271,8 @@ struct LogFetcher {
credentials_cache: Arc<CredentialsCache>,
log_fetch_buffer: Arc<LogFetchBuffer>,
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
+ table_path: TablePath,
+ is_partitioned: bool,
}
impl LogFetcher {
@@ -299,6 +301,8 @@ impl LogFetcher {
credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
+ table_path: table_info.table_path.clone(),
+ is_partitioned: table_info.is_partitioned(),
})
}
@@ -315,9 +319,45 @@ impl LogFetcher {
}
}
+ async fn check_and_update_metadata(&self) -> Result<()> {
+ if self.is_partitioned {
+ // TODO: Implement partition-aware metadata refresh for buckets
whose leaders are unknown.
+ // The implementation will likely need to collect partition IDs
for such buckets and
+ // perform targeted metadata updates. Until then, we avoid
computing unused partition_ids.
+ return Ok(());
+ }
+
+ let need_update = self
+ .fetchable_buckets()
+ .iter()
+ .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
+
+ if !need_update {
+ return Ok(());
+ }
+
+ // TODO: Handle PartitionNotExist error
+ self.metadata
+ .update_tables_metadata(&HashSet::from([&self.table_path]))
+ .await
+ .or_else(|e| {
+ if let Error::RpcError { source, .. } = &e
+ && matches!(source, RpcError::ConnectionError(_) |
RpcError::Poisoned(_))
+ {
+ warn!(
+ "Retrying after encountering error while updating
table metadata: {}",
+ e
+ );
+ Ok(())
+ } else {
+ Err(e)
+ }
+ })
+ }
+
/// Send fetch requests asynchronously without waiting for responses
async fn send_fetches(&self) -> Result<()> {
- // todo: check update metadata like fluss-java in case leader changes
+ self.check_and_update_metadata().await?;
let fetch_request = self.prepare_fetch_log_requests().await;
for (leader, fetch_request) in fetch_request {
@@ -337,6 +377,7 @@ impl LogFetcher {
let remote_log_downloader =
Arc::clone(&self.remote_log_downloader);
let creds_cache = self.credentials_cache.clone();
let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
+ let metadata = self.metadata.clone();
// Spawn async task to handle the fetch request
// Note: These tasks are not explicitly tracked or cancelled when
LogFetcher is dropped.
@@ -351,27 +392,34 @@ impl LogFetcher {
nodes_with_pending.lock().remove(&leader);
});
- let server_node = cluster
- .get_tablet_server(leader)
- .expect("todo: handle leader not exist.");
+ let server_node = match cluster.get_tablet_server(leader) {
+ Some(node) => node,
+ None => {
+ warn!("No server node found for leader {}, retrying",
leader);
+ Self::handle_fetch_failure(metadata, &leader,
&fetch_request).await;
+ return;
+ }
+ };
let con = match conns.get_connection(server_node).await {
Ok(con) => con,
Err(e) => {
- // todo: handle failed to get connection
- warn!("Failed to get connection to destination node:
{e:?}");
+ warn!("Retrying after error getting connection to
destination node: {e:?}");
+ Self::handle_fetch_failure(metadata, &leader,
&fetch_request).await;
return;
}
};
let fetch_response = match con
- .request(message::FetchLogRequest::new(fetch_request))
+
.request(message::FetchLogRequest::new(fetch_request.clone()))
.await
{
Ok(resp) => resp,
Err(e) => {
- // todo: handle fetch log from destination node
- warn!("Failed to fetch log from destination node
{server_node:?}: {e:?}");
+ warn!(
+ "Retrying after error fetching log from
destination node {server_node:?}: {e:?}"
+ );
+ Self::handle_fetch_failure(metadata, &leader,
&fetch_request).await;
return;
}
};
@@ -387,7 +435,6 @@ impl LogFetcher {
)
.await
{
- // todo: handle fail to handle fetch response
error!("Fail to handle fetch response: {e:?}");
}
});
@@ -396,6 +443,15 @@ impl LogFetcher {
Ok(())
}
+ async fn handle_fetch_failure(
+ metadata: Arc<Metadata>,
+ server_id: &i32,
+ request: &FetchLogRequest,
+ ) {
+ let table_ids = request.tables_req.iter().map(|r|
r.table_id).collect();
+ metadata.invalidate_server(server_id, table_ids);
+ }
+
/// Handle fetch response and add completed fetches to buffer
async fn handle_fetch_response(
fetch_response: crate::proto::FetchLogResponse,
diff --git a/crates/fluss/src/cluster/cluster.rs
b/crates/fluss/src/cluster/cluster.rs
index a6f20a8..f14d055 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -22,7 +22,7 @@ use crate::metadata::{JsonSerde, TableBucket,
TableDescriptor, TableInfo, TableP
use crate::proto::MetadataResponse;
use crate::rpc::{from_pb_server_node, from_pb_table_path};
use rand::random_range;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
static EMPTY: Vec<BucketLocation> = Vec::new();
@@ -64,6 +64,43 @@ impl Cluster {
}
}
+ pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) ->
Self {
+ let alive_tablet_servers_by_id = self
+ .alive_tablet_servers_by_id
+ .iter()
+ .filter(|&(id, _)| id != server_id)
+ .map(|(id, ts)| (*id, ts.clone()))
+ .collect();
+
+ let table_paths: HashSet<&TablePath> = table_ids
+ .iter()
+ .filter_map(|id| self.table_path_by_id.get(id))
+ .collect();
+
+ let available_locations_by_path = self
+ .available_locations_by_path
+ .iter()
+ .filter(|&(path, _)| !table_paths.contains(path))
+ .map(|(path, locations)| (path.clone(), locations.clone()))
+ .collect();
+
+ let available_locations_by_bucket = self
+ .available_locations_by_bucket
+ .iter()
+ .filter(|&(_bucket, location)|
!table_paths.contains(&location.table_path))
+ .map(|(bucket, location)| (bucket.clone(), location.clone()))
+ .collect();
+
+ Cluster::new(
+ self.coordinator_server.clone(),
+ alive_tablet_servers_by_id,
+ available_locations_by_path,
+ available_locations_by_bucket,
+ self.table_id_by_path.clone(),
+ self.table_info_by_path.clone(),
+ )
+ }
+
pub fn update(&mut self, cluster: Cluster) {
let Cluster {
coordinator_server,
@@ -214,15 +251,12 @@ impl Cluster {
.unwrap_or(&EMPTY)
}
- pub fn get_one_available_server(&self) -> &ServerNode {
- assert!(
- !self.alive_tablet_servers.is_empty(),
- "no alive tablet server in cluster"
- );
+ pub fn get_one_available_server(&self) -> Option<&ServerNode> {
+ if self.alive_tablet_servers.is_empty() {
+ return None;
+ }
let offset = random_range(0..self.alive_tablet_servers.len());
- self.alive_tablet_servers
- .get(offset)
- .unwrap_or_else(|| panic!("can't find alive tab server by offset
{offset}"))
+ self.alive_tablet_servers.get(offset)
}
pub fn get_bucket_count(&self, table_path: &TablePath) -> i32 {
diff --git a/crates/fluss/src/rpc/server_connection.rs
b/crates/fluss/src/rpc/server_connection.rs
index fdeb56f..441b175 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -66,13 +66,25 @@ impl RpcClient {
server_node: &ServerNode,
) -> Result<ServerConnection, RpcError> {
let server_id = server_node.uid();
- {
+ let connection = {
let connections = self.connections.read();
- if let Some(connection) = connections.get(server_id) {
- return Ok(connection.clone());
+ connections.get(server_id).cloned()
+ };
+
+ if let Some(conn) = connection {
+ if !conn.is_poisoned() {
+ return Ok(conn);
}
}
- let new_server = self.connect(server_node).await?;
+
+ let new_server = match self.connect(server_node).await {
+ Ok(new_server) => new_server,
+ Err(e) => {
+ self.connections.write().remove(server_id);
+ return Err(e);
+ }
+ };
+
self.connections
.write()
.insert(server_id.clone(), new_server.clone());
@@ -231,6 +243,11 @@ where
}
}
+ fn is_poisoned(&self) -> bool {
+ let guard = self.state.lock();
+ matches!(*guard, ConnectionState::Poison(_))
+ }
+
pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, Error>
where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,