luoyuxia commented on code in PR #213:
URL: https://github.com/apache/fluss-rust/pull/213#discussion_r2725549032
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -88,80 +61,285 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) ->
Option<(String, bool)> {
}
}
-pub struct CredentialsCache {
- inner: RwLock<Option<CachedToken>>,
+/// Build remote filesystem props from credentials and additional info
+fn build_remote_fs_props(
+ credentials: &Credentials,
+ addition_infos: &HashMap<String, String>,
+) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert(
+ "access_key_id".to_string(),
+ credentials.access_key_id.clone(),
+ );
+ props.insert(
+ "secret_access_key".to_string(),
+ credentials.access_key_secret.clone(),
+ );
+
+ if let Some(token) = &credentials.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+}
+
+/// Manager for security tokens that refreshes tokens in a background task.
+///
+/// This follows the pattern from Java's `DefaultSecurityTokenManager`, where
+/// a background thread periodically refreshes tokens based on their
expiration time.
+///
+/// Uses `tokio::sync::watch` channel to broadcast token updates to consumers.
+/// Consumers can subscribe by calling `subscribe()` to get a receiver.
+///
+/// The channel value is `Option<HashMap>`:
+/// - `None` = not yet fetched, consumers should wait
+/// - `Some(HashMap)` = fetched (may be empty if no auth needed)
+///
+/// # Example
+/// ```ignore
+/// let manager = SecurityTokenManager::new(rpc_client, metadata);
+/// let credentials_rx = manager.subscribe();
+/// manager.start();
+///
+/// // Consumer can get latest credentials via:
+/// let props = credentials_rx.borrow().clone();
+/// ```
+pub struct SecurityTokenManager {
rpc_client: Arc<RpcClient>,
metadata: Arc<Metadata>,
+ token_renewal_ratio: f64,
+ renewal_retry_backoff: Duration,
+ /// Watch channel sender for broadcasting token updates
+ credentials_tx: watch::Sender<Option<HashMap<String, String>>>,
+ /// Watch channel receiver (kept to allow cloning for new subscribers)
+ credentials_rx: watch::Receiver<Option<HashMap<String, String>>>,
+ /// Handle to the background refresh task
+ task_handle: RwLock<Option<JoinHandle<()>>>,
+ /// Sender to signal shutdown
+ shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
}
-impl CredentialsCache {
+impl SecurityTokenManager {
pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+ let (credentials_tx, credentials_rx) = watch::channel(None);
Self {
- inner: RwLock::new(None),
rpc_client,
metadata,
+ token_renewal_ratio: DEFAULT_TOKEN_RENEWAL_RATIO,
+ renewal_retry_backoff: DEFAULT_RENEWAL_RETRY_BACKOFF,
+ credentials_tx,
+ credentials_rx,
+ task_handle: RwLock::new(None),
+ shutdown_tx: RwLock::new(None),
+ }
+ }
+
+ /// Subscribe to credential updates.
+ /// Returns a receiver that always contains the latest credentials.
+ /// Consumers can call `receiver.borrow()` to get the current value.
+ pub fn subscribe(&self) -> CredentialsReceiver {
+ self.credentials_rx.clone()
+ }
+
+ /// Start the background token refresh task.
+ /// This should be called once after creating the manager.
+ pub fn start(&self) {
+ if self.task_handle.read().is_some() {
+ warn!("SecurityTokenManager is already started");
+ return;
+ }
+
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+ *self.shutdown_tx.write() = Some(shutdown_tx);
+
+ let rpc_client = Arc::clone(&self.rpc_client);
+ let metadata = Arc::clone(&self.metadata);
+ let token_renewal_ratio = self.token_renewal_ratio;
+ let renewal_retry_backoff = self.renewal_retry_backoff;
+ let credentials_tx = self.credentials_tx.clone();
+
+ let handle = tokio::spawn(async move {
+ Self::token_refresh_loop(
+ rpc_client,
+ metadata,
+ token_renewal_ratio,
+ renewal_retry_backoff,
+ credentials_tx,
+ shutdown_rx,
+ )
+ .await;
+ });
+
Review Comment:
The IT covers Credential updates being properly broadcast through the watch
channel.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]