This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new d575ad1113 [#9002] Improvement(iceberg): Optimize 
HiveBackendProxy.resetIcebergHiveClientPool() to prevent memory leaks.  (#9003)
d575ad1113 is described below

commit d575ad11132806c691f835d364bfce4f17038faf
Author: Xiaojian Sun <[email protected]>
AuthorDate: Wed Nov 5 16:41:03 2025 +0800

    [#9002] Improvement(iceberg): Optimize 
HiveBackendProxy.resetIcebergHiveClientPool() to prevent memory leaks.  (#9003)
    
    ### What changes were proposed in this pull request?
    
    Optimize HiveBackendProxy.resetIcebergHiveClientPool() to prevent memory
    leaks.
    
    ### Why are the changes needed?
    
    Fix: #9002
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    N/A
    
    Co-authored-by: Mini Yu <[email protected]>
---
 .../authentication/kerberos/HiveBackendProxy.java  |  79 ++++++--
 .../utils/CaffeineSchedulerExtractorUtils.java     | 200 +++++++++++++++++++++
 .../utils/TestCaffeineSchedulerExtractorUtils.java | 148 +++++++++++++++
 3 files changed, 417 insertions(+), 10 deletions(-)

diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
index 4278756c96..2da071f08f 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
@@ -19,14 +19,17 @@
 
 package org.apache.gravitino.iceberg.common.authentication.kerberos;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import net.sf.cglib.proxy.Enhancer;
 import net.sf.cglib.proxy.MethodInterceptor;
 import net.sf.cglib.proxy.MethodProxy;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import 
org.apache.gravitino.iceberg.common.utils.CaffeineSchedulerExtractorUtils;
 import org.apache.gravitino.iceberg.common.utils.IcebergHiveCachedClientPool;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -36,13 +39,15 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.iceberg.ClientPool;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Proxy class for HiveCatalog to support kerberos authentication. We can also 
make HiveCatalog as a
  * generic type and pass it as a parameter to the constructor.
  */
 public class HiveBackendProxy implements MethodInterceptor {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveBackendProxy.class);
   private final HiveCatalog target;
   private final String kerberosRealm;
   private final UserGroupInformation proxyUser;
@@ -65,7 +70,7 @@ public class HiveBackendProxy implements MethodInterceptor {
       this.newClientPool = resetIcebergHiveClientPool();
     } catch (IOException e) {
       throw new RuntimeException("Failed to get current user", e);
-    } catch (IllegalAccessException | NoSuchFieldException e) {
+    } catch (IllegalAccessException e) {
       throw new RuntimeException("Failed to reset IcebergHiveClientPool", e);
     }
   }
@@ -106,18 +111,72 @@ public class HiveBackendProxy implements 
MethodInterceptor {
   }
 
   private ClientPool<IMetaStoreClient, TException> resetIcebergHiveClientPool()
-      throws IllegalAccessException, NoSuchFieldException {
-    final Field m = HiveCatalog.class.getDeclaredField("clients");
-    m.setAccessible(true);
+      throws IllegalAccessException {
+    // Get the old client pool before replacing it
+    Object oldPool = FieldUtils.readField(target, "clients", true);
 
-    // TODO: we need to close the original client pool and thread pool, or it 
will cause memory
-    //  leak.
-    ClientPool<IMetaStoreClient, TException> newClientPool =
+    // Create and set the new client pool first
+    IcebergHiveCachedClientPool newClientPool =
         new IcebergHiveCachedClientPool(target.getConf(), properties);
-    m.set(target, newClientPool);
+    FieldUtils.writeField(target, "clients", newClientPool, true);
+
+    // Then try to close the old pool to release resources
+    if (oldPool != null) {
+      // Try standard close method if available
+      if (oldPool instanceof AutoCloseable) {
+        try {
+          ((AutoCloseable) oldPool).close();
+          LOG.info("Successfully closed old Hive client pool");
+        } catch (Exception e) {
+          LOG.warn("Failed to close old Hive client pool", e);
+        }
+      }
+
+      // Additionally, try to shutdown the internal scheduler thread pool in 
Iceberg's
+      // CachedClientPool to prevent memory leak. This is necessary because 
Iceberg's
+      // CachedClientPool does not implement Closeable and does not properly 
clean up
+      // its internal scheduler.
+      try {
+        shutdownIcebergCachedClientPoolScheduler(oldPool);
+      } catch (Exception e) {
+        LOG.warn(
+            "Failed to shutdown scheduler in old CachedClientPool, may cause 
minor resource leak",
+            e);
+      }
+    }
+
     return newClientPool;
   }
 
+  /**
+   * Shuts down the scheduler thread pool in Iceberg's CachedClientPool.
+   *
+   * <p>Required because CachedClientPool doesn't provide cleanup, causing 
thread pool leaks.
+   *
+   * @param clientPool The old CachedClientPool instance
+   */
+  @VisibleForTesting
+  void shutdownIcebergCachedClientPoolScheduler(Object clientPool) {
+    try {
+      Object cache = FieldUtils.readField(clientPool, "clientPoolCache", true);
+      if (cache == null) {
+        LOG.debug("clientPoolCache is null, no scheduler to shutdown");
+        return;
+      }
+
+      ScheduledExecutorService executor =
+          CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
+      if (executor != null) {
+        LOG.info("Shutting down scheduler thread pool from old 
CachedClientPool");
+        executor.shutdownNow();
+      } else {
+        LOG.debug("Could not extract scheduler executor from cache");
+      }
+    } catch (IllegalAccessException e) {
+      LOG.debug("Failed to access clientPoolCache field", e);
+    }
+  }
+
   public HiveCatalog getProxy() {
     Enhancer e = new Enhancer();
     e.setClassLoader(target.getClass().getClassLoader());
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/CaffeineSchedulerExtractorUtils.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/CaffeineSchedulerExtractorUtils.java
new file mode 100644
index 0000000000..361d5990e9
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/CaffeineSchedulerExtractorUtils.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.utils;
+
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for extracting schedulers from Caffeine caches.
+ *
+ * <p>Provides reflection-based access to Caffeine's internal 
ScheduledExecutorService when caches
+ * don't properly clean up their scheduler thread pools.
+ *
+ * <p>Reflection path (Caffeine 2.9.3): Cache → pacer → scheduler → 
ScheduledExecutorService
+ */
+public class CaffeineSchedulerExtractorUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineSchedulerExtractorUtils.class);
+
+  private CaffeineSchedulerExtractorUtils() {
+    // Utility class, prevent instantiation
+  }
+
+  /**
+   * Extracts the ScheduledExecutorService from a Caffeine Cache using 
reflection.
+   *
+   * <p>Works with various cache implementations: BoundedLocalManualCache, 
BoundedLocalLoadingCache,
+   * etc.
+   *
+   * @param cache The Caffeine cache instance
+   * @return The ScheduledExecutorService if found, null otherwise
+   */
+  public static ScheduledExecutorService extractSchedulerExecutor(Object 
cache) {
+    try {
+      // Step 1: Unwrap cache if it's a wrapper (e.g., BoundedLocalManualCache)
+      Object cacheImpl = unwrapCache(cache);
+
+      // Step 2: Get Pacer from cache
+      Object pacer = getPacerFromCache(cacheImpl);
+      if (pacer == null) {
+        return null;
+      }
+
+      // Step 3: Get Scheduler from Pacer
+      Scheduler scheduler = getSchedulerFromPacer(pacer);
+      if (scheduler == null) {
+        return null;
+      }
+
+      // Step 4: Extract ScheduledExecutorService from Scheduler
+      ScheduledExecutorService executorService = 
getExecutorServiceFromScheduler(scheduler);
+      if (executorService != null) {
+        LOG.info("Successfully extracted ScheduledExecutorService from 
Caffeine cache");
+        return executorService;
+      }
+
+    } catch (Exception e) {
+      LOG.warn("Failed to extract scheduler from Caffeine cache: {}", 
e.getMessage());
+    }
+
+    return null;
+  }
+
+  /**
+   * Unwraps cache wrappers (e.g., BoundedLocalManualCache) to access the 
underlying implementation.
+   *
+   * @param cache The cache object (possibly a wrapper)
+   * @return The unwrapped cache implementation
+   */
+  @VisibleForTesting
+  public static Object unwrapCache(Object cache) {
+    if (cache == null) {
+      return null;
+    }
+
+    // Try to get inner cache field (for wrapper classes)
+    try {
+      Object innerCache = FieldUtils.readField(cache, "cache", true);
+      if (innerCache != null) {
+        return innerCache;
+      }
+    } catch (IllegalAccessException e) {
+      // Not a wrapper, use cache directly
+    }
+
+    return cache;
+  }
+
+  /**
+   * Gets the Pacer from cache implementation (traverses class hierarchy if 
needed).
+   *
+   * @param cacheImpl The cache implementation
+   * @return The Pacer object, or null if not found
+   */
+  @VisibleForTesting
+  public static Object getPacerFromCache(Object cacheImpl) {
+    if (cacheImpl == null) {
+      return null;
+    }
+
+    // Traverse class hierarchy to find pacer field
+    Class<?> clazz = cacheImpl.getClass();
+    while (clazz != null && !clazz.equals(Object.class)) {
+      try {
+        Object pacer = FieldUtils.readField(cacheImpl, "pacer", true);
+        if (pacer != null && "Pacer".equals(pacer.getClass().getSimpleName())) 
{
+          return pacer;
+        }
+      } catch (IllegalAccessException e) {
+        // Continue to superclass
+      }
+      clazz = clazz.getSuperclass();
+    }
+
+    return null;
+  }
+
+  /**
+   * Extracts Scheduler from Pacer.
+   *
+   * @param pacer The Pacer object
+   * @return The Scheduler instance, or null if not found
+   */
+  @VisibleForTesting
+  public static Scheduler getSchedulerFromPacer(Object pacer) {
+    if (pacer == null) {
+      return null;
+    }
+
+    try {
+      Object scheduler = FieldUtils.readField(pacer, "scheduler", true);
+      if (scheduler instanceof Scheduler) {
+        return (Scheduler) scheduler;
+      }
+    } catch (IllegalAccessException e) {
+      // Scheduler field not accessible
+    }
+
+    return null;
+  }
+
+  /**
+   * Extracts ScheduledExecutorService from Scheduler (handles 
GuardedScheduler wrapper).
+   *
+   * @param scheduler The Scheduler instance
+   * @return The ScheduledExecutorService, or null if not found
+   */
+  @VisibleForTesting
+  public static ScheduledExecutorService 
getExecutorServiceFromScheduler(Scheduler scheduler) {
+    if (scheduler == null) {
+      return null;
+    }
+
+    // Handle GuardedScheduler wrapper
+    Object actualScheduler = scheduler;
+    if ("GuardedScheduler".equals(scheduler.getClass().getSimpleName())) {
+      try {
+        Object delegate = FieldUtils.readField(scheduler, "delegate", true);
+        if (delegate != null) {
+          actualScheduler = delegate;
+        }
+      } catch (IllegalAccessException e) {
+        // Could not unwrap, use original
+      }
+    }
+
+    // Extract from ExecutorServiceScheduler
+    if 
("ExecutorServiceScheduler".equals(actualScheduler.getClass().getSimpleName())) 
{
+      try {
+        Object executorService =
+            FieldUtils.readField(actualScheduler, "scheduledExecutorService", 
true);
+        if (executorService instanceof ScheduledExecutorService) {
+          return (ScheduledExecutorService) executorService;
+        }
+      } catch (IllegalAccessException e) {
+        // Could not extract
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestCaffeineSchedulerExtractorUtils.java
 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestCaffeineSchedulerExtractorUtils.java
new file mode 100644
index 0000000000..ee4526f4dd
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestCaffeineSchedulerExtractorUtils.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.utils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.Test;
+
+/** Test class for CaffeineSchedulerExtractorUtils. */
+public class TestCaffeineSchedulerExtractorUtils {
+
+  @Test
+  public void testExtractSchedulerExecutor() {
+    ScheduledExecutorService testScheduler = 
ThreadPools.newScheduledPool("test-extract", 1);
+    Cache<String, String> cache =
+        Caffeine.newBuilder()
+            .expireAfterAccess(10, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(testScheduler))
+            .build();
+
+    try {
+      // Use CaffeineSchedulerExtractorUtils to extract scheduler
+      ScheduledExecutorService extracted =
+          CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
+      assertNotNull(extracted, "Should successfully extract scheduler from 
cache");
+
+      // Verification 1: Reference equality - they are the SAME object
+      assertSame(
+          testScheduler,
+          extracted,
+          "Extracted scheduler MUST be the same instance as the one we 
provided");
+
+      // Verification 2: Initial state - both should NOT be shutdown
+      assertFalse(testScheduler.isShutdown(), "Original should not be shutdown 
initially");
+      assertFalse(extracted.isShutdown(), "Extracted should not be shutdown 
initially");
+
+      // Verification 3: Shutdown extracted and verify original is also 
shutdown
+      extracted.shutdownNow();
+
+      // Both should be shutdown (proving they are the SAME object)
+      assertTrue(extracted.isShutdown(), "Extracted should be shutdown");
+      assertTrue(testScheduler.isShutdown(), "Original MUST be shutdown, 
proving same instance!");
+
+    } catch (Exception e) {
+      testScheduler.shutdownNow();
+      throw e;
+    }
+  }
+
+  @Test
+  public void testGetPacerFromCache() throws Exception {
+    ScheduledExecutorService testScheduler = 
ThreadPools.newScheduledPool("test-pacer", 1);
+
+    Cache<String, String> cache =
+        Caffeine.newBuilder()
+            .expireAfterAccess(10, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(testScheduler))
+            .build();
+
+    try {
+      // Unwrap cache using utility method
+      Object cacheImpl = CaffeineSchedulerExtractorUtils.unwrapCache(cache);
+      assertNotNull(cacheImpl, "Should unwrap cache");
+
+      // Get Pacer using utility method
+      Object pacer = 
CaffeineSchedulerExtractorUtils.getPacerFromCache(cacheImpl);
+      assertNotNull(pacer, "Should successfully get Pacer object");
+      assertEquals("Pacer", pacer.getClass().getSimpleName(), "Should be Pacer 
class");
+
+    } finally {
+      testScheduler.shutdownNow();
+    }
+  }
+
+  /** Test the complete extraction chain with identity verification at each 
step. */
+  @Test
+  public void testCompleteExtractionChain() throws Exception {
+    ScheduledExecutorService testScheduler = 
ThreadPools.newScheduledPool("test-chain", 1);
+
+    Cache<String, String> cache =
+        Caffeine.newBuilder()
+            .expireAfterAccess(10, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(testScheduler))
+            .build();
+
+    try {
+      // Step 1: Unwrap cache
+      Object cacheImpl = CaffeineSchedulerExtractorUtils.unwrapCache(cache);
+      assertNotNull(cacheImpl, "Step 1: Should unwrap cache");
+
+      // Step 2: Get Pacer
+      Object pacer = 
CaffeineSchedulerExtractorUtils.getPacerFromCache(cacheImpl);
+      assertNotNull(pacer, "Step 2: Should get Pacer");
+      assertEquals("Pacer", pacer.getClass().getSimpleName(), "Step 2: Should 
be Pacer class");
+
+      // Step 3: Get Scheduler from Pacer
+      Scheduler scheduler = 
CaffeineSchedulerExtractorUtils.getSchedulerFromPacer(pacer);
+      assertNotNull(scheduler, "Step 3: Should get Scheduler");
+
+      // Step 4: Get ExecutorService from Scheduler
+      ScheduledExecutorService executor =
+          
CaffeineSchedulerExtractorUtils.getExecutorServiceFromScheduler(scheduler);
+      assertNotNull(executor, "Step 4: Should get ScheduledExecutorService");
+
+      // Step 5: CRITICAL - Identity verification
+      assertSame(testScheduler, executor, "Extracted MUST be same instance!");
+
+      // Step 6: Behavior verification - shutdown one affects the other
+      assertFalse(testScheduler.isShutdown(), "Original should not be shutdown 
initially");
+      assertFalse(executor.isShutdown(), "Extracted should not be shutdown 
initially");
+
+      executor.shutdownNow();
+
+      assertTrue(executor.isShutdown(), "Extracted should be shutdown");
+      assertTrue(testScheduler.isShutdown(), "Original MUST be shutdown - same 
object!");
+
+    } catch (Exception e) {
+      testScheduler.shutdownNow();
+      throw e;
+    }
+  }
+}

Reply via email to