This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d575ad1113 [#9002] Improvement(iceberg): Optimize
HiveBackendProxy.resetIcebergHiveClientPool() to prevent memory leaks. (#9003)
d575ad1113 is described below
commit d575ad11132806c691f835d364bfce4f17038faf
Author: Xiaojian Sun <[email protected]>
AuthorDate: Wed Nov 5 16:41:03 2025 +0800
[#9002] Improvement(iceberg): Optimize
HiveBackendProxy.resetIcebergHiveClientPool() to prevent memory leaks. (#9003)
### What changes were proposed in this pull request?
Optimize HiveBackendProxy.resetIcebergHiveClientPool() to prevent memory
leaks.
### Why are the changes needed?
Fix: #9002
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
N/A
Co-authored-by: Mini Yu <[email protected]>
---
.../authentication/kerberos/HiveBackendProxy.java | 79 ++++++--
.../utils/CaffeineSchedulerExtractorUtils.java | 200 +++++++++++++++++++++
.../utils/TestCaffeineSchedulerExtractorUtils.java | 148 +++++++++++++++
3 files changed, 417 insertions(+), 10 deletions(-)
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
index 4278756c96..2da071f08f 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
@@ -19,14 +19,17 @@
package org.apache.gravitino.iceberg.common.authentication.kerberos;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import
org.apache.gravitino.iceberg.common.utils.CaffeineSchedulerExtractorUtils;
import org.apache.gravitino.iceberg.common.utils.IcebergHiveCachedClientPool;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -36,13 +39,15 @@ import org.apache.hadoop.security.token.Token;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Proxy class for HiveCatalog to support kerberos authentication. We can also
make HiveCatalog as a
* generic type and pass it as a parameter to the constructor.
*/
public class HiveBackendProxy implements MethodInterceptor {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveBackendProxy.class);
private final HiveCatalog target;
private final String kerberosRealm;
private final UserGroupInformation proxyUser;
@@ -65,7 +70,7 @@ public class HiveBackendProxy implements MethodInterceptor {
this.newClientPool = resetIcebergHiveClientPool();
} catch (IOException e) {
throw new RuntimeException("Failed to get current user", e);
- } catch (IllegalAccessException | NoSuchFieldException e) {
+ } catch (IllegalAccessException e) {
throw new RuntimeException("Failed to reset IcebergHiveClientPool", e);
}
}
@@ -106,18 +111,72 @@ public class HiveBackendProxy implements
MethodInterceptor {
}
private ClientPool<IMetaStoreClient, TException> resetIcebergHiveClientPool()
- throws IllegalAccessException, NoSuchFieldException {
- final Field m = HiveCatalog.class.getDeclaredField("clients");
- m.setAccessible(true);
+ throws IllegalAccessException {
+ // Get the old client pool before replacing it
+ Object oldPool = FieldUtils.readField(target, "clients", true);
- // TODO: we need to close the original client pool and thread pool, or it
will cause memory
- // leak.
- ClientPool<IMetaStoreClient, TException> newClientPool =
+ // Create and set the new client pool first
+ IcebergHiveCachedClientPool newClientPool =
new IcebergHiveCachedClientPool(target.getConf(), properties);
- m.set(target, newClientPool);
+ FieldUtils.writeField(target, "clients", newClientPool, true);
+
+ // Then try to close the old pool to release resources
+ if (oldPool != null) {
+ // Try standard close method if available
+ if (oldPool instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) oldPool).close();
+ LOG.info("Successfully closed old Hive client pool");
+ } catch (Exception e) {
+ LOG.warn("Failed to close old Hive client pool", e);
+ }
+ }
+
+ // Additionally, try to shutdown the internal scheduler thread pool in
Iceberg's
+ // CachedClientPool to prevent memory leak. This is necessary because
Iceberg's
+ // CachedClientPool does not implement Closeable and does not properly
clean up
+ // its internal scheduler.
+ try {
+ shutdownIcebergCachedClientPoolScheduler(oldPool);
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to shutdown scheduler in old CachedClientPool, may cause
minor resource leak",
+ e);
+ }
+ }
+
return newClientPool;
}
+ /**
+ * Shuts down the scheduler thread pool in Iceberg's CachedClientPool.
+ *
+ * <p>Required because CachedClientPool doesn't provide cleanup, causing
thread pool leaks.
+ *
+ * @param clientPool The old CachedClientPool instance
+ */
+ @VisibleForTesting
+ void shutdownIcebergCachedClientPoolScheduler(Object clientPool) {
+ try {
+ Object cache = FieldUtils.readField(clientPool, "clientPoolCache", true);
+ if (cache == null) {
+ LOG.debug("clientPoolCache is null, no scheduler to shutdown");
+ return;
+ }
+
+ ScheduledExecutorService executor =
+ CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
+ if (executor != null) {
+ LOG.info("Shutting down scheduler thread pool from old
CachedClientPool");
+ executor.shutdownNow();
+ } else {
+ LOG.debug("Could not extract scheduler executor from cache");
+ }
+ } catch (IllegalAccessException e) {
+ LOG.debug("Failed to access clientPoolCache field", e);
+ }
+ }
+
public HiveCatalog getProxy() {
Enhancer e = new Enhancer();
e.setClassLoader(target.getClass().getClassLoader());
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/CaffeineSchedulerExtractorUtils.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/CaffeineSchedulerExtractorUtils.java
new file mode 100644
index 0000000000..361d5990e9
--- /dev/null
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/CaffeineSchedulerExtractorUtils.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.utils;
+
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for extracting schedulers from Caffeine caches.
+ *
+ * <p>Provides reflection-based access to Caffeine's internal
ScheduledExecutorService when caches
+ * don't properly clean up their scheduler thread pools.
+ *
+ * <p>Reflection path (Caffeine 2.9.3): Cache → pacer → scheduler →
ScheduledExecutorService
+ */
+public class CaffeineSchedulerExtractorUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(CaffeineSchedulerExtractorUtils.class);
+
+ private CaffeineSchedulerExtractorUtils() {
+ // Utility class, prevent instantiation
+ }
+
+ /**
+ * Extracts the ScheduledExecutorService from a Caffeine Cache using
reflection.
+ *
+ * <p>Works with various cache implementations: BoundedLocalManualCache,
BoundedLocalLoadingCache,
+ * etc.
+ *
+ * @param cache The Caffeine cache instance
+ * @return The ScheduledExecutorService if found, null otherwise
+ */
+ public static ScheduledExecutorService extractSchedulerExecutor(Object
cache) {
+ try {
+ // Step 1: Unwrap cache if it's a wrapper (e.g., BoundedLocalManualCache)
+ Object cacheImpl = unwrapCache(cache);
+
+ // Step 2: Get Pacer from cache
+ Object pacer = getPacerFromCache(cacheImpl);
+ if (pacer == null) {
+ return null;
+ }
+
+ // Step 3: Get Scheduler from Pacer
+ Scheduler scheduler = getSchedulerFromPacer(pacer);
+ if (scheduler == null) {
+ return null;
+ }
+
+ // Step 4: Extract ScheduledExecutorService from Scheduler
+ ScheduledExecutorService executorService =
getExecutorServiceFromScheduler(scheduler);
+ if (executorService != null) {
+ LOG.info("Successfully extracted ScheduledExecutorService from
Caffeine cache");
+ return executorService;
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Failed to extract scheduler from Caffeine cache: {}",
e.getMessage());
+ }
+
+ return null;
+ }
+
+ /**
+ * Unwraps cache wrappers (e.g., BoundedLocalManualCache) to access the
underlying implementation.
+ *
+ * @param cache The cache object (possibly a wrapper)
+ * @return The unwrapped cache implementation
+ */
+ @VisibleForTesting
+ public static Object unwrapCache(Object cache) {
+ if (cache == null) {
+ return null;
+ }
+
+ // Try to get inner cache field (for wrapper classes)
+ try {
+ Object innerCache = FieldUtils.readField(cache, "cache", true);
+ if (innerCache != null) {
+ return innerCache;
+ }
+ } catch (IllegalAccessException e) {
+ // Not a wrapper, use cache directly
+ }
+
+ return cache;
+ }
+
+ /**
+ * Gets the Pacer from cache implementation (traverses class hierarchy if
needed).
+ *
+ * @param cacheImpl The cache implementation
+ * @return The Pacer object, or null if not found
+ */
+ @VisibleForTesting
+ public static Object getPacerFromCache(Object cacheImpl) {
+ if (cacheImpl == null) {
+ return null;
+ }
+
+ // Traverse class hierarchy to find pacer field
+ Class<?> clazz = cacheImpl.getClass();
+ while (clazz != null && !clazz.equals(Object.class)) {
+ try {
+ Object pacer = FieldUtils.readField(cacheImpl, "pacer", true);
+ if (pacer != null && "Pacer".equals(pacer.getClass().getSimpleName()))
{
+ return pacer;
+ }
+ } catch (IllegalAccessException e) {
+ // Continue to superclass
+ }
+ clazz = clazz.getSuperclass();
+ }
+
+ return null;
+ }
+
+ /**
+ * Extracts Scheduler from Pacer.
+ *
+ * @param pacer The Pacer object
+ * @return The Scheduler instance, or null if not found
+ */
+ @VisibleForTesting
+ public static Scheduler getSchedulerFromPacer(Object pacer) {
+ if (pacer == null) {
+ return null;
+ }
+
+ try {
+ Object scheduler = FieldUtils.readField(pacer, "scheduler", true);
+ if (scheduler instanceof Scheduler) {
+ return (Scheduler) scheduler;
+ }
+ } catch (IllegalAccessException e) {
+ // Scheduler field not accessible
+ }
+
+ return null;
+ }
+
+ /**
+ * Extracts ScheduledExecutorService from Scheduler (handles
GuardedScheduler wrapper).
+ *
+ * @param scheduler The Scheduler instance
+ * @return The ScheduledExecutorService, or null if not found
+ */
+ @VisibleForTesting
+ public static ScheduledExecutorService
getExecutorServiceFromScheduler(Scheduler scheduler) {
+ if (scheduler == null) {
+ return null;
+ }
+
+ // Handle GuardedScheduler wrapper
+ Object actualScheduler = scheduler;
+ if ("GuardedScheduler".equals(scheduler.getClass().getSimpleName())) {
+ try {
+ Object delegate = FieldUtils.readField(scheduler, "delegate", true);
+ if (delegate != null) {
+ actualScheduler = delegate;
+ }
+ } catch (IllegalAccessException e) {
+ // Could not unwrap, use original
+ }
+ }
+
+ // Extract from ExecutorServiceScheduler
+ if
("ExecutorServiceScheduler".equals(actualScheduler.getClass().getSimpleName()))
{
+ try {
+ Object executorService =
+ FieldUtils.readField(actualScheduler, "scheduledExecutorService",
true);
+ if (executorService instanceof ScheduledExecutorService) {
+ return (ScheduledExecutorService) executorService;
+ }
+ } catch (IllegalAccessException e) {
+ // Could not extract
+ }
+ }
+ return null;
+ }
+}
diff --git
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestCaffeineSchedulerExtractorUtils.java
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestCaffeineSchedulerExtractorUtils.java
new file mode 100644
index 0000000000..ee4526f4dd
--- /dev/null
+++
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestCaffeineSchedulerExtractorUtils.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.utils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.Test;
+
+/** Test class for CaffeineSchedulerExtractorUtils. */
+public class TestCaffeineSchedulerExtractorUtils {
+
+ @Test
+ public void testExtractSchedulerExecutor() {
+ ScheduledExecutorService testScheduler =
ThreadPools.newScheduledPool("test-extract", 1);
+ Cache<String, String> cache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .scheduler(Scheduler.forScheduledExecutorService(testScheduler))
+ .build();
+
+ try {
+ // Use CaffeineSchedulerExtractorUtils to extract scheduler
+ ScheduledExecutorService extracted =
+ CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
+ assertNotNull(extracted, "Should successfully extract scheduler from
cache");
+
+ // Verification 1: Reference equality - they are the SAME object
+ assertSame(
+ testScheduler,
+ extracted,
+ "Extracted scheduler MUST be the same instance as the one we
provided");
+
+ // Verification 2: Initial state - both should NOT be shutdown
+ assertFalse(testScheduler.isShutdown(), "Original should not be shutdown
initially");
+ assertFalse(extracted.isShutdown(), "Extracted should not be shutdown
initially");
+
+ // Verification 3: Shutdown extracted and verify original is also
shutdown
+ extracted.shutdownNow();
+
+ // Both should be shutdown (proving they are the SAME object)
+ assertTrue(extracted.isShutdown(), "Extracted should be shutdown");
+ assertTrue(testScheduler.isShutdown(), "Original MUST be shutdown,
proving same instance!");
+
+ } catch (Exception e) {
+ testScheduler.shutdownNow();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testGetPacerFromCache() throws Exception {
+ ScheduledExecutorService testScheduler =
ThreadPools.newScheduledPool("test-pacer", 1);
+
+ Cache<String, String> cache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .scheduler(Scheduler.forScheduledExecutorService(testScheduler))
+ .build();
+
+ try {
+ // Unwrap cache using utility method
+ Object cacheImpl = CaffeineSchedulerExtractorUtils.unwrapCache(cache);
+ assertNotNull(cacheImpl, "Should unwrap cache");
+
+ // Get Pacer using utility method
+ Object pacer =
CaffeineSchedulerExtractorUtils.getPacerFromCache(cacheImpl);
+ assertNotNull(pacer, "Should successfully get Pacer object");
+ assertEquals("Pacer", pacer.getClass().getSimpleName(), "Should be Pacer
class");
+
+ } finally {
+ testScheduler.shutdownNow();
+ }
+ }
+
+ /** Test the complete extraction chain with identity verification at each
step. */
+ @Test
+ public void testCompleteExtractionChain() throws Exception {
+ ScheduledExecutorService testScheduler =
ThreadPools.newScheduledPool("test-chain", 1);
+
+ Cache<String, String> cache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .scheduler(Scheduler.forScheduledExecutorService(testScheduler))
+ .build();
+
+ try {
+ // Step 1: Unwrap cache
+ Object cacheImpl = CaffeineSchedulerExtractorUtils.unwrapCache(cache);
+ assertNotNull(cacheImpl, "Step 1: Should unwrap cache");
+
+ // Step 2: Get Pacer
+ Object pacer =
CaffeineSchedulerExtractorUtils.getPacerFromCache(cacheImpl);
+ assertNotNull(pacer, "Step 2: Should get Pacer");
+ assertEquals("Pacer", pacer.getClass().getSimpleName(), "Step 2: Should
be Pacer class");
+
+ // Step 3: Get Scheduler from Pacer
+ Scheduler scheduler =
CaffeineSchedulerExtractorUtils.getSchedulerFromPacer(pacer);
+ assertNotNull(scheduler, "Step 3: Should get Scheduler");
+
+ // Step 4: Get ExecutorService from Scheduler
+ ScheduledExecutorService executor =
+
CaffeineSchedulerExtractorUtils.getExecutorServiceFromScheduler(scheduler);
+ assertNotNull(executor, "Step 4: Should get ScheduledExecutorService");
+
+ // Step 5: CRITICAL - Identity verification
+ assertSame(testScheduler, executor, "Extracted MUST be same instance!");
+
+ // Step 6: Behavior verification - shutdown one affects the other
+ assertFalse(testScheduler.isShutdown(), "Original should not be shutdown
initially");
+ assertFalse(executor.isShutdown(), "Extracted should not be shutdown
initially");
+
+ executor.shutdownNow();
+
+ assertTrue(executor.isShutdown(), "Extracted should be shutdown");
+ assertTrue(testScheduler.isShutdown(), "Original MUST be shutdown - same
object!");
+
+ } catch (Exception e) {
+ testScheduler.shutdownNow();
+ throw e;
+ }
+ }
+}