Copilot commented on code in PR #213:
URL: https://github.com/apache/fluss-rust/pull/213#discussion_r2725336342


##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> 
Option<(String, bool)> {
     }
 }
 
-pub struct CredentialsCache {
-    inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+    credentials: &Credentials,
+    addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+    let mut props = HashMap::new();
+
+    props.insert(
+        "access_key_id".to_string(),
+        credentials.access_key_id.clone(),
+    );
+    props.insert(
+        "secret_access_key".to_string(),
+        credentials.access_key_secret.clone(),
+    );
+
+    if let Some(token) = &credentials.security_token {
+        props.insert("security_token".to_string(), token.clone());
+    }
+
+    for (key, value) in addition_infos {
+        if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+            let final_value = if transform {
+                // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                if value == "true" {
+                    "false".to_string()
+                } else {
+                    "true".to_string()
+                }
+            } else {
+                value.clone()
+            };
+            props.insert(opendal_key, final_value);
+        }
+    }
+
+    props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their 
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
     rpc_client: Arc<RpcClient>,
     metadata: Arc<Metadata>,
+    token_renewal_ratio: f64,
+    renewal_retry_backoff: Duration,
+    /// Watch channel sender for broadcasting token updates
+    credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+    /// Watch channel receiver (kept to allow cloning for new subscribers)
+    credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+    /// Handle to the background refresh task
+    task_handle: RwLock<Option<JoinHandle<()>>>,
+    /// Sender to signal shutdown
+    shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
 }
 
