This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 7b5c80ca3d [#8251][#8249] fix(fileset-catalog): Fix OOM problem for
fileset catalog (#8252)
7b5c80ca3d is described below
commit 7b5c80ca3d56686c40e5118b51815af063914a81
Author: Mini Yu <[email protected]>
AuthorDate: Mon Sep 1 20:34:33 2025 +0800
[#8251][#8249] fix(fileset-catalog): Fix OOM problem for fileset catalog
(#8252)
### What changes were proposed in this pull request?
1. Clear thread locals and daemon thread for cusom class loader to let
GC free it.
2. Fix tiemstamp can be 0 for azure file system.
### Why are the changes needed?
They are bugs.
Fix: #8251
Fix: #8249
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Test locally.
---
bundles/azure-bundle/build.gradle.kts | 7 +-
bundles/gcp-bundle/build.gradle.kts | 5 +-
.../utils/ClassLoaderResourceCleanerUtils.java | 303 +++++++++++++++++++++
.../fileset/SecureFilesetCatalogOperations.java | 3 +
.../org/apache/gravitino/dto/file/FileInfoDTO.java | 3 +-
5 files changed, 318 insertions(+), 3 deletions(-)
diff --git a/bundles/azure-bundle/build.gradle.kts
b/bundles/azure-bundle/build.gradle.kts
index 088b01a407..6ff704ea28 100644
--- a/bundles/azure-bundle/build.gradle.kts
+++ b/bundles/azure-bundle/build.gradle.kts
@@ -25,7 +25,12 @@ plugins {
}
dependencies {
- implementation(project(":bundles:azure"))
+ implementation(project(":bundles:azure")) {
+ // There is already a dependency on commons-logging v1.2 in hadoop-azure,
so exclude the one
+ // from the bundle.
+ exclude(group = "commons-logging", module = "commons-logging")
+ }
+
implementation(libs.hadoop3.abs)
implementation(libs.hadoop3.client.api)
implementation(libs.hadoop3.client.runtime)
diff --git a/bundles/gcp-bundle/build.gradle.kts
b/bundles/gcp-bundle/build.gradle.kts
index 1f0fc7ec23..df3d976b8f 100644
--- a/bundles/gcp-bundle/build.gradle.kts
+++ b/bundles/gcp-bundle/build.gradle.kts
@@ -25,7 +25,10 @@ plugins {
}
dependencies {
- implementation(project(":bundles:gcp"))
+ implementation(project(":bundles:gcp")) {
+ // There is already a dependency on commons-logging v1.2 in hadoop-gcs, so
exclude the one.
+ exclude(group = "commons-logging", module = "commons-logging")
+ }
implementation(libs.hadoop3.client.api)
implementation(libs.hadoop3.client.runtime)
implementation(libs.hadoop3.gcs)
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/utils/ClassLoaderResourceCleanerUtils.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/utils/ClassLoaderResourceCleanerUtils.java
new file mode 100644
index 0000000000..3601351e36
--- /dev/null
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/utils/ClassLoaderResourceCleanerUtils.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.utils;
+
+import java.lang.reflect.Field;
+import java.util.IdentityHashMap;
+import java.util.Timer;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to clean up resources related to a specific class loader to
prevent memory leaks.
+ * Gravitino will create a new class loader for each catalog and release it
when there exist any
+ * changes to the catalog. So, it's important to clean up resources related to
the class loader to
+ * prevent memory leaks.
+ */
+public class ClassLoaderResourceCleanerUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClassLoaderResourceCleanerUtils.class);
+
+ private ClassLoaderResourceCleanerUtils() {}
+
+ /**
+ * Close all resources related to the given class loader to prevent memory
leaks.
+ *
+ * @param classLoader the classloader to be closed
+ */
+ public static void closeClassLoaderResource(ClassLoader classLoader) {
+ boolean testEnv = System.getenv("GRAVITINO_TEST") != null;
+ if (testEnv) {
+ // In test environment, we do not need to clean up class loader related
stuff
+ return;
+ }
+
+ // Clear statics threads in FileSystem and close all FileSystem instances.
+ executeAndCatch(
+ ClassLoaderResourceCleanerUtils::closeStatsDataClearerInFileSystem,
classLoader);
+
+ // Stop all threads with the current class loader and clear their
threadLocal variables for
+ // jetty threads that are loaded by the current class loader.
+ // For example, thread local `threadData` in
FileSystem#StatisticsDataCleaner is created
+ // within jetty thread with the current class loader. However, there are
clear by
+ // `catalog.close` in ForkJoinPool in CaffeineCache, in this case, the
thread local variable
+ // will not be cleared, so we need to clear them manually here.
+ executeAndCatch(
+
ClassLoaderResourceCleanerUtils::stopThreadsAndClearThreadLocalVariables,
classLoader);
+
+ // Release the LogFactory for the classloader, each classloader has its
own LogFactory
+ // instance.
+
executeAndCatch(ClassLoaderResourceCleanerUtils::releaseLogFactoryInCommonLogging,
classLoader);
+
+ executeAndCatch(ClassLoaderResourceCleanerUtils::closeResourceInAWS,
classLoader);
+
+ executeAndCatch(ClassLoaderResourceCleanerUtils::closeResourceInGCP,
classLoader);
+
+ executeAndCatch(ClassLoaderResourceCleanerUtils::closeResourceInAzure,
classLoader);
+
+ executeAndCatch(ClassLoaderResourceCleanerUtils::clearShutdownHooks,
classLoader);
+ }
+
+ /**
+ * Close the stats data clearer thread in Hadoop FileSystem to prevent
memory leaks when using
+ *
+ * @param targetClassLoader the classloader where Hadoop FileSystem is loaded
+ */
+ private static void closeStatsDataClearerInFileSystem(ClassLoader
targetClassLoader)
+ throws Exception {
+ Class<?> fileSystemClass =
+ Class.forName("org.apache.hadoop.fs.FileSystem", true,
targetClassLoader);
+ MethodUtils.invokeStaticMethod(fileSystemClass, "closeAll");
+
+ Class<?> mutableQuantilesClass =
+ Class.forName("org.apache.hadoop.metrics2.lib.MutableQuantiles", true,
targetClassLoader);
+ Class<?> statisticsClass =
+ Class.forName("org.apache.hadoop.fs.FileSystem$Statistics", true,
targetClassLoader);
+
+ ScheduledExecutorService scheduler =
+ (ScheduledExecutorService)
+ FieldUtils.readStaticField(mutableQuantilesClass, "scheduler",
true);
+ scheduler.shutdownNow();
+ Field statisticsCleanerField = FieldUtils.getField(statisticsClass,
"STATS_DATA_CLEANER", true);
+ Object statisticsCleaner = statisticsCleanerField.get(null);
+ if (statisticsCleaner != null) {
+ ((Thread) statisticsCleaner).interrupt();
+ ((Thread) statisticsCleaner).setContextClassLoader(null);
+ ((Thread) statisticsCleaner).join();
+ }
+ }
+
+ /**
+ * Stop all threads that are using the target class loader and clear thread
local variables to
+ * prevent memory leaks.
+ *
+ * <pre>
+ * This method aims to:
+ * 1. Stop all threads that are using the target class loader.
+ * 2. Clear thread local variables in all threads that are using the target
class loader. some thread
+ * local variables are loaded in thread jetty-webserver-* threads, which are
long-lived threads and
+ * will not be stopped when the catalog is closed.
+ * </pre>
+ */
+ private static void stopThreadsAndClearThreadLocalVariables(ClassLoader
classLoader) {
+ Thread[] threads = getAllThreads();
+ for (Thread thread : threads) {
+ // First clear thread local variables
+ clearThreadLocalMap(thread, classLoader);
+ // Close all threads that are using the FilesetCatalogOperations class
loader
+ if (runningWithClassLoader(thread, classLoader)) {
+ LOG.info("Interrupting thread: {}", thread.getName());
+ thread.setContextClassLoader(null);
+ thread.interrupt();
+ try {
+ thread.join(500);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to join thread: {}", thread.getName(), e);
+ }
+ }
+ }
+ }
+
+ private static boolean runningWithClassLoader(Thread thread, ClassLoader
targetClassLoader) {
+ return thread != null && thread.getContextClassLoader() ==
targetClassLoader;
+ }
+
+ private static Thread[] getAllThreads() {
+ ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+ ThreadGroup parentGroup;
+ while ((parentGroup = rootGroup.getParent()) != null) {
+ rootGroup = parentGroup;
+ }
+
+ Thread[] threads = new Thread[rootGroup.activeCount()];
+ while (rootGroup.enumerate(threads, true) == threads.length) {
+ threads = new Thread[threads.length * 2];
+ }
+ return threads;
+ }
+
+ private static void clearThreadLocalMap(Thread thread, ClassLoader
targetClassLoader) {
+ if (thread == null ||
!thread.getName().startsWith("Gravitino-webserver-")) {
+ return;
+ }
+
+ try {
+ Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
+ threadLocalsField.setAccessible(true);
+ Object threadLocalMap = threadLocalsField.get(thread);
+
+ if (threadLocalMap != null) {
+ Class<?> tlmClass =
Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
+ Field tableField = tlmClass.getDeclaredField("table");
+ tableField.setAccessible(true);
+ Object[] table = (Object[]) tableField.get(threadLocalMap);
+
+ for (Object entry : table) {
+ if (entry != null) {
+ Object value = FieldUtils.readField(entry, "value", true);
+ if (value != null
+ && value.getClass().getClassLoader() != null
+ && value.getClass().getClassLoader() == targetClassLoader) {
+ LOG.info(
+ "Cleaning up thread local {} for thread {} with custom class
loader",
+ value,
+ thread.getName());
+ FieldUtils.writeField(entry, "value", null, true);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up thread locals for thread {}",
thread.getName(), e);
+ }
+ }
+
+ /**
+ * Clear shutdown hooks registered by the target class loader to prevent
memory leaks.
+ *
+ * <p>All shutdown hooks are run with the system class loader, so we need to
manually clear the
+ * shutdown hooks registered by the target class loader.
+ *
+ * @param targetClassLoader the classloader where the shutdown hooks are
registered.
+ */
+ private static void clearShutdownHooks(ClassLoader targetClassLoader) throws
Exception {
+ Class<?> shutdownHooks =
Class.forName("java.lang.ApplicationShutdownHooks");
+ IdentityHashMap<Thread, Thread> hooks =
+ (IdentityHashMap<Thread, Thread>)
FieldUtils.readStaticField(shutdownHooks, "hooks", true);
+
+ hooks
+ .entrySet()
+ .removeIf(
+ entry -> {
+ Thread thread = entry.getKey();
+ return thread.getContextClassLoader() == targetClassLoader;
+ });
+ }
+
+ /**
+ * Release the LogFactory for the target class loader to prevent memory
leaks.
+ *
+ * @param currentClassLoader the classloader where the commons-logging is
loaded.
+ */
+ private static void releaseLogFactoryInCommonLogging(ClassLoader
currentClassLoader)
+ throws Exception {
+ // Release the LogFactory for the FilesetCatalogOperations class loader
+ Class<?> logFactoryClass =
+ Class.forName("org.apache.commons.logging.LogFactory", true,
currentClassLoader);
+ MethodUtils.invokeStaticMethod(logFactoryClass, "release",
currentClassLoader);
+ }
+
+ /**
+ * Close the AWS SDK metrics MBean to prevent memory leaks when using AWS S3.
+ *
+ * @param classLoader the classloader where AWS SDK is loaded
+ */
+ private static void closeResourceInAWS(ClassLoader classLoader) throws
Exception {
+ // For Aws SDK metrics, unregister the metric admin MBean
+ Class<?> awsSdkMetricsClass =
+ Class.forName("com.amazonaws.metrics.AwsSdkMetrics", true,
classLoader);
+ MethodUtils.invokeStaticMethod(awsSdkMetricsClass,
"unregisterMetricAdminMBean");
+ }
+
+ private static void closeResourceInGCP(ClassLoader classLoader) throws
Exception {
+ // For GCS
+ Class<?> relocatedLogFactory =
+ Class.forName(
+
"org.apache.gravitino.gcp.shaded.org.apache.commons.logging.LogFactory",
+ true,
+ classLoader);
+ MethodUtils.invokeStaticMethod(relocatedLogFactory, "release",
classLoader);
+ }
+
+ /**
+ * Close the timer in AbfsClientThrottlingAnalyzer to prevent memory leaks
when using Azure Blob
+ * File System.
+ *
+ * <p>Timer is a daemon thread, so it won't prevent the JVM from shutting
down, but it will
+ * prevent the class loader from being garbage collected.
+ *
+ * @param classLoader the classloader where Azure Blob File System is loaded
+ */
+ private static void closeResourceInAzure(ClassLoader classLoader) throws
Exception {
+ // Clear timer in AbfsClientThrottlingAnalyzer
+ Class<?> abfsClientThrottlingInterceptClass =
+ Class.forName(
+
"org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept",
+ true,
+ classLoader);
+ Object abfsClientThrottlingIntercept =
+ FieldUtils.readStaticField(abfsClientThrottlingInterceptClass,
"singleton", true);
+
+ Object readThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept, "readThrottler",
true);
+ Object writeThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept, "writeThrottler",
true);
+
+ Timer readTimer = (Timer) FieldUtils.readField(readThrottler, "timer",
true);
+ readTimer.cancel();
+ Timer writeTimer = (Timer) FieldUtils.readField(writeThrottler, "timer",
true);
+ writeTimer.cancel();
+
+ // Release the LogFactory for the Azure shaded commons logging which has
been relocated
+ // by the Azure SDK
+ Class<?> relocatedLogFactory =
+ Class.forName(
+
"org.apache.gravitino.azure.shaded.org.apache.commons.logging.LogFactory",
+ true,
+ classLoader);
+ MethodUtils.invokeStaticMethod(relocatedLogFactory, "release",
classLoader);
+ }
+
+ @FunctionalInterface
+ private interface ThrowableConsumer<T> {
+ void accept(T t) throws Exception;
+ }
+
+ private static <T> void executeAndCatch(ThrowableConsumer<T> consumer, T
value) {
+ try {
+ consumer.accept(value);
+ } catch (Exception e) {
+ LOG.warn("Failed to execute consumer: ", e);
+ }
+ }
+}
diff --git
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
index e1ca543190..bdc7f48735 100644
---
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
@@ -65,6 +65,7 @@ import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.file.FilesetChange;
import org.apache.gravitino.meta.FilesetEntity;
import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.ClassLoaderResourceCleanerUtils;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.PrincipalUtils;
import org.slf4j.Logger;
@@ -271,6 +272,8 @@ public class SecureFilesetCatalogOperations
catalogUserContext.close();
UserContext.cleanAllUserContext();
+
+
ClassLoaderResourceCleanerUtils.closeClassLoaderResource(this.getClass().getClassLoader());
}
@Override
diff --git
a/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
b/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
index c826fa14f4..8e4720af2a 100644
--- a/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
+++ b/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
@@ -162,7 +162,8 @@ public class FileInfoDTO implements FileInfo {
public FileInfoDTO build() {
Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot
be null or empty");
Preconditions.checkArgument(size >= 0, "size cannot be negative");
- Preconditions.checkArgument(lastModified > 0, "lastModified must be a
valid timestamp");
+ // In Azure Blob Storage, it can be 0 for newly created files.
+ Preconditions.checkArgument(lastModified >= 0, "lastModified must be a
valid timestamp");
Preconditions.checkArgument(StringUtils.isNotBlank(path), "path cannot
be null or empty");
return new FileInfoDTO(name, isDir, size, lastModified, path);