This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.0 by this push:
new 3cd2f97ff2 [#8815] improvement(core): Evict tags and policies first if
entity cache is full. (#8845)
3cd2f97ff2 is described below
commit 3cd2f97ff21289f587f824299d0e1d3483bc3bf4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 21 09:58:46 2025 +0800
[#8815] improvement(core): Evict tags and policies first if entity cache is
full. (#8845)
### What changes were proposed in this pull request?
Adjust the caffeine weight for tags and policies to allow them to evict
first compared to other entities.
### Why are the changes needed?
They are less important than entities like tables, filesets.
Fix: #8815
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
UTs
---------
Co-authored-by: Mini Yu <[email protected]>
Co-authored-by: Jerry Shao <[email protected]>
---
.../apache/gravitino/cache/EntityCacheWeigher.java | 26 +++--
.../apache/gravitino/cache/TestCacheConfig.java | 127 ++++++++++++++++++++-
2 files changed, 142 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java
b/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java
index 768f60eb64..edc3ca6b9b 100644
--- a/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java
+++ b/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java
@@ -40,24 +40,36 @@ import org.slf4j.LoggerFactory;
* or manually cleared.
* <li>Catalog: 0, which means that it will never be evicted from the cache
unless timeout occurs
* or manually cleared.
- * <li>Schema: 10
- * <li>Other: 100
+ * <li>Schema: 500
+ * <li>Tag: 100
+ * <li>Policy: 100
+ * <li>Other: 200
* </ul>
*/
public class EntityCacheWeigher implements Weigher<EntityCacheKey,
List<Entity>> {
- public static final int METALAKE_WEIGHT = 0;
+ public static final int METALAKE_WEIGHT = 0; // 0 means never evict
public static final int CATALOG_WEIGHT = 0;
- public static final int SCHEMA_WEIGHT = 10;
- public static final int OTHER_WEIGHT = 100;
+ public static final int SCHEMA_WEIGHT = 500; // higher weight means it will
less likely be evicted
+ public static final int OTHER_WEIGHT = 200;
+ public static final int TAG_WEIGHT = 100;
+ public static final int POLICY_WEIGHT = 100;
private static final Logger LOG =
LoggerFactory.getLogger(EntityCacheWeigher.class.getName());
private static final EntityCacheWeigher INSTANCE = new EntityCacheWeigher();
private static final Map<Entity.EntityType, Integer> ENTITY_WEIGHTS =
ImmutableMap.of(
Entity.EntityType.METALAKE, METALAKE_WEIGHT,
Entity.EntityType.CATALOG, CATALOG_WEIGHT,
- Entity.EntityType.SCHEMA, SCHEMA_WEIGHT);
+ Entity.EntityType.SCHEMA, SCHEMA_WEIGHT,
+ Entity.EntityType.TAG, TAG_WEIGHT,
+ Entity.EntityType.POLICY, POLICY_WEIGHT);
private static final long MAX_WEIGHT =
- 2 * (METALAKE_WEIGHT * 10 + CATALOG_WEIGHT * (10 * 200) + SCHEMA_WEIGHT
* (10 * 200 * 1000));
+ 2
+ * (METALAKE_WEIGHT * 10
+ + CATALOG_WEIGHT * 100
+ + SCHEMA_WEIGHT * 1000
+ + OTHER_WEIGHT * 10000
+ + TAG_WEIGHT * 10000
+ + POLICY_WEIGHT * 10000);
@VisibleForTesting
protected EntityCacheWeigher() {}
diff --git a/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java
b/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java
index 55c62a1ae9..f7a5724bc5 100644
--- a/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java
+++ b/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java
@@ -21,19 +21,25 @@ package org.apache.gravitino.cache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.List;
+import java.util.stream.IntStream;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.FilesetEntity;
import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.utils.NameIdentifierUtil;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -47,14 +53,127 @@ public class TestCacheConfig {
Assertions.assertTrue(config.get(Configs.CACHE_WEIGHER_ENABLED));
Assertions.assertEquals(10_000, config.get(Configs.CACHE_MAX_ENTRIES));
Assertions.assertEquals(3_600_000L,
config.get(Configs.CACHE_EXPIRATION_TIME));
- Assertions.assertEquals(40_000_000L, EntityCacheWeigher.getMaxWeight());
+ Assertions.assertEquals(9_000_000L, EntityCacheWeigher.getMaxWeight());
Assertions.assertEquals("caffeine",
config.get(Configs.CACHE_IMPLEMENTATION));
}
+ @Test
+ void testPolicyAndTagCacheWeigher() throws InterruptedException {
+ Caffeine<Object, Object> builder = Caffeine.newBuilder();
+ builder.maximumWeight(2000);
+ builder.weigher(EntityCacheWeigher.getInstance());
+ Cache<EntityCacheRelationKey, List<Entity>> cache = builder.build();
+
+ BaseMetalake baseMetalake =
+ BaseMetalake.builder()
+ .withName("metalake1")
+ .withId(1L)
+ .withVersion(SchemaVersion.V_0_1)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .build();
+ cache.put(
+ EntityCacheRelationKey.of(NameIdentifier.of("metalake1"),
Entity.EntityType.METALAKE),
+ List.of(baseMetalake));
+ CatalogEntity catalogEntity =
+ CatalogEntity.builder()
+ .withNamespace(Namespace.of("metalake1"))
+ .withName("catalog1")
+ .withProvider("provider")
+ .withAuditInfo(AuditInfo.EMPTY)
+ .withId(100L)
+ .withType(Catalog.Type.RELATIONAL)
+ .build();
+ cache.put(
+ EntityCacheRelationKey.of(
+ NameIdentifier.of(new String[] {"metalake1", "catalog1"}),
Entity.EntityType.CATALOG),
+ List.of(catalogEntity));
+
+ SchemaEntity schemaEntity =
+ SchemaEntity.builder()
+ .withNamespace(Namespace.of("metalake1", "catalog1"))
+ .withName("schema1")
+ .withAuditInfo(AuditInfo.EMPTY)
+ .withId(1000L)
+ .build();
+ cache.put(
+ EntityCacheRelationKey.of(
+ NameIdentifier.of(new String[] {"metalake1", "catalog1",
"schema1"}),
+ Entity.EntityType.SCHEMA),
+ List.of(schemaEntity));
+
+ for (int i = 0; i < 5; i++) {
+ String filesetName = "fileset" + i;
+ FilesetEntity fileset =
+ FilesetEntity.builder()
+ .withNamespace(Namespace.of("metalake1", "catalog1", "schema1"))
+ .withName(filesetName)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .withStorageLocations(ImmutableMap.of("default",
"s3://bucket/path"))
+ .withId((long) (i + 1) * 10_000)
+ .withFilesetType(Fileset.Type.MANAGED)
+ .build();
+ cache.put(
+ EntityCacheRelationKey.of(
+ NameIdentifier.of(new String[] {"metalake1", "catalog1",
"schema1", filesetName}),
+ Entity.EntityType.FILESET),
+ List.of(fileset));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ String tagName = "tag" + i;
+ NameIdentifier tagNameIdent = NameIdentifierUtil.ofTag("metalake",
tagName);
+ TagEntity tagEntity =
+ TagEntity.builder()
+ .withNamespace(tagNameIdent.namespace())
+ .withName(tagName)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .withId((long) (i + 1) * 100_000)
+ .build();
+ cache.put(EntityCacheRelationKey.of(tagNameIdent,
Entity.EntityType.TAG), List.of(tagEntity));
+ }
+
+ // The weight of the cache has exceeded 2000, some entities will be
evicted if we continue to
+ // add fileset entities.
+ for (int i = 5; i < 15; i++) {
+ String filesetName = "fileset" + i;
+ FilesetEntity fileset =
+ FilesetEntity.builder()
+ .withNamespace(Namespace.of("metalake1", "catalog1", "schema1"))
+ .withName(filesetName)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .withStorageLocations(ImmutableMap.of("default",
"s3://bucket/path"))
+ .withId((long) (i + 1) * 10_000)
+ .withFilesetType(Fileset.Type.MANAGED)
+ .build();
+ cache.put(
+ EntityCacheRelationKey.of(
+ NameIdentifier.of(new String[] {"metalake1", "catalog1",
"schema1", filesetName}),
+ Entity.EntityType.FILESET),
+ List.of(fileset));
+ }
+
+ Thread.sleep(1000);
+
+ // There should no tag entities in the cache, because the weight of each
tag entity is 100 that
+ // is higher than the maximum weight of the fileset entity which is 200.
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(20))
+ .pollInterval(Duration.ofMillis(10))
+ .until(
+ () ->
+ IntStream.of(0, 1, 2, 3)
+ .mapToObj(i -> NameIdentifierUtil.ofTag("metalake", "tag"
+ i))
+ .allMatch(
+ tagNameIdent ->
+ cache.getIfPresent(
+ EntityCacheRelationKey.of(tagNameIdent,
Entity.EntityType.TAG))
+ == null));
+ }
+
@Test
void testCaffeineCacheWithWeight() throws Exception {
Caffeine<Object, Object> builder = Caffeine.newBuilder();
- builder.maximumWeight(500);
+ builder.maximumWeight(5000);
builder.weigher(EntityCacheWeigher.getInstance());
Cache<EntityCacheRelationKey, List<Entity>> cache = builder.build();
@@ -121,11 +240,11 @@ public class TestCacheConfig {
NameIdentifier.of("metalake1.catalog" + i),
Entity.EntityType.CATALOG)));
}
- // Only some of the 100 schemas are still in the cache, to be exact, 500 /
10 = 50 schemas.
+ // Only some of the 100 schemas are still in the cache, to be exact, 5000
/ 500 = 10 schemas.
Awaitility.await()
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(10))
- .until(() -> cache.asMap().size() == 10 + 3 + 500 / 10);
+ .until(() -> cache.asMap().size() == 10 + 3 + 5000 / 500);
}
@Test