This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new fa07adf063 [#7177] feat(core): Add jcstress for CaffeineEntityCache. 
(#7414)
fa07adf063 is described below

commit fa07adf0634d9b7e6f2592852a6acb31d4fd54c1
Author: Lord of Abyss <[email protected]>
AuthorDate: Mon Jun 23 10:50:57 2025 +0800

    [#7177] feat(core): Add jcstress for CaffeineEntityCache. (#7414)
    
    ### What changes were proposed in this pull request?
    
    Add JCStress tests for EntityCache and index consistency. During
    testing, two issues were identified:
    
    1. The method `put(NameIdentifier ident, Entity.EntityType type,
    SupportsRelationOperations.Type relType, List<E> entities)` is missing a
    synchronization lock, which may lead to race conditions in concurrent
    scenarios.
    2. A `java.lang.OutOfMemoryError`: unable to create new native thread
    occurred during JCStress runs. This was due to creating a new
    CaffeineEntityCache instance in each test iteration. Each instance
    internally initializes thread pools, and with a large number of
    iterations, this results in excessive thread creation and ultimately
    exhausts system resources.
    
    Refactor CaffeineEntityCache to use a static shared thread pool instead
    of creating a new one per instance. This ensures that all cache
    instances reuse the same executor, significantly reducing thread
    overhead and preventing resource exhaustion during stress testing.
    Since cleanup is only triggered for evictions due to expiration, size
    limit, or weight limit (excluding explicit and replacement removals),
    the resource overhead of this `removalListener` remains minimal.
    
    Test results as follows:
    
    
    
![image](https://github.com/user-attachments/assets/31aa5029-fffd-4d64-b62f-232fbc468003)
    
    Result of `ConcurrentPutDifferentKeysWithRelationTest` as follows:
    
    
    
![image](https://github.com/user-attachments/assets/a7f97700-5c5f-4577-a8dc-1f7282ed7b92)
    
    
    ### Task list
    
    - [X] Implement the methods related to the `SupportsEntityStoreCache`
    and `SupportsRelationEntityCache` interface in `CaffeineEntityCache`.
    - [X] Add unit tests for both index and cache.
    - [X] Use JCStress to perform multi-threaded testing on
    `CaffeineEntityCache` and the `CacheIndex`.
    
    ### Why are the changes needed?
    
    Fix: #7177
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    local test.
    
    ---------
    
    Co-authored-by: liuxian <[email protected]>
    Co-authored-by: liuxian131 <[email protected]>
---
 LICENSE.bin                                        |   1 +
 core/build.gradle.kts                              |  20 +-
 core/src/jcstress/README.md                        | 281 +++++++++
 .../gravitino/cache/TestCacheIndexCoherence.java   | 516 ++++++++++++++++
 .../cache/TestCaffeineEntityCacheCoherence.java    | 661 +++++++++++++++++++++
 .../gravitino/cache/CaffeineEntityCache.java       |  53 +-
 gradle/libs.versions.toml                          |   2 +
 7 files changed, 1504 insertions(+), 30 deletions(-)

diff --git a/LICENSE.bin b/LICENSE.bin
index dc0d66c318..4e075212bc 100644
--- a/LICENSE.bin
+++ b/LICENSE.bin
@@ -377,6 +377,7 @@
    Jettison
    Awaitility
    npgall concurrent-trees
+   reyerizo jcstress-gradle-plugin
 
    This product bundles various third-party components also under the
    Apache Software Foundation License 1.1
diff --git a/core/build.gradle.kts b/core/build.gradle.kts
index 29948009fa..43e4cb5574 100644
--- a/core/build.gradle.kts
+++ b/core/build.gradle.kts
@@ -1,3 +1,5 @@
+import net.ltgt.gradle.errorprone.errorprone
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,11 +18,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import net.ltgt.gradle.errorprone.errorprone
 plugins {
   `maven-publish`
   id("java")
   id("idea")
+  alias(libs.plugins.jcstress)
 }
 
 dependencies {
@@ -61,6 +63,8 @@ dependencies {
   testImplementation(libs.testcontainers)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
+
+  jcstressImplementation(libs.mockito.core)
 }
 
 tasks.test {
@@ -73,7 +77,19 @@ tasks.test {
 }
 
 tasks.withType<JavaCompile>().configureEach {
-  if (name.contains("test", ignoreCase = true)) {
+  if (name.contains("jcstress", ignoreCase = true)) {
     options.errorprone?.excludedPaths?.set(".*/generated/.*")
   }
 }
+
+jcstress {
+  /*
+   Available modes:
+   - sanity : takes seconds
+   - quick : takes tens of seconds
+   - default : takes minutes, good number of iterations
+   - tough : takes tens of minutes, large number of iterations, most reliable
+    */
+  mode = "default"
+  jvmArgsPrepend = "-Djdk.stdout.sync=true"
+}
diff --git a/core/src/jcstress/README.md b/core/src/jcstress/README.md
new file mode 100644
index 0000000000..cc4931ab05
--- /dev/null
+++ b/core/src/jcstress/README.md
@@ -0,0 +1,281 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# JCStress-Based Concurrency Tests for Gravitino
+
+## Overview
+
+This folder contains concurrency tests for Gravitino, leveraging the JCStress 
framework to uncover subtle thread-safety issues. These tests are designed to 
expose concurrency problems that traditional unit tests often miss—such as data 
races, visibility violations, and atomicity errors.
+
+## What is JCStress?
+
+[JCStress](https://github.com/openjdk/jcstress) is a concurrency 
stress-testing tool developed by Oracle to verify the correctness of 
multithreaded Java code. Unlike unit tests, JCStress:
+
+- Executes test scenarios across multiple threads.
+- Repeats executions with varying interleavings.
+- Reveals low-probability bugs due to concurrency.
+- Reports outcomes and their frequencies, categorized as acceptable, 
forbidden, or interesting.
+
+## Running the Tests
+
+In Gravitino, we use the `jcstress-gradle-plugin` to integrate JCStress 
testing into the Gradle build system.
+
+This plugin simplifies running JCStress tests by:
+
+- Automatically generating the test harness.
+- Providing built-in Gradle tasks (e.g., `:core:jcstress`).
+- Generating detailed HTML reports under `build/reports/jcstress/`.
+
+To run the jcstress tests in Gravitino, use the following Gradle command:
+
+```bash
+./gradlew :core:jcstress 
+```
+
+or a subset of your tests:
+
+```bash
+gradle jcstress --tests "MyFirstTest|MySecondTest"
+```
+
+After execution, reports are available at:
+`core/build/reports/jcstress/index.html`
+
+> The test suite may take several minutes depending on test complexity and 
available CPU resources.
+
+## Understanding Test Results
+
+JCStress test results are presented in a table format showing:
+
+- Observed outcomes and their frequencies
+- Whether each outcome is expected or forbidden
+- Total number of iterations
+- Time taken for the test
+
+A "FAILED" result means that a forbidden outcome was observed, indicating a 
potential concurrency issue.
+
+## Configuration
+
+If you need to customize the configuration, add a block like the following to 
configure the plugin:
+
+```plantuml
+jcstress {
+    verbose = "true"
+    timeMillis = "200"
+    spinStyle = "THREAD_YIELD"
+}
+```
+
+These are all possible configuration options:
+
+| Name             | Description                                               
                                                                                
                                                                               |
+|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `affinityMode`   | Use the specific affinity mode, if available. `NONE` = No 
affinity whatsoever; `GLOBAL` = Affnity for the entire JVM; `LOCAL` = Affinity 
for the individual actors.                                                      
|
+| `cpuCount`       | Number of CPUs to use. Defaults to all CPUs in the 
system. Reducing the number of CPUs limits the amount of resources (including 
memory) the run is using.                                                       
        |
+| `heapPerFork`    | Java heap size per fork, in megabytes. This affects the 
stride size: maximum footprint will never be exceeded, regardless of min/max 
stride sizes.                                                                   
    |
+| `forkMultiplier` | "Fork multiplier for randomized/stress tests. This allows 
more efficient randomized testing, as each fork would use a different seed."    
                                                                               |
+| `forks`          | Should fork each test N times. Must be 1 or higher.       
                                                                                
                                                                               |
+| `iterations`     | Iterations per test.                                      
                                                                                
                                                                               |
+| `jvmArgs`        | Use given JVM arguments. This disables JVM flags 
auto-detection, and runs only the single JVM mode. Either a single 
space-separated option line, or multiple options are accepted. This option only 
affects forked runs. |
+| `jvmArgsPrepend` | Prepend given JVM arguments to auto-detected 
configurations. This option only affects forked runs."                          
                                                                                
            |
+| `mode`           | Test mode preset: `sanity`, `quick`, `default`, `tough`, 
`stress`.                                                                       
                                                                                
|
+| `regexp`         | Regexp selector for tests.                                
                                                                                
                                                                               |
+| `reportDir`      | Target destination to put the report into.                
                                                                                
                                                                               |
+| `spinStyle`      | Busy loop wait style. `HARD` = hard busy loop; 
`THREAD_YIELD` = use `Thread.yield()`; `THREAD_SPIN_WAIT` = use 
`Thread.onSpinWait()`; `LOCKSUPPORT_PARK_NANOS` = use 
`LockSupport.parkNanos()`.                          |
+| `splitPerActor`  | Use split per-actor compilation mode, if available.       
                                                                                
                                                                               |
+| `strideCount`    | Internal stride count per epoch. Larger value increases 
cache footprint.                                                                
                                                                                
 |
+| `strideSize`     | Internal stride size. Larger value decreases the 
synchronization overhead, but also reduces the number of collisions.            
                                                                                
        |
+| `timeMillis`     | Time to spend in single test iteration. Larger value 
improves test reliability, since schedulers do better job in the long run.      
                                                                                
    |
+| `verbose`        | Be extra verbose.                                         
                                                                                
                                                                               |
+
+## Writing JCStress Tests
+
+JCStress tests can be written in two main styles:
+
+### Style 1: Arbiter-Based Tests
+
+This style uses a separate method to observe the final state:
+
+1. Define shared state variables
+2. Create methods annotated with `@Actor` that will run concurrently
+3. Define an `@Arbiter` method to check the final state
+4. Use `@JCStressTest` and `@Outcome` annotations to specify expected results
+
+Example:
+```java
+@JCStressTest
+@Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Normal outcome")
+@Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Race condition")
+@State
+public class ArbiterTest {
+    int x;
+
+    @Actor
+    void actor1() {
+        x = 1;
+    }
+
+    @Actor
+    void actor2() {
+        x = 1;
+    }
+
+    @Arbiter
+    void arbiter(I_Result r) {
+        r.r1 = x;
+    }
+}
+```
+
+### Style 2: Direct Result Reporting
+
+Actors can directly report results by accepting a result parameter:
+
+1. Define shared state variables
+2. Create methods annotated with `@Actor` that accept a result parameter
+3. Each actor records its observations directly
+4. Use `@JCStressTest` and `@Outcome` annotations to specify expected results
+
+Example:
+```java
+@JCStressTest
+@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE, desc = "Thread 2 sees the 
write")
+@Outcome(id = "0, 0", expect = Expect.ACCEPTABLE, desc = "Thread 2 doesn't see 
the write")
+@State
+public class DirectReportingTest {
+    int x;
+
+    @Actor
+    void actor1() {
+        x = 1;
+    }
+
+    @Actor
+    void actor2(II_Result r) {
+        r.r1 = 0;
+        r.r2 = x; // Records what this thread observes
+    }
+}
+```
+
+You may add a `@Description` annotation to briefly explain the test scenario. 
However, avoid writing overly long descriptions, as this can trigger a 
`StringIndexOutOfBoundsException` due to a known limitation in the JCStress 
result frame parser.
+
+Choose the style based on your test needs:
+- Use an arbiter when the final state needs to be observed after all actors 
have finished.
+- Use direct reporting when actors need to record observations during their 
execution.
+- Direct reporting is particularly useful for testing visibility and ordering 
guarantees.
+
+A complete example is shown below. It demonstrates how to test the 
thread-safety of concurrent `put()` operations on distinct keys in a cache 
implementation.
+
+First, I use two @Actor methods to perform `put()` operations on two different 
entities (e.g., a schema and a table) concurrently. Then, I use an `@Arbiter` 
method to verify how many of these entities are visible in the cache after both 
operations complete. The test defines multiple `@Outcome` values to capture 
possible results:
+
+- If both entries are present, the result is 2, which is the expected and 
acceptable outcome.
+- If only one is present, the result is 1, indicating a potential visibility 
or atomicity issue.
+- If neither is present, the result is 0, which reveals a serious concurrency 
failure.
+
+```java
+public class xxx {
+  // some entity to test.
+  
+  @JCStressTest
+  @Outcome.Outcomes({
+      @Outcome(id = "2", expect = Expect.ACCEPTABLE, desc = "Both put() calls 
succeeded; both entries are visible in the cache."),
+      @Outcome(id = "1", expect = Expect.FORBIDDEN, desc = "Only one entry is 
visible; potential visibility or atomicity issue."),
+      @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Neither entry is 
visible; indicates a serious failure in write propagation or cache logic.")})
+  @Description("Concurrent put() on different keys. Both schema and table 
should be visible (result = 2). " + "Lower results may indicate visibility or 
concurrency issues.")
+  @State
+
+  public static class ConcurrentPutDifferentKeysTest {
+    private final EntityCache cache;
+
+    public ConcurrentPutDifferentKeysTest() {
+      this.cache = new CaffeineEntityCache(new Config() {
+      });
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor2() {
+      cache.put(tableEntity);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      int count = 0;
+      if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())) 
count++;
+      if (cache.contains(tableEntity.nameIdentifier(), tableEntity.type())) 
count++;
+
+      r.r1 = count;
+    }
+  }
+  
+  // ... other test classes
+}
+```
+
+This test helps ensure that the cache implementation handles concurrent writes 
correctly and does not lose visibility or violate consistency guarantees.
+
+## JCStress Test Lifecycle and Execution Modes
+
+JCStress follows a well-defined execution model to simulate concurrent 
interactions:
+
+1. **State Initialization**: A fresh instance of the test class (annotated 
with `@State`) is created before each test iteration to ensure full isolation.
+2. **Concurrent Execution of Actors**: All methods annotated with `@Actor` are 
invoked concurrently, each on a separate thread. These methods simulate the 
real-world concurrent behavior being tested.
+3. **Synchronization Barrier**: After both (or all) actors complete, JCStress 
inserts a memory fence to ensure all actions are visible to the next phase.
+4. **Arbiter Execution (Optional)**: If an `@Arbiter` method is defined, it 
runs after all actor methods have completed. It observes the final state and 
writes results to the `*Result` object.
+5. **Result Collection and Classification**: The outcome of each iteration is 
recorded and matched against the `@Outcome` definitions. JCStress performs this 
process millions of times with different thread interleavings to uncover rare 
concurrency issues.
+
+JCStress offers several execution modes to control the intensity and duration 
of the tests. Available modes:
+
+| Mode      | Description                                                      
                                                 |
+|-----------|-------------------------------------------------------------------------------------------------------------------|
+| `sanity`  | Runs for a few seconds with minimal iterations. Useful to check 
test setup.                                       |
+| `quick`   | Runs for a short duration with a moderate number of iterations. 
Good for fast feedback during development.        |
+| `default` | Balanced mode with sufficient iterations. Takes a few minutes. 
Recommended for regular use.                       |
+| `tough`   | Longest and most thorough mode. Runs many iterations over tens 
of minutes. Best for CI and production validation. |
+
+## Concurrency Issues We Target
+
+Our JCStress tests help identify several types of concurrency problems:
+
+1. **Race Conditions**: When multiple threads access shared data without 
proper synchronization
+2. **Memory Visibility**: When changes made by one thread are not visible to 
other threads
+3. **Atomicity Violations**: When operations that should be atomic can be 
interrupted
+4. **Ordering Issues**: When operations execute in unexpected orders due to 
compiler or CPU reordering
+
+## Best Practices
+
+When working with JCStress tests:
+
+- Keep tests focused on a single concurrency aspect
+- Use meaningful names that describe the scenario being tested
+- Document expected outcomes and their reasoning
+- Run tests multiple times, as issues may not appear in every run
+- Review test results carefully, especially for tests with multiple possible 
valid outcomes
+
+## Resources
+
+- [JCStress Official 
Documentation](https://wiki.openjdk.java.net/display/CodeTools/jcstress)
+- [Java Memory Model 
Specification](https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.4)
+- [Doug Lea's JSR-133 Cookbook](http://gee.cs.oswego.edu/dl/jmm/cookbook.html)
+- [jcstress-gradle-plugin](https://github.com/reyerizo/jcstress-gradle-plugin)
\ No newline at end of file
diff --git 
a/core/src/jcstress/java/org/apache/gravitino/cache/TestCacheIndexCoherence.java
 
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCacheIndexCoherence.java
new file mode 100644
index 0000000000..ffeb04bcf8
--- /dev/null
+++ 
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCacheIndexCoherence.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.time.Instant;
+import java.util.Objects;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.openjdk.jcstress.annotations.Actor;
+import org.openjdk.jcstress.annotations.Arbiter;
+import org.openjdk.jcstress.annotations.Description;
+import org.openjdk.jcstress.annotations.Expect;
+import org.openjdk.jcstress.annotations.JCStressTest;
+import org.openjdk.jcstress.annotations.Outcome;
+import org.openjdk.jcstress.annotations.State;
+import org.openjdk.jcstress.infra.results.II_Result;
+import org.openjdk.jcstress.infra.results.I_Result;
+
+public class TestCacheIndexCoherence {
+  private static SchemaEntity getTestSchemaEntity(
+      long id, String name, Namespace namespace, String comment) {
+    return SchemaEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(namespace)
+        .withAuditInfo(getTestAuditInfo())
+        .withComment(comment)
+        .withProperties(ImmutableMap.of())
+        .build();
+  }
+
+  private static AuditInfo getTestAuditInfo() {
+    return AuditInfo.builder()
+        .withCreator("admin")
+        .withCreateTime(Instant.now())
+        .withLastModifier("admin")
+        .withLastModifiedTime(Instant.now())
+        .build();
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key inserted and 
visible"),
+    @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Key not found")
+  })
+  @Description(
+      "Tests the coherence of ConcurrentRadixTree under concurrent insertions 
with the same key. "
+          + "Both threads concurrently insert the same key-value pair into the 
radix tree. The expected "
+          + "behavior is that the tree maintains a single mapping without data 
loss or duplication. A "
+          + "forbidden result indicates insertion was lost or the tree became 
inconsistent, violating "
+          + "atomicity or thread-safety guarantees.")
+  @State
+  public static class InsertSameKeyCoherenceTest {
+    private final RadixTree<EntityCacheKey> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    private final NameIdentifier ident = NameIdentifier.of("metalake1", 
"catalog1", "schema1");
+    private final Entity entity =
+        getTestSchemaEntity(123L, "schema1", Namespace.of("metalake1", 
"catalog1"), "ident1");
+    private final EntityCacheKey key = EntityCacheKey.of(ident, entity.type());
+    private final String keyStr = key.toString();
+
+    @Actor
+    public void actor1() {
+      indexTree.put(keyStr, key);
+    }
+
+    @Actor
+    public void actor2() {
+      indexTree.put(keyStr, key);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      EntityCacheKey valueForExactKey = indexTree.getValueForExactKey(keyStr);
+      r.r1 =
+          (valueForExactKey != null
+                  && Objects.equals(valueForExactKey, key)
+                  && indexTree.size() == 1)
+              ? 1
+              : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key inserted and 
visible"),
+    @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Key not found")
+  })
+  @Description(
+      "Tests that ConcurrentRadixTree handles concurrent put() operations for 
the same key with a "
+          + "relation type. Both threads insert the same key-value pair where 
the key includes a "
+          + "relation type (ROLE_USER_REL). The test expects the tree to 
maintain a single entry. A "
+          + "forbidden result indicates incorrect key handling or data loss 
under concurrency.")
+  @State
+  public static class InsertSameKeyWithRelationTypeCoherenceTest {
+    private final RadixTree<EntityCacheKey> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    private final NameIdentifier ident = NameIdentifierUtil.ofRole("metalake", 
"role");
+    private final EntityCacheKey key =
+        EntityCacheKey.of(
+            ident, Entity.EntityType.ROLE, 
SupportsRelationOperations.Type.ROLE_USER_REL);
+    private final String keyStr = key.toString();
+
+    @Actor
+    public void actor1() {
+      indexTree.put(keyStr, key);
+    }
+
+    @Actor
+    public void actor2() {
+      indexTree.put(keyStr, key);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      EntityCacheKey valueForExactKey = indexTree.getValueForExactKey(keyStr);
+      r.r1 =
+          (valueForExactKey != null
+                  && Objects.equals(valueForExactKey, key)
+                  && indexTree.size() == 1)
+              ? 1
+              : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "multiple Key 
inserted and visible"),
+    @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "some Key not found")
+  })
+  @Description(
+      "Tests that ConcurrentRadixTree maintains consistency under concurrent 
insertions of two "
+          + "different keys. Each thread inserts a unique key-value pair into 
the radix tree. "
+          + "Expected behavior is that both keys are stored and visible. A 
forbidden result "
+          + "indicates data loss or broken concurrency guarantees.")
+  @State
+  public static class InsertMultipleKeyCoherenceTest {
+    private final RadixTree<EntityCacheKey> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    private final NameIdentifier ident1 = NameIdentifier.of("metalake1", 
"catalog1", "schema1");
+    private final Entity entity1 =
+        getTestSchemaEntity(123L, "schema1", Namespace.of("metalake1", 
"catalog1"), "ident1");
+    private final NameIdentifier ident2 = NameIdentifier.of("metalake1", 
"catalog1", "schema2");
+    private final Entity entity2 =
+        getTestSchemaEntity(456L, "schema2", Namespace.of("metalake1", 
"catalog1"), "ident2");
+    private final EntityCacheKey key1 = EntityCacheKey.of(ident1, 
entity1.type());
+    private final EntityCacheKey key2 = EntityCacheKey.of(ident2, 
entity2.type());
+    private final String key1Str = key1.toString();
+    private final String key2Str = key2.toString();
+
+    @Actor
+    public void actor1() {
+      indexTree.put(key1Str, key1);
+    }
+
+    @Actor
+    public void actor2() {
+      indexTree.put(key2Str, key2);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      EntityCacheKey valueForExactKey1 = 
indexTree.getValueForExactKey(key1Str);
+      EntityCacheKey valueForExactKey2 = 
indexTree.getValueForExactKey(key2Str);
+      r.r1 =
+          (valueForExactKey1 != null
+                  && valueForExactKey2 != null
+                  && Objects.equals(valueForExactKey1, key1)
+                  && Objects.equals(valueForExactKey2, key2)
+                  && indexTree.size() == 2)
+              ? 1
+              : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "multiple Key 
inserted and visible"),
+    @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "some Key not found")
+  })
+  @Description(
+      "Tests ConcurrentRadixTree under concurrent insertions of different keys 
with relation types. "
+          + "Thread 1 inserts a key with ROLE_USER_REL, and Thread 2 inserts a 
key with ROLE_GROUP_REL. "
+          + "Both keys share the same NameIdentifier but differ by relation 
type. The tree should retain "
+          + "both entries. A forbidden result indicates relation type is not 
correctly distinguished.")
+  @State
+  public static class InsertMultipleKeyWithRelationTypeCoherenceTest {
+    private final RadixTree<EntityCacheKey> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    private final NameIdentifier ident1 = 
NameIdentifierUtil.ofRole("metalake", "role1");
+    private final NameIdentifier ident2 = 
NameIdentifierUtil.ofRole("metalake", "role1");
+
+    private final EntityCacheKey key1 =
+        EntityCacheKey.of(
+            ident1, Entity.EntityType.ROLE, 
SupportsRelationOperations.Type.ROLE_USER_REL);
+    private final EntityCacheKey key2 =
+        EntityCacheKey.of(
+            ident2, Entity.EntityType.ROLE, 
SupportsRelationOperations.Type.ROLE_GROUP_REL);
+    private final String key1Str = key1.toString();
+    private final String key2Str = key2.toString();
+
+    @Actor
+    public void actor1() {
+      indexTree.put(key1Str, key1);
+    }
+
+    @Actor
+    public void actor2() {
+      indexTree.put(key2Str, key2);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      EntityCacheKey valueForExactKey1 = 
indexTree.getValueForExactKey(key1Str);
+      EntityCacheKey valueForExactKey2 = 
indexTree.getValueForExactKey(key2Str);
+      r.r1 =
+          (valueForExactKey1 != null
+                  && valueForExactKey2 != null
+                  && Objects.equals(valueForExactKey1, key1)
+                  && Objects.equals(valueForExactKey2, key2)
+                  && indexTree.size() == 2)
+              ? 1
+              : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key does not exist 
after remove."),
+    @Outcome(
+        id = "0",
+        expect = Expect.ACCEPTABLE_INTERESTING,
+        desc = "Key still exists due to put/remove race.")
+  })
+  @Description(
+      "Tests the race condition between put() and remove() on the same key in 
ConcurrentRadixTree. "
+          + "Thread 1 inserts a key, while Thread 2 removes it concurrently. 
Depending on execution "
+          + "order, the final visibility may vary. If the key is removed 
successfully, it's acceptable. "
+          + "If the key remains due to put-after-remove, it's considered 
interesting but not forbidden.")
+  @State
+  public static class PutRemoveSameKeyCoherenceTest {
+
+    private final RadixTree<String> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+    private final String key = "catalog.schema.table";
+
+    @Actor
+    public void actorPut() {
+      indexTree.put(key, "v1");
+    }
+
+    @Actor
+    public void actorRemove() {
+      indexTree.remove(key);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      String value = indexTree.getValueForExactKey(key);
+      r.r1 = (value == null) ? 1 : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key does not exist 
after remove."),
+    @Outcome(
+        id = "0",
+        expect = Expect.ACCEPTABLE_INTERESTING,
+        desc = "Key still exists due to put/remove race.")
+  })
+  @Description(
+      "Tests the race condition between put() and remove() on a key with 
relation type in "
+          + "ConcurrentRadixTree. Thread 1 inserts a key that includes a 
relation type "
+          + "(ROLE_USER_REL), while Thread 2 concurrently removes the same 
key. If remove "
+          + "happens after put, the key is deleted, which is acceptable. If 
put happens after remove, "
+          + "the key remains, which is interesting but not forbidden.")
+  @State
+  public static class PutRemoveSameKeyWithRelationTypeCoherenceTest {
+
+    private final RadixTree<EntityCacheKey> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    private final NameIdentifier ident = NameIdentifierUtil.ofRole("metalake", 
"role");
+    private final EntityCacheKey key =
+        EntityCacheKey.of(
+            ident, Entity.EntityType.ROLE, 
SupportsRelationOperations.Type.ROLE_USER_REL);
+    private final String keyStr = key.toString();
+
+    @Actor
+    public void actorPut() {
+      indexTree.put(keyStr, key);
+    }
+
+    @Actor
+    public void actorRemove() {
+      indexTree.remove(keyStr);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      EntityCacheKey value = indexTree.getValueForExactKey(keyStr);
+      r.r1 = (value == null) ? 1 : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "1, 1",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both values are visible — correct and expected."),
+    @Outcome(
+        id = "1, 0",
+        expect = Expect.FORBIDDEN,
+        desc = "Only v1 is visible — inconsistent cache state."),
+    @Outcome(
+        id = "0, 1",
+        expect = Expect.FORBIDDEN,
+        desc = "Only v2 is visible — inconsistent cache state."),
+    @Outcome(
+        id = "0, 0",
+        expect = Expect.FORBIDDEN,
+        desc = "No value visible — inconsistent cache state.")
+  })
+  @Description(
+      "Tests race conditions between concurrent put() operations and a prefix 
scan. Thread 1 inserts "
+          + "'table1' and Thread 2 inserts 'table2', both under the same 
prefix. Arbiter performs a "
+          + "prefix scan using getValuesForKeysStartingWith(). Both values 
should be visible. "
+          + "Missing any indicates a broken visibility guarantee, which is 
forbidden.")
+  @State
+  public static class PutAndPrefixScanCoherenceTest {
+
+    private final RadixTree<String> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    private static final String PREFIX = "catalog.schema.";
+    private static final String KEY1 = PREFIX + "table1";
+    private static final String KEY2 = PREFIX + "table2";
+
+    @Actor
+    public void actorPut1() {
+      indexTree.put(KEY1, "v1");
+    }
+
+    @Actor
+    public void actorPut2() {
+      indexTree.put(KEY2, "v2");
+    }
+
+    @Arbiter
+    public void arbiter(II_Result r) {
+      ImmutableList<String> values =
+          ImmutableList.copyOf(indexTree.getValuesForKeysStartingWith(PREFIX));
+      r.r1 = values.contains("v1") ? 1 : 0;
+      r.r2 = values.contains("v2") ? 1 : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "1, 1",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both values are visible — correct and expected."),
+    @Outcome(
+        id = "1, 0",
+        expect = Expect.FORBIDDEN,
+        desc = "Only v1 is visible — inconsistent cache state."),
+    @Outcome(
+        id = "0, 1",
+        expect = Expect.FORBIDDEN,
+        desc = "Only v2 is visible — inconsistent cache state."),
+    @Outcome(
+        id = "0, 0",
+        expect = Expect.FORBIDDEN,
+        desc = "No value visible — inconsistent cache state.")
+  })
+  @Description(
+      "Tests race conditions between concurrent put() operations on keys with 
different relation "
+          + "types and a prefix-based scan. Thread 1 inserts a ROLE_USER_REL 
key, Thread 2 inserts "
+          + "a ROLE_GROUP_REL key. The arbiter uses 
getValuesForKeysStartingWith() to verify both "
+          + "entries are visible. Visibility of only one is a race condition 
and forbidden. Missing "
+          + "both values indicates an inconsistent cache state and is also 
forbidden.")
+  @State
+  public static class PutAndPrefixScanWithRelationTypeCoherenceTest {
+
+    private final RadixTree<EntityCacheKey> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    private final NameIdentifier ident1 = 
NameIdentifierUtil.ofRole("metalake", "role1");
+    private final NameIdentifier ident2 = 
NameIdentifierUtil.ofRole("metalake", "role2");
+
+    private final EntityCacheKey key1 =
+        EntityCacheKey.of(
+            ident1, Entity.EntityType.ROLE, 
SupportsRelationOperations.Type.ROLE_USER_REL);
+    private final EntityCacheKey key2 =
+        EntityCacheKey.of(
+            ident2, Entity.EntityType.ROLE, 
SupportsRelationOperations.Type.ROLE_GROUP_REL);
+    private final String key1Str = key1.toString();
+    private final String key2Str = key2.toString();
+
+    @Actor
+    public void actorPut1() {
+      indexTree.put(key1Str, key1);
+    }
+
+    @Actor
+    public void actorPut2() {
+      indexTree.put(key2Str, key2);
+    }
+
+    @Arbiter
+    public void arbiter(II_Result r) {
+      ImmutableList<EntityCacheKey> values =
+          
ImmutableList.copyOf(indexTree.getValuesForKeysStartingWith("metalake"));
+      r.r1 = values.contains(key1) ? 1 : 0;
+      r.r2 = values.contains(key2) ? 1 : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Get sees the 
value"),
+    @Outcome(
+        id = "0",
+        expect = Expect.ACCEPTABLE_INTERESTING,
+        desc = "Get sees nothing due to timing")
+  })
+  @Description(
+      "Tests the race condition between put() and getValueForExactKey(). 
Thread 1 inserts a key-value "
+          + "pair into the radix tree. Thread 2 concurrently attempts to 
retrieve the value for the same "
+          + "key. If get() observes the effect of put(), it should return a 
non-null result. If not, it "
+          + "may return null depending on the timing. Missing value is 
interesting but not forbidden.")
+  @State
+  public static class PutAndGetCoherenceTest {
+    private final RadixTree<String> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+    private final String key = "catalog.schema.table";
+
+    @Actor
+    public void actorPut() {
+      indexTree.put(key, "v1");
+    }
+
+    @Actor
+    public void actorGet(I_Result r) {
+      r.r1 = indexTree.getValueForExactKey(key) != null ? 1 : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Get sees the 
value"),
+    @Outcome(
+        id = "0",
+        expect = Expect.ACCEPTABLE_INTERESTING,
+        desc = "Get sees nothing due to timing")
+  })
+  @Description(
+      "Tests race condition between put() and getValueForExactKey() for 
relation-typed cache keys. "
+          + "Thread 1 inserts a ROLE_USER_REL key into the radix tree. Thread 
2 concurrently attempts to "
+          + "retrieve the same key. If the get observes the result of the put, 
it returns a non-null value. "
+          + "Missing value is interesting but not forbidden, depending on 
execution timing.")
+  @State
+  public static class PutAndGetWithRelationTypeCoherenceTest {
+    private final RadixTree<EntityCacheKey> indexTree =
+        new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+    private final NameIdentifier ident = NameIdentifierUtil.ofRole("metalake", 
"role");
+    private final EntityCacheKey key =
+        EntityCacheKey.of(
+            ident, Entity.EntityType.ROLE, 
SupportsRelationOperations.Type.ROLE_USER_REL);
+    private final String keyStr = key.toString();
+
+    @Actor
+    public void actorPut() {
+      indexTree.put(keyStr, key);
+    }
+
+    @Actor
+    public void actorGet(I_Result r) {
+      r.r1 = indexTree.getValueForExactKey(keyStr) != null ? 1 : 0;
+    }
+  }
+}
diff --git 
a/core/src/jcstress/java/org/apache/gravitino/cache/TestCaffeineEntityCacheCoherence.java
 
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCaffeineEntityCacheCoherence.java
new file mode 100644
index 0000000000..df1978adc4
--- /dev/null
+++ 
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCaffeineEntityCacheCoherence.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.openjdk.jcstress.annotations.Actor;
+import org.openjdk.jcstress.annotations.Arbiter;
+import org.openjdk.jcstress.annotations.Description;
+import org.openjdk.jcstress.annotations.Expect;
+import org.openjdk.jcstress.annotations.JCStressTest;
+import org.openjdk.jcstress.annotations.Outcome;
+import org.openjdk.jcstress.annotations.State;
+import org.openjdk.jcstress.infra.results.I_Result;
+import org.openjdk.jcstress.infra.results.L_Result;
+
+public class TestCaffeineEntityCacheCoherence {
+  private static final SchemaEntity schemaEntity =
+      getTestSchemaEntity(1L, "schema1", Namespace.of("metalake1", 
"catalog1"), "test_schema1");
+  private static final TableEntity tableEntity =
+      getTestTableEntity(3L, "table1", Namespace.of("metalake1", "catalog1", 
"schema1"));
+  private static final GroupEntity groupEntity =
+      getTestGroupEntity(4L, "group1", "metalake1", ImmutableList.of("role1"));
+  private static final UserEntity userEntity =
+      getTestUserEntity(5L, "user1", "metalake1", ImmutableList.of(6L));
+  private static final RoleEntity roleEntity = getTestRoleEntity(6L, "role1", 
"metalake1");
+
+  private static SchemaEntity getTestSchemaEntity(
+      long id, String name, Namespace namespace, String comment) {
+    return SchemaEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(namespace)
+        .withAuditInfo(getTestAuditInfo())
+        .withComment(comment)
+        .withProperties(ImmutableMap.of())
+        .build();
+  }
+
+  private static TableEntity getTestTableEntity(long id, String name, 
Namespace namespace) {
+    return TableEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withAuditInfo(getTestAuditInfo())
+        .withNamespace(namespace)
+        .withColumns(ImmutableList.of(getMockColumnEntity()))
+        .build();
+  }
+
+  private static RoleEntity getTestRoleEntity(long id, String name, String 
metalake) {
+    return RoleEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofRole(metalake))
+        .withAuditInfo(getTestAuditInfo())
+        .withSecurableObjects(ImmutableList.of())
+        .build();
+  }
+
+  private static GroupEntity getTestGroupEntity(
+      long id, String name, String metalake, List<String> roles) {
+    return GroupEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofGroup(metalake))
+        .withAuditInfo(getTestAuditInfo())
+        .withRoleNames(roles)
+        .build();
+  }
+
+  private static UserEntity getTestUserEntity(
+      long id, String name, String metalake, List<Long> roles) {
+    return UserEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofUser(metalake))
+        .withAuditInfo(getTestAuditInfo())
+        .withRoleIds(roles)
+        .build();
+  }
+
+  private static AuditInfo getTestAuditInfo() {
+    return AuditInfo.builder()
+        .withCreator("admin")
+        .withCreateTime(Instant.now())
+        .withLastModifier("admin")
+        .withLastModifiedTime(Instant.now())
+        .build();
+  }
+
+  private static ColumnEntity getMockColumnEntity() {
+    ColumnEntity mockColumn = mock(ColumnEntity.class);
+    when(mockColumn.name()).thenReturn("filed1");
+    when(mockColumn.dataType()).thenReturn(Types.StringType.get());
+    when(mockColumn.nullable()).thenReturn(false);
+    when(mockColumn.auditInfo()).thenReturn(getTestAuditInfo());
+
+    return mockColumn;
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(id = "ENTITY", expect = Expect.ACCEPTABLE, desc = "getIfPresent 
observed the entity."),
+    @Outcome(
+        id = "NULL",
+        expect = Expect.FORBIDDEN,
+        desc = "getIfPresent did not observe entity, which violates visibility 
guarantees.")
+  })
+  @Description(
+      "Tests visibility between put() and getIfPresent() on an existing 
entity. "
+          + "Entity should remain visible; NULL indicates broken visibility or 
invalidation.")
+  @State
+  public static class PutWithGetIfPresentCoherenceTest {
+    private final EntityCache cache;
+
+    public PutWithGetIfPresentCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor2(L_Result r) {
+      r.r1 =
+          cache.getIfPresent(schemaEntity.nameIdentifier(), 
schemaEntity.type()).isPresent()
+              ? "ENTITY"
+              : "NULL";
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "ENTITY",
+        expect = Expect.ACCEPTABLE,
+        desc =
+            "contains() returns true, indicating the cache retains visibility 
after repeated put()."),
+    @Outcome(
+        id = "NULL",
+        expect = Expect.FORBIDDEN,
+        desc =
+            "contains() returned false, indicating visibility or 
synchronization error during repeated put().")
+  })
+  @Description(
+      "Tests visibility with repeated put() on the same key. "
+          + "Actor1 puts again; Actor2 checks contains(). "
+          + "Entity should remain visible; NULL indicates a visibility issue.")
+  @State
+  public static class PutWithContainCoherenceTest {
+    private final EntityCache cache;
+
+    public PutWithContainCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor2(L_Result r) {
+      boolean contains = cache.contains(schemaEntity.nameIdentifier(), 
schemaEntity.type());
+      r.r1 = contains ? "ENTITY" : "NULL";
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "ENTITY",
+        expect = Expect.ACCEPTABLE,
+        desc = "put() completed after invalidate(), so the entity remains in 
the cache."),
+    @Outcome(
+        id = "NULL",
+        expect = Expect.ACCEPTABLE_INTERESTING,
+        desc = "invalidate() cleared the entity after put(), so it is no 
longer in the cache.")
+  })
+  @Description(
+      "Concurrent put() and invalidate() on the same key. "
+          + "If put wins, entity remains; if invalidate wins, it's removed. "
+          + "Both outcomes are acceptable; NULL is interesting for consistency 
checks.")
+  @State
+  public static class PutWithInvalidateCoherenceTest {
+    private final EntityCache cache;
+
+    public PutWithInvalidateCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor2() {
+      cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+    }
+
+    @Arbiter
+    public void arbiter(L_Result r) {
+      Entity result =
+          cache.getIfPresent(schemaEntity.nameIdentifier(), 
schemaEntity.type()).orElse(null);
+      r.r1 = result != null ? "ENTITY" : "NULL";
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "ENTITY",
+        expect = Expect.ACCEPTABLE,
+        desc = "put() happened after clear(), so the entity remains in the 
cache."),
+    @Outcome(
+        id = "NULL",
+        expect = Expect.ACCEPTABLE_INTERESTING,
+        desc = "clear() happened after or concurrently with put(), so the 
cache is empty.")
+  })
+  @Description(
+      "Concurrent put() and clear(). If put wins, entity remains. If clear 
wins, entity is gone. "
+          + "NULL is acceptable but interesting due to race timing.")
+  @State
+  public static class PutWithClearCoherenceTest {
+    private final EntityCache cache;
+
+    public PutWithClearCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(tableEntity);
+    }
+
+    @Actor
+    public void actor2() {
+      cache.clear();
+    }
+
+    @Arbiter
+    public void arbiter(L_Result r) {
+      Entity result =
+          cache.getIfPresent(tableEntity.nameIdentifier(), 
tableEntity.type()).orElse(null);
+      r.r1 = result != null ? "ENTITY" : "NULL";
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "2",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both put() calls succeeded; both entries are visible in the 
cache."),
+    @Outcome(
+        id = "1",
+        expect = Expect.FORBIDDEN,
+        desc = "Only one entry is visible; potential visibility or atomicity 
issue."),
+    @Outcome(
+        id = "0",
+        expect = Expect.FORBIDDEN,
+        desc =
+            "Neither entry is visible; indicates a serious failure in write 
propagation or cache logic.")
+  })
+  @Description(
+      "Concurrent put() on different keys. Both schema and table should be 
visible (result = 2). "
+          + "Lower results may indicate visibility or concurrency issues.")
+  @State
+  public static class ConcurrentPutDifferentKeysTest {
+    private final EntityCache cache;
+
+    public ConcurrentPutDifferentKeysTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor2() {
+      cache.put(tableEntity);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      int count = 0;
+      if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())) {
+        count++;
+      }
+      if (cache.contains(tableEntity.nameIdentifier(), tableEntity.type())) {
+        count++;
+      }
+
+      r.r1 = count;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "2",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both put() calls succeeded; both entries are visible in the 
cache."),
+    @Outcome(
+        id = "1",
+        expect = Expect.FORBIDDEN,
+        desc = "Only one entry is visible; potential visibility or atomicity 
issue."),
+    @Outcome(
+        id = "0",
+        expect = Expect.FORBIDDEN,
+        desc =
+            "Neither entry is visible; indicates a serious failure in write 
propagation or cache logic.")
+  })
+  @Description("Concurrent put() with different ROLE relation types; expect 
both visible (2).")
+  @State
+  public static class ConcurrentPutDifferentKeysWithRelationTest {
+    private final EntityCache cache;
+
+    public ConcurrentPutDifferentKeysWithRelationTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(
+          roleEntity.nameIdentifier(),
+          Entity.EntityType.ROLE,
+          SupportsRelationOperations.Type.ROLE_GROUP_REL,
+          ImmutableList.of(groupEntity));
+    }
+
+    @Actor
+    public void actor2() {
+      cache.put(
+          roleEntity.nameIdentifier(),
+          Entity.EntityType.ROLE,
+          SupportsRelationOperations.Type.ROLE_USER_REL,
+          ImmutableList.of(userEntity));
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      int count = 0;
+      if (cache.contains(
+          roleEntity.nameIdentifier(),
+          Entity.EntityType.ROLE,
+          SupportsRelationOperations.Type.ROLE_USER_REL)) {
+        count++;
+      }
+      if (cache.contains(
+          roleEntity.nameIdentifier(),
+          Entity.EntityType.ROLE,
+          SupportsRelationOperations.Type.ROLE_GROUP_REL)) {
+        count++;
+      }
+
+      r.r1 = count;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "1",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both put() calls succeeded on the same key; value is 
visible."),
+    @Outcome(
+        id = "0",
+        expect = Expect.FORBIDDEN,
+        desc = "Neither put() was visible; indicates a visibility or atomicity 
issue.")
+  })
+  @Description(
+      "Concurrent put() on the same key; value should remain visible. Missing 
entry indicates a concurrency issue.")
+  @State
+  public static class ConcurrentPutSameKeyTest {
+    private final EntityCache cache;
+
+    public ConcurrentPutSameKeyTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor2() {
+      cache.put(schemaEntity);
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      r.r1 = cache.contains(schemaEntity.nameIdentifier(), 
Entity.EntityType.SCHEMA) ? 1 : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "1",
+        expect = Expect.ACCEPTABLE,
+        desc = "Entry is visible; concurrent put() calls succeeded."),
+    @Outcome(
+        id = "0",
+        expect = Expect.FORBIDDEN,
+        desc = "Entry is missing; indicates visibility or atomicity issue.")
+  })
+  @Description(
+      "Tests concurrent put() on the same key with relation type. "
+          + "Entry must remain visible after concurrent writes; missing 
indicates a bug.")
+  @State
+  public static class ConcurrentPutSameKeyWithRelationTest {
+    private final EntityCache cache;
+
+    public ConcurrentPutSameKeyWithRelationTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+    }
+
+    @Actor
+    public void actor1() {
+      cache.put(
+          roleEntity.nameIdentifier(),
+          Entity.EntityType.ROLE,
+          SupportsRelationOperations.Type.ROLE_USER_REL,
+          ImmutableList.of(userEntity));
+    }
+
+    @Actor
+    public void actor2() {
+      cache.put(
+          roleEntity.nameIdentifier(),
+          Entity.EntityType.ROLE,
+          SupportsRelationOperations.Type.ROLE_USER_REL,
+          ImmutableList.of(userEntity));
+    }
+
+    @Arbiter
+    public void arbiter(I_Result r) {
+      r.r1 =
+          cache.contains(
+                  roleEntity.nameIdentifier(),
+                  Entity.EntityType.ROLE,
+                  SupportsRelationOperations.Type.ROLE_USER_REL)
+              ? 1
+              : 0;
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "ENTITY",
+        expect = Expect.ACCEPTABLE,
+        desc = "GetIfPresent sees the entity before it was invalidated."),
+    @Outcome(
+        id = "NULL",
+        expect = Expect.ACCEPTABLE_INTERESTING,
+        desc = "Invalidate removed the entity before getIfPresent.")
+  })
+  @Description(
+      "Tests race between invalidate() and getIfPresent(). "
+          + "Either outcome is allowed depending on timing.")
+  @State
+  public static class InvalidateWithGetCoherenceTest {
+    private final EntityCache cache;
+
+    public InvalidateWithGetCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+
+      cache.put(schemaEntity);
+      cache.put(tableEntity);
+    }
+
+    @Actor
+    public void actor1() {
+      cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+    }
+
+    @Actor
+    public void actor2(L_Result r) {
+      Optional<? extends Entity> result =
+          cache.getIfPresent(tableEntity.nameIdentifier(), tableEntity.type());
+      r.r1 = result.isPresent() ? "ENTITY" : "NULL";
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "SUCCESS",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both invalidates executed safely."),
+    @Outcome(id = "FAILURE", expect = Expect.FORBIDDEN, desc = "One or both 
invalidates failed.")
+  })
+  @Description("Tests concurrent invalidate() on the same key is safe and 
idempotent.")
+  @State
+  public static class ConcurrentInvalidateSameKeyCoherenceTest {
+    private final EntityCache cache;
+
+    public ConcurrentInvalidateSameKeyCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+      cache.put(schemaEntity);
+    }
+
+    @Actor
+    public void actor1() {
+      cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+    }
+
+    @Actor
+    public void actor2() {
+      cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+    }
+
+    @Arbiter
+    public void arbiter(L_Result r) {
+      r.r1 =
+          cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())
+              ? "FAILURE"
+              : "SUCCESS";
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "SUCCESS",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both invalidate operations completed; neither entry remains in 
the cache."),
+    @Outcome(
+        id = "FAILURE",
+        expect = Expect.FORBIDDEN,
+        desc = "One or both entries remain in the cache after concurrent 
invalidate.")
+  })
+  @Description(
+      "Tests concurrent invalidate() on two related keys (same prefix). "
+          + "Verifies that prefix-based invalidation logic does not interfere 
across keys, "
+          + "and both schema and table entries are properly removed without 
conflict or race condition.")
+  @State
+  public static class ConcurrentInvalidateRelatedKeyCoherenceTest {
+    private final EntityCache cache;
+
+    public ConcurrentInvalidateRelatedKeyCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+      cache.put(schemaEntity);
+      cache.put(tableEntity);
+    }
+
+    @Actor
+    public void actor1() {
+      cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+    }
+
+    @Actor
+    public void actor2() {
+      cache.invalidate(tableEntity.nameIdentifier(), tableEntity.type());
+    }
+
+    @Arbiter
+    public void arbiter(L_Result r) {
+      if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())
+          || cache.contains(tableEntity.nameIdentifier(), tableEntity.type())) 
{
+        r.r1 = "FAILURE";
+        return;
+      }
+
+      r.r1 = "SUCCESS";
+    }
+  }
+
+  @JCStressTest
+  @Outcome.Outcomes({
+    @Outcome(
+        id = "SUCCESS",
+        expect = Expect.ACCEPTABLE,
+        desc = "Both invalidate() and clear() removed the entries as 
expected."),
+    @Outcome(
+        id = "FAILURE",
+        expect = Expect.FORBIDDEN,
+        desc = "One or more entries remain; indicates race condition or 
incomplete invalidation.")
+  })
+  @Description(
+      "Tests concurrent invalidate() and clear() operations. "
+          + "Ensures that both targeted and global removals can coexist 
without leaving residual entries. "
+          + "Any remaining entries indicate inconsistency in concurrent 
removal paths.")
+  @State
+  public static class ClearWithInvalidateCoherenceTest {
+    private final EntityCache cache;
+
+    public ClearWithInvalidateCoherenceTest() {
+      this.cache = new CaffeineEntityCache(new Config() {});
+      cache.put(schemaEntity);
+      cache.put(tableEntity);
+    }
+
+    @Actor
+    public void actor1() {
+      cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+    }
+
+    @Actor
+    public void actor2() {
+      cache.clear();
+    }
+
+    @Arbiter
+    public void arbiter(L_Result r) {
+      if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())
+          || cache.contains(tableEntity.nameIdentifier(), tableEntity.type())) 
{
+        r.r1 = "FAILURE";
+        return;
+      }
+
+      r.r1 = "SUCCESS";
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java 
b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
index 317beff0d1..91148b4a78 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -58,6 +59,20 @@ public class CaffeineEntityCache extends BaseEntityCache {
   private static final int CACHE_CLEANUP_QUEUE_CAPACITY = 100;
   private static final int CACHE_MONITOR_PERIOD_MINUTES = 5;
   private static final int CACHE_MONITOR_INITIAL_DELAY_MINUTES = 0;
+  private static final ExecutorService CLEANUP_EXECUTOR =
+      new ThreadPoolExecutor(
+          CACHE_CLEANUP_CORE_THREADS,
+          CACHE_CLEANUP_MAX_THREADS,
+          0L,
+          TimeUnit.MILLISECONDS,
+          new ArrayBlockingQueue<>(CACHE_CLEANUP_QUEUE_CAPACITY),
+          r -> {
+            Thread t = new Thread(r, "CaffeineEntityCache-Cleanup");
+            t.setDaemon(true);
+            return t;
+          },
+          new ThreadPoolExecutor.CallerRunsPolicy());
+
   private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
   private final ReentrantLock opLock = new ReentrantLock();
 
@@ -78,11 +93,10 @@ public class CaffeineEntityCache extends BaseEntityCache {
     super(cacheConfig);
     this.cacheIndex = new ConcurrentRadixTree<>(new 
DefaultCharArrayNodeFactory());
 
-    ThreadPoolExecutor cleanupExec = buildCleanupExecutor();
     Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder = 
newBaseBuilder(cacheConfig);
 
     cacheDataBuilder
-        .executor(cleanupExec)
+        .executor(CLEANUP_EXECUTOR)
         .removalListener(
             (key, value, cause) -> {
               if (cause == RemovalCause.EXPLICIT || cause == 
RemovalCause.REPLACED) {
@@ -191,13 +205,16 @@ public class CaffeineEntityCache extends BaseEntityCache {
       List<E> entities) {
     checkArguments(ident, type, relType);
     Preconditions.checkArgument(entities != null, "Entities cannot be null");
-    if (entities.isEmpty()) {
-      return;
-    }
+    withLock(
+        () -> {
+          if (entities.isEmpty()) {
+            return;
+          }
 
-    syncEntitiesToCache(
-        EntityCacheKey.of(ident, type, relType),
-        entities.stream().map(e -> (Entity) e).collect(Collectors.toList()));
+          syncEntitiesToCache(
+              EntityCacheKey.of(ident, type, relType),
+              entities.stream().map(e -> (Entity) 
e).collect(Collectors.toList()));
+        });
   }
 
   /** {@inheritDoc} */
@@ -410,26 +427,6 @@ public class CaffeineEntityCache extends BaseEntityCache {
     }
   }
 
-  /**
-   * Builds the cleanup executor.
-   *
-   * @return The cleanup executor
-   */
-  private ThreadPoolExecutor buildCleanupExecutor() {
-    return new ThreadPoolExecutor(
-        CACHE_CLEANUP_CORE_THREADS,
-        CACHE_CLEANUP_MAX_THREADS,
-        0L,
-        TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<>(CACHE_CLEANUP_QUEUE_CAPACITY),
-        r -> {
-          Thread t = new Thread(r, "CaffeineEntityCache-Cleanup");
-          t.setDaemon(true);
-          return t;
-        },
-        new ThreadPoolExecutor.CallerRunsPolicy());
-  }
-
   /** Starts the cache stats monitor. */
   private void startCacheStatsMonitor() {
     scheduler.scheduleAtFixedRate(
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index f83fc539c7..114be4a02e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -120,6 +120,7 @@ google-auth = "1.28.0"
 aliyun-credentials = "0.3.12"
 openlineage = "1.29.0"
 concurrent-trees = "2.6.0"
+jcstress = "0.8.15"
 
 [libraries]
 aws-iam = { group = "software.amazon.awssdk", name = "iam", version.ref = 
"awssdk" }
@@ -307,3 +308,4 @@ tasktree = {id = "com.dorongold.task-tree", version = 
"2.1.1"}
 dependencyLicenseReport = {id = "com.github.jk1.dependency-license-report", 
version = "2.9"}
 bom = {id = "org.cyclonedx.bom", version = "1.5.0"}
 errorprone = {id = "net.ltgt.errorprone", version.ref = "error-prone"}
+jcstress = { id = "io.github.reyerizo.gradle.jcstress", version.ref = 
"jcstress" }

Reply via email to