This is an automated email from the ASF dual-hosted git repository.

ming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new c99cfcddc fix(server):fix graph server cache notifier mechanism (#2729)
c99cfcddc is described below

commit c99cfcddc9f397513a7288748471b5ed58031f20
Author: haohao0103 <956322...@qq.com>
AuthorDate: Wed Apr 2 09:53:22 2025 +0800

    fix(server):fix graph server cache notifier mechanism (#2729)
    
    * #2728
    
    * fix some typo & tiny improve
    
    ---------
    
    Co-authored-by: imbajin <j...@apache.org>
---
 .../org/apache/hugegraph/StandardHugeGraph.java    | 63 +++++++++------
 .../backend/cache/CachedGraphTransaction.java      | 22 +++---
 .../hugegraph/backend/tx/GraphTransaction.java     | 92 +++++++++++-----------
 3 files changed, 97 insertions(+), 80 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index eb991c0f6..ed9cd4234 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -17,6 +17,7 @@
 
 package org.apache.hugegraph;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -577,11 +578,7 @@ public class StandardHugeGraph implements HugeGraph {
     private AbstractSerializer serializer() {
         String name = this.configuration.get(CoreOptions.SERIALIZER);
         LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name);
-        AbstractSerializer serializer = 
SerializerFactory.serializer(this.configuration, name);
-        if (serializer == null) {
-            throw new HugeException("Can't load serializer with name " + name);
-        }
-        return serializer;
+        return SerializerFactory.serializer(this.configuration, name);
     }
 
     private Analyzer analyzer() {
@@ -597,7 +594,7 @@ public class StandardHugeGraph implements HugeGraph {
     }
 
     protected void reloadRamtable(boolean loadFromFile) {
-        // Expect triggered manually, like gremlin job
+        // Expect triggered manually, like a gremlin job
         if (this.ramtable != null) {
             this.ramtable.reload(loadFromFile, this.name);
         } else {
@@ -1615,37 +1612,51 @@ public class StandardHugeGraph implements HugeGraph {
 
     private static class AbstractCacheNotifier implements CacheNotifier {
 
+        public static final Logger LOG = 
Log.logger(AbstractCacheNotifier.class);
+
         private final EventHub hub;
         private final EventListener cacheEventListener;
 
         public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
             this.hub = hub;
             this.cacheEventListener = event -> {
-                Object[] args = event.args();
-                E.checkArgument(args.length > 0 && args[0] instanceof String,
-                                "Expect event action argument");
-                if (Cache.ACTION_INVALIDED.equals(args[0])) {
-                    event.checkArgs(String.class, HugeType.class, 
Object.class);
-                    HugeType type = (HugeType) args[1];
-                    Object ids = args[2];
-                    if (ids instanceof Id[]) {
-                        // argument type mismatch: proxy.invalid2(type,Id[]ids)
-                        proxy.invalid2(type, (Id[]) ids);
-                    } else if (ids instanceof Id) {
-                        proxy.invalid(type, (Id) ids);
-                    } else {
-                        E.checkArgument(false, "Unexpected argument: %s", ids);
+                try {
+                    LOG.info("Received event: {}", event);
+                    Object[] args = event.args();
+                    E.checkArgument(args.length > 0 && args[0] instanceof 
String,
+                                    "Expect event action argument");
+                    String action = (String) args[0];
+                    LOG.debug("Event action: {}", action);
+                    if (Cache.ACTION_INVALIDED.equals(action)) {
+                        event.checkArgs(String.class, HugeType.class, 
Object.class);
+                        HugeType type = (HugeType) args[1];
+                        Object ids = args[2];
+                        if (ids instanceof Id[]) {
+                            LOG.debug("Calling proxy.invalid2 with type: {}, 
IDs: {}", type, Arrays.toString((Id[]) ids));
+                            proxy.invalid2(type, (Id[]) ids);
+                        } else if (ids instanceof Id) {
+                            LOG.debug("Calling proxy.invalid with type: {}, 
ID: {}", type, ids);
+                            proxy.invalid(type, (Id) ids);
+                        } else {
+                            LOG.error("Unexpected argument: {}", ids);
+                            E.checkArgument(false, "Unexpected argument: %s", 
ids);
+                        }
+                        return true;
+                    } else if (Cache.ACTION_CLEARED.equals(action)) {
+                        event.checkArgs(String.class, HugeType.class);
+                        HugeType type = (HugeType) args[1];
+                        LOG.debug("Calling proxy.clear with type: {}", type);
+                        proxy.clear(type);
+                        return true;
                     }
-                    return true;
-                } else if (Cache.ACTION_CLEARED.equals(args[0])) {
-                    event.checkArgs(String.class, HugeType.class);
-                    HugeType type = (HugeType) args[1];
-                    proxy.clear(type);
-                    return true;
+                } catch (Exception e) {
+                    LOG.error("Error processing cache event: {}", 
e.getMessage(), e);
                 }
+                LOG.warn("Event {} not handled",event);
                 return false;
             };
             this.hub.listen(Events.CACHE, this.cacheEventListener);
+            LOG.info("Cache event listener registered successfully. 
cacheEventListener {}",this.cacheEventListener);
         }
 
         @Override
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
index 83ab7f51a..cbf23e14d 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
@@ -133,7 +133,9 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
             }
             return false;
         };
-        this.store().provider().listen(this.storeEventListener);
+        
if(storeEventListenStatus.putIfAbsent(this.params().name(),true)==null){
+            this.store().provider().listen(this.storeEventListener);
+        }
 
         // Listen cache event: "cache"(invalid cache item)
         this.cacheEventListener = event -> {
@@ -182,19 +184,21 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
             }
             return false;
         };
-        EventHub graphEventHub = this.params().graphEventHub();
-        if (!graphEventHub.containsListener(Events.CACHE)) {
+        
if(graphCacheListenStatus.putIfAbsent(this.params().name(),true)==null){
+            EventHub graphEventHub = this.params().graphEventHub();
             graphEventHub.listen(Events.CACHE, this.cacheEventListener);
         }
     }
 
     private void unlistenChanges() {
-        // Unlisten store event
-        this.store().provider().unlisten(this.storeEventListener);
-
-        // Unlisten cache event
-        EventHub graphEventHub = this.params().graphEventHub();
-        graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+        String graphName = this.params().name();
+        if (graphCacheListenStatus.remove(graphName) != null) {
+            EventHub graphEventHub = this.params().graphEventHub();
+            graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+        }
+        if (storeEventListenStatus.remove(graphName) != null) {
+            this.store().provider().unlisten(this.storeEventListener);
+        }
     }
 
     private void notifyChanges(String action, HugeType type, Id[] ids) {
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
index 7f441574e..e50fa5c6f 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -139,6 +140,10 @@ public class GraphTransaction extends IndexableTransaction 
{
 
     private final int verticesCapacity;
     private final int edgesCapacity;
+    protected static final ConcurrentHashMap<String, Boolean> 
graphCacheListenStatus =
+            new ConcurrentHashMap<>();
+    protected static final ConcurrentHashMap<String, Boolean> 
storeEventListenStatus =
+            new ConcurrentHashMap<>();
 
     public GraphTransaction(HugeGraphParams graph, BackendStore store) {
         super(graph, store);
@@ -400,8 +405,8 @@ public class GraphTransaction extends IndexableTransaction {
             /*
              * If the backend stores vertex together with edges, it's edges
              * would be removed after removing vertex. Otherwise, if the
-             * backend stores vertex which is separated from edges, it's
-             * edges should be removed manually when removing vertex.
+             * backend stores vertex which is separated from edges,
+             * its edges should be removed manually when removing vertex.
              */
             this.doRemove(this.serializer.writeVertex(v.prepareRemoved()));
             this.indexTx.updateVertexIndex(v, true);
@@ -435,7 +440,7 @@ public class GraphTransaction extends IndexableTransaction {
                 if (this.store().features().supportsUpdateVertexProperty()) {
                     // Update vertex index without removed property
                     this.indexTx.updateVertexIndex(prop.element(), false);
-                    // Eliminate the property(OUT and IN owner edge)
+                    // Eliminate the property (OUT and IN owner edge)
                     
this.doEliminate(this.serializer.writeVertexProperty(prop));
                 } else {
                     // Override vertex
@@ -447,12 +452,12 @@ public class GraphTransaction extends 
IndexableTransaction {
                 if (this.store().features().supportsUpdateEdgeProperty()) {
                     // Update edge index without removed property
                     this.indexTx.updateEdgeIndex(prop.element(), false);
-                    // Eliminate the property(OUT and IN owner edge)
+                    // Eliminate the property (OUT and IN owner edge)
                     this.doEliminate(this.serializer.writeEdgeProperty(prop));
                     this.doEliminate(this.serializer.writeEdgeProperty(
                             prop.switchEdgeOwner()));
                 } else {
-                    // Override edge(it will be in addedEdges & updatedEdges)
+                    // Override edge (it will be in addedEdges & updatedEdges)
                     this.addEdge(prop.element());
                 }
             }
@@ -464,7 +469,7 @@ public class GraphTransaction extends IndexableTransaction {
                 if (this.store().features().supportsUpdateVertexProperty()) {
                     // Update vertex index with new added property
                     this.indexTx.updateVertexIndex(prop.element(), false);
-                    // Append new property(OUT and IN owner edge)
+                    // Append new property (OUT and IN owner edge)
                     this.doAppend(this.serializer.writeVertexProperty(prop));
                 } else {
                     // Override vertex
@@ -474,9 +479,9 @@ public class GraphTransaction extends IndexableTransaction {
                 assert p.element().type().isEdge();
                 HugeEdgeProperty<?> prop = (HugeEdgeProperty<?>) p;
                 if (this.store().features().supportsUpdateEdgeProperty()) {
-                    // Update edge index with new added property
+                    // Update edge-index with new added property
                     this.indexTx.updateEdgeIndex(prop.element(), false);
-                    // Append new property(OUT and IN owner edge)
+                    // Append new property (OUT and IN owner edge)
                     this.doAppend(this.serializer.writeEdgeProperty(prop));
                     this.doAppend(this.serializer.writeEdgeProperty(
                             prop.switchEdgeOwner()));
@@ -560,12 +565,12 @@ public class GraphTransaction extends 
IndexableTransaction {
         QueryList<Number> queries = this.optimizeQueries(query, q -> {
             boolean isIndexQuery = q instanceof IdQuery;
             assert isIndexQuery || isConditionQuery || q == query;
-            // Need to fallback if there are uncommitted records
+            // Need to fall back if there are uncommitted records
             boolean fallback = hasUpdate;
             Number result;
 
             if (fallback) {
-                // Here just ignore it, and do fallback later
+                // Here just ignore it, and do fall back later
                 result = null;
             } else if (!isIndexQuery || !isConditionQuery) {
                 // It's a sysprop-query, let parent tx do it
@@ -578,7 +583,7 @@ public class GraphTransaction extends IndexableTransaction {
                 assert query instanceof ConditionQuery;
                 OptimizedType optimized = ((ConditionQuery) query).optimized();
                 if (this.optimizeAggrByIndex && optimized == 
OptimizedType.INDEX) {
-                    // The ids size means results count (assume no left index)
+                    // The id's size means result count (assume no left index)
                     result = q.idsSize();
                 } else {
                     assert !fallback;
@@ -587,7 +592,7 @@ public class GraphTransaction extends IndexableTransaction {
                 }
             }
 
-            // Can't be optimized, then do fallback
+            // Can't be optimized, then do fall back
             if (fallback) {
                 assert result == null;
                 assert q.resultType().isVertex() || q.resultType().isEdge();
@@ -629,7 +634,7 @@ public class GraphTransaction extends IndexableTransaction {
             /*
              * No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label
              * update only can add nullable properties and user data, which is
-             * unconcerned with add vertex
+             * unconcerned with added vertex
              */
             this.beforeWrite();
             this.addedVertices.put(vertex.id(), vertex);
@@ -762,21 +767,16 @@ public class GraphTransaction extends 
IndexableTransaction {
             return this.queryVerticesByIds(vertexIds, false, false,
                                            HugeType.SERVER);
         }
-        return this.queryVerticesByIds(vertexIds, false, false,
-                                       HugeType.VERTEX);
+        return this.queryVerticesByIds(vertexIds, false, false, 
HugeType.VERTEX);
     }
 
-    protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds,
-                                              boolean adjacentVertex,
-                                              boolean checkMustExist) {
-        return this.queryVerticesByIds(vertexIds, adjacentVertex, 
checkMustExist,
-                                       HugeType.VERTEX);
+    protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, boolean 
adjacentVertex,
+                                                  boolean checkMustExist) {
+        return this.queryVerticesByIds(vertexIds, adjacentVertex, 
checkMustExist, HugeType.VERTEX);
     }
 
-    protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds,
-                                                  boolean adjacentVertex,
-                                                  boolean checkMustExist,
-                                                  HugeType type) {
+    protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, boolean 
adjacentVertex,
+                                                  boolean checkMustExist, 
HugeType type) {
         Query.checkForceCapacity(vertexIds.length);
 
         // NOTE: allowed duplicated vertices if query by duplicated ids
@@ -890,7 +890,7 @@ public class GraphTransaction extends IndexableTransaction {
             /*
              * No need to lock EDGE_LABEL_ADD_UPDATE, because edge label
              * update only can add nullable properties and user data, which is
-             * unconcerned with add edge
+             * unconcerned with added edge
              */
             this.beforeWrite();
             this.addedEdges.put(edge.id(), edge);
@@ -1132,7 +1132,7 @@ public class GraphTransaction extends 
IndexableTransaction {
         E.checkState(vertex != null,
                      "No owner for updating property '%s'", prop.key());
 
-        // Add property in memory for new created vertex
+        // Add property in memory for newly created vertex
         if (vertex.fresh()) {
             // The owner will do property update
             vertex.setProperty(prop);
@@ -1177,7 +1177,7 @@ public class GraphTransaction extends 
IndexableTransaction {
         List<Id> primaryKeyIds = vertex.schemaLabel().primaryKeys();
         E.checkArgument(!primaryKeyIds.contains(propKey.id()),
                         "Can't remove primary key '%s'", prop.key());
-        // Remove property in memory for new created vertex
+        // Remove property in memory for newly created vertex
         if (vertex.fresh()) {
             // The owner will do property update
             vertex.removeProperty(propKey.id());
@@ -1210,7 +1210,7 @@ public class GraphTransaction extends 
IndexableTransaction {
         E.checkState(edge != null,
                      "No owner for updating property '%s'", prop.key());
 
-        // Add property in memory for new created edge
+        // Add property in memory for newly created edge
         if (edge.fresh()) {
             // The owner will do property update
             edge.setProperty(prop);
@@ -1250,11 +1250,11 @@ public class GraphTransaction extends 
IndexableTransaction {
         if (!edge.hasProperty(propKey.id())) {
             return;
         }
-        // Check is removing sort key
+        // Check is removing a sort key
         List<Id> sortKeyIds = edge.schemaLabel().sortKeys();
         E.checkArgument(!sortKeyIds.contains(prop.propertyKey().id()),
                         "Can't remove sort key '%s'", prop.key());
-        // Remove property in memory for new created edge
+        // Remove property in memory for newly created edge
         if (edge.fresh()) {
             // The owner will do property update
             edge.removeProperty(propKey.id());
@@ -1280,7 +1280,7 @@ public class GraphTransaction extends 
IndexableTransaction {
     }
 
     /**
-     * Construct one edge condition query based on source vertex, direction and
+     * Construct one-edge condition query based on source vertex, direction and
      * edge labels
      *
      * @param sourceVertex source vertex of edge
@@ -1340,8 +1340,8 @@ public class GraphTransaction extends 
IndexableTransaction {
     }
 
     private static ConditionQuery constructEdgesQuery(Id sourceVertex,
-                                                     Directions direction,
-                                                     List<Id> edgeLabels) {
+                                                      Directions direction,
+                                                      List<Id> edgeLabels) {
         E.checkState(sourceVertex != null,
                      "The edge query must contain source vertex");
         E.checkState(direction != null,
@@ -1454,7 +1454,7 @@ public class GraphTransaction extends 
IndexableTransaction {
             /*
              * Supported query:
              *  1.query just by edge label
-             *  2.query just by PROPERTIES (like containsKey,containsValue)
+             *  2.query just by PROPERTIES (like containsKey, containsValue)
              *  3.query with scan
              */
             if (query.containsCondition(HugeKeys.LABEL) ||
@@ -1570,8 +1570,8 @@ public class GraphTransaction extends 
IndexableTransaction {
                 }
 
                 if (vertexIdList.size() != filterVertexList.size()) {
-                    // Modify on the copied relation to avoid affecting other 
query
-                    Condition.Relation relation = 
+                    // Modify on the copied relation to avoid affecting 
another query
+                    Condition.Relation relation =
                             
query.copyRelationAndUpdateQuery(HugeKeys.OWNER_VERTEX);
                     relation.value(filterVertexList);
                 }
@@ -1603,7 +1603,8 @@ public class GraphTransaction extends 
IndexableTransaction {
                  */
                 query.resetUserpropConditions();
 
-                if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && 
query.condition(HugeKeys.SUB_LABEL) == null) {
+                if (this.storeFeatures().supportsFatherAndSubEdgeLabel() &&
+                    query.condition(HugeKeys.SUB_LABEL) == null) {
                     query.eq(HugeKeys.SUB_LABEL, el.id());
                 }
                 LOG.debug("Query edges by sortKeys: {}", query);
@@ -1613,7 +1614,7 @@ public class GraphTransaction extends 
IndexableTransaction {
 
         /*
          * Query only by sysprops, like: by vertex label, by edge label.
-         * NOTE: we assume sysprops would be indexed by backend store
+         * NOTE: we assume sysprops would be indexed by backend store,
          * but we don't support query edges only by direction/target-vertex.
          */
         if (query.allSysprop()) {
@@ -1842,7 +1843,7 @@ public class GraphTransaction extends 
IndexableTransaction {
             }
             /*
              * No need to lock INDEX_LABEL_ADD_UPDATE, because index label
-             * update only can add  user data, which is unconcerned with
+             * update only can add user data, which is unconcerned with
              * update property
              */
             this.beforeWrite();
@@ -1919,7 +1920,8 @@ public class GraphTransaction extends 
IndexableTransaction {
             }
             if (cq.optimized() == OptimizedType.INDEX) {
                 // g.E().hasLabel(xxx).has(yyy)
-                // consider OptimizedType.INDEX_FILTER occurred in 
org.apache.hugegraph.core.EdgeCoreTest.testQueryCount
+                // consider OptimizedType.INDEX_FILTER occurred in 
org.apache.hugegraph.core
+                // .EdgeCoreTest.testQueryCount
                 try {
                     this.indexTx.asyncRemoveIndexLeft(cq, elem);
                 } catch (Throwable e) {
@@ -1934,7 +1936,7 @@ public class GraphTransaction extends 
IndexableTransaction {
             if (cq.existLeftIndex(elem.id())) {
                 /*
                  * Both have correct and left index, wo should return true
-                 * but also needs to cleaned up left index
+                 * but also needs to clean up left index
                  */
                 try {
                     this.indexTx.asyncRemoveIndexLeft(cq, elem);
@@ -2067,8 +2069,8 @@ public class GraphTransaction extends 
IndexableTransaction {
         Set<V> txResults = InsertionOrderUtil.newSet();
 
         /*
-         * Collect added/updated records
-         * Records in memory have higher priority than query from backend store
+         * Collect added/updated records.
+         * Records in memory have higher priority than a query from backend 
store
          */
         for (V elem : addedTxRecords.values()) {
             if (query.reachLimit(txResults.size())) {
@@ -2275,7 +2277,7 @@ public class GraphTransaction extends 
IndexableTransaction {
                 while (iter.hasNext()) {
                     consumer.accept(iter.next());
                     /*
-                     * Commit per batch to avoid too much data in single 
commit,
+                     * Commit per batch to avoid too much data in a single 
commit,
                      * especially for Cassandra backend
                      */
                     this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);
@@ -2300,7 +2302,7 @@ public class GraphTransaction extends 
IndexableTransaction {
                         if (label.equals(elemLabel)) {
                             consumer.accept(e);
                             /*
-                             * Commit per batch to avoid too much data in 
single
+                             * Commit per batch to avoid too much data in a 
single
                              * commit, especially for Cassandra backend
                              */
                             this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);

Reply via email to