Copilot commented on code in PR #8252:
URL: https://github.com/apache/gravitino/pull/8252#discussion_r2293271500
##########
core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java:
##########
@@ -244,6 +244,9 @@ public void close() {
catalog = null;
return null;
});
+ // Let hooks close first before classLoader close, so that the hook can
+ // release any resources that the catalog might be using.
+ Thread.sleep(500);
Review Comment:
Using Thread.sleep() for timing coordination is unreliable and blocks the
calling thread. Consider using a CountDownLatch or CompletableFuture to wait
for hook completion, or implement proper shutdown coordination through the
catalog interface.
```suggestion
// Removed Thread.sleep(500); -- hooks should be closed
synchronously.
```
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java:
##########
@@ -271,6 +280,176 @@ public void close() throws IOException {
catalogUserContext.close();
UserContext.cleanAllUserContext();
+
+ try {
+
+ closeStatsDataClearerInFileSystem();
+
+ FileSystem.closeAll();
+
+ // Clear all thread references to the ClosableHiveCatalog class loader.
+ Thread[] threads = getAllThreads();
+ for (Thread thread : threads) {
+ // Clear thread local map for webserver threads in the current class
loader
+ clearThreadLocalMap(thread);
+
+ // Close all threads that are using the FilesetCatalogOperations class
loader
+ if (runningWithCurrentClassLoader(thread)) {
+ LOG.info("Interrupting peer cache thread: {}", thread.getName());
+ thread.setContextClassLoader(null);
+ thread.interrupt();
+ try {
+ thread.join(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to join peer cache thread: {}", thread.getName(),
e);
+ }
+ }
+ }
+
+ // Release the LogFactory for the FilesetCatalogOperations class loader
+
LogFactory.release(SecureFilesetCatalogOperations.class.getClassLoader());
+
+ // For Aws SDK metrics, unregister the metric admin MBean
+ try {
+ Class<?> methodUtilsClass =
Class.forName("com.amazonaws.metrics.AwsSdkMetrics");
+ MethodUtils.invokeStaticMethod(methodUtilsClass,
"unregisterMetricAdminMBean");
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister AWS SDK metrics admin MBean", e);
+ // This is not critical, so we just log the warning
+ }
+
+ // For GCS
+ try {
+ Class<?> relocatedLogFactory =
+
Class.forName("org.apache.gravitino.gcp.shaded.org.apache.commons.logging.LogFactory");
+ MethodUtils.invokeStaticMethod(
+ relocatedLogFactory, "release",
SecureFilesetCatalogOperations.class.getClassLoader());
+ } catch (Exception e) {
+ LOG.warn("Failed to find GCS shaded LogFactory", e);
+ }
+
+ // For Azure
+ try {
+ Class<?> relocatedLogFactory =
+ Class.forName(
+
"org.apache.gravitino.azure.shaded.org.apache.commons.logging.LogFactory");
+ MethodUtils.invokeStaticMethod(
+ relocatedLogFactory, "release",
SecureFilesetCatalogOperations.class.getClassLoader());
+
+ // Clear timer in AbfsClientThrottlingAnalyzer
+ Class<?> abfsClientThrottlingInterceptClass =
+
Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept");
+ Object abfsClientThrottlingIntercept =
+ FieldUtils.readStaticField(abfsClientThrottlingInterceptClass,
"singleton", true);
+
+ Object readThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept,
"readThrottler", true);
+ Object writeThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept,
"writeThrottler", true);
+
+ Timer readTimer = (Timer) FieldUtils.readField(readThrottler, "timer",
true);
+ readTimer.cancel();
+ Timer writeTimer = (Timer) FieldUtils.readField(writeThrottler,
"timer", true);
+ writeTimer.cancel();
+ } catch (Exception e) {
+ LOG.warn("Failed to find GCS shaded LogFactory", e);
+ }
+
+ clearShutdownHooks();
+ } catch (Exception e) {
+ LOG.warn("Failed to clear FileSystem statistics cleaner thread", e);
+ }
+ }
+
+ private static void clearThreadLocalMap(Thread thread) {
+ if (thread != null && thread.getName().startsWith("Gravitino-webserver-"))
{
+ // try to
+ try {
+ Field threadLocalsField =
Thread.class.getDeclaredField("threadLocals");
+ threadLocalsField.setAccessible(true);
+ Object threadLocalMap = threadLocalsField.get(thread);
+
+ if (threadLocalMap != null) {
+ Class<?> tlmClass =
Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
+ Field tableField = tlmClass.getDeclaredField("table");
+ tableField.setAccessible(true);
+ Object[] table = (Object[]) tableField.get(threadLocalMap);
+
+ for (Object entry : table) {
+ if (entry != null) {
+ Object value = FieldUtils.readField(entry, "value", true);
+ if (value != null
+ && value.getClass().getClassLoader() != null
+ && value.getClass().getClassLoader()
+ ==
SecureFilesetCatalogOperations.class.getClassLoader()) {
+ LOG.info(
+ "Cleaning up thread local {} for thread {} with custom
class loader",
+ value,
+ thread.getName());
+ FieldUtils.writeField(entry, "value", null, true);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up thread locals for thread {}",
thread.getName(), e);
+ }
+ }
+ }
+
+ private static void closeStatsDataClearerInFileSystem()
+ throws IllegalAccessException, InterruptedException {
+ ScheduledExecutorService scheduler =
+ (ScheduledExecutorService)
+ FieldUtils.readStaticField(MutableQuantiles.class, "scheduler",
true);
+ scheduler.shutdownNow();
+ Field statisticsCleanerField =
+ FieldUtils.getField(FileSystem.Statistics.class, "STATS_DATA_CLEANER",
true);
+ Object statisticsCleaner = statisticsCleanerField.get(null);
+ if (statisticsCleaner != null) {
+ ((Thread) statisticsCleaner).interrupt();
+ ((Thread) statisticsCleaner).setContextClassLoader(null);
+ ((Thread) statisticsCleaner).join();
+ }
+ }
+
+ private static Thread[] getAllThreads() {
+ ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+ ThreadGroup parentGroup;
+ while ((parentGroup = rootGroup.getParent()) != null) {
+ rootGroup = parentGroup;
+ }
+
+ Thread[] threads = new Thread[rootGroup.activeCount()];
+ while (rootGroup.enumerate(threads, true) == threads.length) {
+ threads = new Thread[threads.length * 2];
+ }
+ return threads;
+ }
+
+ private static boolean runningWithCurrentClassLoader(Thread thread) {
+ return thread != null
+ && thread.getContextClassLoader() ==
FilesetCatalogOperations.class.getClassLoader();
+ }
+
+ public static void clearShutdownHooks() {
+ try {
+ Class<?> shutdownHooks =
Class.forName("java.lang.ApplicationShutdownHooks");
+ IdentityHashMap<Thread, Thread> hooks =
+ (IdentityHashMap<Thread, Thread>)
+ FieldUtils.readStaticField(shutdownHooks, "hooks", true);
+
+ hooks
+ .entrySet()
+ .removeIf(
+ entry -> {
+ Thread thread = entry.getKey();
+ return thread.getContextClassLoader()
+ == FilesetCatalogOperations.class.getClassLoader();
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to clean shutdown hooks", e);
Review Comment:
Throwing a RuntimeException during cleanup could prevent other cleanup
operations from completing. Consider logging the error instead of throwing, or
ensure this is the last cleanup operation.
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java:
##########
@@ -271,6 +280,176 @@ public void close() throws IOException {
catalogUserContext.close();
UserContext.cleanAllUserContext();
+
+ try {
+
+ closeStatsDataClearerInFileSystem();
+
+ FileSystem.closeAll();
+
+ // Clear all thread references to the ClosableHiveCatalog class loader.
+ Thread[] threads = getAllThreads();
+ for (Thread thread : threads) {
+ // Clear thread local map for webserver threads in the current class
loader
+ clearThreadLocalMap(thread);
+
+ // Close all threads that are using the FilesetCatalogOperations class
loader
+ if (runningWithCurrentClassLoader(thread)) {
+ LOG.info("Interrupting peer cache thread: {}", thread.getName());
+ thread.setContextClassLoader(null);
+ thread.interrupt();
+ try {
+ thread.join(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to join peer cache thread: {}", thread.getName(),
e);
+ }
+ }
+ }
+
+ // Release the LogFactory for the FilesetCatalogOperations class loader
+
LogFactory.release(SecureFilesetCatalogOperations.class.getClassLoader());
+
+ // For Aws SDK metrics, unregister the metric admin MBean
+ try {
+ Class<?> methodUtilsClass =
Class.forName("com.amazonaws.metrics.AwsSdkMetrics");
+ MethodUtils.invokeStaticMethod(methodUtilsClass,
"unregisterMetricAdminMBean");
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister AWS SDK metrics admin MBean", e);
+ // This is not critical, so we just log the warning
+ }
+
+ // For GCS
+ try {
+ Class<?> relocatedLogFactory =
+
Class.forName("org.apache.gravitino.gcp.shaded.org.apache.commons.logging.LogFactory");
+ MethodUtils.invokeStaticMethod(
+ relocatedLogFactory, "release",
SecureFilesetCatalogOperations.class.getClassLoader());
+ } catch (Exception e) {
+ LOG.warn("Failed to find GCS shaded LogFactory", e);
+ }
+
+ // For Azure
+ try {
+ Class<?> relocatedLogFactory =
+ Class.forName(
+
"org.apache.gravitino.azure.shaded.org.apache.commons.logging.LogFactory");
+ MethodUtils.invokeStaticMethod(
+ relocatedLogFactory, "release",
SecureFilesetCatalogOperations.class.getClassLoader());
+
+ // Clear timer in AbfsClientThrottlingAnalyzer
+ Class<?> abfsClientThrottlingInterceptClass =
+
Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept");
+ Object abfsClientThrottlingIntercept =
+ FieldUtils.readStaticField(abfsClientThrottlingInterceptClass,
"singleton", true);
+
+ Object readThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept,
"readThrottler", true);
+ Object writeThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept,
"writeThrottler", true);
+
+ Timer readTimer = (Timer) FieldUtils.readField(readThrottler, "timer",
true);
+ readTimer.cancel();
+ Timer writeTimer = (Timer) FieldUtils.readField(writeThrottler,
"timer", true);
+ writeTimer.cancel();
+ } catch (Exception e) {
+ LOG.warn("Failed to find GCS shaded LogFactory", e);
Review Comment:
The error message incorrectly refers to "GCS shaded LogFactory" when the
code is actually handling Azure LogFactory cleanup. The message should be
"Failed to find Azure shaded LogFactory".
```suggestion
LOG.warn("Failed to find Azure shaded LogFactory", e);
```
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java:
##########
@@ -271,6 +280,176 @@ public void close() throws IOException {
catalogUserContext.close();
UserContext.cleanAllUserContext();
+
+ try {
+
+ closeStatsDataClearerInFileSystem();
+
+ FileSystem.closeAll();
+
+ // Clear all thread references to the ClosableHiveCatalog class loader.
+ Thread[] threads = getAllThreads();
+ for (Thread thread : threads) {
+ // Clear thread local map for webserver threads in the current class
loader
+ clearThreadLocalMap(thread);
+
+ // Close all threads that are using the FilesetCatalogOperations class
loader
+ if (runningWithCurrentClassLoader(thread)) {
+ LOG.info("Interrupting peer cache thread: {}", thread.getName());
+ thread.setContextClassLoader(null);
+ thread.interrupt();
+ try {
+ thread.join(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to join peer cache thread: {}", thread.getName(),
e);
+ }
+ }
+ }
+
+ // Release the LogFactory for the FilesetCatalogOperations class loader
+
LogFactory.release(SecureFilesetCatalogOperations.class.getClassLoader());
+
+ // For Aws SDK metrics, unregister the metric admin MBean
+ try {
+ Class<?> methodUtilsClass =
Class.forName("com.amazonaws.metrics.AwsSdkMetrics");
+ MethodUtils.invokeStaticMethod(methodUtilsClass,
"unregisterMetricAdminMBean");
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister AWS SDK metrics admin MBean", e);
+ // This is not critical, so we just log the warning
+ }
+
+ // For GCS
+ try {
+ Class<?> relocatedLogFactory =
+
Class.forName("org.apache.gravitino.gcp.shaded.org.apache.commons.logging.LogFactory");
+ MethodUtils.invokeStaticMethod(
+ relocatedLogFactory, "release",
SecureFilesetCatalogOperations.class.getClassLoader());
+ } catch (Exception e) {
+ LOG.warn("Failed to find GCS shaded LogFactory", e);
+ }
+
+ // For Azure
+ try {
+ Class<?> relocatedLogFactory =
+ Class.forName(
+
"org.apache.gravitino.azure.shaded.org.apache.commons.logging.LogFactory");
+ MethodUtils.invokeStaticMethod(
+ relocatedLogFactory, "release",
SecureFilesetCatalogOperations.class.getClassLoader());
+
+ // Clear timer in AbfsClientThrottlingAnalyzer
+ Class<?> abfsClientThrottlingInterceptClass =
+
Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept");
+ Object abfsClientThrottlingIntercept =
+ FieldUtils.readStaticField(abfsClientThrottlingInterceptClass,
"singleton", true);
+
+ Object readThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept,
"readThrottler", true);
+ Object writeThrottler =
+ FieldUtils.readField(abfsClientThrottlingIntercept,
"writeThrottler", true);
+
+ Timer readTimer = (Timer) FieldUtils.readField(readThrottler, "timer",
true);
+ readTimer.cancel();
+ Timer writeTimer = (Timer) FieldUtils.readField(writeThrottler,
"timer", true);
+ writeTimer.cancel();
+ } catch (Exception e) {
+ LOG.warn("Failed to find GCS shaded LogFactory", e);
+ }
+
+ clearShutdownHooks();
+ } catch (Exception e) {
+ LOG.warn("Failed to clear FileSystem statistics cleaner thread", e);
+ }
+ }
+
+ private static void clearThreadLocalMap(Thread thread) {
+ if (thread != null && thread.getName().startsWith("Gravitino-webserver-"))
{
+ // try to
Review Comment:
The comment on line 366 is incomplete ("try to"). It should be completed to
explain what the code is attempting to do, such as "try to clear thread local
variables for webserver threads".
```suggestion
// Try to clear thread local variables for Gravitino webserver threads
using the custom class loader
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]