imbajin commented on code in PR #3017:
URL: https://github.com/apache/hugegraph/pull/3017#discussion_r3242011697
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java:
##########
@@ -60,11 +62,20 @@ public final class CachedGraphTransaction extends
GraphTransaction {
private static final long AVG_VERTEX_ENTRY_SIZE = 40L;
private static final long AVG_EDGE_ENTRY_SIZE = 100L;
+ /*
+ * Listener lifetime must cover all active transactions for the graph.
+ * The holder is removed from the registry and unregistered from EventHub
+ * only when the last transaction releases it.
+ */
+ private static final ConcurrentMap<String, CacheListenerHolder>
+ graphCacheEventListeners = new ConcurrentHashMap<>();
Review Comment:
⚠️ **Naming inconsistency with `SCHEMA_CACHE_EVENT_LISTENERS`**
Both maps are `private static final` and should follow
`SCREAMING_SNAKE_CASE` per Java convention, but this one uses `lowerCamelCase`
while the schema counterpart in `CachedSchemaTransaction` does not. Suggest
renaming to `GRAPH_CACHE_EVENT_LISTENERS` for consistency.
```suggestion
GRAPH_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>();
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CacheListenerHolder.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.backend.cache;
+
+import org.apache.hugegraph.event.EventHub;
+import org.apache.hugegraph.event.EventListener;
+
+/*
+ * Listener lifetime must cover all active transactions for the graph.
+ * The holder is removed from the registry and unregistered from EventHub
+ * only when the last transaction releases it.
+ */
+final class CacheListenerHolder {
+
+ final EventListener listener;
+ final EventHub hub;
+ int refCount;
Review Comment:
⚠️ **`refCount` is a bare `int` with no synchronization contract documented**
The field is safe today because all mutations happen inside
`ConcurrentMap.compute()`, which serialises per-key — but that invariant is
invisible to future maintainers. A stray read/write outside `compute()` would
introduce a data race with no compiler warning.
Either document the contract:
```suggestion
// Must only be read or written inside ConcurrentMap.compute() for the
// enclosing registry; ConcurrentHashMap.compute() serialises per-key
access.
int refCount;
```
Or use `AtomicInteger` — consistent with `BackendSessionPool.sessionCount`
and other ref-counted fields in the codebase — which makes the intent
self-documenting.
##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java:
##########
@@ -138,6 +186,176 @@ public void testEventInvalid() throws Exception {
Whitebox.invoke(cache, "verticesCache", "size"));
}
+ @Test
+ public void testClearCacheEmitsActionClear() throws Exception {
+ // Producers must emit the present-tense ACTION_CLEAR / ACTION_INVALID,
+ // not the legacy past-tense variants - otherwise local listeners that
+ // match only the present-tense actions silently drop the event.
+ CachedGraphTransaction cache = this.cache();
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<String> action = new AtomicReference<>();
+ EventListener listener = event -> {
+ Object[] args = event.args();
+ if (args.length > 0 && args[0] instanceof String) {
+ action.set((String) args[0]);
+ latch.countDown();
+ }
+ return true;
+ };
+ this.params.graphEventHub().listen(Events.CACHE, listener);
+ try {
+ cache.clearCache(HugeType.VERTEX, true);
+
+ Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ Assert.assertEquals(Cache.ACTION_CLEAR, action.get());
+ } finally {
+ this.params.graphEventHub().unlisten(Events.CACHE, listener);
+ }
+ }
+
+ @Test
+ public void testVertexMutationEmitsActionInvalid() throws Exception {
+ CachedGraphTransaction cache = this.cache();
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<String> action = new AtomicReference<>();
+ EventListener listener = event -> {
+ Object[] args = event.args();
+ if (args.length > 0 && Cache.ACTION_INVALID.equals(args[0])) {
+ action.set((String) args[0]);
+ latch.countDown();
+ }
+ return true;
+ };
+ this.params.graphEventHub().listen(Events.CACHE, listener);
+ try {
+ cache.addVertex(this.newVertex(IdGenerator.of(1)));
+ cache.commit();
+
+ Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ Assert.assertEquals(Cache.ACTION_INVALID, action.get());
+ } finally {
+ this.params.graphEventHub().unlisten(Events.CACHE, listener);
+ }
+ }
+
+ @Test
+ public void testClosingNonOwnerKeepsGraphCacheListenerRegistered()
+ throws Exception {
+ ConcurrentMap<String, Object> cacheListeners =
+ graphCacheEventListeners();
+ ConcurrentMap<String, Boolean> storeListeners =
+ storeEventListenStatus();
+
+ String graphName = this.params.spaceGraphName();
+ Object holder = cacheListeners.get(graphName);
+ Assert.assertNotNull(holder);
+ EventListener registered = holderListener(holder);
+ int refCount = holderRefCount(holder);
+
+ CachedGraphTransaction second = new CachedGraphTransaction(
+ this.params, this.params.loadGraphStore());
+ Assert.assertSame(holder, cacheListeners.get(graphName));
+ Assert.assertEquals(refCount + 1, holderRefCount(holder));
+
+ try {
+ second.close();
+
+ Assert.assertSame(holder, cacheListeners.get(graphName));
+ Assert.assertEquals(refCount, holderRefCount(holder));
+ Assert.assertTrue(this.params.graphEventHub()
+ .listeners(Events.CACHE)
+ .contains(registered));
+ } finally {
+ // Closing the secondary transaction exercises the pre-existing
Review Comment:
⚠️ **Test manipulates internal production state to compensate for a known
bug**
`storeListeners.putIfAbsent(graphName, true)` restores
`storeEventListenStatus` so that `teardown` can unlisten the primary store
listener — but only because closing `second` already consumed the map entry
(the ownership-first-close bug acknowledged in the TODO inside
`unlistenChanges`).
This means the test is silently accepting broken teardown behaviour rather
than isolating the intended scenario. If `putIfAbsent` fails for any reason,
teardown will fail with an error unrelated to the test's actual assertion.
Consider adding a comment linking this workaround to the follow-up PR that
will fix `storeEventListenStatus`, or isolating it in a shared helper that
makes the intent explicit.
##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java:
##########
@@ -285,17 +288,23 @@ public void testCacheExpire() {
mockCache2);
Mockito.when(this.mockCaches.entrySet()).thenReturn(caches.entrySet());
- cache1.update(IdGenerator.of("fake-id"), "fake-value");
- cache2.update(IdGenerator.of("fake-id"), "fake-value");
+ TimerTask task = Whitebox.invoke(CacheManager.class, "scheduleTimer",
+ manager, 0.1F);
+ try {
+ cache1.update(IdGenerator.of("fake-id"), "fake-value",
+ -30 * 1000L);
+ cache2.update(IdGenerator.of("fake-id"), "fake-value");
- waitTillNext(40);
+ waitTillNext(1);
Review Comment:
🧹 **`waitTillNext(1)` may be flaky under CI load**
`scheduleTimer(0.1F)` creates a 100 ms `Timer`, and 1 second should give ~10
ticks in theory. However, `Timer` uses a single background thread with no
scheduling-lag compensation, and `waitTillNext` is a spin-wait rather than a
latch. Under a CPU-starved CI runner the timer thread can be delayed enough
that fewer ticks fire within the window.
Consider extending the wait to 3 seconds for a reasonable safety margin, or
replacing the spin-wait with a `CountDownLatch` triggered from inside the
`tick()` mock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]