imbajin commented on code in PR #3011:
URL: https://github.com/apache/hugegraph/pull/3011#discussion_r3195543465


##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java:
##########
@@ -238,13 +369,25 @@ public void removeSchema(SchemaElement schema) {
 
         this.invalidateCache(schema.type(), schema.id());
 
+        this.maybeNotifySchemaCacheClear();
+    }
+
+    private void maybeNotifySchemaCacheClear() {
+        // Only suppress notifications for removal tasks when
+        // TASK_SYNC_DELETION=true: the caller propagates cache invalidation
+        // synchronously, so the meta-event broadcast would be redundant.
         if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
-            MetaManager.instance()
-                       .notifySchemaCacheClear(this.graph().graphSpace(),
-                                               this.graph().name());
+            this.notifySchemaCacheClear();
         }
     }
 
+    private void notifySchemaCacheClear() {
+        MetaManager.instance()
+                   .notifySchemaCacheClear(this.graph().graphSpace(),
+                                           this.graph().name(),
+                                           SCHEMA_CACHE_CLEAR_SOURCE);

Review Comment:
   ๐Ÿงน **Nit (non-blocking): consider adding a `@VisibleForTesting` or explicit 
visibility note on `resetMetaListenerForReconnect()`**
   
   The method is `public static` but the Javadoc makes clear it's an escape 
hatch, not part of the stable API. A `@VisibleForTesting` (Guava) or at least a 
`@apiNote` line clarifying "internal recovery hook; do not call from 
application code" would prevent external callers from building on what's 
intended as a temporary shim until MetaManager exposes a reconnect callback.
   
   ```suggestion
       /**
        * Manually reset the JVM-global meta listener flag after detecting that
        * the MetaManager transport reconnected and dropped the underlying gRPC
        * watch. This method is not wired to a MetaManager/MetaDriver reconnect
        * callback today; callers must invoke it explicitly after detecting that
        * condition. Without such a manual reset {@link 
#metaEventListenerRegistered}
        * would stay {@code true} forever and this JVM would stop receiving
        * cross-node schema cache clear events with no error or warning.
        *
        * <p>TODO: wire this into MetaManager once it exposes a transport
        * reconnect callback (e.g. {@code listenReconnect} /
        * {@code onTransportReconnect}). Until then it must be invoked
        * explicitly by code that detects the reconnect.
        *
        * @apiNote Internal recovery hook. Not part of the stable API โ€” external
        *          callers should not depend on this method; it may be replaced
        *          by an automatic reconnect callback in a future release.
        */
       public static void resetMetaListenerForReconnect() {
   ```



##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java:
##########
@@ -165,6 +187,528 @@ public void testGetSchema() throws Exception {
                             cache.getPropertyKey(IdGenerator.of(1)).name());
     }
 
