This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.0 by this push:
     new 273475fdca [#8377] improvement(core): Optimize CaffeineEntityCache 
using segmented locking (#8492)
273475fdca is described below

commit 273475fdca7983a85fd10e1af8082ba66451ef7a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Sep 10 15:58:16 2025 +0800

    [#8377] improvement(core): Optimize CaffeineEntityCache using segmented 
locking (#8492)
    
    ### What changes were proposed in this pull request?
    
    Optimize CaffeineEntityCache using segmented locking
    
    ### Why are the changes needed?
    
    Fix: [#8377](https://github.com/apache/gravitino/issues/8377)
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    TestSegmentedLock.java
    
    Co-authored-by: Xiaojian Sun <[email protected]>
---
 .../fileset/TestFilesetCatalogOperations.java      |   1 +
 .../catalog/kafka/TestKafkaCatalogOperations.java  |   1 +
 .../catalog/model/TestModelCatalogOperations.java  |   1 +
 .../main/java/org/apache/gravitino/Configs.java    |  17 +
 .../gravitino/cache/CaffeineEntityCache.java       | 137 ++------
 .../org/apache/gravitino/cache/EntityCache.java    |   5 +-
 .../org/apache/gravitino/cache/NoOpsCache.java     |  33 +-
 .../org/apache/gravitino/cache/SegmentedLock.java  | 236 +++++++++++++
 .../storage/relational/RelationalEntityStore.java  |   3 +
 .../authorization/TestAccessControlManager.java    |   1 +
 .../gravitino/authorization/TestOwnerManager.java  |   1 +
 .../apache/gravitino/cache/TestSegmentedLock.java  | 383 +++++++++++++++++++++
 .../apache/gravitino/policy/TestPolicyManager.java |   1 +
 .../gravitino/stats/TestStatisticManager.java      |   1 +
 .../gravitino/storage/TestEntityStorage.java       |   1 +
 .../org/apache/gravitino/tag/TestTagManager.java   |   1 +
 docs/gravitino-server-config.md                    |   4 +
 17 files changed, 695 insertions(+), 132 deletions(-)

diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index dc28f69bce..993f92acf7 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -239,6 +239,7 @@ public class TestFilesetCatalogOperations {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     store = EntityStoreFactory.createEntityStore(config);
     store.initialize(config);
diff --git 
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
 
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
index ef90c81893..6b8f87888d 100644
--- 
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
+++ 
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
@@ -164,6 +164,7 @@ public class TestKafkaCatalogOperations extends 
KafkaClusterEmbedded {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     // Mock
     MetalakeMetaService metalakeMetaService = 
MetalakeMetaService.getInstance();
diff --git 
a/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
 
b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
index 980568f769..f8119b1fec 100644
--- 
a/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
+++ 
b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
@@ -128,6 +128,7 @@ public class TestModelCatalogOperations {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     store = EntityStoreFactory.createEntityStore(config);
     store.initialize(config);
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 479aa0c701..56392fadf3 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -408,6 +408,23 @@ public class Configs {
           .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
           .createWithDefault("caffeine");
 
+  // Number of lock segments for cache concurrency optimization
+  public static final ConfigEntry<Integer> CACHE_LOCK_SEGMENTS =
+      new ConfigBuilder("gravitino.cache.lockSegments")
+          .doc(
+              "Number of lock segments for cache concurrency optimization. "
+                  + "This configuration controls the granularity of locking in 
the cache system. "
+                  + "Instead of using a single global lock, Gravitino uses 
Guava's Striped<Lock> "
+                  + "to divide locks into segments, allowing concurrent access 
to different cache "
+                  + "entries while maintaining thread safety. Higher values 
reduce lock contention "
+                  + "but increase memory overhead. The actual number of 
segments will be rounded "
+                  + "up to the nearest power of 2 for optimal performance. "
+                  + "See: 
https://github.com/google/guava/wiki/StripedExplained";)
+          .version(ConfigConstants.VERSION_1_0_0)
+          .intConf()
+          .checkValue(value -> value > 0, "Lock segments must be positive.")
+          .createWithDefault(16);
+
   public static final ConfigEntry<String> JOB_STAGING_DIR =
       new ConfigBuilder("gravitino.job.stagingDir")
           .doc("Directory for managing staging files when running jobs.")
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java 
b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
index 3faf543d30..8abde5117f 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Sets;
 import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
 import com.googlecode.concurrenttrees.radix.RadixTree;
 import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
-import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,8 +42,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
@@ -78,7 +75,9 @@ public class CaffeineEntityCache extends BaseEntityCache {
           new ThreadPoolExecutor.CallerRunsPolicy());
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
-  private final ReentrantLock opLock = new ReentrantLock();
+
+  /** Segmented locking for better concurrency */
+  private final SegmentedLock segmentedLock;
 
   /** Cache data structure. */
   private final Cache<EntityCacheRelationKey, List<Entity>> cacheData;
@@ -101,6 +100,10 @@ public class CaffeineEntityCache extends BaseEntityCache {
     this.cacheIndex = new ConcurrentRadixTree<>(new 
DefaultCharArrayNodeFactory());
     this.reverseIndex = new ReverseIndexCache();
 
+    // Initialize segmented lock
+    int lockSegments = cacheConfig.get(Configs.CACHE_LOCK_SEGMENTS);
+    this.segmentedLock = new SegmentedLock(lockSegments);
+
     Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder = 
newBaseBuilder(cacheConfig);
 
     cacheDataBuilder
@@ -162,15 +165,18 @@ public class CaffeineEntityCache extends BaseEntityCache {
       NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType) {
     checkArguments(ident, type, relType);
 
-    return withLock(() -> invalidateEntities(ident, type, 
Optional.of(relType)));
+    return segmentedLock.withLock(
+        EntityCacheRelationKey.of(ident, type, relType),
+        () -> invalidateEntities(ident, type, Optional.of(relType)));
   }
 
   /** {@inheritDoc} */
   @Override
   public boolean invalidate(NameIdentifier ident, Entity.EntityType type) {
     checkArguments(ident, type);
-
-    return withLock(() -> invalidateEntities(ident, type, Optional.empty()));
+    return segmentedLock.withLock(
+        EntityCacheRelationKey.of(ident, type),
+        () -> invalidateEntities(ident, type, Optional.empty()));
   }
 
   /** {@inheritDoc} */
@@ -197,7 +203,7 @@ public class CaffeineEntityCache extends BaseEntityCache {
   /** {@inheritDoc} */
   @Override
   public void clear() {
-    withLock(
+    segmentedLock.withGlobalLock(
         () -> {
           cacheData.invalidateAll();
         });
@@ -212,15 +218,16 @@ public class CaffeineEntityCache extends BaseEntityCache {
       List<E> entities) {
     checkArguments(ident, type, relType);
     Preconditions.checkArgument(entities != null, "Entities cannot be null");
-    withLock(
+    EntityCacheRelationKey entityCacheKey = EntityCacheRelationKey.of(ident, 
type, relType);
+    segmentedLock.withLock(
+        entityCacheKey,
         () -> {
           if (entities.isEmpty()) {
             return;
           }
 
           syncEntitiesToCache(
-              EntityCacheRelationKey.of(ident, type, relType),
-              entities.stream().map(e -> (Entity) 
e).collect(Collectors.toList()));
+              entityCacheKey, entities.stream().map(e -> (Entity) 
e).collect(Collectors.toList()));
         });
   }
 
@@ -229,13 +236,13 @@ public class CaffeineEntityCache extends BaseEntityCache {
   public <E extends Entity & HasIdentifier> void put(E entity) {
     Preconditions.checkArgument(entity != null, "Entity cannot be null");
 
-    withLock(
+    NameIdentifier identifier = getIdentFromEntity(entity);
+    EntityCacheRelationKey entityCacheKey = 
EntityCacheRelationKey.of(identifier, entity.type());
+
+    segmentedLock.withLock(
+        entityCacheKey,
         () -> {
           invalidateOnKeyChange(entity);
-          NameIdentifier identifier = getIdentFromEntity(entity);
-          EntityCacheRelationKey entityCacheKey =
-              EntityCacheRelationKey.of(identifier, entity.type());
-
           syncEntitiesToCache(entityCacheKey, Lists.newArrayList(entity));
         });
   }
@@ -254,18 +261,22 @@ public class CaffeineEntityCache extends BaseEntityCache {
 
   /** {@inheritDoc} */
   @Override
-  public <E extends Exception> void withCacheLock(ThrowingRunnable<E> action) 
throws E {
+  public <E extends Exception> void withCacheLock(
+      EntityCacheKey key, EntityCache.ThrowingRunnable<E> action) throws E {
+    Preconditions.checkArgument(key != null, "Key cannot be null");
     Preconditions.checkArgument(action != null, "Action cannot be null");
 
-    withLockAndThrow(action);
+    segmentedLock.withLockAndThrow(key, action);
   }
 
   /** {@inheritDoc} */
   @Override
-  public <E, T extends Exception> E withCacheLock(ThrowingSupplier<E, T> 
action) throws T {
+  public <E, T extends Exception> E withCacheLock(
+      EntityCacheKey key, EntityCache.ThrowingSupplier<E, T> action) throws T {
+    Preconditions.checkArgument(key != null, "Key cannot be null");
     Preconditions.checkArgument(action != null, "Action cannot be null");
 
-    return withLockAndThrow(action);
+    return segmentedLock.withLockAndThrow(key, action);
   }
 
   /**
@@ -276,7 +287,8 @@ public class CaffeineEntityCache extends BaseEntityCache {
    */
   @Override
   protected void invalidateExpiredItem(EntityCacheKey key) {
-    withLock(
+    segmentedLock.withLock(
+        key,
         () -> {
           reverseIndex.remove(key);
           cacheIndex.remove(key.toString());
@@ -402,89 +414,6 @@ public class CaffeineEntityCache extends BaseEntityCache {
     return true;
   }
 
-  /**
-   * Runs the given action with the lock.
-   *
-   * @param action The action to run with the lock
-   */
-  private void withLock(Runnable action) {
-    try {
-      opLock.lockInterruptibly();
-      try {
-        action.run();
-      } finally {
-        opLock.unlock();
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
-    }
-  }
-
-  /**
-   * Runs the given action with the lock and returns the result.
-   *
-   * @param action The action to run with the lock
-   * @param <T> The type of the result
-   * @return The result of the action
-   */
-  private <T> T withLock(Supplier<T> action) {
-    try {
-      opLock.lockInterruptibly();
-      try {
-        return action.get();
-      } finally {
-        opLock.unlock();
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
-    }
-  }
-
-  /**
-   * Runs the given action with the lock and throws the exception if it occurs.
-   *
-   * @param action The action to run with the lock
-   * @param <T> The type of the result
-   * @return The result of the action
-   * @throws IOException If an exception occurs during the action
-   */
-  private <T, E extends Exception> T withLockAndThrow(ThrowingSupplier<T, E> 
action) throws E {
-    try {
-      opLock.lockInterruptibly();
-      try {
-        return action.get();
-      } finally {
-        opLock.unlock();
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
-    }
-  }
-
-  /**
-   * Runs the given action with the lock and throws the exception if it occurs.
-   *
-   * @param action The action to run with the lock
-   * @param <E> The type of the exception
-   * @throws E If an exception occurs during the action
-   */
-  private <E extends Exception> void withLockAndThrow(ThrowingRunnable<E> 
action) throws E {
-    try {
-      opLock.lockInterruptibly();
-      try {
-        action.run();
-      } finally {
-        opLock.unlock();
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
-    }
-  }
-
   /** Starts the cache stats monitor. */
   private void startCacheStatsMonitor() {
     scheduler.scheduleAtFixedRate(
diff --git a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java 
b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
index 8ccdb6c146..3ad6de1e8a 100644
--- a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
@@ -47,7 +47,7 @@ public interface EntityCache extends 
SupportsEntityStoreCache, SupportsRelationE
    * @param <E> The type of exception that may be thrown
    * @throws E if the action throws an exception of type E
    */
-  <E extends Exception> void withCacheLock(ThrowingRunnable<E> action) throws 
E;
+  <E extends Exception> void withCacheLock(EntityCacheKey key, 
ThrowingRunnable<E> action) throws E;
 
   /**
    * Executes the given action within a cache context and returns the result.
@@ -58,7 +58,8 @@ public interface EntityCache extends 
SupportsEntityStoreCache, SupportsRelationE
    * @param <T> The type of the result
    * @throws E if the action throws an exception of type E
    */
-  <T, E extends Exception> T withCacheLock(ThrowingSupplier<T, E> action) 
throws E;
+  <T, E extends Exception> T withCacheLock(EntityCacheKey key, 
ThrowingSupplier<T, E> action)
+      throws E;
 
   /**
    * A functional interface that represents a supplier that may throw an 
exception.
diff --git a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java 
b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
index 56cdb93511..74babb75a5 100644
--- a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.cache;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
@@ -30,7 +29,7 @@ import org.apache.gravitino.SupportsRelationOperations;
 
 /** A cache implementation that does not cache anything. */
 public class NoOpsCache extends BaseEntityCache {
-  private final ReentrantLock opLock = new ReentrantLock();
+  private final SegmentedLock opLock = new SegmentedLock(1);
   /**
    * Constructs a new {@link BaseEntityCache} instance.
    *
@@ -60,34 +59,16 @@ public class NoOpsCache extends BaseEntityCache {
 
   /** {@inheritDoc} */
   @Override
-  public <E extends Exception> void withCacheLock(ThrowingRunnable<E> action) 
throws E {
-    try {
-      opLock.lockInterruptibly();
-      try {
-        action.run();
-      } finally {
-        opLock.unlock();
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
-    }
+  public <E extends Exception> void withCacheLock(EntityCacheKey key, 
ThrowingRunnable<E> action)
+      throws E {
+    opLock.withLockAndThrow(key, action);
   }
 
   /** {@inheritDoc} */
   @Override
-  public <T, E extends Exception> T withCacheLock(ThrowingSupplier<T, E> 
action) throws E {
-    try {
-      opLock.lockInterruptibly();
-      try {
-        return action.get();
-      } finally {
-        opLock.unlock();
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
-    }
+  public <T, E extends Exception> T withCacheLock(EntityCacheKey key, 
ThrowingSupplier<T, E> action)
+      throws E {
+    return opLock.withLockAndThrow(key, action);
   }
 
   /** {@inheritDoc} */
diff --git a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java 
b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
new file mode 100644
index 0000000000..dce8eb63ce
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Striped;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Segmented lock for improved concurrency. Divides locks into segments to 
reduce contention.
+ * Supports global clearing operations that require exclusive access to all 
segments.
+ */
+public class SegmentedLock {
+  private final Striped<Lock> stripedLocks;
+  private static final Object NULL_KEY = new Object();
+
+  /** CountDownLatch for global operations - null when no operation is in 
progress */
+  private final AtomicReference<CountDownLatch> globalOperationLatch = new 
AtomicReference<>();
+
+  /**
+   * Creates a SegmentedLock with the specified number of segments. Guava's 
Striped automatically
+   * rounds up to the nearest power of 2 for optimal performance.
+   *
+   * @param numSegments Number of segments (must be positive)
+   * @throws IllegalArgumentException if numSegments is not positive
+   */
+  public SegmentedLock(int numSegments) {
+    if (numSegments <= 0) {
+      throw new IllegalArgumentException(
+          "Number of segments must be positive, got: " + numSegments);
+    }
+
+    this.stripedLocks = Striped.lock(numSegments);
+  }
+
+  /**
+   * Gets the segment lock for the given key.
+   *
+   * @param key Object to determine the segment
+   * @return Segment lock for the key
+   */
+  public Lock getSegmentLock(Object key) {
+    return stripedLocks.get(normalizeKey(key));
+  }
+
+  /**
+   * Normalizes the key to handle null values consistently.
+   *
+   * @param key The input key
+   * @return Normalized key (never null)
+   */
+  private Object normalizeKey(Object key) {
+    return key != null ? key : NULL_KEY;
+  }
+
+  /**
+   * Runs action with segment lock for the given key. Will wait if a global 
clearing operation is in
+   * progress.
+   *
+   * @param key Key to determine segment
+   * @param action Action to run
+   * @throws RuntimeException if interrupted
+   */
+  public void withLock(Object key, Runnable action) {
+    waitForGlobalComplete();
+    Lock lock = getSegmentLock(key);
+    try {
+      lock.lockInterruptibly();
+      try {
+        action.run();
+      } finally {
+        lock.unlock();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Thread was interrupted while waiting for 
lock", e);
+    }
+  }
+
+  /**
+   * Runs action with segment lock and returns result. Will wait if a global 
clearing operation is
+   * in progress.
+   *
+   * @param key Key to determine segment
+   * @param action Action to run
+   * @param <T> Result type
+   * @return Action result
+   * @throws RuntimeException if interrupted
+   */
+  public <T> T withLock(Object key, java.util.function.Supplier<T> action) {
+    waitForGlobalComplete();
+    Lock lock = getSegmentLock(key);
+    try {
+      lock.lockInterruptibly();
+      try {
+        return action.get();
+      } finally {
+        lock.unlock();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Thread was interrupted while waiting for 
lock", e);
+    }
+  }
+
+  /**
+   * Runs action with segment lock for the given key. Will wait if a global 
clearing operation is in
+   * progress.
+   *
+   * @param key Key to determine segment
+   * @param action Action to run
+   * @param <T> Result type
+   * @param <E> Exception type
+   * @return Action result
+   * @throws E Exception
+   */
+  public <T, E extends Exception> T withLockAndThrow(
+      Object key, EntityCache.ThrowingSupplier<T, E> action) throws E {
+    waitForGlobalComplete();
+    Lock lock = getSegmentLock(key);
+    try {
+      lock.lockInterruptibly();
+      try {
+        return action.get();
+      } finally {
+        lock.unlock();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
+    }
+  }
+
+  /**
+   * Runs action with segment lock for the given key. Will wait if a global 
clearing operation is in
+   * progress.
+   *
+   * @param key Key to determine segment
+   * @param action Action to run
+   * @param <E> Exception type
+   * @throws E Exception
+   */
+  public <E extends Exception> void withLockAndThrow(
+      Object key, EntityCache.ThrowingRunnable<E> action) throws E {
+    waitForGlobalComplete();
+    Lock lock = getSegmentLock(key);
+    try {
+      lock.lockInterruptibly();
+      try {
+        action.run();
+      } finally {
+        lock.unlock();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Thread was interrupted while waiting 
for lock", e);
+    }
+  }
+
+  /**
+   * Executes a global clearing operation with exclusive access to all 
segments. This method sets
+   * the clearing flag and ensures no other operations can proceed until the 
clearing is complete.
+   *
+   * @param action The clearing action to execute
+   */
+  public void withGlobalLock(Runnable action) {
+    // Create a new CountDownLatch for this operation
+    CountDownLatch latch = new CountDownLatch(1);
+
+    // Atomically set the latch, fail if another operation is already in 
progress
+    if (!globalOperationLatch.compareAndSet(null, latch)) {
+      throw new IllegalStateException("Global operation already in progress");
+    }
+
+    try {
+      synchronized (this) {
+        action.run();
+      }
+    } finally {
+      // Clear state first, then signal completion
+      globalOperationLatch.set(null);
+      latch.countDown();
+    }
+  }
+
+  /**
+   * Waits for any ongoing global operation to complete. This method is called 
by regular operations
+   * to ensure they don't interfere with global operations.
+   */
+  private void waitForGlobalComplete() {
+    CountDownLatch latch = globalOperationLatch.get();
+    if (latch != null) {
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            "Thread was interrupted while waiting for global operation to 
complete", e);
+      }
+    }
+  }
+
+  /** Checks if a global operation is currently in progress. */
+  @VisibleForTesting
+  public boolean isClearing() {
+    return globalOperationLatch.get() != null;
+  }
+
+  /**
+   * Returns number of lock segments.
+   *
+   * @return Number of segments
+   */
+  public int getNumSegments() {
+    return stripedLocks.size();
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 3968381135..15bf0eabd6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -38,6 +38,7 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.cache.CacheFactory;
 import org.apache.gravitino.cache.EntityCache;
+import org.apache.gravitino.cache.EntityCacheRelationKey;
 import org.apache.gravitino.cache.NoOpsCache;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.TagEntity;
@@ -129,6 +130,7 @@ public class RelationalEntityStore
       NameIdentifier ident, Entity.EntityType entityType, Class<E> e)
       throws NoSuchEntityException, IOException {
     return cache.withCacheLock(
+        EntityCacheRelationKey.of(ident, entityType),
         () -> {
           Optional<E> entityFromCache = cache.getIfPresent(ident, entityType);
           if (entityFromCache.isPresent()) {
@@ -210,6 +212,7 @@ public class RelationalEntityStore
       Type relType, NameIdentifier nameIdentifier, Entity.EntityType 
identType, boolean allFields)
       throws IOException {
     return cache.withCacheLock(
+        EntityCacheRelationKey.of(nameIdentifier, identType, relType),
         () -> {
           Optional<List<E>> entities = cache.getIfPresent(relType, 
nameIdentifier, identType);
           if (entities.isPresent()) {
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
index 21d4dcb477..23e8a71f5f 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
@@ -138,6 +138,7 @@ public class TestAccessControlManager {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java 
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
index 889e4a120f..184449d2ec 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
@@ -111,6 +111,7 @@ public class TestOwnerManager {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git 
a/core/src/test/java/org/apache/gravitino/cache/TestSegmentedLock.java 
b/core/src/test/java/org/apache/gravitino/cache/TestSegmentedLock.java
new file mode 100644
index 0000000000..e377c7751f
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/cache/TestSegmentedLock.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/** Test class for SegmentedLock. */
+public class TestSegmentedLock {
+
+  @Test
+  void testPowerOfTwoRounding() {
+    // Test rounding up to power of 2
+    assertEquals(4, new SegmentedLock(3).getNumSegments());
+    assertEquals(8, new SegmentedLock(5).getNumSegments());
+    assertEquals(8, new SegmentedLock(7).getNumSegments());
+    assertEquals(16, new SegmentedLock(9).getNumSegments());
+    assertEquals(16, new SegmentedLock(15).getNumSegments());
+    assertEquals(32, new SegmentedLock(17).getNumSegments());
+    assertEquals(32, new SegmentedLock(31).getNumSegments());
+  }
+
+  // ========== Basic Locking Tests ==========
+
+  @Test
+  void testBasicLocking() {
+    SegmentedLock lock = new SegmentedLock(4);
+    AtomicInteger counter = new AtomicInteger(0);
+
+    lock.withLock("key1", () -> counter.incrementAndGet());
+    assertEquals(1, counter.get());
+
+    lock.withLock("key2", () -> counter.incrementAndGet());
+    assertEquals(2, counter.get());
+  }
+
+  @Test
+  void testLockingWithReturnValue() {
+    SegmentedLock lock = new SegmentedLock(4);
+
+    String result = lock.withLock("key1", () -> "test result");
+    assertEquals("test result", result);
+
+    Integer number = lock.withLock("key2", () -> 42);
+    assertEquals(42, number);
+  }
+
+  // ========== Concurrency Tests ==========
+
+  @Test
+  @Timeout(30)
+  void testConcurrentSegmentAccess() throws InterruptedException {
+    SegmentedLock lock = new SegmentedLock(8);
+    AtomicInteger counter = new AtomicInteger(0);
+    int numThreads = 16;
+    int operationsPerThread = 50;
+
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(numThreads);
+
+    for (int i = 0; i < numThreads; i++) {
+      final int threadId = i;
+      executor.submit(
+          () -> {
+            try {
+              startLatch.await();
+              for (int j = 0; j < operationsPerThread; j++) {
+                String key = "key" + (threadId % 8);
+                lock.withLock(key, () -> counter.incrementAndGet());
+              }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            } finally {
+              finishLatch.countDown();
+            }
+          });
+    }
+
+    startLatch.countDown();
+    assertTrue(finishLatch.await(30, TimeUnit.SECONDS));
+    assertEquals(numThreads * operationsPerThread, counter.get());
+
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testSegmentDistribution() {
+    SegmentedLock lock = new SegmentedLock(4);
+
+    // Test that different keys can be processed without exceptions
+    for (int i = 0; i < 100; i++) {
+      String key = "key" + i;
+      assertDoesNotThrow(() -> lock.withLock(key, () -> {}));
+    }
+  }
+
+  @Test
+  void testNullKeyHandling() {
+    SegmentedLock lock = new SegmentedLock(4);
+
+    assertDoesNotThrow(() -> lock.withLock(null, () -> {}));
+    assertDoesNotThrow(() -> lock.withLock(null, () -> "result"));
+  }
+
+  @Test
+  void testDifferentKeyTypes() {
+    SegmentedLock lock = new SegmentedLock(4);
+    AtomicInteger counter = new AtomicInteger(0);
+
+    lock.withLock("stringKey", () -> counter.incrementAndGet());
+    lock.withLock(123, () -> counter.incrementAndGet());
+    lock.withLock(123L, () -> counter.incrementAndGet());
+    lock.withLock(123.45, () -> counter.incrementAndGet());
+    lock.withLock(new Object(), () -> counter.incrementAndGet());
+
+    assertEquals(5, counter.get());
+  }
+
+  @Test
+  void testInterruptedExceptionHandling() throws InterruptedException {
+    SegmentedLock lock = new SegmentedLock(4);
+    AtomicInteger counter = new AtomicInteger(0);
+
+    Thread testThread =
+        new Thread(
+            () -> {
+              try {
+                lock.withLock(
+                    "testKey",
+                    () -> {
+                      try {
+                        Thread.sleep(1000);
+                      } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException("Interrupted during 
operation", e);
+                      }
+                      counter.incrementAndGet();
+                    });
+              } catch (RuntimeException e) {
+                assertTrue(e.getMessage().contains("interrupted"));
+              }
+            });
+
+    testThread.start();
+    Thread.sleep(100);
+    testThread.interrupt();
+    testThread.join(2000);
+
+    assertEquals(0, counter.get());
+  }
+
+  @Test
+  void testLockReentrancy() {
+    SegmentedLock lock = new SegmentedLock(4);
+    AtomicInteger counter = new AtomicInteger(0);
+
+    lock.withLock(
+        "sameKey",
+        () -> {
+          counter.incrementAndGet();
+          lock.withLock(
+              "sameKey",
+              () -> {
+                counter.incrementAndGet();
+                lock.withLock("sameKey", () -> counter.incrementAndGet());
+              });
+        });
+
+    assertEquals(3, counter.get());
+  }
+
+  // ========== Performance and Stress Tests ==========
+
+  @Test
+  @Timeout(60)
+  void testHighConcurrencyStress() throws InterruptedException {
+    SegmentedLock lock = new SegmentedLock(16);
+    AtomicInteger counter = new AtomicInteger(0);
+    int numThreads = 32;
+    int operationsPerThread = 1000;
+
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(numThreads);
+
+    for (int i = 0; i < numThreads; i++) {
+      final int threadId = i;
+      executor.submit(
+          () -> {
+            try {
+              startLatch.await();
+              for (int j = 0; j < operationsPerThread; j++) {
+                String key = "key" + (threadId % 16);
+                lock.withLock(key, () -> counter.incrementAndGet());
+              }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            } finally {
+              finishLatch.countDown();
+            }
+          });
+    }
+
+    startLatch.countDown();
+    assertTrue(finishLatch.await(60, TimeUnit.SECONDS));
+    assertEquals(numThreads * operationsPerThread, counter.get());
+
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testLargeNumberOfSegments() {
+    SegmentedLock lock = new SegmentedLock(1024);
+    assertEquals(1024, lock.getNumSegments());
+
+    AtomicInteger counter = new AtomicInteger(0);
+    for (int i = 0; i < 1000; i++) {
+      lock.withLock("key" + i, () -> counter.incrementAndGet());
+    }
+    assertEquals(1000, counter.get());
+  }
+
+  @Test
+  void testLongRunningOperations() throws InterruptedException {
+    SegmentedLock lock = new SegmentedLock(4);
+    AtomicInteger completed = new AtomicInteger(0);
+    int numThreads = 4;
+    int operationDurationMs = 100;
+
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch finishLatch = new CountDownLatch(numThreads);
+
+    for (int i = 0; i < numThreads; i++) {
+      final int threadId = i;
+      executor.submit(
+          () -> {
+            try {
+              startLatch.await();
+              String key = "key" + threadId;
+              lock.withLock(
+                  key,
+                  () -> {
+                    try {
+                      Thread.sleep(operationDurationMs);
+                    } catch (InterruptedException e) {
+                      Thread.currentThread().interrupt();
+                    }
+                    completed.incrementAndGet();
+                  });
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            } finally {
+              finishLatch.countDown();
+            }
+          });
+    }
+
+    startLatch.countDown();
+    assertTrue(finishLatch.await(30, TimeUnit.SECONDS));
+    assertEquals(numThreads, completed.get());
+
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testGlobalClearingBlocksOtherOperations() throws InterruptedException {
+    SegmentedLock lock = new SegmentedLock(4);
+    AtomicInteger counter = new AtomicInteger(0);
+    CountDownLatch clearingStarted = new CountDownLatch(1);
+    CountDownLatch clearingFinished = new CountDownLatch(1);
+    CountDownLatch operationFinished = new CountDownLatch(1);
+
+    // Start a long-running clearing operation
+    Thread clearingThread =
+        new Thread(
+            () -> {
+              try {
+                lock.withGlobalLock(
+                    () -> {
+                      clearingStarted.countDown();
+                      try {
+                        Thread.sleep(100); // Simulate long clearing operation
+                      } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                      }
+                      counter.incrementAndGet();
+                    });
+              } finally {
+                clearingFinished.countDown();
+              }
+            });
+
+    // Start a regular operation that should wait
+    Thread operationThread =
+        new Thread(
+            () -> {
+              try {
+                clearingStarted.await(); // Wait for clearing to start
+                lock.withLock("key1", () -> counter.incrementAndGet());
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+              } finally {
+                operationFinished.countDown();
+              }
+            });
+
+    clearingThread.start();
+    operationThread.start();
+
+    // Wait for clearing to start
+    assertTrue(clearingStarted.await(1, TimeUnit.SECONDS));
+
+    // Check that clearing is in progress
+    assertTrue(lock.isClearing());
+
+    // Wait for clearing to finish
+    assertTrue(clearingFinished.await(2, TimeUnit.SECONDS));
+
+    // Wait for operation to finish
+    assertTrue(operationFinished.await(2, TimeUnit.SECONDS));
+
+    // Both operations should have completed
+    assertEquals(2, counter.get());
+
+    // Clearing should no longer be in progress
+    assertFalse(lock.isClearing());
+
+    clearingThread.join();
+    operationThread.join();
+  }
+
+  @Test
+  void testConcurrentGlobalClearing() {
+    SegmentedLock lock = new SegmentedLock(4);
+
+    // First clearing operation should succeed
+    assertDoesNotThrow(() -> lock.withGlobalLock(() -> {}));
+
+    // Second concurrent clearing operation should fail
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          lock.withGlobalLock(
+              () -> {
+                // Try to start another clearing operation
+                lock.withGlobalLock(() -> {});
+              });
+        });
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java 
b/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
index aafe2adc5f..ac76ee81f2 100644
--- a/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
+++ b/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
@@ -202,6 +202,7 @@ public class TestPolicyManager {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java 
b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
index 03cb28245d..2529702f7b 100644
--- a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
+++ b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
@@ -116,6 +116,7 @@ public class TestStatisticManager {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
     Mockito.when(config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS))
         
.thenReturn(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
 
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index 44d9134531..0e8298a2a0 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -151,6 +151,7 @@ public class TestEntityStorage {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     BaseIT baseIT = new BaseIT();
 
diff --git a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java 
b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
index 5a7fc87d32..c1ca78e8cf 100644
--- a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
+++ b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
@@ -139,6 +139,7 @@ public class TestTagManager {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
diff --git a/docs/gravitino-server-config.md b/docs/gravitino-server-config.md
index 3d2a90bd6b..211cb0faa2 100644
--- a/docs/gravitino-server-config.md
+++ b/docs/gravitino-server-config.md
@@ -85,6 +85,9 @@ gravitino.cache.enabled=true
 
 # Specify the cache implementation (no need to use the fully qualified class 
name)
 gravitino.cache.implementation=caffeine
+
+# Number of lock segments for cache concurrency optimization
+gravitino.cache.lockSegments=16
 ```
 
 | Configuration Key                | Description                               
 | Default Value          | Required | Since Version |
@@ -95,6 +98,7 @@ gravitino.cache.implementation=caffeine
 | `gravitino.cache.expireTimeInMs` | Cache expiration time (in milliseconds)   
 | `3600000` (about 1 hr) | No       | 1.0.0         |
 | `gravitino.cache.enableStats`    | Whether to enable cache statistics 
logging | `false`                | No       | 1.0.0         |
 | `gravitino.cache.enableWeigher`  | Whether to enable weight-based eviction   
 | `true`                 | No       | 1.0.0         |
+| `gravitino.cache.lockSegments`   | Number of lock segments.                  
 | `16`                   | No       | 1.0.0         |
 
 - `gravitino.cache.enableWeigher`: When enabled, eviction is based on weight 
and `maxEntries` will be ignored.
 - `gravitino.cache.expireTimeInMs`: Controls the cache TTL in milliseconds.


Reply via email to