This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new fa07adf063 [#7177] feat(core): Add jcstress for CaffeineEntityCache.
(#7414)
fa07adf063 is described below
commit fa07adf0634d9b7e6f2592852a6acb31d4fd54c1
Author: Lord of Abyss <[email protected]>
AuthorDate: Mon Jun 23 10:50:57 2025 +0800
[#7177] feat(core): Add jcstress for CaffeineEntityCache. (#7414)
### What changes were proposed in this pull request?
Add JCStress tests for EntityCache and index consistency. During
testing, two issues were identified:
1. The method `put(NameIdentifier ident, Entity.EntityType type,
SupportsRelationOperations.Type relType, List<E> entities)` is missing a
synchronization lock, which may lead to race conditions in concurrent
scenarios.
2. A `java.lang.OutOfMemoryError`: unable to create new native thread
occurred during JCStress runs. This was due to creating a new
CaffeineEntityCache instance in each test iteration. Each instance
internally initializes thread pools, and with a large number of
iterations, this results in excessive thread creation and ultimately
exhausts system resources.
Refactor CaffeineEntityCache to use a static shared thread pool instead
of creating a new one per instance. This ensures that all cache
instances reuse the same executor, significantly reducing thread
overhead and preventing resource exhaustion during stress testing.
Since cleanup is only triggered for evictions due to expiration, size
limit, or weight limit (excluding explicit and replacement removals),
the resource overhead of this `removalListener` remains minimal.
Test results as follows:

Result of `ConcurrentPutDifferentKeysWithRelationTest` as follows:

### Task list
- [X] Implement the methods related to the `SupportsEntityStoreCache`
and `SupportsRelationEntityCache` interface in `CaffeineEntityCache`.
- [X] Add unit tests for both index and cache.
- [X] Use JCStress to perform multi-threaded testing on
`CaffeineEntityCache` and the `CacheIndex`.
### Why are the changes needed?
Fix: #7177
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
local test.
---------
Co-authored-by: liuxian <[email protected]>
Co-authored-by: liuxian131 <[email protected]>
---
LICENSE.bin | 1 +
core/build.gradle.kts | 20 +-
core/src/jcstress/README.md | 281 +++++++++
.../gravitino/cache/TestCacheIndexCoherence.java | 516 ++++++++++++++++
.../cache/TestCaffeineEntityCacheCoherence.java | 661 +++++++++++++++++++++
.../gravitino/cache/CaffeineEntityCache.java | 53 +-
gradle/libs.versions.toml | 2 +
7 files changed, 1504 insertions(+), 30 deletions(-)
diff --git a/LICENSE.bin b/LICENSE.bin
index dc0d66c318..4e075212bc 100644
--- a/LICENSE.bin
+++ b/LICENSE.bin
@@ -377,6 +377,7 @@
Jettison
Awaitility
npgall concurrent-trees
+ reyerizo jcstress-gradle-plugin
This product bundles various third-party components also under the
Apache Software Foundation License 1.1
diff --git a/core/build.gradle.kts b/core/build.gradle.kts
index 29948009fa..43e4cb5574 100644
--- a/core/build.gradle.kts
+++ b/core/build.gradle.kts
@@ -1,3 +1,5 @@
+import net.ltgt.gradle.errorprone.errorprone
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,11 +18,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-import net.ltgt.gradle.errorprone.errorprone
plugins {
`maven-publish`
id("java")
id("idea")
+ alias(libs.plugins.jcstress)
}
dependencies {
@@ -61,6 +63,8 @@ dependencies {
testImplementation(libs.testcontainers)
testRuntimeOnly(libs.junit.jupiter.engine)
+
+ jcstressImplementation(libs.mockito.core)
}
tasks.test {
@@ -73,7 +77,19 @@ tasks.test {
}
tasks.withType<JavaCompile>().configureEach {
- if (name.contains("test", ignoreCase = true)) {
+ if (name.contains("jcstress", ignoreCase = true)) {
options.errorprone?.excludedPaths?.set(".*/generated/.*")
}
}
+
+jcstress {
+ /*
+ Available modes:
+ - sanity : takes seconds
+ - quick : takes tens of seconds
+ - default : takes minutes, good number of iterations
+ - tough : takes tens of minutes, large number of iterations, most reliable
+ */
+ mode = "default"
+ jvmArgsPrepend = "-Djdk.stdout.sync=true"
+}
diff --git a/core/src/jcstress/README.md b/core/src/jcstress/README.md
new file mode 100644
index 0000000000..cc4931ab05
--- /dev/null
+++ b/core/src/jcstress/README.md
@@ -0,0 +1,281 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# JCStress-Based Concurrency Tests for Gravitino
+
+## Overview
+
+This folder contains concurrency tests for Gravitino, leveraging the JCStress
framework to uncover subtle thread-safety issues. These tests are designed to
expose concurrency problems that traditional unit tests often miss—such as data
races, visibility violations, and atomicity errors.
+
+## What is JCStress?
+
+[JCStress](https://github.com/openjdk/jcstress) is a concurrency
stress-testing tool developed by Oracle to verify the correctness of
multithreaded Java code. Unlike unit tests, JCStress:
+
+- Executes test scenarios across multiple threads.
+- Repeats executions with varying interleavings.
+- Reveals low-probability bugs due to concurrency.
+- Reports outcomes and their frequencies, categorized as acceptable,
forbidden, or interesting.
+
+## Running the Tests
+
+In Gravitino, we use the `jcstress-gradle-plugin` to integrate JCStress
testing into the Gradle build system.
+
+This plugin simplifies running JCStress tests by:
+
+- Automatically generating the test harness.
+- Providing built-in Gradle tasks (e.g., `:core:jcstress`).
+- Generating detailed HTML reports under `build/reports/jcstress/`.
+
+To run the jcstress tests in Gravitino, use the following Gradle command:
+
+```bash
+./gradlew :core:jcstress
+```
+
+or a subset of your tests:
+
+```bash
+gradle jcstress --tests "MyFirstTest|MySecondTest"
+```
+
+After execution, reports are available at:
+`core/build/reports/jcstress/index.html`
+
+> The test suite may take several minutes depending on test complexity and
available CPU resources.
+
+## Understanding Test Results
+
+JCStress test results are presented in a table format showing:
+
+- Observed outcomes and their frequencies
+- Whether each outcome is expected or forbidden
+- Total number of iterations
+- Time taken for the test
+
+A "FAILED" result means that a forbidden outcome was observed, indicating a
potential concurrency issue.
+
+## Configuration
+
+If you need to customize the configuration, add a block like the following to
configure the plugin:
+
+```plantuml
+jcstress {
+ verbose = "true"
+ timeMillis = "200"
+ spinStyle = "THREAD_YIELD"
+}
+```
+
+These are all possible configuration options:
+
+| Name | Description
|
+|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `affinityMode` | Use the specific affinity mode, if available. `NONE` = No
affinity whatsoever; `GLOBAL` = Affnity for the entire JVM; `LOCAL` = Affinity
for the individual actors.
|
+| `cpuCount` | Number of CPUs to use. Defaults to all CPUs in the
system. Reducing the number of CPUs limits the amount of resources (including
memory) the run is using.
|
+| `heapPerFork` | Java heap size per fork, in megabytes. This affects the
stride size: maximum footprint will never be exceeded, regardless of min/max
stride sizes.
|
+| `forkMultiplier` | "Fork multiplier for randomized/stress tests. This allows
more efficient randomized testing, as each fork would use a different seed."
|
+| `forks` | Should fork each test N times. Must be 1 or higher.
|
+| `iterations` | Iterations per test.
|
+| `jvmArgs` | Use given JVM arguments. This disables JVM flags
auto-detection, and runs only the single JVM mode. Either a single
space-separated option line, or multiple options are accepted. This option only
affects forked runs. |
+| `jvmArgsPrepend` | Prepend given JVM arguments to auto-detected
configurations. This option only affects forked runs."
|
+| `mode` | Test mode preset: `sanity`, `quick`, `default`, `tough`,
`stress`.
|
+| `regexp` | Regexp selector for tests.
|
+| `reportDir` | Target destination to put the report into.
|
+| `spinStyle` | Busy loop wait style. `HARD` = hard busy loop;
`THREAD_YIELD` = use `Thread.yield()`; `THREAD_SPIN_WAIT` = use
`Thread.onSpinWait()`; `LOCKSUPPORT_PARK_NANOS` = use
`LockSupport.parkNanos()`. |
+| `splitPerActor` | Use split per-actor compilation mode, if available.
|
+| `strideCount` | Internal stride count per epoch. Larger value increases
cache footprint.
|
+| `strideSize` | Internal stride size. Larger value decreases the
synchronization overhead, but also reduces the number of collisions.
|
+| `timeMillis` | Time to spend in single test iteration. Larger value
improves test reliability, since schedulers do better job in the long run.
|
+| `verbose` | Be extra verbose.
|
+
+## Writing JCStress Tests
+
+JCStress tests can be written in two main styles:
+
+### Style 1: Arbiter-Based Tests
+
+This style uses a separate method to observe the final state:
+
+1. Define shared state variables
+2. Create methods annotated with `@Actor` that will run concurrently
+3. Define an `@Arbiter` method to check the final state
+4. Use `@JCStressTest` and `@Outcome` annotations to specify expected results
+
+Example:
+```java
+@JCStressTest
+@Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Normal outcome")
+@Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Race condition")
+@State
+public class ArbiterTest {
+ int x;
+
+ @Actor
+ void actor1() {
+ x = 1;
+ }
+
+ @Actor
+ void actor2() {
+ x = 1;
+ }
+
+ @Arbiter
+ void arbiter(I_Result r) {
+ r.r1 = x;
+ }
+}
+```
+
+### Style 2: Direct Result Reporting
+
+Actors can directly report results by accepting a result parameter:
+
+1. Define shared state variables
+2. Create methods annotated with `@Actor` that accept a result parameter
+3. Each actor records its observations directly
+4. Use `@JCStressTest` and `@Outcome` annotations to specify expected results
+
+Example:
+```java
+@JCStressTest
+@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE, desc = "Thread 2 sees the
write")
+@Outcome(id = "0, 0", expect = Expect.ACCEPTABLE, desc = "Thread 2 doesn't see
the write")
+@State
+public class DirectReportingTest {
+ int x;
+
+ @Actor
+ void actor1() {
+ x = 1;
+ }
+
+ @Actor
+ void actor2(II_Result r) {
+ r.r1 = 0;
+ r.r2 = x; // Records what this thread observes
+ }
+}
+```
+
+You may add a `@Description` annotation to briefly explain the test scenario.
However, avoid writing overly long descriptions, as this can trigger a
`StringIndexOutOfBoundsException` due to a known limitation in the JCStress
result frame parser.
+
+Choose the style based on your test needs:
+- Use an arbiter when the final state needs to be observed after all actors
have finished.
+- Use direct reporting when actors need to record observations during their
execution.
+- Direct reporting is particularly useful for testing visibility and ordering
guarantees.
+
+A complete example is shown below. It demonstrates how to test the
thread-safety of concurrent `put()` operations on distinct keys in a cache
implementation.
+
+First, I use two @Actor methods to perform `put()` operations on two different
entities (e.g., a schema and a table) concurrently. Then, I use an `@Arbiter`
method to verify how many of these entities are visible in the cache after both
operations complete. The test defines multiple `@Outcome` values to capture
possible results:
+
+- If both entries are present, the result is 2, which is the expected and
acceptable outcome.
+- If only one is present, the result is 1, indicating a potential visibility
or atomicity issue.
+- If neither is present, the result is 0, which reveals a serious concurrency
failure.
+
+```java
+public class xxx {
+ // some entity to test.
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "2", expect = Expect.ACCEPTABLE, desc = "Both put() calls
succeeded; both entries are visible in the cache."),
+ @Outcome(id = "1", expect = Expect.FORBIDDEN, desc = "Only one entry is
visible; potential visibility or atomicity issue."),
+ @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Neither entry is
visible; indicates a serious failure in write propagation or cache logic.")})
+ @Description("Concurrent put() on different keys. Both schema and table
should be visible (result = 2). " + "Lower results may indicate visibility or
concurrency issues.")
+ @State
+
+ public static class ConcurrentPutDifferentKeysTest {
+ private final EntityCache cache;
+
+ public ConcurrentPutDifferentKeysTest() {
+ this.cache = new CaffeineEntityCache(new Config() {
+ });
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor2() {
+ cache.put(tableEntity);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ int count = 0;
+ if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type()))
count++;
+ if (cache.contains(tableEntity.nameIdentifier(), tableEntity.type()))
count++;
+
+ r.r1 = count;
+ }
+ }
+
+ // ... other test classes
+}
+```
+
+This test helps ensure that the cache implementation handles concurrent writes
correctly and does not lose visibility or violate consistency guarantees.
+
+## JCStress Test Lifecycle and Execution Modes
+
+JCStress follows a well-defined execution model to simulate concurrent
interactions:
+
+1. **State Initialization**: A fresh instance of the test class (annotated
with `@State`) is created before each test iteration to ensure full isolation.
+2. **Concurrent Execution of Actors**: All methods annotated with `@Actor` are
invoked concurrently, each on a separate thread. These methods simulate the
real-world concurrent behavior being tested.
+3. **Synchronization Barrier**: After both (or all) actors complete, JCStress
inserts a memory fence to ensure all actions are visible to the next phase.
+4. **Arbiter Execution (Optional)**: If an `@Arbiter` method is defined, it
runs after all actor methods have completed. It observes the final state and
writes results to the `*Result` object.
+5. **Result Collection and Classification**: The outcome of each iteration is
recorded and matched against the `@Outcome` definitions. JCStress performs this
process millions of times with different thread interleavings to uncover rare
concurrency issues.
+
+JCStress offers several execution modes to control the intensity and duration
of the tests. Available modes:
+
+| Mode | Description
|
+|-----------|-------------------------------------------------------------------------------------------------------------------|
+| `sanity` | Runs for a few seconds with minimal iterations. Useful to check
test setup. |
+| `quick` | Runs for a short duration with a moderate number of iterations.
Good for fast feedback during development. |
+| `default` | Balanced mode with sufficient iterations. Takes a few minutes.
Recommended for regular use. |
+| `tough` | Longest and most thorough mode. Runs many iterations over tens
of minutes. Best for CI and production validation. |
+
+## Concurrency Issues We Target
+
+Our JCStress tests help identify several types of concurrency problems:
+
+1. **Race Conditions**: When multiple threads access shared data without
proper synchronization
+2. **Memory Visibility**: When changes made by one thread are not visible to
other threads
+3. **Atomicity Violations**: When operations that should be atomic can be
interrupted
+4. **Ordering Issues**: When operations execute in unexpected orders due to
compiler or CPU reordering
+
+## Best Practices
+
+When working with JCStress tests:
+
+- Keep tests focused on a single concurrency aspect
+- Use meaningful names that describe the scenario being tested
+- Document expected outcomes and their reasoning
+- Run tests multiple times, as issues may not appear in every run
+- Review test results carefully, especially for tests with multiple possible
valid outcomes
+
+## Resources
+
+- [JCStress Official
Documentation](https://wiki.openjdk.java.net/display/CodeTools/jcstress)
+- [Java Memory Model
Specification](https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.4)
+- [Doug Lea's JSR-133 Cookbook](http://gee.cs.oswego.edu/dl/jmm/cookbook.html)
+- [jcstress-gradle-plugin](https://github.com/reyerizo/jcstress-gradle-plugin)
\ No newline at end of file
diff --git
a/core/src/jcstress/java/org/apache/gravitino/cache/TestCacheIndexCoherence.java
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCacheIndexCoherence.java
new file mode 100644
index 0000000000..ffeb04bcf8
--- /dev/null
+++
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCacheIndexCoherence.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.time.Instant;
+import java.util.Objects;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.openjdk.jcstress.annotations.Actor;
+import org.openjdk.jcstress.annotations.Arbiter;
+import org.openjdk.jcstress.annotations.Description;
+import org.openjdk.jcstress.annotations.Expect;
+import org.openjdk.jcstress.annotations.JCStressTest;
+import org.openjdk.jcstress.annotations.Outcome;
+import org.openjdk.jcstress.annotations.State;
+import org.openjdk.jcstress.infra.results.II_Result;
+import org.openjdk.jcstress.infra.results.I_Result;
+
+public class TestCacheIndexCoherence {
+ private static SchemaEntity getTestSchemaEntity(
+ long id, String name, Namespace namespace, String comment) {
+ return SchemaEntity.builder()
+ .withId(id)
+ .withName(name)
+ .withNamespace(namespace)
+ .withAuditInfo(getTestAuditInfo())
+ .withComment(comment)
+ .withProperties(ImmutableMap.of())
+ .build();
+ }
+
+ private static AuditInfo getTestAuditInfo() {
+ return AuditInfo.builder()
+ .withCreator("admin")
+ .withCreateTime(Instant.now())
+ .withLastModifier("admin")
+ .withLastModifiedTime(Instant.now())
+ .build();
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key inserted and
visible"),
+ @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Key not found")
+ })
+ @Description(
+ "Tests the coherence of ConcurrentRadixTree under concurrent insertions
with the same key. "
+ + "Both threads concurrently insert the same key-value pair into the
radix tree. The expected "
+ + "behavior is that the tree maintains a single mapping without data
loss or duplication. A "
+ + "forbidden result indicates insertion was lost or the tree became
inconsistent, violating "
+ + "atomicity or thread-safety guarantees.")
+ @State
+ public static class InsertSameKeyCoherenceTest {
+ private final RadixTree<EntityCacheKey> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+ private final NameIdentifier ident = NameIdentifier.of("metalake1",
"catalog1", "schema1");
+ private final Entity entity =
+ getTestSchemaEntity(123L, "schema1", Namespace.of("metalake1",
"catalog1"), "ident1");
+ private final EntityCacheKey key = EntityCacheKey.of(ident, entity.type());
+ private final String keyStr = key.toString();
+
+ @Actor
+ public void actor1() {
+ indexTree.put(keyStr, key);
+ }
+
+ @Actor
+ public void actor2() {
+ indexTree.put(keyStr, key);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ EntityCacheKey valueForExactKey = indexTree.getValueForExactKey(keyStr);
+ r.r1 =
+ (valueForExactKey != null
+ && Objects.equals(valueForExactKey, key)
+ && indexTree.size() == 1)
+ ? 1
+ : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key inserted and
visible"),
+ @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "Key not found")
+ })
+ @Description(
+ "Tests that ConcurrentRadixTree handles concurrent put() operations for
the same key with a "
+ + "relation type. Both threads insert the same key-value pair where
the key includes a "
+ + "relation type (ROLE_USER_REL). The test expects the tree to
maintain a single entry. A "
+ + "forbidden result indicates incorrect key handling or data loss
under concurrency.")
+ @State
+ public static class InsertSameKeyWithRelationTypeCoherenceTest {
+ private final RadixTree<EntityCacheKey> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+ private final NameIdentifier ident = NameIdentifierUtil.ofRole("metalake",
"role");
+ private final EntityCacheKey key =
+ EntityCacheKey.of(
+ ident, Entity.EntityType.ROLE,
SupportsRelationOperations.Type.ROLE_USER_REL);
+ private final String keyStr = key.toString();
+
+ @Actor
+ public void actor1() {
+ indexTree.put(keyStr, key);
+ }
+
+ @Actor
+ public void actor2() {
+ indexTree.put(keyStr, key);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ EntityCacheKey valueForExactKey = indexTree.getValueForExactKey(keyStr);
+ r.r1 =
+ (valueForExactKey != null
+ && Objects.equals(valueForExactKey, key)
+ && indexTree.size() == 1)
+ ? 1
+ : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "multiple Key
inserted and visible"),
+ @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "some Key not found")
+ })
+ @Description(
+ "Tests that ConcurrentRadixTree maintains consistency under concurrent
insertions of two "
+ + "different keys. Each thread inserts a unique key-value pair into
the radix tree. "
+ + "Expected behavior is that both keys are stored and visible. A
forbidden result "
+ + "indicates data loss or broken concurrency guarantees.")
+ @State
+ public static class InsertMultipleKeyCoherenceTest {
+ private final RadixTree<EntityCacheKey> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+ private final NameIdentifier ident1 = NameIdentifier.of("metalake1",
"catalog1", "schema1");
+ private final Entity entity1 =
+ getTestSchemaEntity(123L, "schema1", Namespace.of("metalake1",
"catalog1"), "ident1");
+ private final NameIdentifier ident2 = NameIdentifier.of("metalake1",
"catalog1", "schema2");
+ private final Entity entity2 =
+ getTestSchemaEntity(456L, "schema2", Namespace.of("metalake1",
"catalog1"), "ident2");
+ private final EntityCacheKey key1 = EntityCacheKey.of(ident1,
entity1.type());
+ private final EntityCacheKey key2 = EntityCacheKey.of(ident2,
entity2.type());
+ private final String key1Str = key1.toString();
+ private final String key2Str = key2.toString();
+
+ @Actor
+ public void actor1() {
+ indexTree.put(key1Str, key1);
+ }
+
+ @Actor
+ public void actor2() {
+ indexTree.put(key2Str, key2);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ EntityCacheKey valueForExactKey1 =
indexTree.getValueForExactKey(key1Str);
+ EntityCacheKey valueForExactKey2 =
indexTree.getValueForExactKey(key2Str);
+ r.r1 =
+ (valueForExactKey1 != null
+ && valueForExactKey2 != null
+ && Objects.equals(valueForExactKey1, key1)
+ && Objects.equals(valueForExactKey2, key2)
+ && indexTree.size() == 2)
+ ? 1
+ : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "multiple Key
inserted and visible"),
+ @Outcome(id = "0", expect = Expect.FORBIDDEN, desc = "some Key not found")
+ })
+ @Description(
+ "Tests ConcurrentRadixTree under concurrent insertions of different keys
with relation types. "
+ + "Thread 1 inserts a key with ROLE_USER_REL, and Thread 2 inserts a
key with ROLE_GROUP_REL. "
+ + "Both keys share the same NameIdentifier but differ by relation
type. The tree should retain "
+ + "both entries. A forbidden result indicates relation type is not
correctly distinguished.")
+ @State
+ public static class InsertMultipleKeyWithRelationTypeCoherenceTest {
+ private final RadixTree<EntityCacheKey> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+ private final NameIdentifier ident1 =
NameIdentifierUtil.ofRole("metalake", "role1");
+ private final NameIdentifier ident2 =
NameIdentifierUtil.ofRole("metalake", "role1");
+
+ private final EntityCacheKey key1 =
+ EntityCacheKey.of(
+ ident1, Entity.EntityType.ROLE,
SupportsRelationOperations.Type.ROLE_USER_REL);
+ private final EntityCacheKey key2 =
+ EntityCacheKey.of(
+ ident2, Entity.EntityType.ROLE,
SupportsRelationOperations.Type.ROLE_GROUP_REL);
+ private final String key1Str = key1.toString();
+ private final String key2Str = key2.toString();
+
+ @Actor
+ public void actor1() {
+ indexTree.put(key1Str, key1);
+ }
+
+ @Actor
+ public void actor2() {
+ indexTree.put(key2Str, key2);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ EntityCacheKey valueForExactKey1 =
indexTree.getValueForExactKey(key1Str);
+ EntityCacheKey valueForExactKey2 =
indexTree.getValueForExactKey(key2Str);
+ r.r1 =
+ (valueForExactKey1 != null
+ && valueForExactKey2 != null
+ && Objects.equals(valueForExactKey1, key1)
+ && Objects.equals(valueForExactKey2, key2)
+ && indexTree.size() == 2)
+ ? 1
+ : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key does not exist
after remove."),
+ @Outcome(
+ id = "0",
+ expect = Expect.ACCEPTABLE_INTERESTING,
+ desc = "Key still exists due to put/remove race.")
+ })
+ @Description(
+ "Tests the race condition between put() and remove() on the same key in
ConcurrentRadixTree. "
+ + "Thread 1 inserts a key, while Thread 2 removes it concurrently.
Depending on execution "
+ + "order, the final visibility may vary. If the key is removed
successfully, it's acceptable. "
+ + "If the key remains due to put-after-remove, it's considered
interesting but not forbidden.")
+ @State
+ public static class PutRemoveSameKeyCoherenceTest {
+
+ private final RadixTree<String> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+ private final String key = "catalog.schema.table";
+
+ @Actor
+ public void actorPut() {
+ indexTree.put(key, "v1");
+ }
+
+ @Actor
+ public void actorRemove() {
+ indexTree.remove(key);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ String value = indexTree.getValueForExactKey(key);
+ r.r1 = (value == null) ? 1 : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Key does not exist
after remove."),
+ @Outcome(
+ id = "0",
+ expect = Expect.ACCEPTABLE_INTERESTING,
+ desc = "Key still exists due to put/remove race.")
+ })
+ @Description(
+ "Tests the race condition between put() and remove() on a key with
relation type in "
+ + "ConcurrentRadixTree. Thread 1 inserts a key that includes a
relation type "
+ + "(ROLE_USER_REL), while Thread 2 concurrently removes the same
key. If remove "
+ + "happens after put, the key is deleted, which is acceptable. If
put happens after remove, "
+ + "the key remains, which is interesting but not forbidden.")
+ @State
+ public static class PutRemoveSameKeyWithRelationTypeCoherenceTest {
+
+ private final RadixTree<EntityCacheKey> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+ private final NameIdentifier ident = NameIdentifierUtil.ofRole("metalake",
"role");
+ private final EntityCacheKey key =
+ EntityCacheKey.of(
+ ident, Entity.EntityType.ROLE,
SupportsRelationOperations.Type.ROLE_USER_REL);
+ private final String keyStr = key.toString();
+
+ @Actor
+ public void actorPut() {
+ indexTree.put(keyStr, key);
+ }
+
+ @Actor
+ public void actorRemove() {
+ indexTree.remove(keyStr);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ EntityCacheKey value = indexTree.getValueForExactKey(keyStr);
+ r.r1 = (value == null) ? 1 : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "1, 1",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both values are visible — correct and expected."),
+ @Outcome(
+ id = "1, 0",
+ expect = Expect.FORBIDDEN,
+ desc = "Only v1 is visible — inconsistent cache state."),
+ @Outcome(
+ id = "0, 1",
+ expect = Expect.FORBIDDEN,
+ desc = "Only v2 is visible — inconsistent cache state."),
+ @Outcome(
+ id = "0, 0",
+ expect = Expect.FORBIDDEN,
+ desc = "No value visible — inconsistent cache state.")
+ })
+ @Description(
+ "Tests race conditions between concurrent put() operations and a prefix
scan. Thread 1 inserts "
+ + "'table1' and Thread 2 inserts 'table2', both under the same
prefix. Arbiter performs a "
+ + "prefix scan using getValuesForKeysStartingWith(). Both values
should be visible. "
+ + "Missing any indicates a broken visibility guarantee, which is
forbidden.")
+ @State
+ public static class PutAndPrefixScanCoherenceTest {
+
+ private final RadixTree<String> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+ private static final String PREFIX = "catalog.schema.";
+ private static final String KEY1 = PREFIX + "table1";
+ private static final String KEY2 = PREFIX + "table2";
+
+ @Actor
+ public void actorPut1() {
+ indexTree.put(KEY1, "v1");
+ }
+
+ @Actor
+ public void actorPut2() {
+ indexTree.put(KEY2, "v2");
+ }
+
+ @Arbiter
+ public void arbiter(II_Result r) {
+ ImmutableList<String> values =
+ ImmutableList.copyOf(indexTree.getValuesForKeysStartingWith(PREFIX));
+ r.r1 = values.contains("v1") ? 1 : 0;
+ r.r2 = values.contains("v2") ? 1 : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "1, 1",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both values are visible — correct and expected."),
+ @Outcome(
+ id = "1, 0",
+ expect = Expect.FORBIDDEN,
+ desc = "Only v1 is visible — inconsistent cache state."),
+ @Outcome(
+ id = "0, 1",
+ expect = Expect.FORBIDDEN,
+ desc = "Only v2 is visible — inconsistent cache state."),
+ @Outcome(
+ id = "0, 0",
+ expect = Expect.FORBIDDEN,
+ desc = "No value visible — inconsistent cache state.")
+ })
+ @Description(
+ "Tests race conditions between concurrent put() operations on keys with
different relation "
+ + "types and a prefix-based scan. Thread 1 inserts a ROLE_USER_REL
key, Thread 2 inserts "
+ + "a ROLE_GROUP_REL key. The arbiter uses
getValuesForKeysStartingWith() to verify both "
+ + "entries are visible. Visibility of only one is a race condition
and forbidden. Missing "
+ + "both values indicates an inconsistent cache state and is also
forbidden.")
+ @State
+ public static class PutAndPrefixScanWithRelationTypeCoherenceTest {
+
+ private final RadixTree<EntityCacheKey> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+ private final NameIdentifier ident1 =
NameIdentifierUtil.ofRole("metalake", "role1");
+ private final NameIdentifier ident2 =
NameIdentifierUtil.ofRole("metalake", "role2");
+
+ private final EntityCacheKey key1 =
+ EntityCacheKey.of(
+ ident1, Entity.EntityType.ROLE,
SupportsRelationOperations.Type.ROLE_USER_REL);
+ private final EntityCacheKey key2 =
+ EntityCacheKey.of(
+ ident2, Entity.EntityType.ROLE,
SupportsRelationOperations.Type.ROLE_GROUP_REL);
+ private final String key1Str = key1.toString();
+ private final String key2Str = key2.toString();
+
+ @Actor
+ public void actorPut1() {
+ indexTree.put(key1Str, key1);
+ }
+
+ @Actor
+ public void actorPut2() {
+ indexTree.put(key2Str, key2);
+ }
+
+ @Arbiter
+ public void arbiter(II_Result r) {
+ ImmutableList<EntityCacheKey> values =
+
ImmutableList.copyOf(indexTree.getValuesForKeysStartingWith("metalake"));
+ r.r1 = values.contains(key1) ? 1 : 0;
+ r.r2 = values.contains(key2) ? 1 : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Get sees the
value"),
+ @Outcome(
+ id = "0",
+ expect = Expect.ACCEPTABLE_INTERESTING,
+ desc = "Get sees nothing due to timing")
+ })
+ @Description(
+ "Tests the race condition between put() and getValueForExactKey().
Thread 1 inserts a key-value "
+ + "pair into the radix tree. Thread 2 concurrently attempts to
retrieve the value for the same "
+ + "key. If get() observes the effect of put(), it should return a
non-null result. If not, it "
+ + "may return null depending on the timing. Missing value is
interesting but not forbidden.")
+ @State
+ public static class PutAndGetCoherenceTest {
+ private final RadixTree<String> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+ private final String key = "catalog.schema.table";
+
+ @Actor
+ public void actorPut() {
+ indexTree.put(key, "v1");
+ }
+
+ @Actor
+ public void actorGet(I_Result r) {
+ r.r1 = indexTree.getValueForExactKey(key) != null ? 1 : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Get sees the
value"),
+ @Outcome(
+ id = "0",
+ expect = Expect.ACCEPTABLE_INTERESTING,
+ desc = "Get sees nothing due to timing")
+ })
+ @Description(
+ "Tests race condition between put() and getValueForExactKey() for
relation-typed cache keys. "
+ + "Thread 1 inserts a ROLE_USER_REL key into the radix tree. Thread
2 concurrently attempts to "
+ + "retrieve the same key. If the get observes the result of the put,
it returns a non-null value. "
+ + "Missing value is interesting but not forbidden, depending on
execution timing.")
+ @State
+ public static class PutAndGetWithRelationTypeCoherenceTest {
+ private final RadixTree<EntityCacheKey> indexTree =
+ new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+ private final NameIdentifier ident = NameIdentifierUtil.ofRole("metalake",
"role");
+ private final EntityCacheKey key =
+ EntityCacheKey.of(
+ ident, Entity.EntityType.ROLE,
SupportsRelationOperations.Type.ROLE_USER_REL);
+ private final String keyStr = key.toString();
+
+ @Actor
+ public void actorPut() {
+ indexTree.put(keyStr, key);
+ }
+
+ @Actor
+ public void actorGet(I_Result r) {
+ r.r1 = indexTree.getValueForExactKey(keyStr) != null ? 1 : 0;
+ }
+ }
+}
diff --git
a/core/src/jcstress/java/org/apache/gravitino/cache/TestCaffeineEntityCacheCoherence.java
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCaffeineEntityCacheCoherence.java
new file mode 100644
index 0000000000..df1978adc4
--- /dev/null
+++
b/core/src/jcstress/java/org/apache/gravitino/cache/TestCaffeineEntityCacheCoherence.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.openjdk.jcstress.annotations.Actor;
+import org.openjdk.jcstress.annotations.Arbiter;
+import org.openjdk.jcstress.annotations.Description;
+import org.openjdk.jcstress.annotations.Expect;
+import org.openjdk.jcstress.annotations.JCStressTest;
+import org.openjdk.jcstress.annotations.Outcome;
+import org.openjdk.jcstress.annotations.State;
+import org.openjdk.jcstress.infra.results.I_Result;
+import org.openjdk.jcstress.infra.results.L_Result;
+
+public class TestCaffeineEntityCacheCoherence {
+ private static final SchemaEntity schemaEntity =
+ getTestSchemaEntity(1L, "schema1", Namespace.of("metalake1",
"catalog1"), "test_schema1");
+ private static final TableEntity tableEntity =
+ getTestTableEntity(3L, "table1", Namespace.of("metalake1", "catalog1",
"schema1"));
+ private static final GroupEntity groupEntity =
+ getTestGroupEntity(4L, "group1", "metalake1", ImmutableList.of("role1"));
+ private static final UserEntity userEntity =
+ getTestUserEntity(5L, "user1", "metalake1", ImmutableList.of(6L));
+ private static final RoleEntity roleEntity = getTestRoleEntity(6L, "role1",
"metalake1");
+
+ private static SchemaEntity getTestSchemaEntity(
+ long id, String name, Namespace namespace, String comment) {
+ return SchemaEntity.builder()
+ .withId(id)
+ .withName(name)
+ .withNamespace(namespace)
+ .withAuditInfo(getTestAuditInfo())
+ .withComment(comment)
+ .withProperties(ImmutableMap.of())
+ .build();
+ }
+
+ private static TableEntity getTestTableEntity(long id, String name,
Namespace namespace) {
+ return TableEntity.builder()
+ .withId(id)
+ .withName(name)
+ .withAuditInfo(getTestAuditInfo())
+ .withNamespace(namespace)
+ .withColumns(ImmutableList.of(getMockColumnEntity()))
+ .build();
+ }
+
+ private static RoleEntity getTestRoleEntity(long id, String name, String
metalake) {
+ return RoleEntity.builder()
+ .withId(id)
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofRole(metalake))
+ .withAuditInfo(getTestAuditInfo())
+ .withSecurableObjects(ImmutableList.of())
+ .build();
+ }
+
+ private static GroupEntity getTestGroupEntity(
+ long id, String name, String metalake, List<String> roles) {
+ return GroupEntity.builder()
+ .withId(id)
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofGroup(metalake))
+ .withAuditInfo(getTestAuditInfo())
+ .withRoleNames(roles)
+ .build();
+ }
+
+ private static UserEntity getTestUserEntity(
+ long id, String name, String metalake, List<Long> roles) {
+ return UserEntity.builder()
+ .withId(id)
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofUser(metalake))
+ .withAuditInfo(getTestAuditInfo())
+ .withRoleIds(roles)
+ .build();
+ }
+
+ private static AuditInfo getTestAuditInfo() {
+ return AuditInfo.builder()
+ .withCreator("admin")
+ .withCreateTime(Instant.now())
+ .withLastModifier("admin")
+ .withLastModifiedTime(Instant.now())
+ .build();
+ }
+
+ private static ColumnEntity getMockColumnEntity() {
+ ColumnEntity mockColumn = mock(ColumnEntity.class);
+ when(mockColumn.name()).thenReturn("filed1");
+ when(mockColumn.dataType()).thenReturn(Types.StringType.get());
+ when(mockColumn.nullable()).thenReturn(false);
+ when(mockColumn.auditInfo()).thenReturn(getTestAuditInfo());
+
+ return mockColumn;
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(id = "ENTITY", expect = Expect.ACCEPTABLE, desc = "getIfPresent
observed the entity."),
+ @Outcome(
+ id = "NULL",
+ expect = Expect.FORBIDDEN,
+ desc = "getIfPresent did not observe entity, which violates visibility
guarantees.")
+ })
+ @Description(
+ "Tests visibility between put() and getIfPresent() on an existing
entity. "
+ + "Entity should remain visible; NULL indicates broken visibility or
invalidation.")
+ @State
+ public static class PutWithGetIfPresentCoherenceTest {
+ private final EntityCache cache;
+
+ public PutWithGetIfPresentCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor2(L_Result r) {
+ r.r1 =
+ cache.getIfPresent(schemaEntity.nameIdentifier(),
schemaEntity.type()).isPresent()
+ ? "ENTITY"
+ : "NULL";
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "ENTITY",
+ expect = Expect.ACCEPTABLE,
+ desc =
+ "contains() returns true, indicating the cache retains visibility
after repeated put()."),
+ @Outcome(
+ id = "NULL",
+ expect = Expect.FORBIDDEN,
+ desc =
+ "contains() returned false, indicating visibility or
synchronization error during repeated put().")
+ })
+ @Description(
+ "Tests visibility with repeated put() on the same key. "
+ + "Actor1 puts again; Actor2 checks contains(). "
+ + "Entity should remain visible; NULL indicates a visibility issue.")
+ @State
+ public static class PutWithContainCoherenceTest {
+ private final EntityCache cache;
+
+ public PutWithContainCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor2(L_Result r) {
+ boolean contains = cache.contains(schemaEntity.nameIdentifier(),
schemaEntity.type());
+ r.r1 = contains ? "ENTITY" : "NULL";
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "ENTITY",
+ expect = Expect.ACCEPTABLE,
+ desc = "put() completed after invalidate(), so the entity remains in
the cache."),
+ @Outcome(
+ id = "NULL",
+ expect = Expect.ACCEPTABLE_INTERESTING,
+ desc = "invalidate() cleared the entity after put(), so it is no
longer in the cache.")
+ })
+ @Description(
+ "Concurrent put() and invalidate() on the same key. "
+ + "If put wins, entity remains; if invalidate wins, it's removed. "
+ + "Both outcomes are acceptable; NULL is interesting for consistency
checks.")
+ @State
+ public static class PutWithInvalidateCoherenceTest {
+ private final EntityCache cache;
+
+ public PutWithInvalidateCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor2() {
+ cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+ }
+
+ @Arbiter
+ public void arbiter(L_Result r) {
+ Entity result =
+ cache.getIfPresent(schemaEntity.nameIdentifier(),
schemaEntity.type()).orElse(null);
+ r.r1 = result != null ? "ENTITY" : "NULL";
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "ENTITY",
+ expect = Expect.ACCEPTABLE,
+ desc = "put() happened after clear(), so the entity remains in the
cache."),
+ @Outcome(
+ id = "NULL",
+ expect = Expect.ACCEPTABLE_INTERESTING,
+ desc = "clear() happened after or concurrently with put(), so the
cache is empty.")
+ })
+ @Description(
+ "Concurrent put() and clear(). If put wins, entity remains. If clear
wins, entity is gone. "
+ + "NULL is acceptable but interesting due to race timing.")
+ @State
+ public static class PutWithClearCoherenceTest {
+ private final EntityCache cache;
+
+ public PutWithClearCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(tableEntity);
+ }
+
+ @Actor
+ public void actor2() {
+ cache.clear();
+ }
+
+ @Arbiter
+ public void arbiter(L_Result r) {
+ Entity result =
+ cache.getIfPresent(tableEntity.nameIdentifier(),
tableEntity.type()).orElse(null);
+ r.r1 = result != null ? "ENTITY" : "NULL";
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "2",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both put() calls succeeded; both entries are visible in the
cache."),
+ @Outcome(
+ id = "1",
+ expect = Expect.FORBIDDEN,
+ desc = "Only one entry is visible; potential visibility or atomicity
issue."),
+ @Outcome(
+ id = "0",
+ expect = Expect.FORBIDDEN,
+ desc =
+ "Neither entry is visible; indicates a serious failure in write
propagation or cache logic.")
+ })
+ @Description(
+ "Concurrent put() on different keys. Both schema and table should be
visible (result = 2). "
+ + "Lower results may indicate visibility or concurrency issues.")
+ @State
+ public static class ConcurrentPutDifferentKeysTest {
+ private final EntityCache cache;
+
+ public ConcurrentPutDifferentKeysTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor2() {
+ cache.put(tableEntity);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ int count = 0;
+ if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())) {
+ count++;
+ }
+ if (cache.contains(tableEntity.nameIdentifier(), tableEntity.type())) {
+ count++;
+ }
+
+ r.r1 = count;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "2",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both put() calls succeeded; both entries are visible in the
cache."),
+ @Outcome(
+ id = "1",
+ expect = Expect.FORBIDDEN,
+ desc = "Only one entry is visible; potential visibility or atomicity
issue."),
+ @Outcome(
+ id = "0",
+ expect = Expect.FORBIDDEN,
+ desc =
+ "Neither entry is visible; indicates a serious failure in write
propagation or cache logic.")
+ })
+ @Description("Concurrent put() with different ROLE relation types; expect
both visible (2).")
+ @State
+ public static class ConcurrentPutDifferentKeysWithRelationTest {
+ private final EntityCache cache;
+
+ public ConcurrentPutDifferentKeysWithRelationTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(
+ roleEntity.nameIdentifier(),
+ Entity.EntityType.ROLE,
+ SupportsRelationOperations.Type.ROLE_GROUP_REL,
+ ImmutableList.of(groupEntity));
+ }
+
+ @Actor
+ public void actor2() {
+ cache.put(
+ roleEntity.nameIdentifier(),
+ Entity.EntityType.ROLE,
+ SupportsRelationOperations.Type.ROLE_USER_REL,
+ ImmutableList.of(userEntity));
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ int count = 0;
+ if (cache.contains(
+ roleEntity.nameIdentifier(),
+ Entity.EntityType.ROLE,
+ SupportsRelationOperations.Type.ROLE_USER_REL)) {
+ count++;
+ }
+ if (cache.contains(
+ roleEntity.nameIdentifier(),
+ Entity.EntityType.ROLE,
+ SupportsRelationOperations.Type.ROLE_GROUP_REL)) {
+ count++;
+ }
+
+ r.r1 = count;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "1",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both put() calls succeeded on the same key; value is
visible."),
+ @Outcome(
+ id = "0",
+ expect = Expect.FORBIDDEN,
+ desc = "Neither put() was visible; indicates a visibility or atomicity
issue.")
+ })
+ @Description(
+ "Concurrent put() on the same key; value should remain visible. Missing
entry indicates a concurrency issue.")
+ @State
+ public static class ConcurrentPutSameKeyTest {
+ private final EntityCache cache;
+
+ public ConcurrentPutSameKeyTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor2() {
+ cache.put(schemaEntity);
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ r.r1 = cache.contains(schemaEntity.nameIdentifier(),
Entity.EntityType.SCHEMA) ? 1 : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "1",
+ expect = Expect.ACCEPTABLE,
+ desc = "Entry is visible; concurrent put() calls succeeded."),
+ @Outcome(
+ id = "0",
+ expect = Expect.FORBIDDEN,
+ desc = "Entry is missing; indicates visibility or atomicity issue.")
+ })
+ @Description(
+ "Tests concurrent put() on the same key with relation type. "
+ + "Entry must remain visible after concurrent writes; missing
indicates a bug.")
+ @State
+ public static class ConcurrentPutSameKeyWithRelationTest {
+ private final EntityCache cache;
+
+ public ConcurrentPutSameKeyWithRelationTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ }
+
+ @Actor
+ public void actor1() {
+ cache.put(
+ roleEntity.nameIdentifier(),
+ Entity.EntityType.ROLE,
+ SupportsRelationOperations.Type.ROLE_USER_REL,
+ ImmutableList.of(userEntity));
+ }
+
+ @Actor
+ public void actor2() {
+ cache.put(
+ roleEntity.nameIdentifier(),
+ Entity.EntityType.ROLE,
+ SupportsRelationOperations.Type.ROLE_USER_REL,
+ ImmutableList.of(userEntity));
+ }
+
+ @Arbiter
+ public void arbiter(I_Result r) {
+ r.r1 =
+ cache.contains(
+ roleEntity.nameIdentifier(),
+ Entity.EntityType.ROLE,
+ SupportsRelationOperations.Type.ROLE_USER_REL)
+ ? 1
+ : 0;
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "ENTITY",
+ expect = Expect.ACCEPTABLE,
+ desc = "GetIfPresent sees the entity before it was invalidated."),
+ @Outcome(
+ id = "NULL",
+ expect = Expect.ACCEPTABLE_INTERESTING,
+ desc = "Invalidate removed the entity before getIfPresent.")
+ })
+ @Description(
+ "Tests race between invalidate() and getIfPresent(). "
+ + "Either outcome is allowed depending on timing.")
+ @State
+ public static class InvalidateWithGetCoherenceTest {
+ private final EntityCache cache;
+
+ public InvalidateWithGetCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+
+ cache.put(schemaEntity);
+ cache.put(tableEntity);
+ }
+
+ @Actor
+ public void actor1() {
+ cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+ }
+
+ @Actor
+ public void actor2(L_Result r) {
+ Optional<? extends Entity> result =
+ cache.getIfPresent(tableEntity.nameIdentifier(), tableEntity.type());
+ r.r1 = result.isPresent() ? "ENTITY" : "NULL";
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "SUCCESS",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both invalidates executed safely."),
+ @Outcome(id = "FAILURE", expect = Expect.FORBIDDEN, desc = "One or both
invalidates failed.")
+ })
+ @Description("Tests concurrent invalidate() on the same key is safe and
idempotent.")
+ @State
+ public static class ConcurrentInvalidateSameKeyCoherenceTest {
+ private final EntityCache cache;
+
+ public ConcurrentInvalidateSameKeyCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ cache.put(schemaEntity);
+ }
+
+ @Actor
+ public void actor1() {
+ cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+ }
+
+ @Actor
+ public void actor2() {
+ cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+ }
+
+ @Arbiter
+ public void arbiter(L_Result r) {
+ r.r1 =
+ cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())
+ ? "FAILURE"
+ : "SUCCESS";
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "SUCCESS",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both invalidate operations completed; neither entry remains in
the cache."),
+ @Outcome(
+ id = "FAILURE",
+ expect = Expect.FORBIDDEN,
+ desc = "One or both entries remain in the cache after concurrent
invalidate.")
+ })
+ @Description(
+ "Tests concurrent invalidate() on two related keys (same prefix). "
+ + "Verifies that prefix-based invalidation logic does not interfere
across keys, "
+ + "and both schema and table entries are properly removed without
conflict or race condition.")
+ @State
+ public static class ConcurrentInvalidateRelatedKeyCoherenceTest {
+ private final EntityCache cache;
+
+ public ConcurrentInvalidateRelatedKeyCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ cache.put(schemaEntity);
+ cache.put(tableEntity);
+ }
+
+ @Actor
+ public void actor1() {
+ cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+ }
+
+ @Actor
+ public void actor2() {
+ cache.invalidate(tableEntity.nameIdentifier(), tableEntity.type());
+ }
+
+ @Arbiter
+ public void arbiter(L_Result r) {
+ if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())
+ || cache.contains(tableEntity.nameIdentifier(), tableEntity.type()))
{
+ r.r1 = "FAILURE";
+ return;
+ }
+
+ r.r1 = "SUCCESS";
+ }
+ }
+
+ @JCStressTest
+ @Outcome.Outcomes({
+ @Outcome(
+ id = "SUCCESS",
+ expect = Expect.ACCEPTABLE,
+ desc = "Both invalidate() and clear() removed the entries as
expected."),
+ @Outcome(
+ id = "FAILURE",
+ expect = Expect.FORBIDDEN,
+ desc = "One or more entries remain; indicates race condition or
incomplete invalidation.")
+ })
+ @Description(
+ "Tests concurrent invalidate() and clear() operations. "
+ + "Ensures that both targeted and global removals can coexist
without leaving residual entries. "
+ + "Any remaining entries indicate inconsistency in concurrent
removal paths.")
+ @State
+ public static class ClearWithInvalidateCoherenceTest {
+ private final EntityCache cache;
+
+ public ClearWithInvalidateCoherenceTest() {
+ this.cache = new CaffeineEntityCache(new Config() {});
+ cache.put(schemaEntity);
+ cache.put(tableEntity);
+ }
+
+ @Actor
+ public void actor1() {
+ cache.invalidate(schemaEntity.nameIdentifier(), schemaEntity.type());
+ }
+
+ @Actor
+ public void actor2() {
+ cache.clear();
+ }
+
+ @Arbiter
+ public void arbiter(L_Result r) {
+ if (cache.contains(schemaEntity.nameIdentifier(), schemaEntity.type())
+ || cache.contains(tableEntity.nameIdentifier(), tableEntity.type()))
{
+ r.r1 = "FAILURE";
+ return;
+ }
+
+ r.r1 = "SUCCESS";
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
index 317beff0d1..91148b4a78 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -58,6 +59,20 @@ public class CaffeineEntityCache extends BaseEntityCache {
private static final int CACHE_CLEANUP_QUEUE_CAPACITY = 100;
private static final int CACHE_MONITOR_PERIOD_MINUTES = 5;
private static final int CACHE_MONITOR_INITIAL_DELAY_MINUTES = 0;
+ private static final ExecutorService CLEANUP_EXECUTOR =
+ new ThreadPoolExecutor(
+ CACHE_CLEANUP_CORE_THREADS,
+ CACHE_CLEANUP_MAX_THREADS,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(CACHE_CLEANUP_QUEUE_CAPACITY),
+ r -> {
+ Thread t = new Thread(r, "CaffeineEntityCache-Cleanup");
+ t.setDaemon(true);
+ return t;
+ },
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
private static final Logger LOG =
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
private final ReentrantLock opLock = new ReentrantLock();
@@ -78,11 +93,10 @@ public class CaffeineEntityCache extends BaseEntityCache {
super(cacheConfig);
this.cacheIndex = new ConcurrentRadixTree<>(new
DefaultCharArrayNodeFactory());
- ThreadPoolExecutor cleanupExec = buildCleanupExecutor();
Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder =
newBaseBuilder(cacheConfig);
cacheDataBuilder
- .executor(cleanupExec)
+ .executor(CLEANUP_EXECUTOR)
.removalListener(
(key, value, cause) -> {
if (cause == RemovalCause.EXPLICIT || cause ==
RemovalCause.REPLACED) {
@@ -191,13 +205,16 @@ public class CaffeineEntityCache extends BaseEntityCache {
List<E> entities) {
checkArguments(ident, type, relType);
Preconditions.checkArgument(entities != null, "Entities cannot be null");
- if (entities.isEmpty()) {
- return;
- }
+ withLock(
+ () -> {
+ if (entities.isEmpty()) {
+ return;
+ }
- syncEntitiesToCache(
- EntityCacheKey.of(ident, type, relType),
- entities.stream().map(e -> (Entity) e).collect(Collectors.toList()));
+ syncEntitiesToCache(
+ EntityCacheKey.of(ident, type, relType),
+ entities.stream().map(e -> (Entity)
e).collect(Collectors.toList()));
+ });
}
/** {@inheritDoc} */
@@ -410,26 +427,6 @@ public class CaffeineEntityCache extends BaseEntityCache {
}
}
- /**
- * Builds the cleanup executor.
- *
- * @return The cleanup executor
- */
- private ThreadPoolExecutor buildCleanupExecutor() {
- return new ThreadPoolExecutor(
- CACHE_CLEANUP_CORE_THREADS,
- CACHE_CLEANUP_MAX_THREADS,
- 0L,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(CACHE_CLEANUP_QUEUE_CAPACITY),
- r -> {
- Thread t = new Thread(r, "CaffeineEntityCache-Cleanup");
- t.setDaemon(true);
- return t;
- },
- new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
/** Starts the cache stats monitor. */
private void startCacheStatsMonitor() {
scheduler.scheduleAtFixedRate(
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index f83fc539c7..114be4a02e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -120,6 +120,7 @@ google-auth = "1.28.0"
aliyun-credentials = "0.3.12"
openlineage = "1.29.0"
concurrent-trees = "2.6.0"
+jcstress = "0.8.15"
[libraries]
aws-iam = { group = "software.amazon.awssdk", name = "iam", version.ref =
"awssdk" }
@@ -307,3 +308,4 @@ tasktree = {id = "com.dorongold.task-tree", version =
"2.1.1"}
dependencyLicenseReport = {id = "com.github.jk1.dependency-license-report",
version = "2.9"}
bom = {id = "org.cyclonedx.bom", version = "1.5.0"}
errorprone = {id = "net.ltgt.errorprone", version.ref = "error-prone"}
+jcstress = { id = "io.github.reyerizo.gradle.jcstress", version.ref =
"jcstress" }