sunxiaojian commented on code in PR #9003:
URL: https://github.com/apache/gravitino/pull/9003#discussion_r2491051030


##########
iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java:
##########
@@ -110,14 +115,228 @@ private ClientPool<IMetaStoreClient, TException> 
resetIcebergHiveClientPool()
     final Field m = HiveCatalog.class.getDeclaredField("clients");
     m.setAccessible(true);
 
-    // TODO: we need to close the original client pool and thread pool, or it 
will cause memory
-    //  leak.
-    ClientPool<IMetaStoreClient, TException> newClientPool =
+    // Get the old client pool before replacing it
+    Object oldPool = m.get(target);
+
+    // Create and set the new client pool first
+    IcebergHiveCachedClientPool newClientPool =
         new IcebergHiveCachedClientPool(target.getConf(), properties);
     m.set(target, newClientPool);
+
+    // Then try to close the old pool to release resources
+    if (oldPool != null) {
+      // Try standard close method if available
+      if (oldPool instanceof AutoCloseable) {
+        try {
+          ((AutoCloseable) oldPool).close();
+          LOG.info("Successfully closed old Hive client pool");
+        } catch (Exception e) {
+          LOG.warn("Failed to close old Hive client pool", e);
+        }
+      }
+
+      // Additionally, try to shutdown the internal scheduler thread pool in 
Iceberg's
+      // CachedClientPool to prevent memory leak. This is necessary because 
Iceberg's
+      // CachedClientPool does not implement Closeable and does not properly 
clean up
+      // its internal scheduler.
+      try {
+        shutdownIcebergCachedClientPoolScheduler(oldPool);
+      } catch (Exception e) {
+        LOG.warn(
+            "Failed to shutdown scheduler in old CachedClientPool, may cause 
minor resource leak",
+            e);
+      }
+    }
+
     return newClientPool;
   }
 
+  /**
+   * Attempts to shutdown the scheduler thread pool in Iceberg's 
CachedClientPool using reflection.
+   * This is necessary because CachedClientPool creates a scheduler via
+   * ThreadPools.newScheduledPool() but does not provide a way to close it, 
leading to potential
+   * thread leaks.
+   *
+   * @param clientPool The old CachedClientPool instance
+   */
+  @VisibleForTesting
+  void shutdownIcebergCachedClientPoolScheduler(Object clientPool) {
+    try {
+      // Access the clientPoolCache field from CachedClientPool
+      Field cacheField = 
clientPool.getClass().getDeclaredField("clientPoolCache");
+      cacheField.setAccessible(true);
+      Object cache = cacheField.get(clientPool);
+
+      if (cache == null) {
+        LOG.debug("clientPoolCache is null, no scheduler to shutdown");
+        return;
+      }
+
+      // Try to get the scheduler from Caffeine cache and shutdown its executor
+      ScheduledExecutorService executor = extractSchedulerExecutor(cache);
+      if (executor != null) {
+        LOG.info("Shutting down scheduler thread pool from old 
CachedClientPool");
+        executor.shutdownNow();
+      } else {
+        LOG.debug("Could not extract scheduler executor from cache");
+      }
+    } catch (NoSuchFieldException e) {
+      LOG.debug("CachedClientPool does not have expected field structure, 
skipping cleanup");
+    } catch (Exception e) {
+      LOG.debug("Failed to shutdown CachedClientPool scheduler", e);
+    }
+  }
+
+  /**
+   * Extracts the ScheduledExecutorService from a Caffeine Cache instance 
using reflection.
+   *
+   * <p>Required because Iceberg's CachedClientPool doesn't implement 
Closeable, leaving the
+   * scheduler thread pool without proper cleanup.
+   *
+   * <p>Reflection path (Caffeine 2.9.3): Cache → cache field → pacer field → 
scheduler field →
+   * ScheduledExecutorService
+   *
+   * @param cache The Caffeine cache instance.
+   * @return The ScheduledExecutorService if found, null otherwise.
+   */
+  @VisibleForTesting
+  ScheduledExecutorService extractSchedulerExecutor(Object cache) {
+    try {
+      // Step 1: Unwrap cache if it's a wrapper (e.g., BoundedLocalManualCache)
+      Object cacheImpl = cache;

Review Comment:
   yes, they all have a cache field pointing to BoundedLocalCache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to