This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f80b501d87 [#9966] improvement(iceberg): IRC should expire the catalog
wrapper cache timely (#9969)
f80b501d87 is described below
commit f80b501d87bebb22c7fc26dc997eee94740fb266
Author: roryqi <[email protected]>
AuthorDate: Thu Feb 12 21:55:55 2026 +0800
[#9966] improvement(iceberg): IRC should expire the catalog wrapper cache
timely (#9969)
### What changes were proposed in this pull request?
RC should expire the catalog wrapper tmely
### Why are the changes needed?
Fix: #9966
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add a UT.
---
.../apache/gravitino/catalog/CatalogManager.java | 22 +++++++++++++
.../gravitino/catalog/TestCatalogManager.java | 36 ++++++++++++++++++++++
.../service/IcebergCatalogWrapperManager.java | 12 ++++++++
.../TestIcebergCatalogWrapperManagerForREST.java | 19 ++++++++++++
4 files changed, 89 insertions(+)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index abc0b42517..e320decaac 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -53,6 +53,7 @@ import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Getter;
@@ -285,6 +286,7 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
private final EntityStore store;
private final IdGenerator idGenerator;
+ private final List<Consumer<NameIdentifier>> removalListeners =
Lists.newArrayList();
/**
* Constructs a CatalogManager instance.
@@ -304,6 +306,11 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
.expireAfterAccess(cacheEvictionIntervalInMs,
TimeUnit.MILLISECONDS)
.removalListener(
(k, v, c) -> {
+ for (Consumer<NameIdentifier> listener : removalListeners) {
+ if (k != null) {
+ listener.accept((NameIdentifier) k);
+ }
+ }
LOG.info("Closing catalog {}.", k);
((CatalogWrapper) v).close();
})
@@ -327,6 +334,21 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
catalogCache.invalidateAll();
}
+ /**
+ * Adds a listener that will be notified when a catalog is removed from the
cache.
+ *
+ * <p>Note: Cache eviction is invoked asynchronously but uses a single
thread to process removal
+ * events. To avoid blocking the eviction thread and delaying subsequent
cache operations,
+ * listeners should avoid performing heavy operations (such as I/O, network
calls, or complex
+ * computations) directly. Instead, consider offloading heavy work to a
separate thread or
+ * executor.
+ *
+ * @param listener The consumer to be called with the NameIdentifier of the
removed catalog.
+ */
+ public void addCatalogCacheRemoveListener(Consumer<NameIdentifier> listener)
{
+ removalListeners.add(listener);
+ }
+
/**
* Lists the catalogs within the specified namespace.
*
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
index ba08eba42f..6f5148b4c5 100644
--- a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
+++ b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
@@ -25,12 +25,14 @@ import static
org.apache.gravitino.TestCatalog.PROPERTY_KEY3;
import static org.apache.gravitino.TestCatalog.PROPERTY_KEY4;
import static org.apache.gravitino.TestCatalog.PROPERTY_KEY5_PREFIX;
import static org.apache.gravitino.TestCatalog.PROPERTY_KEY6_PREFIX;
+import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
+import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
@@ -680,6 +682,40 @@ public class TestCatalogManager {
Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident));
}
+ @Test
+ public void testCatalogCacheRemoveListener() {
+ NameIdentifier ident = NameIdentifier.of(metalake, "catalog");
+ Map<String, String> props =
+ ImmutableMap.of(
+ PROPERTY_KEY1, "value1", PROPERTY_KEY2, "value2",
PROPERTY_KEY5_PREFIX + "1", "value3");
+
+ // Create a catalog
+ catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider,
"comment", props);
+
+ // Load the catalog to add it to the cache
+ catalogManager.loadCatalog(ident);
+
Assertions.assertNotNull(catalogManager.getCatalogCache().getIfPresent(ident));
+
+ // Add a listener to track removed catalogs
+ Set<NameIdentifier> removedCatalogs = Sets.newConcurrentHashSet();
+ catalogManager.addCatalogCacheRemoveListener(removedCatalogs::add);
+
+ // Invalidate the cache to trigger the removal listener
+ catalogManager.getCatalogCache().invalidate(ident);
+
+ // Wait for the async eviction to complete
+ await()
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(
+ () -> {
+ Assertions.assertTrue(
+ removedCatalogs.contains(ident),
+ "Listener should be notified of catalog removal");
+ Assertions.assertEquals(
+ 1, removedCatalogs.size(), "Only one catalog should be
removed");
+ });
+ }
+
private void testProperties(Map<String, String> expectedProps, Map<String,
String> testProps) {
expectedProps.forEach(
(k, v) -> {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index a98ccd3beb..a8b82622d4 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
@@ -71,6 +72,17 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
.setNameFormat("iceberg-catalog-wrapper-cleaner-%d")
.build())))
.build();
+ IcebergRESTServerContext context = IcebergRESTServerContext.getInstance();
+ if (context.isAuxMode()) {
+ GravitinoEnv.getInstance()
+ .catalogManager()
+ .addCatalogCacheRemoveListener(
+ ident -> {
+ if (ident.namespace().level(0).equals(context.metalakeName()))
{
+ catalogWrapperCache.invalidate(ident.name());
+ }
+ });
+ }
}
/**
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
index aa4064fc67..e7ceb73ba5 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
@@ -21,19 +21,38 @@ package org.apache.gravitino.iceberg.service;
import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.catalog.CatalogManager;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
import
org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
public class TestIcebergCatalogWrapperManagerForREST {
private static final String DEFAULT_CATALOG = "memory";
+ @BeforeAll
+ public static void setup() throws IllegalAccessException {
+ // Mock CatalogManager for GravitinoEnv to avoid initialization errors
+ CatalogManager mockCatalogManager = Mockito.mock(CatalogManager.class);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager",
mockCatalogManager, true);
+ }
+
+ @AfterAll
+ public static void tearDown() throws IllegalAccessException {
+ // Clean up GravitinoEnv
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", null,
true);
+ }
+
@ParameterizedTest
@ValueSource(strings = {"", "hello/", "\\\n\t\\\'/", "\u0024/", "\100/",
"[_~/"})
public void testValidGetOps(String rawPrefix) {