Copilot commented on code in PR #10416:
URL: https://github.com/apache/gravitino/pull/10416#discussion_r2924400819


##########
core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java:
##########
@@ -293,29 +314,98 @@ public <E extends Entity & HasIdentifier> List<E> 
updateEntityRelations(
     // backend. For example, if we are adding a tag to table, we need to 
invalidate the cache for
     // that table and the tag being added or removed. Otherwise, we might 
return stale data if we
     // list all tags for that table or all tables for that tag.
-    cache.invalidate(srcEntityIdent, srcEntityType, relType);
+    List<E> updated =
+        backend.updateEntityRelations(
+            relType, srcEntityIdent, srcEntityType, destEntitiesToAdd, 
destEntitiesToRemove);
+    publishOrApplyRelationInvalidation(srcEntityIdent, srcEntityType, relType);
     for (NameIdentifier destToAdd : destEntitiesToAdd) {
-      cache.invalidate(destToAdd, srcEntityType, relType);
+      publishOrApplyRelationInvalidation(destToAdd, srcEntityType, relType);
     }
 
     for (NameIdentifier destToRemove : destEntitiesToRemove) {
-      cache.invalidate(destToRemove, srcEntityType, relType);
+      publishOrApplyRelationInvalidation(destToRemove, srcEntityType, relType);
     }
-
-    return backend.updateEntityRelations(
-        relType, srcEntityIdent, srcEntityType, destEntitiesToAdd, 
destEntitiesToRemove);
+    return updated;
   }
 
   @Override
   public int batchDelete(
       List<Pair<NameIdentifier, Entity.EntityType>> entitiesToDelete, boolean 
cascade)
       throws IOException {
-    return backend.batchDelete(entitiesToDelete, cascade);
+    int result = backend.batchDelete(entitiesToDelete, cascade);
+    for (Pair<NameIdentifier, Entity.EntityType> entity : entitiesToDelete) {
+      publishOrApplyEntityInvalidation(entity.getLeft(), entity.getRight());
+    }
+    return result;
   }
 
   @Override
   public <E extends Entity & HasIdentifier> void batchPut(List<E> entities, 
boolean overwritten)
       throws IOException, EntityAlreadyExistsException {
     backend.batchPut(entities, overwritten);
+    entities.forEach(entity -> 
publishOrApplyEntityUpsert(entity.nameIdentifier(), entity.type()));
+  }
+
+  private void publishOrApplyEntityInvalidation(
+      NameIdentifier ident, Entity.EntityType entityType) {
+    if (!cacheInvalidationService.isEnabled()) {
+      cache.invalidate(ident, entityType);
+      return;
+    }
+
+    cacheInvalidationService.publish(
+        CacheInvalidationEvent.of(
+            CacheDomain.ENTITY,
+            CacheInvalidationOperation.INVALIDATE_KEY,
+            EntityCacheInvalidationKey.of(ident, entityType, null),
+            cacheInvalidationService.localNodeId()));
+  }
+
+  private void publishOrApplyRelationInvalidation(
+      NameIdentifier ident, Entity.EntityType entityType, 
SupportsRelationOperations.Type relType) {
+    if (!cacheInvalidationService.isEnabled()) {
+      cache.invalidate(ident, entityType, relType);
+      return;
+    }
+
+    cacheInvalidationService.publish(
+        CacheInvalidationEvent.of(
+            CacheDomain.ENTITY,
+            CacheInvalidationOperation.INVALIDATE_KEY,
+            EntityCacheInvalidationKey.of(ident, entityType, relType),
+            cacheInvalidationService.localNodeId()));
+  }
+
+  private void publishOrApplyEntityUpsert(NameIdentifier ident, 
Entity.EntityType entityType) {
+    if (!cacheInvalidationService.isEnabled()) {
+      return;
+    }
+
+    cacheInvalidationService.publish(
+        CacheInvalidationEvent.of(
+            CacheDomain.ENTITY,
+            CacheInvalidationOperation.UPSERT_KEY,
+            EntityCacheInvalidationKey.of(ident, entityType, null),
+            cacheInvalidationService.localNodeId()));
+  }
+
+  private void handleEntityInvalidationEvent(CacheInvalidationEvent event) {
+    if (event.operation() == CacheInvalidationOperation.INVALIDATE_ALL) {
+      cache.clear();
+      return;
+    }
+
+    if (event.operation() != CacheInvalidationOperation.INVALIDATE_KEY

Review Comment:
   `publishOrApplyEntityUpsert` publishes `UPSERT_KEY`, but the registered 
handler only processes `INVALIDATE_ALL` and `INVALIDATE_KEY`. As a result, 
`put()` / `batchPut()` will not invalidate caches on other nodes (and not even 
via the handler locally), which can leave stale data. Consider either (a) 
publishing `INVALIDATE_KEY` for upserts in Phase-1, or (b) updating 
`handleEntityInvalidationEvent` to also process `UPSERT_KEY` (e.g., treat it as 
invalidate-by-key unless you plan to distribute values).
   ```suggestion
       if ((event.operation() != CacheInvalidationOperation.INVALIDATE_KEY
               && event.operation() != CacheInvalidationOperation.UPSERT_KEY)
   ```



##########
core/src/main/java/org/apache/gravitino/cache/invalidation/CacheInvalidationEvent.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache.invalidation;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/** Immutable cache invalidation event. */
+public final class CacheInvalidationEvent {
+  private final CacheDomain domain;
+  private final CacheInvalidationOperation operation;
+  private final String sourceNode;
+  private final long timestampMillis;
+  @Nullable private final Object key;
+
+  private CacheInvalidationEvent(
+      CacheDomain domain,
+      CacheInvalidationOperation operation,
+      @Nullable Object key,
+      String sourceNode,
+      long timestampMillis) {
+    this.domain = Preconditions.checkNotNull(domain, "domain cannot be null");
+    this.operation = Preconditions.checkNotNull(operation, "operation cannot 
be null");
+    this.sourceNode = Preconditions.checkNotNull(sourceNode, "sourceNode 
cannot be null");
+    this.timestampMillis = timestampMillis;
+    this.key = key;
+  }
+
+  public static CacheInvalidationEvent of(
+      CacheDomain domain,
+      CacheInvalidationOperation operation,
+      @Nullable Object key,
+      String sourceNode) {
+    return new CacheInvalidationEvent(
+        domain, operation, key, sourceNode, System.currentTimeMillis());
+  }
+
+  public CacheDomain domain() {
+    return domain;
+  }
+
+  public CacheInvalidationOperation operation() {
+    return operation;
+  }
+
+  @Nullable
+  public Object key() {
+    return key;
+  }
+
+  public String sourceNode() {
+    return sourceNode;
+  }
+
+  public long timestampMillis() {
+    return timestampMillis;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof CacheInvalidationEvent)) {
+      return false;
+    }
+    CacheInvalidationEvent that = (CacheInvalidationEvent) o;
+    return timestampMillis == that.timestampMillis
+        && domain == that.domain
+        && operation == that.operation
+        && Objects.equals(sourceNode, that.sourceNode)
+        && Objects.equals(key, that.key);
+  }

Review Comment:
   Including `timestampMillis` in `equals/hashCode` makes event equality 
extremely brittle (two logically identical events won’t compare equal unless 
created at the same millisecond), which is rarely what callers expect for an 
“event” value object. If `equals/hashCode` are needed for tests/collections, 
consider excluding `timestampMillis` (and potentially `sourceNode`) from 
equality, or avoid overriding `equals/hashCode` entirely and rely on explicit 
field assertions where needed.



##########
server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java:
##########
@@ -401,19 +419,44 @@ public boolean hasMetadataPrivilegePermission(
 
   @Override
   public void handleRolePrivilegeChange(Long roleId) {
-    loadedRoles.invalidate(roleId);
+    if (!cacheInvalidationService.isEnabled()) {
+      loadedRoles.invalidate(roleId);
+      return;
+    }
+
+    cacheInvalidationService.publish(
+        CacheInvalidationEvent.of(
+            CacheDomain.AUTH_ROLE,
+            CacheInvalidationOperation.INVALIDATE_KEY,
+            RoleCacheInvalidationKey.of(roleId),
+            cacheInvalidationService.localNodeId()));
   }
 
   @Override
   public void handleMetadataOwnerChange(
       String metalake, Long oldOwnerId, NameIdentifier nameIdentifier, 
Entity.EntityType type) {
     MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(nameIdentifier, type);
     Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-    ownerRel.invalidate(metadataId);
+    if (!cacheInvalidationService.isEnabled()) {
+      ownerRel.invalidate(metadataId);
+      return;
+    }
+
+    cacheInvalidationService.publish(
+        CacheInvalidationEvent.of(
+            CacheDomain.AUTH_OWNER,
+            CacheInvalidationOperation.INVALIDATE_KEY,
+            OwnerCacheInvalidationKey.of(metadataId),
+            cacheInvalidationService.localNodeId()));
   }
 
   @Override
   public void close() throws IOException {
+    if (cacheInvalidationService != null) {
+      cacheInvalidationService.unregisterHandler(CacheDomain.AUTH_ROLE, 
roleInvalidationHandlerId);
+      cacheInvalidationService.unregisterHandler(
+          CacheDomain.AUTH_OWNER, ownerInvalidationHandlerId);

Review Comment:
   `unregisterHandler` is called with handler IDs that may be null/blank if 
`initialize()` didn’t complete (or if construction/initialization order 
changes). While the current local implementation tolerates blank IDs, the 
`CacheInvalidationService` contract doesn’t guarantee that. Guard the 
unregister calls with non-blank handler IDs (or initialize them eagerly) to 
avoid shutdown-time exceptions in alternate implementations.
   ```suggestion
         if (StringUtils.isNotBlank(roleInvalidationHandlerId)) {
           cacheInvalidationService.unregisterHandler(
               CacheDomain.AUTH_ROLE, roleInvalidationHandlerId);
         }
         if (StringUtils.isNotBlank(ownerInvalidationHandlerId)) {
           cacheInvalidationService.unregisterHandler(
               CacheDomain.AUTH_OWNER, ownerInvalidationHandlerId);
         }
   ```



##########
core/src/main/java/org/apache/gravitino/cache/invalidation/LocalCacheInvalidationService.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache.invalidation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** In-JVM cache invalidation service implementation. */
+public class LocalCacheInvalidationService implements CacheInvalidationService 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalCacheInvalidationService.class);
+
+  private final boolean enabled;
+  private final String localNodeId;
+  private final Map<CacheDomain, ConcurrentMap<String, 
CacheInvalidationHandler>> handlers;
+  private final ConcurrentMap<String, AtomicLong> publishedCounters;
+  private final ConcurrentMap<String, AtomicLong> handledCounters;
+
+  public LocalCacheInvalidationService(boolean enabled) {
+    this.enabled = enabled;
+    this.localNodeId = buildLocalNodeId();
+    this.handlers = new EnumMap<>(CacheDomain.class);
+    for (CacheDomain domain : CacheDomain.values()) {
+      handlers.put(domain, new ConcurrentHashMap<>());
+    }
+    this.publishedCounters = new ConcurrentHashMap<>();
+    this.handledCounters = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void publish(CacheInvalidationEvent event) {
+    Preconditions.checkNotNull(event, "event cannot be null");
+    if (!enabled) {
+      return;
+    }
+
+    String counterKey = getCounterKey(event.domain(), event.operation());
+    publishedCounters.computeIfAbsent(counterKey, key -> new 
AtomicLong()).incrementAndGet();
+
+    ConcurrentMap<String, CacheInvalidationHandler> domainHandlers = 
handlers.get(event.domain());
+    if (domainHandlers.isEmpty()) {
+      LOG.warn("No cache invalidation handlers registered for domain {}", 
event.domain());
+      return;
+    }
+
+    domainHandlers.forEach(
+        (handlerId, handler) -> {
+          handler.handle(event);
+          handledCounters.computeIfAbsent(counterKey, key -> new 
AtomicLong()).incrementAndGet();

Review Comment:
   Handler exceptions will currently propagate out of `publish()`, which can 
prevent remaining handlers from running and can break the caller’s mutation 
path (e.g., entity store update/delete) due to a cache-only side effect. Wrap 
each `handler.handle(event)` in a try/catch, log failures (including 
`handlerId`/domain/operation), and continue dispatching to the rest to improve 
resilience.
   ```suggestion
             try {
               handler.handle(event);
               handledCounters.computeIfAbsent(counterKey, key -> new 
AtomicLong()).incrementAndGet();
             } catch (Exception e) {
               LOG.error(
                   "Cache invalidation handler '{}' failed for domain {} and 
operation {}",
                   handlerId,
                   event.domain(),
                   event.operation(),
                   e);
             }
   ```



##########
core/src/main/java/org/apache/gravitino/cache/invalidation/CacheInvalidationServiceFactory.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache.invalidation;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+
+/** Singleton factory for cache invalidation service. */
+public final class CacheInvalidationServiceFactory {
+  private static volatile CacheInvalidationService invalidationService;
+
+  private CacheInvalidationServiceFactory() {}
+
+  public static CacheInvalidationService getOrCreate(Config config) {
+    if (invalidationService == null) {
+      synchronized (CacheInvalidationServiceFactory.class) {
+        if (invalidationService == null) {
+          invalidationService =
+              new 
LocalCacheInvalidationService(config.get(Configs.CACHE_INVALIDATION_ENABLED));
+        }
+      }
+    }
+    return invalidationService;
+  }
+
+  @VisibleForTesting
+  public static void resetForTest() {
+    synchronized (CacheInvalidationServiceFactory.class) {
+      invalidationService = null;

Review Comment:
   Because this is a process-wide singleton, the `enabled` value is effectively 
“latched” based on the first `getOrCreate(config)` call; later calls with 
different configs can’t change the behavior. This can lead to surprising 
behavior across modules/tests or when config initialization order changes. 
Consider enforcing a single initialization point (e.g., explicit init), or 
detect/log when subsequent configs disagree, or make the service’s enabled 
state derived from config rather than constructor-time.
   ```suggestion
   import java.util.logging.Logger;
   import org.apache.gravitino.Config;
   import org.apache.gravitino.Configs;
   
   /** Singleton factory for cache invalidation service. */
   public final class CacheInvalidationServiceFactory {
     private static final Logger LOGGER =
         Logger.getLogger(CacheInvalidationServiceFactory.class.getName());
   
     private static volatile CacheInvalidationService invalidationService;
   
     /**
      * Remembers the initial value of {@link 
Configs#CACHE_INVALIDATION_ENABLED} used to construct
      * the singleton. This allows us to detect and log subsequent calls that 
use a different value.
      */
     private static volatile Boolean initialEnabledConfig;
   
     private CacheInvalidationServiceFactory() {}
   
     public static CacheInvalidationService getOrCreate(Config config) {
       boolean enabled = config.get(Configs.CACHE_INVALIDATION_ENABLED);
   
       CacheInvalidationService localService = invalidationService;
       if (localService != null) {
         Boolean initial = initialEnabledConfig;
         if (initial != null && initial.booleanValue() != enabled) {
           LOGGER.warning(
               "Cache invalidation service already initialized with "
                   + "CACHE_INVALIDATION_ENABLED="
                   + initial
                   + " but getOrCreate was called with a different value: "
                   + enabled);
         }
         return localService;
       }
   
       synchronized (CacheInvalidationServiceFactory.class) {
         if (invalidationService == null) {
           initialEnabledConfig = enabled;
           invalidationService = new LocalCacheInvalidationService(enabled);
         } else {
           Boolean initial = initialEnabledConfig;
           if (initial != null && initial.booleanValue() != enabled) {
             LOGGER.warning(
                 "Cache invalidation service already initialized with "
                     + "CACHE_INVALIDATION_ENABLED="
                     + initial
                     + " but getOrCreate was called with a different value: "
                     + enabled);
           }
         }
         return invalidationService;
       }
     }
   
     @VisibleForTesting
     public static void resetForTest() {
       synchronized (CacheInvalidationServiceFactory.class) {
         invalidationService = null;
         initialEnabledConfig = null;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to