-impl CredentialsCache {
+impl SecurityTokenManager {
     pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+        let (credentials_tx, credentials_rx) = watch::channel(None);
         Self {
-            inner: RwLock::new(None),
             rpc_client,
             metadata,
+            token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+            renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+            credentials_tx,
+            credentials_rx,
+            task_handle: RwLock::new(None),
+            shutdown_tx: RwLock::new(None),
+        }
+    }
+
+    /// Subscribe to credential updates.
+    /// Returns a receiver that always contains the latest credentials.
+    /// Consumers can call `receiver.borrow()` to get the current value.
+    pub fn subscribe(&self) -> CredentialsReceiver {
+        self.credentials_rx.clone()
+    }
+
+    /// Start the background token refresh task.
+    /// This should be called once after creating the manager.
+    pub fn start(&self) {
+        if self.task_handle.read().is_some() {
+            warn!("SecurityTokenManager is already started");
+            return;
+        }
+
+        let (shutdown_tx, shutdown_rx) = oneshot::channel();
+        *self.shutdown_tx.write() = Some(shutdown_tx);
+
+        let rpc_client = Arc::clone(&self.rpc_client);
+        let metadata = Arc::clone(&self.metadata);
+        let token_renewal_ratio = self.token_renewal_ratio;
+        let renewal_retry_backoff = self.renewal_retry_backoff;
+        let credentials_tx = self.credentials_tx.clone();
+
+        let handle = tokio::spawn(async move {
+            Self::token_refresh_loop(
+                rpc_client,
+                metadata,
+                token_renewal_ratio,
+                renewal_retry_backoff,
+                credentials_tx,
+                shutdown_rx,
+            )
+            .await;
+        });
+
+        *self.task_handle.write() = Some(handle);
+        info!("SecurityTokenManager started");
+    }
+
+    /// Stop the background token refresh task.
+    pub fn stop(&self) {
+        if let Some(tx) = self.shutdown_tx.write().take() {
+            let _ = tx.send(());
+        }
+        if let Some(handle) = self.task_handle.write().take() {
+            handle.abort();
         }
+        info!("SecurityTokenManager stopped");
     }
 
-    pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
-        {
-            let guard = self.inner.read();
-            if let Some(cached) = guard.as_ref() {
-                if cached.cached_at.elapsed() < CACHE_TTL {
-                    return Ok(cached.to_remote_fs_props());
+    /// Background task that periodically refreshes tokens.
+    async fn token_refresh_loop(
+        rpc_client: Arc<RpcClient>,
+        metadata: Arc<Metadata>,
+        token_renewal_ratio: f64,
+        renewal_retry_backoff: Duration,
+        credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+        mut shutdown_rx: oneshot::Receiver<()>,
+    ) {
+        info!("Starting token refresh loop");
+
+        loop {
+            // Fetch token and send to channel
+            let result = Self::fetch_token(&rpc_client, &metadata).await;
+
+            let next_delay = match result {
+                Ok((props, expiration_time)) => {
+                    // Send credentials via watch channel (Some indicates 
fetched)
+                    if let Err(e) = credentials_tx.send(Some(props)) {
+                        log::debug!("No active subscribers for credentials 
update: {:?}", e);
+                    }
+
+                    // Calculate next renewal delay based on expiration time
+                    if let Some(exp_time) = expiration_time {
+                        Self::calculate_renewal_delay(exp_time, 
token_renewal_ratio)
+                    } else {
+                        // No expiration time - token never expires, use long 
refresh interval
+                        log::info!(
+                            "Token has no expiration time (never expires), 
next refresh in {:?}",
+                            DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+                        );
+                        DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+                    }
+                }
+                Err(e) => {
+                    log::warn!(
+                        "Failed to obtain security token: {:?}, will retry in 
{:?}",
+                        e,
+                        renewal_retry_backoff
+                    );
+                    renewal_retry_backoff
+                }
+            };
+
+            log::debug!("Next token refresh in {:?}", next_delay);
+
+            // Wait for either the delay to elapse or shutdown signal
+            tokio::select! {
+                _ = tokio::time::sleep(next_delay) => {
+                    // Continue to next iteration to refresh
+                }
+                _ = &mut shutdown_rx => {
+                     log::info!("Token refresh loop received shutdown signal");
+                    break;
                 }
             }
         }
-
-        self.refresh_from_server().await
     }
 
-    async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
-        let cluster = self.metadata.get_cluster();
-        let server_node = cluster
-            .get_one_available_server()
-            .expect("no tablet server available");
-        let conn = self.rpc_client.get_connection(server_node).await?;
+    /// Fetch token from server.
+    /// Returns the props and expiration time if available.
+    async fn fetch_token(
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<(HashMap<String, String>, Option<i64>)> {
+        let cluster = metadata.get_cluster();
+        let server_node =
+            cluster
+                .get_one_available_server()
+                .ok_or_else(|| Error::UnexpectedError {
+                    message: "No tablet server available for token 
refresh".to_string(),
+                    source: None,
+                })?;
 
+        let conn = rpc_client.get_connection(server_node).await?;
         let request = GetSecurityTokenRequest::new();
         let response = conn.request(request).await?;
 
-        // the token may be empty if the remote filesystem
-        // doesn't require token to access
+        // The token may be empty if remote filesystem doesn't require 
authentication
         if response.token.is_empty() {
-            return Ok(HashMap::new());
+            log::info!("Empty token received, remote filesystem may not 
require authentication");
+            return Ok((HashMap::new(), response.expiration_time));
         }
 
         let credentials: Credentials =
             serde_json::from_slice(&response.token).map_err(|e| 
Error::JsonSerdeError {
-                message: format!("Error when parse token from server: {e}"),
+                message: format!("Error when parsing token from server: {e}"),
             })?;
 
         let mut addition_infos = HashMap::new();
         for kv in &response.addition_info {
             addition_infos.insert(kv.key.clone(), kv.value.clone());
         }
 
-        let cached = CachedToken {
-            access_key_id: credentials.access_key_id,
-            secret_access_key: credentials.access_key_secret,
-            security_token: credentials.security_token,
-            addition_infos,
-            cached_at: Instant::now(),
-        };
+        let props = build_remote_fs_props(&credentials, &addition_infos);
+        log::debug!("Security token fetched successfully");
+
+        Ok((props, response.expiration_time))
+    }
+
+    /// Calculate the delay before next token renewal.
+    /// Uses the renewal ratio to refresh before actual expiration.
+    fn calculate_renewal_delay(expiration_time: i64, renewal_ratio: f64) -> 
Duration {
+        let now = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap()
+            .as_millis() as i64;
+

Review Comment:
   Potential integer overflow when calculating delay. If `time_until_expiry` is 
very large (e.g., token expires far in the future), multiplying by 
`renewal_ratio` (0.8) could result in a value that, when cast to `u64`, might 
overflow or produce unexpected results.
   
   For example, if `time_until_expiry` is close to `i64::MAX`, the 
multiplication `(time_until_expiry as f64 * renewal_ratio)` could produce an 
f64 that exceeds `u64::MAX` when cast. While this is unlikely in practice 
(tokens typically don't expire centuries in the future), consider adding bounds 
checking or capping the maximum delay to a reasonable value like 24 hours.



##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> 
Option<(String, bool)> {
     }
 }
 
-pub struct CredentialsCache {
-    inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+    credentials: &Credentials,
+    addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+    let mut props = HashMap::new();
+
+    props.insert(
+        "access_key_id".to_string(),
+        credentials.access_key_id.clone(),
+    );
+    props.insert(
+        "secret_access_key".to_string(),
+        credentials.access_key_secret.clone(),
+    );
+
+    if let Some(token) = &credentials.security_token {
+        props.insert("security_token".to_string(), token.clone());
+    }
+
+    for (key, value) in addition_infos {
+        if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+            let final_value = if transform {
+                // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                if value == "true" {
+                    "false".to_string()
+                } else {
+                    "true".to_string()
+                }
+            } else {
+                value.clone()
+            };
+            props.insert(opendal_key, final_value);
+        }
+    }
+
+    props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their 
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
     rpc_client: Arc<RpcClient>,
     metadata: Arc<Metadata>,
+    token_renewal_ratio: f64,
+    renewal_retry_backoff: Duration,
+    /// Watch channel sender for broadcasting token updates
+    credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+    /// Watch channel receiver (kept to allow cloning for new subscribers)
+    credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+    /// Handle to the background refresh task
+    task_handle: RwLock<Option<JoinHandle<()>>>,
+    /// Sender to signal shutdown
+    shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
 }
 
-impl CredentialsCache {
+impl SecurityTokenManager {
     pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+        let (credentials_tx, credentials_rx) = watch::channel(None);
         Self {
-            inner: RwLock::new(None),
             rpc_client,
             metadata,
+            token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+            renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+            credentials_tx,
+            credentials_rx,
+            task_handle: RwLock::new(None),
+            shutdown_tx: RwLock::new(None),
+        }
+    }
+
+    /// Subscribe to credential updates.
+    /// Returns a receiver that always contains the latest credentials.
+    /// Consumers can call `receiver.borrow()` to get the current value.
+    pub fn subscribe(&self) -> CredentialsReceiver {
+        self.credentials_rx.clone()
+    }
+
+    /// Start the background token refresh task.
+    /// This should be called once after creating the manager.
+    pub fn start(&self) {
+        if self.task_handle.read().is_some() {
+            warn!("SecurityTokenManager is already started");
+            return;
+        }
+
+        let (shutdown_tx, shutdown_rx) = oneshot::channel();
+        *self.shutdown_tx.write() = Some(shutdown_tx);
+
+        let rpc_client = Arc::clone(&self.rpc_client);
+        let metadata = Arc::clone(&self.metadata);
+        let token_renewal_ratio = self.token_renewal_ratio;
+        let renewal_retry_backoff = self.renewal_retry_backoff;
+        let credentials_tx = self.credentials_tx.clone();
+
+        let handle = tokio::spawn(async move {
+            Self::token_refresh_loop(
+                rpc_client,
+                metadata,
+                token_renewal_ratio,
+                renewal_retry_backoff,
+                credentials_tx,
+                shutdown_rx,
+            )
+            .await;
+        });
+
+        *self.task_handle.write() = Some(handle);
+        info!("SecurityTokenManager started");
+    }
+
+    /// Stop the background token refresh task.
+    pub fn stop(&self) {
+        if let Some(tx) = self.shutdown_tx.write().take() {
+            let _ = tx.send(());
+        }
+        if let Some(handle) = self.task_handle.write().take() {
+            handle.abort();
         }
+        info!("SecurityTokenManager stopped");
     }
 
-    pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
-        {
-            let guard = self.inner.read();
-            if let Some(cached) = guard.as_ref() {
-                if cached.cached_at.elapsed() < CACHE_TTL {
-                    return Ok(cached.to_remote_fs_props());
+    /// Background task that periodically refreshes tokens.
+    async fn token_refresh_loop(
+        rpc_client: Arc<RpcClient>,
+        metadata: Arc<Metadata>,
+        token_renewal_ratio: f64,
+        renewal_retry_backoff: Duration,
+        credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+        mut shutdown_rx: oneshot::Receiver<()>,
+    ) {
+        info!("Starting token refresh loop");
+
+        loop {
+            // Fetch token and send to channel
+            let result = Self::fetch_token(&rpc_client, &metadata).await;
+
+            let next_delay = match result {
+                Ok((props, expiration_time)) => {
+                    // Send credentials via watch channel (Some indicates 
fetched)
+                    if let Err(e) = credentials_tx.send(Some(props)) {
+                        log::debug!("No active subscribers for credentials 
update: {:?}", e);
+                    }
+
+                    // Calculate next renewal delay based on expiration time
+                    if let Some(exp_time) = expiration_time {
+                        Self::calculate_renewal_delay(exp_time, 
token_renewal_ratio)
+                    } else {
+                        // No expiration time - token never expires, use long 
refresh interval
+                        log::info!(
+                            "Token has no expiration time (never expires), 
next refresh in {:?}",
+                            DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+                        );
+                        DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+                    }
+                }
+                Err(e) => {
+                    log::warn!(
+                        "Failed to obtain security token: {:?}, will retry in 
{:?}",
+                        e,
+                        renewal_retry_backoff
+                    );
+                    renewal_retry_backoff
+                }
+            };
+
+            log::debug!("Next token refresh in {:?}", next_delay);
+
+            // Wait for either the delay to elapse or shutdown signal
+            tokio::select! {
+                _ = tokio::time::sleep(next_delay) => {
+                    // Continue to next iteration to refresh
+                }
+                _ = &mut shutdown_rx => {
+                     log::info!("Token refresh loop received shutdown signal");
+                    break;
                 }
             }
         }
-
-        self.refresh_from_server().await
     }
 
-    async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
-        let cluster = self.metadata.get_cluster();
-        let server_node = cluster
-            .get_one_available_server()
-            .expect("no tablet server available");
-        let conn = self.rpc_client.get_connection(server_node).await?;
+    /// Fetch token from server.
+    /// Returns the props and expiration time if available.
+    async fn fetch_token(
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<(HashMap<String, String>, Option<i64>)> {
+        let cluster = metadata.get_cluster();
+        let server_node =
+            cluster
+                .get_one_available_server()
+                .ok_or_else(|| Error::UnexpectedError {
+                    message: "No tablet server available for token 
refresh".to_string(),
+                    source: None,
+                })?;

Review Comment:
   Inconsistent logging style. Lines 188, 199, and 211 use `info!` (without 
`log::` prefix), while lines 229, 250, and 277 use `log::info!` (with prefix). 
Similarly for `warn!` at line 162. 
   
   Since all logging macros in this file use the `log` crate, choose one style 
consistently. Either:
   1. Use `log::info!`, `log::warn!`, etc. everywhere (and remove `info`, 
`warn` from imports)
   2. Import all needed macros and use `info!`, `warn!`, etc. without prefix
   
   The same applies to the `warn!` usage at line 162.



##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> 
Option<(String, bool)> {
     }
 }
 
-pub struct CredentialsCache {
-    inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+    credentials: &Credentials,
+    addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+    let mut props = HashMap::new();
+
+    props.insert(
+        "access_key_id".to_string(),
+        credentials.access_key_id.clone(),
+    );
+    props.insert(
+        "secret_access_key".to_string(),
+        credentials.access_key_secret.clone(),
+    );
+
+    if let Some(token) = &credentials.security_token {
+        props.insert("security_token".to_string(), token.clone());
+    }
+
+    for (key, value) in addition_infos {
+        if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+            let final_value = if transform {
+                // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                if value == "true" {
+                    "false".to_string()
+                } else {
+                    "true".to_string()
+                }
+            } else {
+                value.clone()
+            };
+            props.insert(opendal_key, final_value);
+        }
+    }
+
+    props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their 
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
     rpc_client: Arc<RpcClient>,
     metadata: Arc<Metadata>,
+    token_renewal_ratio: f64,
+    renewal_retry_backoff: Duration,
+    /// Watch channel sender for broadcasting token updates
+    credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+    /// Watch channel receiver (kept to allow cloning for new subscribers)
+    credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+    /// Handle to the background refresh task
+    task_handle: RwLock<Option<JoinHandle<()>>>,
+    /// Sender to signal shutdown
+    shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
 }
 
-impl CredentialsCache {
+impl SecurityTokenManager {
     pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+        let (credentials_tx, credentials_rx) = watch::channel(None);
         Self {
-            inner: RwLock::new(None),
             rpc_client,
             metadata,
+            token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+            renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+            credentials_tx,
+            credentials_rx,
+            task_handle: RwLock::new(None),
+            shutdown_tx: RwLock::new(None),
+        }
+    }
+
+    /// Subscribe to credential updates.
+    /// Returns a receiver that always contains the latest credentials.
+    /// Consumers can call `receiver.borrow()` to get the current value.
+    pub fn subscribe(&self) -> CredentialsReceiver {
+        self.credentials_rx.clone()
+    }
+
+    /// Start the background token refresh task.
+    /// This should be called once after creating the manager.
+    pub fn start(&self) {
+        if self.task_handle.read().is_some() {
+            warn!("SecurityTokenManager is already started");
+            return;
+        }
+
+        let (shutdown_tx, shutdown_rx) = oneshot::channel();
+        *self.shutdown_tx.write() = Some(shutdown_tx);
+
+        let rpc_client = Arc::clone(&self.rpc_client);
+        let metadata = Arc::clone(&self.metadata);
+        let token_renewal_ratio = self.token_renewal_ratio;
+        let renewal_retry_backoff = self.renewal_retry_backoff;
+        let credentials_tx = self.credentials_tx.clone();
+
+        let handle = tokio::spawn(async move {
+            Self::token_refresh_loop(
+                rpc_client,
+                metadata,
+                token_renewal_ratio,
+                renewal_retry_backoff,
+                credentials_tx,
+                shutdown_rx,
+            )
+            .await;
+        });
+
+        *self.task_handle.write() = Some(handle);
+        info!("SecurityTokenManager started");
+    }
+
+    /// Stop the background token refresh task.
+    pub fn stop(&self) {
+        if let Some(tx) = self.shutdown_tx.write().take() {
+            let _ = tx.send(());
+        }
+        if let Some(handle) = self.task_handle.write().take() {
+            handle.abort();
         }

Review Comment:
   Both the shutdown signal sender and task handle are being dropped/aborted in 
the `stop` method. However, there's a potential race condition: if `stop()` is 
called while the task is in the middle of a token fetch operation, calling 
`handle.abort()` will immediately cancel the task. This is fine, but the order 
of operations matters.
   
   Consider sending the shutdown signal first and giving the task a brief 
moment to finish gracefully before aborting. Alternatively, you could just rely 
on the shutdown signal and not abort the handle at all, allowing the task to 
complete its current iteration and exit cleanly. The current implementation 
aborts immediately which could leave resources in an inconsistent state if the 
fetch was in progress.
   ```suggestion
           // Take and drop the task handle so the task can finish gracefully
           let _ = self.task_handle.write().take();
   ```



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -317,14 +318,29 @@ impl RemoteLogFetcher for ProductionFetcher {
                 remote_log_tablet_dir, segment.segment_id, offset_prefix
             );
 
-            let remote_fs_props_map = remote_fs_props.read().clone();
+            // Get credentials from watch channel, waiting if not yet fetched
+            // - None = not yet fetched, wait
+            // - Some(props) = fetched (may be empty if no auth needed)
+            let remote_fs_props = {
+                let maybe_props = credentials_rx.borrow().clone();
+                match maybe_props {
+                    Some(props) => props,
+                    None => {
+                        // Credentials not yet fetched, wait for first update
+                        log::info!("Waiting for credentials to be 
available...");
+                        let _ = credentials_rx.changed().await;
+                        // After change, unwrap or use empty (should be Some 
now)
+                        credentials_rx.borrow().clone().unwrap_or_default()

Review Comment:
   Potential issue when watch channel is closed. If the `SecurityTokenManager` 
is dropped before credentials are fetched (e.g., during shutdown), 
`credentials_rx.changed().await` will return an error because the sender is 
dropped. This error is currently ignored with `let _ =`, but the subsequent 
`unwrap_or_default()` will still return an empty HashMap.
   
   However, if remote filesystem authentication is required, using an empty 
HashMap could cause download failures. Consider explicitly handling the error 
case from `changed().await` and returning a proper error to indicate that the 
credential manager was shut down before credentials could be obtained.
   ```suggestion
                           // If the sender side has been dropped (e.g. during 
shutdown),
                           // this will return an error. Surface that as a 
proper error
                           // instead of silently falling back to empty 
credentials.
                           if let Err(e) = credentials_rx.changed().await {
                               let io_err = io::Error::new(
                                   io::ErrorKind::BrokenPipe,
                                   format!(
                                       "credentials manager shut down before 
credentials were obtained: {e}"
                                   ),
                               );
                               return Err(io_err.into());
                           }
                           // After a successful change notification, 
credentials should be set.
                           // If they are still missing, treat this as an error 
instead of
                           // defaulting to an empty map (which could break 
auth flows).
                           credentials_rx
                               .borrow()
                               .clone()
                               .ok_or_else(|| {
                                   io::Error::new(
                                       io::ErrorKind::Other,
                                       "credentials not available after watch 
notification",
                                   )
                               })
                               .map_err(Into::into)?
   ```



##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> 
Option<(String, bool)> {
     }
 }
 
-pub struct CredentialsCache {
-    inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+    credentials: &Credentials,
+    addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+    let mut props = HashMap::new();
+
+    props.insert(
+        "access_key_id".to_string(),
+        credentials.access_key_id.clone(),
+    );
+    props.insert(
+        "secret_access_key".to_string(),
+        credentials.access_key_secret.clone(),
+    );
+
+    if let Some(token) = &credentials.security_token {
+        props.insert("security_token".to_string(), token.clone());
+    }
+
+    for (key, value) in addition_infos {
+        if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+            let final_value = if transform {
+                // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                if value == "true" {
+                    "false".to_string()
+                } else {
+                    "true".to_string()
+                }
+            } else {
+                value.clone()
+            };
+            props.insert(opendal_key, final_value);
+        }
+    }
+
+    props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their 
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
     rpc_client: Arc<RpcClient>,
     metadata: Arc<Metadata>,
+    token_renewal_ratio: f64,
+    renewal_retry_backoff: Duration,
+    /// Watch channel sender for broadcasting token updates
+    credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+    /// Watch channel receiver (kept to allow cloning for new subscribers)
+    credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+    /// Handle to the background refresh task
+    task_handle: RwLock<Option<JoinHandle<()>>>,
+    /// Sender to signal shutdown
+    shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
 }
 
-impl CredentialsCache {
+impl SecurityTokenManager {
     pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+        let (credentials_tx, credentials_rx) = watch::channel(None);
         Self {
-            inner: RwLock::new(None),
             rpc_client,
             metadata,
+            token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+            renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+            credentials_tx,
+            credentials_rx,
+            task_handle: RwLock::new(None),
+            shutdown_tx: RwLock::new(None),
+        }
+    }
+
+    /// Subscribe to credential updates.
+    /// Returns a receiver that always contains the latest credentials.
+    /// Consumers can call `receiver.borrow()` to get the current value.
+    pub fn subscribe(&self) -> CredentialsReceiver {
+        self.credentials_rx.clone()
+    }
+
+    /// Start the background token refresh task.
+    /// This should be called once after creating the manager.
+    pub fn start(&self) {
+        if self.task_handle.read().is_some() {
+            warn!("SecurityTokenManager is already started");
+            return;
+        }
+
+        let (shutdown_tx, shutdown_rx) = oneshot::channel();
+        *self.shutdown_tx.write() = Some(shutdown_tx);
+
+        let rpc_client = Arc::clone(&self.rpc_client);
+        let metadata = Arc::clone(&self.metadata);
+        let token_renewal_ratio = self.token_renewal_ratio;
+        let renewal_retry_backoff = self.renewal_retry_backoff;
+        let credentials_tx = self.credentials_tx.clone();
+
+        let handle = tokio::spawn(async move {
+            Self::token_refresh_loop(
+                rpc_client,
+                metadata,
+                token_renewal_ratio,
+                renewal_retry_backoff,
+                credentials_tx,
+                shutdown_rx,
+            )
+            .await;
+        });
+
+        *self.task_handle.write() = Some(handle);
+        info!("SecurityTokenManager started");
+    }
+
+    /// Stop the background token refresh task.
+    pub fn stop(&self) {
+        if let Some(tx) = self.shutdown_tx.write().take() {
+            let _ = tx.send(());
+        }
+        if let Some(handle) = self.task_handle.write().take() {
+            handle.abort();
         }
+        info!("SecurityTokenManager stopped");
     }
 
-    pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
-        {
-            let guard = self.inner.read();
-            if let Some(cached) = guard.as_ref() {
-                if cached.cached_at.elapsed() < CACHE_TTL {
-                    return Ok(cached.to_remote_fs_props());
+    /// Background task that periodically refreshes tokens.
+    async fn token_refresh_loop(
+        rpc_client: Arc<RpcClient>,
+        metadata: Arc<Metadata>,
+        token_renewal_ratio: f64,
+        renewal_retry_backoff: Duration,
+        credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+        mut shutdown_rx: oneshot::Receiver<()>,
+    ) {
+        info!("Starting token refresh loop");
+
+        loop {
+            // Fetch token and send to channel
+            let result = Self::fetch_token(&rpc_client, &metadata).await;
+
+            let next_delay = match result {
+                Ok((props, expiration_time)) => {
+                    // Send credentials via watch channel (Some indicates 
fetched)
+                    if let Err(e) = credentials_tx.send(Some(props)) {
+                        log::debug!("No active subscribers for credentials 
update: {:?}", e);
+                    }
+
+                    // Calculate next renewal delay based on expiration time
+                    if let Some(exp_time) = expiration_time {
+                        Self::calculate_renewal_delay(exp_time, 
token_renewal_ratio)
+                    } else {
+                        // No expiration time - token never expires, use long 
refresh interval
+                        log::info!(
+                            "Token has no expiration time (never expires), 
next refresh in {:?}",
+                            DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+                        );
+                        DEFAULT_NON_EXPIRING_REFRESH_INTERVAL
+                    }
+                }
+                Err(e) => {

Review Comment:
   When the first token fetch fails, consumers waiting for credentials may 
block indefinitely. The token refresh loop retries after 
`renewal_retry_backoff` (60 seconds) but doesn't send any credentials to the 
watch channel on failure. This means:
   
   1. If `fetch_token` fails initially, the watch channel remains at `None`
   2. Consumers waiting on `credentials_rx.changed().await` will continue 
waiting
   3. The loop sleeps for 60 seconds before retrying
   4. During this time, any remote log fetch operations will be blocked
   
   Consider sending an empty HashMap on first fetch failure (similar to the 
success case when auth is not required) to unblock consumers, or add a timeout 
to the `changed().await` call in remote_log.rs so consumers can fail fast 
rather than blocking indefinitely.



##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> 
Option<(String, bool)> {
     }
 }
 
-pub struct CredentialsCache {
-    inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+    credentials: &Credentials,
+    addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+    let mut props = HashMap::new();
+
+    props.insert(
+        "access_key_id".to_string(),
+        credentials.access_key_id.clone(),
+    );
+    props.insert(
+        "secret_access_key".to_string(),
+        credentials.access_key_secret.clone(),
+    );
+
+    if let Some(token) = &credentials.security_token {
+        props.insert("security_token".to_string(), token.clone());
+    }
+
+    for (key, value) in addition_infos {
+        if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+            let final_value = if transform {
+                // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                if value == "true" {
+                    "false".to_string()
+                } else {
+                    "true".to_string()
+                }
+            } else {
+                value.clone()
+            };
+            props.insert(opendal_key, final_value);
+        }
+    }
+
+    props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their 
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
     rpc_client: Arc<RpcClient>,
     metadata: Arc<Metadata>,
+    token_renewal_ratio: f64,
+    renewal_retry_backoff: Duration,
+    /// Watch channel sender for broadcasting token updates
+    credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+    /// Watch channel receiver (kept to allow cloning for new subscribers)
+    credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+    /// Handle to the background refresh task
+    task_handle: RwLock<Option<JoinHandle<()>>>,
+    /// Sender to signal shutdown
+    shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
 }
 
-impl CredentialsCache {
+impl SecurityTokenManager {
     pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+        let (credentials_tx, credentials_rx) = watch::channel(None);
         Self {
-            inner: RwLock::new(None),
             rpc_client,
             metadata,
+            token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+            renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+            credentials_tx,
+            credentials_rx,
+            task_handle: RwLock::new(None),
+            shutdown_tx: RwLock::new(None),
+        }
+    }
+
+    /// Subscribe to credential updates.
+    /// Returns a receiver that always contains the latest credentials.
+    /// Consumers can call `receiver.borrow()` to get the current value.
+    pub fn subscribe(&self) -> CredentialsReceiver {
+        self.credentials_rx.clone()
+    }
+
+    /// Start the background token refresh task.
+    /// This should be called once after creating the manager.
+    pub fn start(&self) {
+        if self.task_handle.read().is_some() {
+            warn!("SecurityTokenManager is already started");
+            return;
+        }
+
+        let (shutdown_tx, shutdown_rx) = oneshot::channel();
+        *self.shutdown_tx.write() = Some(shutdown_tx);
+
+        let rpc_client = Arc::clone(&self.rpc_client);
+        let metadata = Arc::clone(&self.metadata);
+        let token_renewal_ratio = self.token_renewal_ratio;
+        let renewal_retry_backoff = self.renewal_retry_backoff;
+        let credentials_tx = self.credentials_tx.clone();
+
+        let handle = tokio::spawn(async move {
+            Self::token_refresh_loop(
+                rpc_client,
+                metadata,
+                token_renewal_ratio,
+                renewal_retry_backoff,
+                credentials_tx,
+                shutdown_rx,
+            )
+            .await;
+        });
+

Review Comment:
   Missing test coverage for the core SecurityTokenManager functionality. While 
there are tests for `calculate_renewal_delay` and `build_remote_fs_props`, 
there are no tests for:
   1. The background refresh loop starting and stopping correctly
   2. Token refresh triggering at appropriate intervals
   3. Handling of token fetch failures and retry logic
   4. Credential updates being properly broadcast through the watch channel
   5. Multiple subscribers receiving updates
   
   Consider adding integration tests that verify the background task behavior, 
especially around error handling and timing. You can use mock RPC clients to 
simulate token fetch success/failure scenarios.



##########
crates/fluss/src/client/credentials.rs:
##########
@@ -19,13 +19,28 @@ use crate::client::metadata::Metadata;
 use crate::error::{Error, Result};
 use crate::rpc::RpcClient;
 use crate::rpc::message::GetSecurityTokenRequest;
+use log::{debug, info, warn};

Review Comment:
   The `debug` identifier in the import is unused. All debug logging uses the 
`log::debug!` macro, which doesn't require importing `debug` directly. Consider 
removing `debug` from the import list, keeping only `info` and `warn`.
   ```suggestion
   use log::{info, warn};
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to