Copilot commented on code in PR #213:
URL: https://github.com/apache/fluss-rust/pull/213#discussion_r2725336342
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) ->
Option<(String, bool)> {
}
}
-pub struct CredentialsCache {
- inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+ credentials: &Credentials,
+ addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert(
+ "access_key_id".to_string(),
+ credentials.access_key_id.clone(),
+ );
+ props.insert(
+ "secret_access_key".to_string(),
+ credentials.access_key_secret.clone(),
+ );
+
+ if let Some(token) = &credentials.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
rpc_client: Arc<RpcClient>,
metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ /// Watch channel sender for broadcasting token updates
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ /// Watch channel receiver (kept to allow cloning for new subscribers)
+ credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+ /// Handle to the background refresh task
+ task_handle: RwLock<Option<JoinHandle<()>>>,
+ /// Sender to signal shutdown
+ shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
}
-impl CredentialsCache {
+impl SecurityTokenManager {
pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+ let (credentials_tx, credentials_rx) = watch::channel(None);
Self {
- inner: RwLock::new(None),
rpc_client,
metadata,
+ token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+ renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+ credentials_tx,
+ credentials_rx,
+ task_handle: RwLock::new(None),
+ shutdown_tx: RwLock::new(None),
+ }
+ }
+
+ /// Subscribe to credential updates.
+ /// Returns a receiver that always contains the latest credentials.
+ /// Consumers can call `receiver.borrow()` to get the current value.
+ pub fn subscribe(&self) -> CredentialsReceiver {
+ self.credentials_rx.clone()
+ }
+
+ /// Start the background token refresh task.
+ /// This should be called once after creating the manager.
+ pub fn start(&self) {
+ if self.task_handle.read().is_some() {
+ warn!("SecurityTokenManager is already started");
+ return;
+ }
+
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+ *self.shutdown_tx.write() = Some(shutdown_tx);
+
+ let rpc_client = Arc::clone(&self.rpc_client);
+ let metadata = Arc::clone(&self.metadata);
+ let token_renewal_ratio = self.token_renewal_ratio;
+ let renewal_retry_backoff = self.renewal_retry_backoff;
+ let credentials_tx = self.credentials_tx.clone();
+
+ let handle = tokio::spawn(async move {
+ Self::token_refresh_loop(
+ rpc_client,
+ metadata,
+ token_renewal_ratio,
+ renewal_retry_backoff,
+ credentials_tx,
+ shutdown_rx,
+ )
+ .await;
+ });
+
+ *self.task_handle.write() = Some(handle);
+ info!("SecurityTokenManager started");
+ }
+
+ /// Stop the background token refresh task.
+ pub fn stop(&self) {
+ if let Some(tx) = self.shutdown_tx.write().take() {
+ let _ = tx.send(());
+ }
+ if let Some(handle) = self.task_handle.write().take() {
+ handle.abort();
}
+ info!("SecurityTokenManager stopped");
}
- pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
- {
- let guard = self.inner.read();
- if let Some(cached) = guard.as_ref() {
- if cached.cached_at.elapsed() < CACHE_TTL {
- return Ok(cached.to_remote_fs_props());
+ /// Background task that periodically refreshes tokens.
+ async fn token_refresh_loop(
+ rpc_client: Arc<RpcClient>,
+ metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ mut shutdown_rx: oneshot::Receiver<()>,
+ ) {
+ info!("Starting token refresh loop");
+
+ loop {
+ // Fetch token and send to channel
+ let result = Self::fetch_token(&rpc_client, &metadata).await;
+
+ let next_delay = match result {
+ Ok((props, expiration_time)) => {
+ // Send credentials via watch channel (Some indicates
fetched)
+ if let Err(e) = credentials_tx.send(Some(props)) {
+ log::debug!("No active subscribers for credentials
update: {:?}", e);
+ }
+
+ // Calculate next renewal delay based on expiration time
+ if let Some(exp_time) = expiration_time {
+ Self::calculate_renewal_delay(exp_time,
token_renewal_ratio)
+ } else {
+ // No expiration time - token never expires, use long
refresh interval
+ log::info!(
+ "Token has no expiration time (never expires),
next refresh in {:?}",
+ DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+ );
+ DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+ }
+ }
+ Err(e) => {
+ log::warn!(
+ "Failed to obtain security token: {:?}, will retry in
{:?}",
+ e,
+ renewal_retry_backoff
+ );
+ renewal_retry_backoff
+ }
+ };
+
+ log::debug!("Next token refresh in {:?}", next_delay);
+
+ // Wait for either the delay to elapse or shutdown signal
+ tokio::select! {
+ _ = tokio::time::sleep(next_delay) => {
+ // Continue to next iteration to refresh
+ }
+ _ = &mut shutdown_rx => {
+ log::info!("Token refresh loop received shutdown signal");
+ break;
}
}
}
-
- self.refresh_from_server().await
}
- async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
- let cluster = self.metadata.get_cluster();
- let server_node = cluster
- .get_one_available_server()
- .expect("no tablet server available");
- let conn = self.rpc_client.get_connection(server_node).await?;
+ /// Fetch token from server.
+ /// Returns the props and expiration time if available.
+ async fn fetch_token(
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<(HashMap<String, String>, Option<i64>)> {
+ let cluster = metadata.get_cluster();
+ let server_node =
+ cluster
+ .get_one_available_server()
+ .ok_or_else(|| Error::UnexpectedError {
+ message: "No tablet server available for token
refresh".to_string(),
+ source: None,
+ })?;
+ let conn = rpc_client.get_connection(server_node).await?;
let request = GetSecurityTokenRequest::new();
let response = conn.request(request).await?;
- // the token may be empty if the remote filesystem
- // doesn't require token to access
+ // The token may be empty if remote filesystem doesn't require
authentication
if response.token.is_empty() {
- return Ok(HashMap::new());
+ log::info!("Empty token received, remote filesystem may not
require authentication");
+ return Ok((HashMap::new(), response.expiration_time));
}
let credentials: Credentials =
serde_json::from_slice(&response.token).map_err(|e|
Error::JsonSerdeError {
- message: format!("Error when parse token from server: {e}"),
+ message: format!("Error when parsing token from server: {e}"),
})?;
let mut addition_infos = HashMap::new();
for kv in &response.addition_info {
addition_infos.insert(kv.key.clone(), kv.value.clone());
}
- let cached = CachedToken {
- access_key_id: credentials.access_key_id,
- secret_access_key: credentials.access_key_secret,
- security_token: credentials.security_token,
- addition_infos,
- cached_at: Instant::now(),
- };
+ let props = build_remote_fs_props(&credentials, &addition_infos);
+ log::debug!("Security token fetched successfully");
+
+ Ok((props, response.expiration_time))
+ }
+
+ /// Calculate the delay before next token renewal.
+ /// Uses the renewal ratio to refresh before actual expiration.
+ fn calculate_renewal_delay(expiration_time: i64, renewal_ratio: f64) ->
Duration {
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as i64;
+
Review Comment:
Potential integer overflow when calculating delay. If `time_until_expiry` is
very large (e.g., token expires far in the future), multiplying by
`renewal_ratio` (0.8) could result in a value that, when cast to `u64`, might
overflow or produce unexpected results.
For example, if `time_until_expiry` is close to `i64::MAX`, the
multiplication `(time_until_expiry as f64 * renewal_ratio)` could produce an
f64 that exceeds `u64::MAX` when cast. While this is unlikely in practice
(tokens typically don't expire centuries in the future), consider adding bounds
checking or capping the maximum delay to a reasonable value like 24 hours.
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) ->
Option<(String, bool)> {
}
}
-pub struct CredentialsCache {
- inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+ credentials: &Credentials,
+ addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert(
+ "access_key_id".to_string(),
+ credentials.access_key_id.clone(),
+ );
+ props.insert(
+ "secret_access_key".to_string(),
+ credentials.access_key_secret.clone(),
+ );
+
+ if let Some(token) = &credentials.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
rpc_client: Arc<RpcClient>,
metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ /// Watch channel sender for broadcasting token updates
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ /// Watch channel receiver (kept to allow cloning for new subscribers)
+ credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+ /// Handle to the background refresh task
+ task_handle: RwLock<Option<JoinHandle<()>>>,
+ /// Sender to signal shutdown
+ shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
}
-impl CredentialsCache {
+impl SecurityTokenManager {
pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+ let (credentials_tx, credentials_rx) = watch::channel(None);
Self {
- inner: RwLock::new(None),
rpc_client,
metadata,
+ token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+ renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+ credentials_tx,
+ credentials_rx,
+ task_handle: RwLock::new(None),
+ shutdown_tx: RwLock::new(None),
+ }
+ }
+
+ /// Subscribe to credential updates.
+ /// Returns a receiver that always contains the latest credentials.
+ /// Consumers can call `receiver.borrow()` to get the current value.
+ pub fn subscribe(&self) -> CredentialsReceiver {
+ self.credentials_rx.clone()
+ }
+
+ /// Start the background token refresh task.
+ /// This should be called once after creating the manager.
+ pub fn start(&self) {
+ if self.task_handle.read().is_some() {
+ warn!("SecurityTokenManager is already started");
+ return;
+ }
+
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+ *self.shutdown_tx.write() = Some(shutdown_tx);
+
+ let rpc_client = Arc::clone(&self.rpc_client);
+ let metadata = Arc::clone(&self.metadata);
+ let token_renewal_ratio = self.token_renewal_ratio;
+ let renewal_retry_backoff = self.renewal_retry_backoff;
+ let credentials_tx = self.credentials_tx.clone();
+
+ let handle = tokio::spawn(async move {
+ Self::token_refresh_loop(
+ rpc_client,
+ metadata,
+ token_renewal_ratio,
+ renewal_retry_backoff,
+ credentials_tx,
+ shutdown_rx,
+ )
+ .await;
+ });
+
+ *self.task_handle.write() = Some(handle);
+ info!("SecurityTokenManager started");
+ }
+
+ /// Stop the background token refresh task.
+ pub fn stop(&self) {
+ if let Some(tx) = self.shutdown_tx.write().take() {
+ let _ = tx.send(());
+ }
+ if let Some(handle) = self.task_handle.write().take() {
+ handle.abort();
}
+ info!("SecurityTokenManager stopped");
}
- pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
- {
- let guard = self.inner.read();
- if let Some(cached) = guard.as_ref() {
- if cached.cached_at.elapsed() < CACHE_TTL {
- return Ok(cached.to_remote_fs_props());
+ /// Background task that periodically refreshes tokens.
+ async fn token_refresh_loop(
+ rpc_client: Arc<RpcClient>,
+ metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ mut shutdown_rx: oneshot::Receiver<()>,
+ ) {
+ info!("Starting token refresh loop");
+
+ loop {
+ // Fetch token and send to channel
+ let result = Self::fetch_token(&rpc_client, &metadata).await;
+
+ let next_delay = match result {
+ Ok((props, expiration_time)) => {
+ // Send credentials via watch channel (Some indicates
fetched)
+ if let Err(e) = credentials_tx.send(Some(props)) {
+ log::debug!("No active subscribers for credentials
update: {:?}", e);
+ }
+
+ // Calculate next renewal delay based on expiration time
+ if let Some(exp_time) = expiration_time {
+ Self::calculate_renewal_delay(exp_time,
token_renewal_ratio)
+ } else {
+ // No expiration time - token never expires, use long
refresh interval
+ log::info!(
+ "Token has no expiration time (never expires),
next refresh in {:?}",
+ DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+ );
+ DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+ }
+ }
+ Err(e) => {
+ log::warn!(
+ "Failed to obtain security token: {:?}, will retry in
{:?}",
+ e,
+ renewal_retry_backoff
+ );
+ renewal_retry_backoff
+ }
+ };
+
+ log::debug!("Next token refresh in {:?}", next_delay);
+
+ // Wait for either the delay to elapse or shutdown signal
+ tokio::select! {
+ _ = tokio::time::sleep(next_delay) => {
+ // Continue to next iteration to refresh
+ }
+ _ = &mut shutdown_rx => {
+ log::info!("Token refresh loop received shutdown signal");
+ break;
}
}
}
-
- self.refresh_from_server().await
}
- async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
- let cluster = self.metadata.get_cluster();
- let server_node = cluster
- .get_one_available_server()
- .expect("no tablet server available");
- let conn = self.rpc_client.get_connection(server_node).await?;
+ /// Fetch token from server.
+ /// Returns the props and expiration time if available.
+ async fn fetch_token(
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<(HashMap<String, String>, Option<i64>)> {
+ let cluster = metadata.get_cluster();
+ let server_node =
+ cluster
+ .get_one_available_server()
+ .ok_or_else(|| Error::UnexpectedError {
+ message: "No tablet server available for token
refresh".to_string(),
+ source: None,
+ })?;
Review Comment:
Inconsistent logging style. Lines 188, 199, and 211 use `info!` (without
`log::` prefix), while lines 229, 250, and 277 use `log::info!` (with prefix).
Similarly for `warn!` at line 162.
Since all logging macros in this file use the `log` crate, choose one style
consistently. Either:
1. Use `log::info!`, `log::warn!`, etc. everywhere (and remove `info`,
`warn` from imports)
2. Import all needed macros and use `info!`, `warn!`, etc. without prefix
The same applies to the `warn!` usage at line 162.
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) ->
Option<(String, bool)> {
}
}
-pub struct CredentialsCache {
- inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+ credentials: &Credentials,
+ addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert(
+ "access_key_id".to_string(),
+ credentials.access_key_id.clone(),
+ );
+ props.insert(
+ "secret_access_key".to_string(),
+ credentials.access_key_secret.clone(),
+ );
+
+ if let Some(token) = &credentials.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
rpc_client: Arc<RpcClient>,
metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ /// Watch channel sender for broadcasting token updates
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ /// Watch channel receiver (kept to allow cloning for new subscribers)
+ credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+ /// Handle to the background refresh task
+ task_handle: RwLock<Option<JoinHandle<()>>>,
+ /// Sender to signal shutdown
+ shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
}
-impl CredentialsCache {
+impl SecurityTokenManager {
pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+ let (credentials_tx, credentials_rx) = watch::channel(None);
Self {
- inner: RwLock::new(None),
rpc_client,
metadata,
+ token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+ renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+ credentials_tx,
+ credentials_rx,
+ task_handle: RwLock::new(None),
+ shutdown_tx: RwLock::new(None),
+ }
+ }
+
+ /// Subscribe to credential updates.
+ /// Returns a receiver that always contains the latest credentials.
+ /// Consumers can call `receiver.borrow()` to get the current value.
+ pub fn subscribe(&self) -> CredentialsReceiver {
+ self.credentials_rx.clone()
+ }
+
+ /// Start the background token refresh task.
+ /// This should be called once after creating the manager.
+ pub fn start(&self) {
+ if self.task_handle.read().is_some() {
+ warn!("SecurityTokenManager is already started");
+ return;
+ }
+
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+ *self.shutdown_tx.write() = Some(shutdown_tx);
+
+ let rpc_client = Arc::clone(&self.rpc_client);
+ let metadata = Arc::clone(&self.metadata);
+ let token_renewal_ratio = self.token_renewal_ratio;
+ let renewal_retry_backoff = self.renewal_retry_backoff;
+ let credentials_tx = self.credentials_tx.clone();
+
+ let handle = tokio::spawn(async move {
+ Self::token_refresh_loop(
+ rpc_client,
+ metadata,
+ token_renewal_ratio,
+ renewal_retry_backoff,
+ credentials_tx,
+ shutdown_rx,
+ )
+ .await;
+ });
+
+ *self.task_handle.write() = Some(handle);
+ info!("SecurityTokenManager started");
+ }
+
+ /// Stop the background token refresh task.
+ pub fn stop(&self) {
+ if let Some(tx) = self.shutdown_tx.write().take() {
+ let _ = tx.send(());
+ }
+ if let Some(handle) = self.task_handle.write().take() {
+ handle.abort();
}
Review Comment:
Both the shutdown signal sender and task handle are being dropped/aborted in
the `stop` method. However, there's a potential race condition: if `stop()` is
called while the task is in the middle of a token fetch operation, calling
`handle.abort()` will immediately cancel the task. This is fine, but the order
of operations matters.
Consider sending the shutdown signal first and giving the task a brief
moment to finish gracefully before aborting. Alternatively, you could just rely
on the shutdown signal and not abort the handle at all, allowing the task to
complete its current iteration and exit cleanly. The current implementation
aborts immediately which could leave resources in an inconsistent state if the
fetch was in progress.
```suggestion
// Take and drop the task handle so the task can finish gracefully
let _ = self.task_handle.write().take();
```
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -317,14 +318,29 @@ impl RemoteLogFetcher for ProductionFetcher {
remote_log_tablet_dir, segment.segment_id, offset_prefix
);
- let remote_fs_props_map = remote_fs_props.read().clone();
+ // Get credentials from watch channel, waiting if not yet fetched
+ // - None = not yet fetched, wait
+ // - Some(props) = fetched (may be empty if no auth needed)
+ let remote_fs_props = {
+ let maybe_props = credentials_rx.borrow().clone();
+ match maybe_props {
+ Some(props) => props,
+ None => {
+ // Credentials not yet fetched, wait for first update
+ log::info!("Waiting for credentials to be
available...");
+ let _ = credentials_rx.changed().await;
+ // After change, unwrap or use empty (should be Some
now)
+ credentials_rx.borrow().clone().unwrap_or_default()
Review Comment:
Potential issue when watch channel is closed. If the `SecurityTokenManager`
is dropped before credentials are fetched (e.g., during shutdown),
`credentials_rx.changed().await` will return an error because the sender is
dropped. This error is currently ignored with `let _ =`, but the subsequent
`unwrap_or_default()` will still return an empty HashMap.
However, if remote filesystem authentication is required, using an empty
HashMap could cause download failures. Consider explicitly handling the error
case from `changed().await` and returning a proper error to indicate that the
credential manager was shut down before credentials could be obtained.
```suggestion
// If the sender side has been dropped (e.g. during
shutdown),
// this will return an error. Surface that as a
proper error
// instead of silently falling back to empty
credentials.
if let Err(e) = credentials_rx.changed().await {
let io_err = io::Error::new(
io::ErrorKind::BrokenPipe,
format!(
"credentials manager shut down before
credentials were obtained: {e}"
),
);
return Err(io_err.into());
}
// After a successful change notification,
credentials should be set.
// If they are still missing, treat this as an error
instead of
// defaulting to an empty map (which could break
auth flows).
credentials_rx
.borrow()
.clone()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"credentials not available after watch
notification",
)
})
.map_err(Into::into)?
```
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) ->
Option<(String, bool)> {
}
}
-pub struct CredentialsCache {
- inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+ credentials: &Credentials,
+ addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert(
+ "access_key_id".to_string(),
+ credentials.access_key_id.clone(),
+ );
+ props.insert(
+ "secret_access_key".to_string(),
+ credentials.access_key_secret.clone(),
+ );
+
+ if let Some(token) = &credentials.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
rpc_client: Arc<RpcClient>,
metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ /// Watch channel sender for broadcasting token updates
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ /// Watch channel receiver (kept to allow cloning for new subscribers)
+ credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+ /// Handle to the background refresh task
+ task_handle: RwLock<Option<JoinHandle<()>>>,
+ /// Sender to signal shutdown
+ shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
}
-impl CredentialsCache {
+impl SecurityTokenManager {
pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+ let (credentials_tx, credentials_rx) = watch::channel(None);
Self {
- inner: RwLock::new(None),
rpc_client,
metadata,
+ token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+ renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+ credentials_tx,
+ credentials_rx,
+ task_handle: RwLock::new(None),
+ shutdown_tx: RwLock::new(None),
+ }
+ }
+
+ /// Subscribe to credential updates.
+ /// Returns a receiver that always contains the latest credentials.
+ /// Consumers can call `receiver.borrow()` to get the current value.
+ pub fn subscribe(&self) -> CredentialsReceiver {
+ self.credentials_rx.clone()
+ }
+
+ /// Start the background token refresh task.
+ /// This should be called once after creating the manager.
+ pub fn start(&self) {
+ if self.task_handle.read().is_some() {
+ warn!("SecurityTokenManager is already started");
+ return;
+ }
+
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+ *self.shutdown_tx.write() = Some(shutdown_tx);
+
+ let rpc_client = Arc::clone(&self.rpc_client);
+ let metadata = Arc::clone(&self.metadata);
+ let token_renewal_ratio = self.token_renewal_ratio;
+ let renewal_retry_backoff = self.renewal_retry_backoff;
+ let credentials_tx = self.credentials_tx.clone();
+
+ let handle = tokio::spawn(async move {
+ Self::token_refresh_loop(
+ rpc_client,
+ metadata,
+ token_renewal_ratio,
+ renewal_retry_backoff,
+ credentials_tx,
+ shutdown_rx,
+ )
+ .await;
+ });
+
+ *self.task_handle.write() = Some(handle);
+ info!("SecurityTokenManager started");
+ }
+
+ /// Stop the background token refresh task.
+ pub fn stop(&self) {
+ if let Some(tx) = self.shutdown_tx.write().take() {
+ let _ = tx.send(());
+ }
+ if let Some(handle) = self.task_handle.write().take() {
+ handle.abort();
}
+ info!("SecurityTokenManager stopped");
}
- pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
- {
- let guard = self.inner.read();
- if let Some(cached) = guard.as_ref() {
- if cached.cached_at.elapsed() < CACHE_TTL {
- return Ok(cached.to_remote_fs_props());
+ /// Background task that periodically refreshes tokens.
+ async fn token_refresh_loop(
+ rpc_client: Arc<RpcClient>,
+ metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ mut shutdown_rx: oneshot::Receiver<()>,
+ ) {
+ info!("Starting token refresh loop");
+
+ loop {
+ // Fetch token and send to channel
+ let result = Self::fetch_token(&rpc_client, &metadata).await;
+
+ let next_delay = match result {
+ Ok((props, expiration_time)) => {
+ // Send credentials via watch channel (Some indicates
fetched)
+ if let Err(e) = credentials_tx.send(Some(props)) {
+ log::debug!("No active subscribers for credentials
update: {:?}", e);
+ }
+
+ // Calculate next renewal delay based on expiration time
+ if let Some(exp_time) = expiration_time {
+ Self::calculate_renewal_delay(exp_time,
token_renewal_ratio)
+ } else {
+ // No expiration time - token never expires, use long
refresh interval
+ log::info!(
+ "Token has no expiration time (never expires),
next refresh in {:?}",
+ DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+ );
+ DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+ }
+ }
+ Err(e) => {
Review Comment:
When the first token fetch fails, consumers waiting for credentials may
block indefinitely. The token refresh loop retries after
`renewal_retry_backoff` (60 seconds) but doesn't send any credentials to the
watch channel on failure. This means:
1. If `fetch_token` fails initially, the watch channel remains at `None`
2. Consumers waiting on `credentials_rx.changed().await` will continue
waiting
3. The loop sleeps for 60 seconds before retrying
4. During this time, any remote log fetch operations will be blocked
Consider sending an empty HashMap on first fetch failure (similar to the
success case when auth is not required) to unblock consumers, or add a timeout
to the `changed().await` call in remote_log.rs so consumers can fail fast
rather than blocking indefinitely.
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) ->
Option<(String, bool)> {
}
}
-pub struct CredentialsCache {
- inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+ credentials: &Credentials,
+ addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert(
+ "access_key_id".to_string(),
+ credentials.access_key_id.clone(),
+ );
+ props.insert(
+ "secret_access_key".to_string(),
+ credentials.access_key_secret.clone(),
+ );
+
+ if let Some(token) = &credentials.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
rpc_client: Arc<RpcClient>,
metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ /// Watch channel sender for broadcasting token updates
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ /// Watch channel receiver (kept to allow cloning for new subscribers)
+ credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+ /// Handle to the background refresh task
+ task_handle: RwLock<Option<JoinHandle<()>>>,
+ /// Sender to signal shutdown
+ shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
}
-impl CredentialsCache {
+impl SecurityTokenManager {
pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+ let (credentials_tx, credentials_rx) = watch::channel(None);
Self {
- inner: RwLock::new(None),
rpc_client,
metadata,
+ token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+ renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+ credentials_tx,
+ credentials_rx,
+ task_handle: RwLock::new(None),
+ shutdown_tx: RwLock::new(None),
+ }
+ }
+
+ /// Subscribe to credential updates.
+ /// Returns a receiver that always contains the latest credentials.
+ /// Consumers can call `receiver.borrow()` to get the current value.
+ pub fn subscribe(&self) -> CredentialsReceiver {
+ self.credentials_rx.clone()
+ }
+
+ /// Start the background token refresh task.
+ /// This should be called once after creating the manager.
+ pub fn start(&self) {
+ if self.task_handle.read().is_some() {
+ warn!("SecurityTokenManager is already started");
+ return;
+ }
+
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+ *self.shutdown_tx.write() = Some(shutdown_tx);
+
+ let rpc_client = Arc::clone(&self.rpc_client);
+ let metadata = Arc::clone(&self.metadata);
+ let token_renewal_ratio = self.token_renewal_ratio;
+ let renewal_retry_backoff = self.renewal_retry_backoff;
+ let credentials_tx = self.credentials_tx.clone();
+
+ let handle = tokio::spawn(async move {
+ Self::token_refresh_loop(
+ rpc_client,
+ metadata,
+ token_renewal_ratio,
+ renewal_retry_backoff,
+ credentials_tx,
+ shutdown_rx,
+ )
+ .await;
+ });
+
Review Comment:
Missing test coverage for the core SecurityTokenManager functionality. While
there are tests for `calculate_renewal_delay` and `build_remote_fs_props`,
there are no tests for:
1. The background refresh loop starting and stopping correctly
2. Token refresh triggering at appropriate intervals
3. Handling of token fetch failures and retry logic
4. Credential updates being properly broadcast through the watch channel
5. Multiple subscribers receiving updates
Consider adding integration tests that verify the background task behavior,
especially around error handling and timing. You can use mock RPC clients to
simulate token fetch success/failure scenarios.
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -19,13 +19,28 @@ use crate::client::metadata::Metadata;
use crate::error::{Error, Result};
use crate::rpc::RpcClient;
use crate::rpc::message::GetSecurityTokenRequest;
+use log::{debug, info, warn};
Review Comment:
The `debug` identifier in the import is unused. All debug logging uses the
`log::debug!` macro, which doesn't require importing `debug` directly. Consider
removing `debug` from the import list, keeping only `info` and `warn`.
```suggestion
use log::{info, warn};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]