This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.0 by this push:
new 273475fdca [#8377] improvement(core): Optimize CaffeineEntityCache
using segmented locking (#8492)
273475fdca is described below
commit 273475fdca7983a85fd10e1af8082ba66451ef7a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Sep 10 15:58:16 2025 +0800
[#8377] improvement(core): Optimize CaffeineEntityCache using segmented
locking (#8492)
### What changes were proposed in this pull request?
Optimize CaffeineEntityCache using segmented locking
### Why are the changes needed?
Fix: [#8377](https://github.com/apache/gravitino/issues/8377)
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
TestSegmentedLock.java
Co-authored-by: Xiaojian Sun <[email protected]>
---
.../fileset/TestFilesetCatalogOperations.java | 1 +
.../catalog/kafka/TestKafkaCatalogOperations.java | 1 +
.../catalog/model/TestModelCatalogOperations.java | 1 +
.../main/java/org/apache/gravitino/Configs.java | 17 +
.../gravitino/cache/CaffeineEntityCache.java | 137 ++------
.../org/apache/gravitino/cache/EntityCache.java | 5 +-
.../org/apache/gravitino/cache/NoOpsCache.java | 33 +-
.../org/apache/gravitino/cache/SegmentedLock.java | 236 +++++++++++++
.../storage/relational/RelationalEntityStore.java | 3 +
.../authorization/TestAccessControlManager.java | 1 +
.../gravitino/authorization/TestOwnerManager.java | 1 +
.../apache/gravitino/cache/TestSegmentedLock.java | 383 +++++++++++++++++++++
.../apache/gravitino/policy/TestPolicyManager.java | 1 +
.../gravitino/stats/TestStatisticManager.java | 1 +
.../gravitino/storage/TestEntityStorage.java | 1 +
.../org/apache/gravitino/tag/TestTagManager.java | 1 +
docs/gravitino-server-config.md | 4 +
17 files changed, 695 insertions(+), 132 deletions(-)
diff --git
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index dc28f69bce..993f92acf7 100644
---
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -239,6 +239,7 @@ public class TestFilesetCatalogOperations {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
store = EntityStoreFactory.createEntityStore(config);
store.initialize(config);
diff --git
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
index ef90c81893..6b8f87888d 100644
---
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
+++
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
@@ -164,6 +164,7 @@ public class TestKafkaCatalogOperations extends
KafkaClusterEmbedded {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
// Mock
MetalakeMetaService metalakeMetaService =
MetalakeMetaService.getInstance();
diff --git
a/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
index 980568f769..f8119b1fec 100644
---
a/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
+++
b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
@@ -128,6 +128,7 @@ public class TestModelCatalogOperations {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
store = EntityStoreFactory.createEntityStore(config);
store.initialize(config);
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index 479aa0c701..56392fadf3 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -408,6 +408,23 @@ public class Configs {
.checkValue(StringUtils::isNotBlank,
ConfigConstants.NOT_BLANK_ERROR_MSG)
.createWithDefault("caffeine");
+ // Number of lock segments for cache concurrency optimization
+ public static final ConfigEntry<Integer> CACHE_LOCK_SEGMENTS =
+ new ConfigBuilder("gravitino.cache.lockSegments")
+ .doc(
+ "Number of lock segments for cache concurrency optimization. "
+ + "This configuration controls the granularity of locking in
the cache system. "
+ + "Instead of using a single global lock, Gravitino uses
Guava's Striped<Lock> "
+ + "to divide locks into segments, allowing concurrent access
to different cache "
+ + "entries while maintaining thread safety. Higher values
reduce lock contention "
+ + "but increase memory overhead. The actual number of
segments will be rounded "
+ + "up to the nearest power of 2 for optimal performance. "
+ + "See:
https://github.com/google/guava/wiki/StripedExplained")
+ .version(ConfigConstants.VERSION_1_0_0)
+ .intConf()
+ .checkValue(value -> value > 0, "Lock segments must be positive.")
+ .createWithDefault(16);
+
public static final ConfigEntry<String> JOB_STAGING_DIR =
new ConfigBuilder("gravitino.job.stagingDir")
.doc("Directory for managing staging files when running jobs.")
diff --git
a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
index 3faf543d30..8abde5117f 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Sets;
import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
import com.googlecode.concurrenttrees.radix.RadixTree;
import
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
-import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
@@ -43,8 +42,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
@@ -78,7 +75,9 @@ public class CaffeineEntityCache extends BaseEntityCache {
new ThreadPoolExecutor.CallerRunsPolicy());
private static final Logger LOG =
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
- private final ReentrantLock opLock = new ReentrantLock();
+
+ /** Segmented locking for better concurrency */
+ private final SegmentedLock segmentedLock;
/** Cache data structure. */
private final Cache<EntityCacheRelationKey, List<Entity>> cacheData;
@@ -101,6 +100,10 @@ public class CaffeineEntityCache extends BaseEntityCache {
this.cacheIndex = new ConcurrentRadixTree<>(new
DefaultCharArrayNodeFactory());
this.reverseIndex = new ReverseIndexCache();
+ // Initialize segmented lock
+ int lockSegments = cacheConfig.get(Configs.CACHE_LOCK_SEGMENTS);
+ this.segmentedLock = new SegmentedLock(lockSegments);
+
Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder =
newBaseBuilder(cacheConfig);
cacheDataBuilder
@@ -162,15 +165,18 @@ public class CaffeineEntityCache extends BaseEntityCache {
NameIdentifier ident, Entity.EntityType type,
SupportsRelationOperations.Type relType) {
checkArguments(ident, type, relType);
- return withLock(() -> invalidateEntities(ident, type,
Optional.of(relType)));
+ return segmentedLock.withLock(
+ EntityCacheRelationKey.of(ident, type, relType),
+ () -> invalidateEntities(ident, type, Optional.of(relType)));
}
/** {@inheritDoc} */
@Override
public boolean invalidate(NameIdentifier ident, Entity.EntityType type) {
checkArguments(ident, type);
-
- return withLock(() -> invalidateEntities(ident, type, Optional.empty()));
+ return segmentedLock.withLock(
+ EntityCacheRelationKey.of(ident, type),
+ () -> invalidateEntities(ident, type, Optional.empty()));
}
/** {@inheritDoc} */
@@ -197,7 +203,7 @@ public class CaffeineEntityCache extends BaseEntityCache {
/** {@inheritDoc} */
@Override
public void clear() {
- withLock(
+ segmentedLock.withGlobalLock(
() -> {
cacheData.invalidateAll();
});
@@ -212,15 +218,16 @@ public class CaffeineEntityCache extends BaseEntityCache {
List<E> entities) {
checkArguments(ident, type, relType);
Preconditions.checkArgument(entities != null, "Entities cannot be null");
- withLock(
+ EntityCacheRelationKey entityCacheKey = EntityCacheRelationKey.of(ident,
type, relType);
+ segmentedLock.withLock(
+ entityCacheKey,
() -> {
if (entities.isEmpty()) {
return;
}
syncEntitiesToCache(
- EntityCacheRelationKey.of(ident, type, relType),
- entities.stream().map(e -> (Entity)
e).collect(Collectors.toList()));
+ entityCacheKey, entities.stream().map(e -> (Entity)
e).collect(Collectors.toList()));
});
}
@@ -229,13 +236,13 @@ public class CaffeineEntityCache extends BaseEntityCache {
public <E extends Entity & HasIdentifier> void put(E entity) {
Preconditions.checkArgument(entity != null, "Entity cannot be null");
- withLock(
+ NameIdentifier identifier = getIdentFromEntity(entity);
+ EntityCacheRelationKey entityCacheKey =
EntityCacheRelationKey.of(identifier, entity.type());
+
+ segmentedLock.withLock(
+ entityCacheKey,
() -> {
invalidateOnKeyChange(entity);
- NameIdentifier identifier = getIdentFromEntity(entity);
- EntityCacheRelationKey entityCacheKey =
- EntityCacheRelationKey.of(identifier, entity.type());
-
syncEntitiesToCache(entityCacheKey, Lists.newArrayList(entity));
});
}
@@ -254,18 +261,22 @@ public class CaffeineEntityCache extends BaseEntityCache {
/** {@inheritDoc} */
@Override
- public <E extends Exception> void withCacheLock(ThrowingRunnable<E> action)
throws E {
+ public <E extends Exception> void withCacheLock(
+ EntityCacheKey key, EntityCache.ThrowingRunnable<E> action) throws E {
+ Preconditions.checkArgument(key != null, "Key cannot be null");
Preconditions.checkArgument(action != null, "Action cannot be null");
- withLockAndThrow(action);
+ segmentedLock.withLockAndThrow(key, action);
}
/** {@inheritDoc} */
@Override
- public <E, T extends Exception> E withCacheLock(ThrowingSupplier<E, T>
action) throws T {
+ public <E, T extends Exception> E withCacheLock(
+ EntityCacheKey key, EntityCache.ThrowingSupplier<E, T> action) throws T {
+ Preconditions.checkArgument(key != null, "Key cannot be null");
Preconditions.checkArgument(action != null, "Action cannot be null");
- return withLockAndThrow(action);
+ return segmentedLock.withLockAndThrow(key, action);
}
/**
@@ -276,7 +287,8 @@ public class CaffeineEntityCache extends BaseEntityCache {
*/
@Override
protected void invalidateExpiredItem(EntityCacheKey key) {
- withLock(
+ segmentedLock.withLock(
+ key,
() -> {
reverseIndex.remove(key);
cacheIndex.remove(key.toString());
@@ -402,89 +414,6 @@ public class CaffeineEntityCache extends BaseEntityCache {
return true;
}
- /**
- * Runs the given action with the lock.
- *
- * @param action The action to run with the lock
- */
- private void withLock(Runnable action) {
- try {
- opLock.lockInterruptibly();
- try {
- action.run();
- } finally {
- opLock.unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
- }
- }
-
- /**
- * Runs the given action with the lock and returns the result.
- *
- * @param action The action to run with the lock
- * @param <T> The type of the result
- * @return The result of the action
- */
- private <T> T withLock(Supplier<T> action) {
- try {
- opLock.lockInterruptibly();
- try {
- return action.get();
- } finally {
- opLock.unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
- }
- }
-
- /**
- * Runs the given action with the lock and throws the exception if it occurs.
- *
- * @param action The action to run with the lock
- * @param <T> The type of the result
- * @return The result of the action
- * @throws IOException If an exception occurs during the action
- */
- private <T, E extends Exception> T withLockAndThrow(ThrowingSupplier<T, E>
action) throws E {
- try {
- opLock.lockInterruptibly();
- try {
- return action.get();
- } finally {
- opLock.unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
- }
- }
-
- /**
- * Runs the given action with the lock and throws the exception if it occurs.
- *
- * @param action The action to run with the lock
- * @param <E> The type of the exception
- * @throws E If an exception occurs during the action
- */
- private <E extends Exception> void withLockAndThrow(ThrowingRunnable<E>
action) throws E {
- try {
- opLock.lockInterruptibly();
- try {
- action.run();
- } finally {
- opLock.unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
- }
- }
-
/** Starts the cache stats monitor. */
private void startCacheStatsMonitor() {
scheduler.scheduleAtFixedRate(
diff --git a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
index 8ccdb6c146..3ad6de1e8a 100644
--- a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
@@ -47,7 +47,7 @@ public interface EntityCache extends
SupportsEntityStoreCache, SupportsRelationE
* @param <E> The type of exception that may be thrown
* @throws E if the action throws an exception of type E
*/
- <E extends Exception> void withCacheLock(ThrowingRunnable<E> action) throws
E;
+ <E extends Exception> void withCacheLock(EntityCacheKey key,
ThrowingRunnable<E> action) throws E;
/**
* Executes the given action within a cache context and returns the result.
@@ -58,7 +58,8 @@ public interface EntityCache extends
SupportsEntityStoreCache, SupportsRelationE
* @param <T> The type of the result
* @throws E if the action throws an exception of type E
*/
- <T, E extends Exception> T withCacheLock(ThrowingSupplier<T, E> action)
throws E;
+ <T, E extends Exception> T withCacheLock(EntityCacheKey key,
ThrowingSupplier<T, E> action)
+ throws E;
/**
* A functional interface that represents a supplier that may throw an
exception.
diff --git a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
index 56cdb93511..74babb75a5 100644
--- a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.cache;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.gravitino.Config;
import org.apache.gravitino.Entity;
import org.apache.gravitino.HasIdentifier;
@@ -30,7 +29,7 @@ import org.apache.gravitino.SupportsRelationOperations;
/** A cache implementation that does not cache anything. */
public class NoOpsCache extends BaseEntityCache {
- private final ReentrantLock opLock = new ReentrantLock();
+ private final SegmentedLock opLock = new SegmentedLock(1);
/**
* Constructs a new {@link BaseEntityCache} instance.
*
@@ -60,34 +59,16 @@ public class NoOpsCache extends BaseEntityCache {
/** {@inheritDoc} */
@Override
- public <E extends Exception> void withCacheLock(ThrowingRunnable<E> action)
throws E {
- try {
- opLock.lockInterruptibly();
- try {
- action.run();
- } finally {
- opLock.unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
- }
+ public <E extends Exception> void withCacheLock(EntityCacheKey key,
ThrowingRunnable<E> action)
+ throws E {
+ opLock.withLockAndThrow(key, action);
}
/** {@inheritDoc} */
@Override
- public <T, E extends Exception> T withCacheLock(ThrowingSupplier<T, E>
action) throws E {
- try {
- opLock.lockInterruptibly();
- try {
- return action.get();
- } finally {
- opLock.unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
- }
+ public <T, E extends Exception> T withCacheLock(EntityCacheKey key,
ThrowingSupplier<T, E> action)
+ throws E {
+ return opLock.withLockAndThrow(key, action);
}
/** {@inheritDoc} */
diff --git a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
new file mode 100644
index 0000000000..dce8eb63ce
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Striped;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Segmented lock for improved concurrency. Divides locks into segments to
reduce contention.
+ * Supports global clearing operations that require exclusive access to all
segments.
+ */
+public class SegmentedLock {
+ private final Striped<Lock> stripedLocks;
+ private static final Object NULL_KEY = new Object();
+
+ /** CountDownLatch for global operations - null when no operation is in
progress */
+ private final AtomicReference<CountDownLatch> globalOperationLatch = new
AtomicReference<>();
+
+ /**
+ * Creates a SegmentedLock with the specified number of segments. Guava's
Striped automatically
+ * rounds up to the nearest power of 2 for optimal performance.
+ *
+ * @param numSegments Number of segments (must be positive)
+ * @throws IllegalArgumentException if numSegments is not positive
+ */
+ public SegmentedLock(int numSegments) {
+ if (numSegments <= 0) {
+ throw new IllegalArgumentException(
+ "Number of segments must be positive, got: " + numSegments);
+ }
+
+ this.stripedLocks = Striped.lock(numSegments);
+ }
+
+ /**
+ * Gets the segment lock for the given key.
+ *
+ * @param key Object to determine the segment
+ * @return Segment lock for the key
+ */
+ public Lock getSegmentLock(Object key) {
+ return stripedLocks.get(normalizeKey(key));
+ }
+
+ /**
+ * Normalizes the key to handle null values consistently.
+ *
+ * @param key The input key
+ * @return Normalized key (never null)
+ */
+ private Object normalizeKey(Object key) {
+ return key != null ? key : NULL_KEY;
+ }
+
+ /**
+ * Runs action with segment lock for the given key. Will wait if a global
clearing operation is in
+ * progress.
+ *
+ * @param key Key to determine segment
+ * @param action Action to run
+ * @throws RuntimeException if interrupted
+ */
+ public void withLock(Object key, Runnable action) {
+ waitForGlobalComplete();
+ Lock lock = getSegmentLock(key);
+ try {
+ lock.lockInterruptibly();
+ try {
+ action.run();
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Thread was interrupted while waiting for
lock", e);
+ }
+ }
+
+ /**
+ * Runs action with segment lock and returns result. Will wait if a global
clearing operation is
+ * in progress.
+ *
+ * @param key Key to determine segment
+ * @param action Action to run
+ * @param <T> Result type
+ * @return Action result
+ * @throws RuntimeException if interrupted
+ */
+ public <T> T withLock(Object key, java.util.function.Supplier<T> action) {
+ waitForGlobalComplete();
+ Lock lock = getSegmentLock(key);
+ try {
+ lock.lockInterruptibly();
+ try {
+ return action.get();
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Thread was interrupted while waiting for
lock", e);
+ }
+ }
+
+ /**
+ * Runs action with segment lock for the given key. Will wait if a global
clearing operation is in
+ * progress.
+ *
+ * @param key Key to determine segment
+ * @param action Action to run
+ * @param <T> Result type
+ * @param <E> Exception type
+ * @return Action result
+ * @throws E Exception
+ */
+ public <T, E extends Exception> T withLockAndThrow(
+ Object key, EntityCache.ThrowingSupplier<T, E> action) throws E {
+ waitForGlobalComplete();
+ Lock lock = getSegmentLock(key);
+ try {
+ lock.lockInterruptibly();
+ try {
+ return action.get();
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
+ }
+ }
+
+ /**
+ * Runs action with segment lock for the given key. Will wait if a global
clearing operation is in
+ * progress.
+ *
+ * @param key Key to determine segment
+ * @param action Action to run
+ * @param <E> Exception type
+ * @throws E Exception
+ */
+ public <E extends Exception> void withLockAndThrow(
+ Object key, EntityCache.ThrowingRunnable<E> action) throws E {
+ waitForGlobalComplete();
+ Lock lock = getSegmentLock(key);
+ try {
+ lock.lockInterruptibly();
+ try {
+ action.run();
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Thread was interrupted while waiting
for lock", e);
+ }
+ }
+
+ /**
+ * Executes a global clearing operation with exclusive access to all
segments. This method sets
+ * the clearing flag and ensures no other operations can proceed until the
clearing is complete.
+ *
+ * @param action The clearing action to execute
+ */
+ public void withGlobalLock(Runnable action) {
+ // Create a new CountDownLatch for this operation
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Atomically set the latch, fail if another operation is already in
progress
+ if (!globalOperationLatch.compareAndSet(null, latch)) {
+ throw new IllegalStateException("Global operation already in progress");
+ }
+
+ try {
+ synchronized (this) {
+ action.run();
+ }
+ } finally {
+ // Clear state first, then signal completion
+ globalOperationLatch.set(null);
+ latch.countDown();
+ }
+ }
+
+ /**
+ * Waits for any ongoing global operation to complete. This method is called
by regular operations
+ * to ensure they don't interfere with global operations.
+ */
+ private void waitForGlobalComplete() {
+ CountDownLatch latch = globalOperationLatch.get();
+ if (latch != null) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Thread was interrupted while waiting for global operation to
complete", e);
+ }
+ }
+ }
+
+ /** Checks if a global operation is currently in progress. */
+ @VisibleForTesting
+ public boolean isClearing() {
+ return globalOperationLatch.get() != null;
+ }
+
+ /**
+ * Returns number of lock segments.
+ *
+ * @return Number of segments
+ */
+ public int getNumSegments() {
+ return stripedLocks.size();
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 3968381135..15bf0eabd6 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -38,6 +38,7 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.cache.CacheFactory;
import org.apache.gravitino.cache.EntityCache;
+import org.apache.gravitino.cache.EntityCacheRelationKey;
import org.apache.gravitino.cache.NoOpsCache;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.meta.TagEntity;
@@ -129,6 +130,7 @@ public class RelationalEntityStore
NameIdentifier ident, Entity.EntityType entityType, Class<E> e)
throws NoSuchEntityException, IOException {
return cache.withCacheLock(
+ EntityCacheRelationKey.of(ident, entityType),
() -> {
Optional<E> entityFromCache = cache.getIfPresent(ident, entityType);
if (entityFromCache.isPresent()) {
@@ -210,6 +212,7 @@ public class RelationalEntityStore
Type relType, NameIdentifier nameIdentifier, Entity.EntityType
identType, boolean allFields)
throws IOException {
return cache.withCacheLock(
+ EntityCacheRelationKey.of(nameIdentifier, identType, relType),
() -> {
Optional<List<E>> entities = cache.getIfPresent(relType,
nameIdentifier, identType);
if (entities.isPresent()) {
diff --git
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
index 21d4dcb477..23e8a71f5f 100644
---
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
+++
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
@@ -138,6 +138,7 @@ public class TestAccessControlManager {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
index 889e4a120f..184449d2ec 100644
---
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
+++
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
@@ -111,6 +111,7 @@ public class TestOwnerManager {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git
a/core/src/test/java/org/apache/gravitino/cache/TestSegmentedLock.java
b/core/src/test/java/org/apache/gravitino/cache/TestSegmentedLock.java
new file mode 100644
index 0000000000..e377c7751f
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/cache/TestSegmentedLock.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/** Test class for SegmentedLock. */
+public class TestSegmentedLock {
+
+ @Test
+ void testPowerOfTwoRounding() {
+ // Test rounding up to power of 2
+ assertEquals(4, new SegmentedLock(3).getNumSegments());
+ assertEquals(8, new SegmentedLock(5).getNumSegments());
+ assertEquals(8, new SegmentedLock(7).getNumSegments());
+ assertEquals(16, new SegmentedLock(9).getNumSegments());
+ assertEquals(16, new SegmentedLock(15).getNumSegments());
+ assertEquals(32, new SegmentedLock(17).getNumSegments());
+ assertEquals(32, new SegmentedLock(31).getNumSegments());
+ }
+
+ // ========== Basic Locking Tests ==========
+
+ @Test
+ void testBasicLocking() {
+ SegmentedLock lock = new SegmentedLock(4);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ lock.withLock("key1", () -> counter.incrementAndGet());
+ assertEquals(1, counter.get());
+
+ lock.withLock("key2", () -> counter.incrementAndGet());
+ assertEquals(2, counter.get());
+ }
+
+ @Test
+ void testLockingWithReturnValue() {
+ SegmentedLock lock = new SegmentedLock(4);
+
+ String result = lock.withLock("key1", () -> "test result");
+ assertEquals("test result", result);
+
+ Integer number = lock.withLock("key2", () -> 42);
+ assertEquals(42, number);
+ }
+
+ // ========== Concurrency Tests ==========
+
+ @Test
+ @Timeout(30)
+ void testConcurrentSegmentAccess() throws InterruptedException {
+ SegmentedLock lock = new SegmentedLock(8);
+ AtomicInteger counter = new AtomicInteger(0);
+ int numThreads = 16;
+ int operationsPerThread = 50;
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch finishLatch = new CountDownLatch(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ final int threadId = i;
+ executor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < operationsPerThread; j++) {
+ String key = "key" + (threadId % 8);
+ lock.withLock(key, () -> counter.incrementAndGet());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ finishLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ assertTrue(finishLatch.await(30, TimeUnit.SECONDS));
+ assertEquals(numThreads * operationsPerThread, counter.get());
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testSegmentDistribution() {
+ SegmentedLock lock = new SegmentedLock(4);
+
+ // Test that different keys can be processed without exceptions
+ for (int i = 0; i < 100; i++) {
+ String key = "key" + i;
+ assertDoesNotThrow(() -> lock.withLock(key, () -> {}));
+ }
+ }
+
+ @Test
+ void testNullKeyHandling() {
+ SegmentedLock lock = new SegmentedLock(4);
+
+ assertDoesNotThrow(() -> lock.withLock(null, () -> {}));
+ assertDoesNotThrow(() -> lock.withLock(null, () -> "result"));
+ }
+
+ @Test
+ void testDifferentKeyTypes() {
+ SegmentedLock lock = new SegmentedLock(4);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ lock.withLock("stringKey", () -> counter.incrementAndGet());
+ lock.withLock(123, () -> counter.incrementAndGet());
+ lock.withLock(123L, () -> counter.incrementAndGet());
+ lock.withLock(123.45, () -> counter.incrementAndGet());
+ lock.withLock(new Object(), () -> counter.incrementAndGet());
+
+ assertEquals(5, counter.get());
+ }
+
+ @Test
+ void testInterruptedExceptionHandling() throws InterruptedException {
+ SegmentedLock lock = new SegmentedLock(4);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ Thread testThread =
+ new Thread(
+ () -> {
+ try {
+ lock.withLock(
+ "testKey",
+ () -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during
operation", e);
+ }
+ counter.incrementAndGet();
+ });
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().contains("interrupted"));
+ }
+ });
+
+ testThread.start();
+ Thread.sleep(100);
+ testThread.interrupt();
+ testThread.join(2000);
+
+ assertEquals(0, counter.get());
+ }
+
+ @Test
+ void testLockReentrancy() {
+ SegmentedLock lock = new SegmentedLock(4);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ lock.withLock(
+ "sameKey",
+ () -> {
+ counter.incrementAndGet();
+ lock.withLock(
+ "sameKey",
+ () -> {
+ counter.incrementAndGet();
+ lock.withLock("sameKey", () -> counter.incrementAndGet());
+ });
+ });
+
+ assertEquals(3, counter.get());
+ }
+
+ // ========== Performance and Stress Tests ==========
+
+ @Test
+ @Timeout(60)
+ void testHighConcurrencyStress() throws InterruptedException {
+ SegmentedLock lock = new SegmentedLock(16);
+ AtomicInteger counter = new AtomicInteger(0);
+ int numThreads = 32;
+ int operationsPerThread = 1000;
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch finishLatch = new CountDownLatch(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ final int threadId = i;
+ executor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < operationsPerThread; j++) {
+ String key = "key" + (threadId % 16);
+ lock.withLock(key, () -> counter.incrementAndGet());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ finishLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ assertTrue(finishLatch.await(60, TimeUnit.SECONDS));
+ assertEquals(numThreads * operationsPerThread, counter.get());
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testLargeNumberOfSegments() {
+ SegmentedLock lock = new SegmentedLock(1024);
+ assertEquals(1024, lock.getNumSegments());
+
+ AtomicInteger counter = new AtomicInteger(0);
+ for (int i = 0; i < 1000; i++) {
+ lock.withLock("key" + i, () -> counter.incrementAndGet());
+ }
+ assertEquals(1000, counter.get());
+ }
+
+ @Test
+ void testLongRunningOperations() throws InterruptedException {
+ SegmentedLock lock = new SegmentedLock(4);
+ AtomicInteger completed = new AtomicInteger(0);
+ int numThreads = 4;
+ int operationDurationMs = 100;
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch finishLatch = new CountDownLatch(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ final int threadId = i;
+ executor.submit(
+ () -> {
+ try {
+ startLatch.await();
+ String key = "key" + threadId;
+ lock.withLock(
+ key,
+ () -> {
+ try {
+ Thread.sleep(operationDurationMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ completed.incrementAndGet();
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ finishLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ assertTrue(finishLatch.await(30, TimeUnit.SECONDS));
+ assertEquals(numThreads, completed.get());
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testGlobalClearingBlocksOtherOperations() throws InterruptedException {
+ SegmentedLock lock = new SegmentedLock(4);
+ AtomicInteger counter = new AtomicInteger(0);
+ CountDownLatch clearingStarted = new CountDownLatch(1);
+ CountDownLatch clearingFinished = new CountDownLatch(1);
+ CountDownLatch operationFinished = new CountDownLatch(1);
+
+ // Start a long-running clearing operation
+ Thread clearingThread =
+ new Thread(
+ () -> {
+ try {
+ lock.withGlobalLock(
+ () -> {
+ clearingStarted.countDown();
+ try {
+ Thread.sleep(100); // Simulate long clearing operation
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ counter.incrementAndGet();
+ });
+ } finally {
+ clearingFinished.countDown();
+ }
+ });
+
+ // Start a regular operation that should wait
+ Thread operationThread =
+ new Thread(
+ () -> {
+ try {
+ clearingStarted.await(); // Wait for clearing to start
+ lock.withLock("key1", () -> counter.incrementAndGet());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ operationFinished.countDown();
+ }
+ });
+
+ clearingThread.start();
+ operationThread.start();
+
+ // Wait for clearing to start
+ assertTrue(clearingStarted.await(1, TimeUnit.SECONDS));
+
+ // Check that clearing is in progress
+ assertTrue(lock.isClearing());
+
+ // Wait for clearing to finish
+ assertTrue(clearingFinished.await(2, TimeUnit.SECONDS));
+
+ // Wait for operation to finish
+ assertTrue(operationFinished.await(2, TimeUnit.SECONDS));
+
+ // Both operations should have completed
+ assertEquals(2, counter.get());
+
+ // Clearing should no longer be in progress
+ assertFalse(lock.isClearing());
+
+ clearingThread.join();
+ operationThread.join();
+ }
+
+ @Test
+ void testConcurrentGlobalClearing() {
+ SegmentedLock lock = new SegmentedLock(4);
+
+ // First clearing operation should succeed
+ assertDoesNotThrow(() -> lock.withGlobalLock(() -> {}));
+
+ // Second concurrent clearing operation should fail
+ assertThrows(
+ IllegalStateException.class,
+ () -> {
+ lock.withGlobalLock(
+ () -> {
+ // Try to start another clearing operation
+ lock.withGlobalLock(() -> {});
+ });
+ });
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
b/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
index aafe2adc5f..ac76ee81f2 100644
--- a/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
+++ b/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
@@ -202,6 +202,7 @@ public class TestPolicyManager {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git
a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
index 03cb28245d..2529702f7b 100644
--- a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
+++ b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
@@ -116,6 +116,7 @@ public class TestStatisticManager {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
Mockito.when(config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS))
.thenReturn(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
diff --git
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index 44d9134531..0e8298a2a0 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -151,6 +151,7 @@ public class TestEntityStorage {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
BaseIT baseIT = new BaseIT();
diff --git a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
index 5a7fc87d32..c1ca78e8cf 100644
--- a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
+++ b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
@@ -139,6 +139,7 @@ public class TestTagManager {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git a/docs/gravitino-server-config.md b/docs/gravitino-server-config.md
index 3d2a90bd6b..211cb0faa2 100644
--- a/docs/gravitino-server-config.md
+++ b/docs/gravitino-server-config.md
@@ -85,6 +85,9 @@ gravitino.cache.enabled=true
# Specify the cache implementation (no need to use the fully qualified class
name)
gravitino.cache.implementation=caffeine
+
+# Number of lock segments for cache concurrency optimization
+gravitino.cache.lockSegments=16
```
| Configuration Key | Description
| Default Value | Required | Since Version |
@@ -95,6 +98,7 @@ gravitino.cache.implementation=caffeine
| `gravitino.cache.expireTimeInMs` | Cache expiration time (in milliseconds)
| `3600000` (about 1 hr) | No | 1.0.0 |
| `gravitino.cache.enableStats` | Whether to enable cache statistics
logging | `false` | No | 1.0.0 |
| `gravitino.cache.enableWeigher` | Whether to enable weight-based eviction
| `true` | No | 1.0.0 |
+| `gravitino.cache.lockSegments` | Number of lock segments.
| `16` | No | 1.0.0 |
- `gravitino.cache.enableWeigher`: When enabled, eviction is based on weight
and `maxEntries` will be ignored.
- `gravitino.cache.expireTimeInMs`: Controls the cache TTL in milliseconds.