+    @Test
+    public void testClearV2SchemaCacheByGraphName() {
+        String graphName = "DEFAULT-unit-test-v2";
+        String otherGraphName = "DEFAULT-other-v2";
+
+        Cache<Id, Object> idCache = CacheManager.instance()
+                                                .cache("schema-id-" +
+                                                       graphName, 10L);
+        Cache<Id, Object> nameCache = CacheManager.instance()
+                                                  .cache("schema-name-" +
+                                                         graphName, 10L);
+        Cache<Id, Object> otherIdCache = CacheManager.instance()
+                                                     .cache("schema-id-" +
+                                                            otherGraphName,
+                                                            10L);
+        Object arrayCaches = idCache.attachment(newV2SchemaCaches(10));
+        Id arrayCacheId = IdGenerator.of(1);
+        SchemaElement arrayCacheSchema =
+                new FakeObjects("unit-test-v2")
+                        .newPropertyKey(arrayCacheId, "fake-pk-array");
+
+        try {
+            clearV2SchemaCaches(arrayCaches);
+            setV2SchemaCache(arrayCaches, HugeType.PROPERTY_KEY, arrayCacheId,
+                             arrayCacheSchema);
+            idCache.update(IdGenerator.of(1), "fake-pk-by-id");
+            nameCache.update(IdGenerator.of("fake-pk"), "fake-pk-by-name");
+            otherIdCache.update(IdGenerator.of(2), "other-pk-by-id");
+
+            Assert.assertEquals(1L, idCache.size());
+            Assert.assertEquals(1L, nameCache.size());
+            Assert.assertEquals(1L, otherIdCache.size());
+            Assert.assertSame(arrayCacheSchema,
+                              getV2SchemaCache(arrayCaches,
+                                               HugeType.PROPERTY_KEY,
+                                               arrayCacheId));
+
+            Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+                                  new Class<?>[]{String.class},
+                                  "clearSchemaCache", graphName);
+
+            Assert.assertEquals(0L, idCache.size());
+            Assert.assertEquals(0L, nameCache.size());
+            Assert.assertEquals(1L, otherIdCache.size());
+            Assert.assertNull(getV2SchemaCache(arrayCaches,
+                                               HugeType.PROPERTY_KEY,
+                                               arrayCacheId));
+        } finally {
+            clearV2SchemaCaches(arrayCaches);
+            idCache.clear();
+            nameCache.clear();
+            otherIdCache.clear();
+        }
+    }
+
+    private static Object newV2SchemaCaches(int size) {
+        for (Class<?> clazz :
+             CachedSchemaTransactionV2.class.getDeclaredClasses()) {
+            if (!"SchemaCaches".equals(clazz.getSimpleName())) {
+                continue;
+            }
+            try {
+                Constructor<?> constructor =
+                        clazz.getDeclaredConstructor(int.class);
+                constructor.setAccessible(true);
+                return constructor.newInstance(size);
+            } catch (ReflectiveOperationException e) {
+                throw new AssertionError("Failed to create SchemaCaches", e);
+            }
+        }
+        throw new AssertionError("SchemaCaches class not found");
+    }
+
+    private static void clearV2SchemaCaches(Object arrayCaches) {
+        Whitebox.invoke(arrayCaches.getClass(), "clear", arrayCaches);
+    }
+
+    private static void setV2SchemaCache(Object arrayCaches, HugeType type,
+                                         Id id, SchemaElement schema) {
+        Whitebox.invoke(arrayCaches.getClass(),
+                        new Class<?>[]{HugeType.class, Id.class,
+                                       SchemaElement.class},
+                        "set", arrayCaches, type, id, schema);
+    }
+
+    private static SchemaElement getV2SchemaCache(Object arrayCaches,
+                                                  HugeType type, Id id) {
+        return Whitebox.invoke(arrayCaches.getClass(),
+                               new Class<?>[]{HugeType.class, Id.class},
+                               "get", arrayCaches, type, id);
+    }
+
+    @Test
+    public void testListenSchemaCacheClearIsIdempotent() throws Exception {
+        // Once the JVM-global registration flag is set, every subsequent
+        // call to listenSchemaCacheClear() must short-circuit before
+        // touching MetaManager โ€” even under concurrent invocation. Pre-set
+        // the flag, race N threads, and verify none of them propagated an
+        // exception (which would happen if MetaManager.instance()
+        // .listenSchemaCacheClear were invoked without an initialised
+        // driver).
+        Field flagField = CachedSchemaTransactionV2.class
+                .getDeclaredField("metaEventListenerRegistered");
+        flagField.setAccessible(true);
+        AtomicBoolean flag = (AtomicBoolean) flagField.get(null);
+        boolean previous = flag.getAndSet(true);
+        try {
+            int threads = 8;
+            CountDownLatch start = new CountDownLatch(1);
+            CountDownLatch done = new CountDownLatch(threads);
+            AtomicInteger failures = new AtomicInteger();
+            for (int i = 0; i < threads; i++) {
+                new Thread(() -> {
+                    try {
+                        start.await();
+                        Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+                                              "listenSchemaCacheClear");
+                    } catch (Throwable t) {
+                        failures.incrementAndGet();
+                    } finally {
+                        done.countDown();
+                    }
+                }).start();
+            }
+            start.countDown();
+            Assert.assertTrue("listenSchemaCacheClear race timed out",
+                              done.await(10, TimeUnit.SECONDS));
+            Assert.assertEquals("listenSchemaCacheClear must short-circuit " +
+                                "when already registered", 0, failures.get());
+            Assert.assertTrue("registration flag must remain set", flag.get());
+        } finally {
+            flag.set(previous);
+        }
+    }
+
+    @Test
+    public void testClearSchemaCacheClearsArrayAttachmentMaps()
+            throws Exception {
+        // clearSchemaCache() must wipe idCache, nameCache and every internal
+        // IntObjectMap (pks/vls/els/ils) inside the array attachment so
+        // stale entries are not served after a meta event.
+        String graphName = "DEFAULT-unit-test-v2-array";
+        Cache<Id, Object> idCache =
+                CacheManager.instance().cache("schema-id-" + graphName, 10L);
+        Cache<Id, Object> nameCache =
+                CacheManager.instance().cache("schema-name-" + graphName, 10L);
+        // Size must comfortably exceed the largest id below: IntObjectMap
+        // grows by doubling and refuses to write past currentSize even after
+        // a single expansion, so a small capacity rejects mid-range keys.
+        Object arrayCaches = idCache.attachment(newV2SchemaCaches(64));
+        Id pkId = IdGenerator.of(1);
+        Id vlId = IdGenerator.of(2);
+        Id elId = IdGenerator.of(3);
+        Id ilId = IdGenerator.of(4);
+        FakeObjects fakeObjects = new FakeObjects("unit-test-v2-array");
+        SchemaElement pk = fakeObjects.newPropertyKey(pkId, "fake-pk");
+
+        try {
+            clearV2SchemaCaches(arrayCaches);
+            setV2SchemaCache(arrayCaches, HugeType.PROPERTY_KEY, pkId, pk);
+            setV2SchemaCache(arrayCaches, HugeType.VERTEX_LABEL, vlId, pk);
+            setV2SchemaCache(arrayCaches, HugeType.EDGE_LABEL, elId, pk);
+            setV2SchemaCache(arrayCaches, HugeType.INDEX_LABEL, ilId, pk);
+            idCache.update(pkId, "fake-pk-by-id");
+            nameCache.update(IdGenerator.of("fake-pk"), "fake-pk-by-name");
+
+            Assert.assertEquals(1L, idCache.size());
+            Assert.assertEquals(1L, nameCache.size());
+            Assert.assertNotNull(getV2SchemaCache(arrayCaches,
+                                                  HugeType.PROPERTY_KEY, 
pkId));
+
+            Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+                                  new Class<?>[]{String.class},
+                                  "clearSchemaCache", graphName);
+
+            Assert.assertEquals(0L, idCache.size());
+            Assert.assertEquals(0L, nameCache.size());
+            for (String mapName : new String[]{"pks", "vls", "els", "ils"}) {
+                Object intMap = readField(arrayCaches, mapName);
+                assertIntObjectMapEmpty(intMap, mapName);
+            }
+            Map<HugeType, Boolean> cachedTypes = readField(arrayCaches,
+                                                           "cachedTypes");
+            Assert.assertTrue("cachedTypes must be empty after clear",
+                              cachedTypes.isEmpty());
+        } finally {
+            clearV2SchemaCaches(arrayCaches);
+            idCache.clear();
+            nameCache.clear();
+        }
+    }
+
+    // TASK_SYNC_DELETION gating of removeSchema notifications and the
+    // unconditional addSchema notification require an initialised
+    // CachedSchemaTransactionV2 instance, which in turn needs an hstore
+    // backend and a connected MetaManager. Both prerequisites are out of
+    // scope for this unit test class. They are exercised end-to-end by the
+    // hstore integration tests in CoreTestSuite. TODO(#2617): port these
+    // assertions into a dedicated CachedSchemaTransactionV2IT once
+    // mockito-inline becomes available so MetaManager.instance() can be
+    // stubbed without an hstore cluster.
+
+    @Test
+    public void testHandleSchemaCacheClearEventSkipsLocalSource()
+            throws Exception {
+        String graphName = "DEFAULT-meta-local-source-v2";
+        Cache<Id, Object> idCache =
+                CacheManager.instance().cache("schema-id-" + graphName, 10L);
+        Cache<Id, Object> nameCache =
+                CacheManager.instance()
+                            .cache("schema-name-" + graphName, 10L);
+
+        MetaDriver mockDriver = Mockito.mock(MetaDriver.class);
+        Object localResponse = new Object();
+        Object remoteResponse = new Object();
+        String localSource = schemaCacheClearSource();
+        Mockito.when(mockDriver.extractValuesFromResponse(localResponse))
+               .thenReturn(Collections.singletonList(
+                       MetaManager.schemaCacheClearEventValue(graphName,
+                                                              localSource)));
+        Mockito.when(mockDriver.extractValuesFromResponse(remoteResponse))
+               .thenReturn(Collections.singletonList(
+                       MetaManager.schemaCacheClearEventValue(graphName,
+                                                              "remote")));
+
+        MetaDriver originalDriver = swapMetaDriver(mockDriver);
+        try {
+            idCache.update(IdGenerator.of(1), "v");
+            nameCache.update(IdGenerator.of("n"), "v");
+
+            Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+                                  new Class<?>[]{Object.class},
+                                  "handleSchemaCacheClearEvent",
+                                  localResponse);
+
+            Assert.assertEquals("local echo must not clear id cache",
+                                1L, idCache.size());
+            Assert.assertEquals("local echo must not clear name cache",
+                                1L, nameCache.size());
+
+            Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+                                  new Class<?>[]{Object.class},
+                                  "handleSchemaCacheClearEvent",
+                                  remoteResponse);
+
+            Assert.assertEquals(0L, idCache.size());
+            Assert.assertEquals(0L, nameCache.size());
+        } finally {
+            swapMetaDriver(originalDriver);
+            idCache.clear();
+            nameCache.clear();
+        }
+    }
+
+    @Test
+    public void testHandleSchemaCacheClearEventClearsTargetGraphOnly()
+            throws Exception {
+        // End-to-end coverage of the meta-event consumer:
+        //   publish (response) -> MetaManager extract -> clearSchemaCache
+        // We bypass the live etcd/PD watch by stubbing MetaDriver on the
+        // MetaManager singleton and invoking the package-private consumer
+        // directly. This validates that only the targeted graph's caches are
+        // cleared and that other graphs in the same JVM are left untouched.
+        String targetGraph = "DEFAULT-meta-target-v2";
+        String otherGraph = "DEFAULT-meta-other-v2";
+
+        Cache<Id, Object> targetIdCache =
+                CacheManager.instance().cache("schema-id-" + targetGraph, 10L);
+        Cache<Id, Object> targetNameCache =
+                CacheManager.instance()
+                            .cache("schema-name-" + targetGraph, 10L);
+        Cache<Id, Object> otherIdCache =

Review Comment:
   ๐Ÿงน **Nit (non-blocking): the TODO references `#2617` but that's the parent 
issue, not a dedicated follow-up for the IT gap**
   
   The comment says `TODO(#2617): port these assertions into a dedicated 
CachedSchemaTransactionV2IT`. Since #2617 is about to close when this PR lands, 
the TODO will point to an issue nobody's tracking anymore. Consider either:
   
   1. File a new follow-up issue for the integration test specifically (like 
you did for #3013 / #3012), and reference that here; or
   2. Drop the issue number and just leave the rationale inline so future 
readers understand the gap without chasing a closed ticket.
   
   Not a merge blocker โ€” just a small housekeeping item to keep the TODO 
actionable after merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to