Copilot commented on code in PR #369:
URL: https://github.com/apache/fluss-rust/pull/369#discussion_r2870767395


##########
crates/fluss/src/client/connection.rs:
##########
@@ -76,7 +79,16 @@ impl FlussConnection {
     }
 
     pub async fn get_admin(&self) -> Result<FlussAdmin> {
-        FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
+        // Lazily initialize and cache the FlussAdmin instance. The cached 
FlussAdmin
+        // holds a reference to RpcClient, which manages connection reuse and 
re-acquisition
+        // when a cached connection becomes poisoned. Subsequent calls clone 
cheaply —
+        // all internal fields (ServerConnection, Arc<Metadata>, 
Arc<RpcClient>) are
+        // Arc-backed so cloning is just a reference-count bump.
+        let admin = self
+            .admin_client
+            .get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()))
+            .await?;
+        Ok(admin.clone())

Review Comment:
   Caching `FlussAdmin` in a `tokio::sync::OnceCell` makes the admin instance 
effectively permanent for the lifetime of the connection. If the coordinator 
changes (metadata refresh) or the underlying `ServerConnection` becomes 
poisoned, `get_admin()` will keep returning the same cached instance and 
callers may be unable to recover without constructing a new `FlussConnection`. 
Consider using a cache that supports invalidation/refresh (e.g., async lock 
around an `Option<FlussAdmin>`), or make `FlussAdmin` acquire/refresh its 
`admin_gateway` on demand when the current connection is poisoned or when 
requests return `InvalidCoordinatorException`.



##########
crates/fluss/src/client/connection.rs:
##########
@@ -76,7 +79,16 @@ impl FlussConnection {
     }
 
     pub async fn get_admin(&self) -> Result<FlussAdmin> {
-        FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
+        // Lazily initialize and cache the FlussAdmin instance. The cached 
FlussAdmin
+        // holds a reference to RpcClient, which manages connection reuse and 
re-acquisition
+        // when a cached connection becomes poisoned. Subsequent calls clone 
cheaply —
+        // all internal fields (ServerConnection, Arc<Metadata>, 
Arc<RpcClient>) are
+        // Arc-backed so cloning is just a reference-count bump.

Review Comment:
   The doc comment here suggests the cached `FlussAdmin` will automatically 
re-acquire connections when a cached connection becomes poisoned, but 
`FlussAdmin` stores a concrete `admin_gateway: ServerConnection` created once 
in `FlussAdmin::new()`. `RpcClient` only re-connects when `get_connection()` is 
called again, so this comment is misleading and the cached admin can keep 
returning a poisoned gateway indefinitely. Please adjust the comment and/or 
implement a reconnection path in `FlussAdmin`/`get_admin()` that actually 
re-acquires a coordinator connection on poison/failover.
   ```suggestion
           // Lazily initialize and cache the FlussAdmin instance. Subsequent 
calls
           // return a clone of the same cached admin. All internal fields
           // (ServerConnection, Arc<Metadata>, Arc<RpcClient>) are Arc-backed, 
so
           // cloning is effectively just a reference-count bump.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to