imbajin commented on code in PR #3017:
URL: https://github.com/apache/hugegraph/pull/3017#discussion_r3208957441


##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java:
##########
@@ -184,17 +185,29 @@ private void listenChanges() {
             }
             return false;
         };
-        if (graphCacheListenStatus.putIfAbsent(this.params().spaceGraphName(), 
true) == null) {
-            EventHub graphEventHub = this.params().graphEventHub();
+        EventHub graphEventHub = this.params().graphEventHub();
+        EventListener previous =
+                graphCacheEventListeners.putIfAbsent(
+                        this.params().spaceGraphName(), listener);
+        if (previous == null) {
+            this.cacheEventListener = listener;
+            this.registeredCacheEventListener = true;
             graphEventHub.listen(Events.CACHE, this.cacheEventListener);
+        } else {
+            this.cacheEventListener = previous;
+            this.registeredCacheEventListener = false;
         }
     }
 
     private void unlistenChanges() {
         String graphName = this.params().spaceGraphName();
-        if (graphCacheListenStatus.remove(graphName) != null) {
+        if (this.registeredCacheEventListener) {
             EventHub graphEventHub = this.params().graphEventHub();
-            graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+            if (graphCacheEventListeners.remove(graphName,
+                                                this.cacheEventListener)) {
+                graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+            }
+            this.registeredCacheEventListener = false;

Review Comment:
   ‼️ What happens in the reverse ordering — owner closes first while 
non-owners are still alive? The new tests only cover non-owner close.
   
   Trace:
   1. T1 opens (owner), registers `listener`, puts it in 
`graphCacheEventListeners`.
   2. T2 opens (non-owner), caches `this.cacheEventListener = <T1's listener>`.
   3. T1 closes → `unlistenChanges()` removes the map entry and calls 
`hub.unlisten(...)`.
   4. T2 still alive. Its `notifyChanges(...)` calls `notifyExcept(hub, <T1's 
listener>, ...)`. That listener is no longer on the hub, so the exclusion is a 
no-op and the emit fans out to whatever else is registered (e.g. 
`AbstractCacheNotifier`'s bridge). Meanwhile T2 has no working local cache 
listener at all — the per-graph listener slot is empty until a new transaction 
repopulates it.
   
   Two questions:
   - Is T1-closes-before-T2 a realistic lifecycle in practice (e.g. 
thread-local transactions on a long-lived graph)? If yes, a ref-counted 
ownership holder (unregister only when the last referent closes) would be safer 
than "first-wins" ownership.
   - Either way, worth adding a companion test to 
`testClosingNonOwnerKeepsGraphCacheListenerRegistered` that exercises the 
owner-first-close path and asserts whatever the intended semantics are, so the 
invariant is captured.



##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java:
##########
@@ -138,6 +147,96 @@ public void testEventInvalid() throws Exception {
                             Whitebox.invoke(cache, "verticesCache", "size"));
     }
 
+    @Test
+    public void testClearCacheEmitsActionClear() throws Exception {
+        // Producers must emit the present-tense ACTION_CLEAR / ACTION_INVALID,
+        // not the legacy past-tense variants - otherwise local listeners that
+        // match only the present-tense actions silently drop the event.
+        CachedGraphTransaction cache = this.cache();
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<String> action = new AtomicReference<>();
+        EventListener listener = event -> {
+            Object[] args = event.args();
+            if (args.length > 0 && args[0] instanceof String) {
+                action.set((String) args[0]);
+                latch.countDown();
+            }
+            return true;
+        };
+        this.params.graphEventHub().listen(Events.CACHE, listener);
+        try {
+            cache.clearCache(HugeType.VERTEX, true);
+
+            Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+            Assert.assertEquals(Cache.ACTION_CLEAR, action.get());
+        } finally {
+            this.params.graphEventHub().unlisten(Events.CACHE, listener);
+        }
+    }
+
+    @Test
+    public void testVertexMutationEmitsActionInvalid() throws Exception {
+        CachedGraphTransaction cache = this.cache();
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<String> action = new AtomicReference<>();
+        EventListener listener = event -> {
+            Object[] args = event.args();
+            if (args.length > 0 && Cache.ACTION_INVALID.equals(args[0])) {
+                action.set((String) args[0]);
+                latch.countDown();
+            }
+            return true;
+        };
+        this.params.graphEventHub().listen(Events.CACHE, listener);
+        try {
+            cache.addVertex(this.newVertex(IdGenerator.of(1)));
+            cache.commit();
+
+            Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+            Assert.assertEquals(Cache.ACTION_INVALID, action.get());
+        } finally {
+            this.params.graphEventHub().unlisten(Events.CACHE, listener);
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testClosingNonOwnerKeepsGraphCacheListenerRegistered()
+            throws Exception {
+        Field cacheField = GraphTransaction.class
+                .getDeclaredField("graphCacheEventListeners");
+        cacheField.setAccessible(true);
+        ConcurrentMap<String, EventListener> cacheListeners =
+                (ConcurrentMap<String, EventListener>) cacheField.get(null);
+        Field storeField = GraphTransaction.class
+                .getDeclaredField("storeEventListenStatus");
+        storeField.setAccessible(true);
+        ConcurrentMap<String, Boolean> storeListeners =
+                (ConcurrentMap<String, Boolean>) storeField.get(null);
+
+        String graphName = this.params.spaceGraphName();
+        EventListener registered = cacheListeners.get(graphName);
+        Assert.assertNotNull(registered);
+
+        CachedGraphTransaction second = new CachedGraphTransaction(
+                this.params, this.params.loadGraphStore());
+        Assert.assertSame(registered, cacheListeners.get(graphName));
+
+        try {
+            second.close();
+
+            Assert.assertSame(registered, cacheListeners.get(graphName));
+            Assert.assertTrue(this.params.graphEventHub()
+                                         .listeners(Events.CACHE)
+                                         .contains(registered));
+        } finally {
+            // Closing the secondary transaction exercises the pre-existing
+            // store listener guard too; restore it so teardown unregisters the
+            // primary transaction's store listener.
+            storeListeners.putIfAbsent(graphName, true);

Review Comment:
   ⚠️ TODO (follow-up): this `putIfAbsent` is a workaround for a pre-existing 
bug in `storeEventListenStatus` that mirrors exactly the one this PR fixes for 
`graphCacheEventListeners`. In `unlistenChanges()`:
   
   ```java
   if (storeEventListenStatus.remove(graphName) != null) {
       this.store().provider().unlisten(this.storeEventListener);
   }
   ```
   
   when a non-owner transaction closes, it (a) pops the map entry even though 
it doesn't own the registration, (b) unlistens its own per-instance 
`storeEventListener` which was never registered on the provider (no-op), and 
(c) leaves the primary's real store listener registered but un-tracked — so the 
owner's later close silently skips the unlisten and leaks it.
   
   Suggest a follow-up PR that applies the same ref-counted / owner-aware 
pattern introduced here for the cache listener (i.e. a `Map<String, 
EventListener>` + `registeredStoreEventListener` flag). Keeping as TODO for now 
since it's out of scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to