This is an automated email from the ASF dual-hosted git repository. mchades pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new 5159313d70 [#7179] improvement(core): Remove old cached System (#7459) 5159313d70 is described below commit 5159313d700977ac677ca7a8ff34c4ebfe4fc71b Author: Lord of Abyss <103809695+abyss-l...@users.noreply.github.com> AuthorDate: Mon Jun 30 09:34:46 2025 +0800 [#7179] improvement(core): Remove old cached System (#7459) ### What changes were proposed in this pull request? Remove old cached System. To unify the caching architecture, we should remove legacy cache modules that directly proxy the underlying EntityStore, especially those that simply cache raw entity results. For example: - `MetalakeManager`: its cache is a direct wrapper around EntityStore. - `FilesetCatalogOperations`: its caching logic is also a shallow reuse of the underlying store. In contrast, cache modules whose content is not directly sourced from the EntityStore, and which provide additional encapsulation or are relied on by external systems, should be preserved. For example: - `CatalogManager`: it caches CatalogWrapper objects instead of raw Entity instances, and is externally depended upon. In summary, the cache cleanup strategy should be based on two principles: - Whether the cache directly proxies the EntityStore. - Whether it is externally depended upon or involves semantic wrapping. This helps clearly distinguish between caches that should be removed and those that should be retained. ### Why are the changes needed? Fix: #7179 ### Does this PR introduce _any_ user-facing change? no. ### How was this patch tested? local test. --- .../catalog/fileset/FilesetCatalogOperations.java | 95 +++++------ .../apache/gravitino/metalake/MetalakeManager.java | 174 ++++++++------------- .../gravitino/metalake/TestMetalakeManager.java | 60 ------- 3 files changed, 100 insertions(+), 229 deletions(-) diff --git a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java index ef88bdf175..9fa992d333 100644 --- a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java +++ b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java @@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.Entity; @@ -130,8 +129,6 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations private boolean disableFSOps; - private Cache<NameIdentifier, FilesetImpl> filesetCache; - FilesetCatalogOperations(EntityStore store) { this.store = store; } @@ -194,7 +191,6 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations } this.catalogStorageLocations = getAndCheckCatalogStorageLocations(config); - this.filesetCache = initializeFilesetCache(config); } @Override @@ -217,28 +213,24 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations @Override public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException { - return filesetCache.get( - ident, - k -> { - try { - FilesetEntity filesetEntity = - store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class); - - return FilesetImpl.builder() - .withName(ident.name()) - .withType(filesetEntity.filesetType()) - .withComment(filesetEntity.comment()) - .withStorageLocations(filesetEntity.storageLocations()) - .withProperties(filesetEntity.properties()) - .withAuditInfo(filesetEntity.auditInfo()) - .build(); - - } catch (NoSuchEntityException exception) { - throw new NoSuchFilesetException(exception, FILESET_DOES_NOT_EXIST_MSG, ident); - } catch (IOException ioe) { - throw new RuntimeException("Failed to load fileset %s" + ident, ioe); - } - }); + try { + FilesetEntity filesetEntity = + store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class); + + return FilesetImpl.builder() + .withName(ident.name()) + .withType(filesetEntity.filesetType()) + .withComment(filesetEntity.comment()) + .withStorageLocations(filesetEntity.storageLocations()) + .withProperties(filesetEntity.properties()) + .withAuditInfo(filesetEntity.auditInfo()) + .build(); + + } catch (NoSuchEntityException exception) { + throw new NoSuchFilesetException(exception, FILESET_DOES_NOT_EXIST_MSG, ident); + } catch (IOException ioe) { + throw new RuntimeException("Failed to load fileset %s" + ident, ioe); + } } @Override @@ -300,12 +292,6 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations } }); - // Check if the fileset already existed in cache first. If it does, it means the fileset is - // already created, so we should throw an exception. - if (filesetCache.getIfPresent(ident) != null) { - throw new FilesetAlreadyExistsException("Fileset %s already exists", ident); - } - try { if (store.exists(ident, Entity.EntityType.FILESET)) { throw new FilesetAlreadyExistsException("Fileset %s already exists", ident); @@ -448,17 +434,14 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations throw new RuntimeException("Failed to create fileset " + ident, ioe); } - FilesetImpl fileset = - FilesetImpl.builder() - .withName(ident.name()) - .withComment(comment) - .withType(type) - .withStorageLocations(formattedStorageLocations) - .withProperties(filesetEntity.properties()) - .withAuditInfo(filesetEntity.auditInfo()) - .build(); - filesetCache.put(ident, fileset); - return fileset; + return FilesetImpl.builder() + .withName(ident.name()) + .withComment(comment) + .withType(type) + .withStorageLocations(formattedStorageLocations) + .withProperties(filesetEntity.properties()) + .withAuditInfo(filesetEntity.auditInfo()) + .build(); } private Map<String, String> setDefaultLocationIfAbsent( @@ -512,7 +495,6 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations throw new RuntimeException("Failed to load fileset " + ident, ioe); } - filesetCache.invalidate(ident); try { FilesetEntity updatedFilesetEntity = store.update( @@ -521,18 +503,14 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations Entity.EntityType.FILESET, e -> updateFilesetEntity(ident, e, changes)); - FilesetImpl fileset = - FilesetImpl.builder() - .withName(updatedFilesetEntity.name()) - .withComment(updatedFilesetEntity.comment()) - .withType(updatedFilesetEntity.filesetType()) - .withStorageLocations(updatedFilesetEntity.storageLocations()) - .withProperties(updatedFilesetEntity.properties()) - .withAuditInfo(updatedFilesetEntity.auditInfo()) - .build(); - filesetCache.put(updatedFilesetEntity.nameIdentifier(), fileset); - return fileset; - + return FilesetImpl.builder() + .withName(updatedFilesetEntity.name()) + .withComment(updatedFilesetEntity.comment()) + .withType(updatedFilesetEntity.filesetType()) + .withStorageLocations(updatedFilesetEntity.storageLocations()) + .withProperties(updatedFilesetEntity.properties()) + .withAuditInfo(updatedFilesetEntity.auditInfo()) + .build(); } catch (IOException ioe) { throw new RuntimeException("Failed to update fileset " + ident, ioe); } catch (NoSuchEntityException nsee) { @@ -589,7 +567,6 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations } } - filesetCache.invalidate(ident); return store.delete(ident, Entity.EntityType.FILESET); } catch (NoSuchEntityException ne) { LOG.warn("Fileset {} does not exist", ident); @@ -788,8 +765,6 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(), properties); boolean dropped = super.dropSchema(ident, cascade); - filesetCache.invalidateAll( - filesets.stream().map(FilesetEntity::nameIdentifier).collect(Collectors.toList())); if (disableFSOps) { return dropped; } @@ -920,7 +895,7 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations @Override public void close() throws IOException { - filesetCache.invalidateAll(); + // do nothing } private Cache<NameIdentifier, FilesetImpl> initializeFilesetCache(Map<String, String> config) { diff --git a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java index cbe1fa6cb0..9b2a968acf 100644 --- a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java +++ b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java @@ -20,12 +20,7 @@ package org.apache.gravitino.metalake; import static org.apache.gravitino.Metalake.PROPERTY_IN_USE; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.Scheduler; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.Closeable; import java.io.IOException; import java.time.Instant; @@ -33,8 +28,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.gravitino.Entity.EntityType; import org.apache.gravitino.EntityAlreadyExistsException; import org.apache.gravitino.EntityStore; @@ -72,26 +65,9 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { private final IdGenerator idGenerator; - // Currently, there will be only one MetalakeManager instance in the system. In this case - // we can clear or close the cache when the instance is destroyed. - @VisibleForTesting - static final Cache<NameIdentifier, BaseMetalake> METALAKE_CACHE = - Caffeine.newBuilder() - .expireAfterAccess(24, TimeUnit.HOURS) - .removalListener((k, v, c) -> LOG.info("Closing metalake {}.", k)) - .scheduler( - Scheduler.forScheduledExecutorService( - new ScheduledThreadPoolExecutor( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("metalake-cleaner-%d") - .build()))) - .build(); - @Override public void close() { - METALAKE_CACHE.invalidateAll(); + // do nothing } /** @@ -104,11 +80,11 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { this.store = store; this.idGenerator = idGenerator; - // pre-load all metalakes and put them into cache, this is useful when user load schema/table + // preload all metalakes and put them into cache, this is useful when user load schema/table // directly without list/get metalake first. - BaseMetalake[] metalakes = listMetalakes(); - for (BaseMetalake metalake : metalakes) { - METALAKE_CACHE.put(metalake.nameIdentifier(), metalake); + BaseMetalake[] baseMetalakes = listMetalakes(); + for (BaseMetalake baseMetalake : baseMetalakes) { + loadMetalake(baseMetalake.nameIdentifier()); } } @@ -140,10 +116,7 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { public static boolean metalakeInUse(EntityStore store, NameIdentifier ident) throws NoSuchMetalakeException { try { - BaseMetalake metalake = METALAKE_CACHE.getIfPresent(ident); - if (metalake == null) { - metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); - } + BaseMetalake metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); return (boolean) metalake.propertiesMetadata().getOrDefault(metalake.properties(), PROPERTY_IN_USE); } catch (NoSuchEntityException e) { @@ -191,22 +164,18 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { return TreeLockUtils.doWithTreeLock( ident, LockType.READ, - () -> - METALAKE_CACHE.get( - ident, - k -> { - try { - BaseMetalake baseMetalake = - store.get(ident, EntityType.METALAKE, BaseMetalake.class); - return newMetalakeWithResolvedProperties(baseMetalake); - } catch (NoSuchEntityException e) { - LOG.warn("Metalake {} does not exist", ident, e); - throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); - } catch (IOException ioe) { - LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); - throw new RuntimeException(ioe); - } - })); + () -> { + try { + BaseMetalake baseMetalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); + return newMetalakeWithResolvedProperties(baseMetalake); + } catch (NoSuchEntityException e) { + LOG.warn("Metalake {} does not exist", ident, e); + throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); + } catch (IOException ioe) { + LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); + throw new RuntimeException(ioe); + } + }); } private BaseMetalake newMetalakeWithResolvedProperties(BaseMetalake metalakeEntity) { @@ -266,7 +235,6 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { () -> { try { store.put(metalake, false /* overwritten */); - METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(metalake)); return metalake; } catch (EntityAlreadyExistsException | AlreadyExistsException e) { LOG.warn("Metalake {} already exists", ident, e); @@ -298,25 +266,21 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { throw new MetalakeNotInUseException( "Metalake %s is not in use, please enable it first", ident); } - METALAKE_CACHE.invalidate(ident); - BaseMetalake baseMetalake = - store.update( - ident, - BaseMetalake.class, - EntityType.METALAKE, - metalake -> { - BaseMetalake.Builder builder = newMetalakeBuilder(metalake); - Map<String, String> newProps = - metalake.properties() == null - ? Maps.newHashMap() - : Maps.newHashMap(metalake.properties()); - builder = updateEntity(builder, newProps, changes); - - return builder.build(); - }); - METALAKE_CACHE.put( - baseMetalake.nameIdentifier(), newMetalakeWithResolvedProperties(baseMetalake)); - return baseMetalake; + + return store.update( + ident, + BaseMetalake.class, + EntityType.METALAKE, + metalake -> { + BaseMetalake.Builder builder = newMetalakeBuilder(metalake); + Map<String, String> newProps = + metalake.properties() == null + ? Maps.newHashMap() + : Maps.newHashMap(metalake.properties()); + builder = updateEntity(builder, newProps, changes); + + return builder.build(); + }); } catch (NoSuchEntityException ne) { LOG.warn("Metalake {} does not exist", ident, ne); throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); @@ -353,8 +317,6 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { "Metalake %s is in use, please disable it first or use force option", ident); } - METALAKE_CACHE.invalidate(ident); - List<CatalogEntity> catalogEntities = store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG); if (!catalogEntities.isEmpty() && !force) { @@ -381,25 +343,22 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { try { boolean inUse = metalakeInUse(store, ident); if (!inUse) { - METALAKE_CACHE.invalidate(ident); - BaseMetalake baseMetalake = - store.update( - ident, - BaseMetalake.class, - EntityType.METALAKE, - metalake -> { - BaseMetalake.Builder builder = newMetalakeBuilder(metalake); - - Map<String, String> newProps = - metalake.properties() == null - ? Maps.newHashMap() - : Maps.newHashMap(metalake.properties()); - newProps.put(PROPERTY_IN_USE, "true"); - builder.withProperties(newProps); - - return builder.build(); - }); - METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake)); + store.update( + ident, + BaseMetalake.class, + EntityType.METALAKE, + metalake -> { + BaseMetalake.Builder builder = newMetalakeBuilder(metalake); + + Map<String, String> newProps = + metalake.properties() == null + ? Maps.newHashMap() + : Maps.newHashMap(metalake.properties()); + newProps.put(PROPERTY_IN_USE, "true"); + builder.withProperties(newProps); + + return builder.build(); + }); } return null; @@ -418,25 +377,22 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable { try { boolean inUse = metalakeInUse(store, ident); if (inUse) { - METALAKE_CACHE.invalidate(ident); - BaseMetalake baseMetalake = - store.update( - ident, - BaseMetalake.class, - EntityType.METALAKE, - metalake -> { - BaseMetalake.Builder builder = newMetalakeBuilder(metalake); - - Map<String, String> newProps = - metalake.properties() == null - ? Maps.newHashMap() - : Maps.newHashMap(metalake.properties()); - newProps.put(PROPERTY_IN_USE, "false"); - builder.withProperties(newProps); - - return builder.build(); - }); - METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake)); + store.update( + ident, + BaseMetalake.class, + EntityType.METALAKE, + metalake -> { + BaseMetalake.Builder builder = newMetalakeBuilder(metalake); + + Map<String, String> newProps = + metalake.properties() == null + ? Maps.newHashMap() + : Maps.newHashMap(metalake.properties()); + newProps.put(PROPERTY_IN_USE, "false"); + builder.withProperties(newProps); + + return builder.build(); + }); } return null; } catch (IOException e) { diff --git a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java index 4765ddc716..03624f28da 100644 --- a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java +++ b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java @@ -23,7 +23,6 @@ import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; import static org.mockito.Mockito.doReturn; -import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import java.io.IOException; @@ -210,65 +209,6 @@ public class TestMetalakeManager { Assertions.assertFalse(dropped1, "metalake should be non-existent"); } - @Test - public void testMetalakeCache() { - NameIdentifier ident = NameIdentifier.of("test51"); - Map<String, String> props = ImmutableMap.of("key1", "value1"); - BaseMetalake metalake = metalakeManager.createMetalake(ident, "comment", props); - Assertions.assertEquals("test51", metalake.name()); - Assertions.assertEquals("comment", metalake.comment()); - - Cache<NameIdentifier, BaseMetalake> cache = MetalakeManager.METALAKE_CACHE; - - BaseMetalake baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - Assertions.assertEquals("test51", baseMetalake.name()); - Assertions.assertEquals("comment", baseMetalake.comment()); - - metalakeManager.disableMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - metalakeManager.dropMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNull(baseMetalake); - - metalakeManager.createMetalake(ident, "comment", props); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - - metalakeManager.disableMetalake(ident); - metalakeManager.dropMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNull(baseMetalake); - - metalakeManager.createMetalake(ident, "comment", props); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - metalakeManager.disableMetalake(ident); - metalakeManager.dropMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNull(baseMetalake); - - metalakeManager.createMetalake(ident, "comment", props); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - metalakeManager.disableMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - Assertions.assertEquals("false", baseMetalake.properties().get("in-use")); - metalakeManager.enableMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - Assertions.assertEquals("true", baseMetalake.properties().get("in-use")); - - metalakeManager.loadMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - metalakeManager.disableMetalake(ident); - baseMetalake = cache.getIfPresent(ident); - Assertions.assertNotNull(baseMetalake); - } - private void testProperties(Map<String, String> expectedProps, Map<String, String> testProps) { expectedProps.forEach( (k, v) -> {