This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new f80b501d87 [#9966] improvement(iceberg): IRC should expire the catalog 
wrapper cache timely (#9969)
f80b501d87 is described below

commit f80b501d87bebb22c7fc26dc997eee94740fb266
Author: roryqi <[email protected]>
AuthorDate: Thu Feb 12 21:55:55 2026 +0800

    [#9966] improvement(iceberg): IRC should expire the catalog wrapper cache 
timely (#9969)
    
    ### What changes were proposed in this pull request?
    
    RC should expire the catalog wrapper tmely
    
    ### Why are the changes needed?
    
    Fix: #9966
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add a UT.
---
 .../apache/gravitino/catalog/CatalogManager.java   | 22 +++++++++++++
 .../gravitino/catalog/TestCatalogManager.java      | 36 ++++++++++++++++++++++
 .../service/IcebergCatalogWrapperManager.java      | 12 ++++++++
 .../TestIcebergCatalogWrapperManagerForREST.java   | 19 ++++++++++++
 4 files changed, 89 insertions(+)

diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index abc0b42517..e320decaac 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -53,6 +53,7 @@ import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.Getter;
@@ -285,6 +286,7 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
   private final EntityStore store;
 
   private final IdGenerator idGenerator;
+  private final List<Consumer<NameIdentifier>> removalListeners = 
Lists.newArrayList();
 
   /**
    * Constructs a CatalogManager instance.
@@ -304,6 +306,11 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
             .expireAfterAccess(cacheEvictionIntervalInMs, 
TimeUnit.MILLISECONDS)
             .removalListener(
                 (k, v, c) -> {
+                  for (Consumer<NameIdentifier> listener : removalListeners) {
+                    if (k != null) {
+                      listener.accept((NameIdentifier) k);
+                    }
+                  }
                   LOG.info("Closing catalog {}.", k);
                   ((CatalogWrapper) v).close();
                 })
@@ -327,6 +334,21 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     catalogCache.invalidateAll();
   }
 
+  /**
+   * Adds a listener that will be notified when a catalog is removed from the 
cache.
+   *
+   * <p>Note: Cache eviction is invoked asynchronously but uses a single 
thread to process removal
+   * events. To avoid blocking the eviction thread and delaying subsequent 
cache operations,
+   * listeners should avoid performing heavy operations (such as I/O, network 
calls, or complex
+   * computations) directly. Instead, consider offloading heavy work to a 
separate thread or
+   * executor.
+   *
+   * @param listener The consumer to be called with the NameIdentifier of the 
removed catalog.
+   */
+  public void addCatalogCacheRemoveListener(Consumer<NameIdentifier> listener) 
{
+    removalListeners.add(listener);
+  }
+
   /**
    * Lists the catalogs within the specified namespace.
    *
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java 
b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
index ba08eba42f..6f5148b4c5 100644
--- a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
+++ b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
@@ -25,12 +25,14 @@ import static 
org.apache.gravitino.TestCatalog.PROPERTY_KEY3;
 import static org.apache.gravitino.TestCatalog.PROPERTY_KEY4;
 import static org.apache.gravitino.TestCatalog.PROPERTY_KEY5_PREFIX;
 import static org.apache.gravitino.TestCatalog.PROPERTY_KEY6_PREFIX;
+import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
 import java.util.Set;
@@ -680,6 +682,40 @@ public class TestCatalogManager {
     
Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident));
   }
 
+  @Test
+  public void testCatalogCacheRemoveListener() {
+    NameIdentifier ident = NameIdentifier.of(metalake, "catalog");
+    Map<String, String> props =
+        ImmutableMap.of(
+            PROPERTY_KEY1, "value1", PROPERTY_KEY2, "value2", 
PROPERTY_KEY5_PREFIX + "1", "value3");
+
+    // Create a catalog
+    catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
"comment", props);
+
+    // Load the catalog to add it to the cache
+    catalogManager.loadCatalog(ident);
+    
Assertions.assertNotNull(catalogManager.getCatalogCache().getIfPresent(ident));
+
+    // Add a listener to track removed catalogs
+    Set<NameIdentifier> removedCatalogs = Sets.newConcurrentHashSet();
+    catalogManager.addCatalogCacheRemoveListener(removedCatalogs::add);
+
+    // Invalidate the cache to trigger the removal listener
+    catalogManager.getCatalogCache().invalidate(ident);
+
+    // Wait for the async eviction to complete
+    await()
+        .atMost(Duration.ofSeconds(5))
+        .untilAsserted(
+            () -> {
+              Assertions.assertTrue(
+                  removedCatalogs.contains(ident),
+                  "Listener should be notified of catalog removal");
+              Assertions.assertEquals(
+                  1, removedCatalogs.size(), "Only one catalog should be 
removed");
+            });
+  }
+
   private void testProperties(Map<String, String> expectedProps, Map<String, 
String> testProps) {
     expectedProps.forEach(
         (k, v) -> {
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index a98ccd3beb..a8b82622d4 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
@@ -71,6 +72,17 @@ public class IcebergCatalogWrapperManager implements 
AutoCloseable {
                             
.setNameFormat("iceberg-catalog-wrapper-cleaner-%d")
                             .build())))
             .build();
+    IcebergRESTServerContext context = IcebergRESTServerContext.getInstance();
+    if (context.isAuxMode()) {
+      GravitinoEnv.getInstance()
+          .catalogManager()
+          .addCatalogCacheRemoveListener(
+              ident -> {
+                if (ident.namespace().level(0).equals(context.metalakeName())) 
{
+                  catalogWrapperCache.invalidate(ident.name());
+                }
+              });
+    }
   }
 
   /**
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
index aa4064fc67..e7ceb73ba5 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
@@ -21,19 +21,38 @@ package org.apache.gravitino.iceberg.service;
 import com.google.common.collect.Maps;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
 import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
 import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
 import 
org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
 
 public class TestIcebergCatalogWrapperManagerForREST {
 
   private static final String DEFAULT_CATALOG = "memory";
 
+  @BeforeAll
+  public static void setup() throws IllegalAccessException {
+    // Mock CatalogManager for GravitinoEnv to avoid initialization errors
+    CatalogManager mockCatalogManager = Mockito.mock(CatalogManager.class);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
mockCatalogManager, true);
+  }
+
+  @AfterAll
+  public static void tearDown() throws IllegalAccessException {
+    // Clean up GravitinoEnv
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", null, 
true);
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {"", "hello/", "\\\n\t\\\'/", "\u0024/", "\100/", 
"[_~/"})
   public void testValidGetOps(String rawPrefix) {

Reply via email to