This is an automated email from the ASF dual-hosted git repository. ming pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push: new c99cfcddc fix(server):fix graph server cache notifier mechanism (#2729) c99cfcddc is described below commit c99cfcddc9f397513a7288748471b5ed58031f20 Author: haohao0103 <956322...@qq.com> AuthorDate: Wed Apr 2 09:53:22 2025 +0800 fix(server):fix graph server cache notifier mechanism (#2729) * #2728 * fix some typo & tiny improve --------- Co-authored-by: imbajin <j...@apache.org> --- .../org/apache/hugegraph/StandardHugeGraph.java | 63 +++++++++------ .../backend/cache/CachedGraphTransaction.java | 22 +++--- .../hugegraph/backend/tx/GraphTransaction.java | 92 +++++++++++----------- 3 files changed, 97 insertions(+), 80 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index eb991c0f6..ed9cd4234 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -17,6 +17,7 @@ package org.apache.hugegraph; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -577,11 +578,7 @@ public class StandardHugeGraph implements HugeGraph { private AbstractSerializer serializer() { String name = this.configuration.get(CoreOptions.SERIALIZER); LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name); - AbstractSerializer serializer = SerializerFactory.serializer(this.configuration, name); - if (serializer == null) { - throw new HugeException("Can't load serializer with name " + name); - } - return serializer; + return SerializerFactory.serializer(this.configuration, name); } private Analyzer analyzer() { @@ -597,7 +594,7 @@ public class StandardHugeGraph implements HugeGraph { } protected void reloadRamtable(boolean loadFromFile) { - // Expect triggered manually, like gremlin job + // Expect triggered manually, like a gremlin job if (this.ramtable != null) { this.ramtable.reload(loadFromFile, this.name); } else { @@ -1615,37 +1612,51 @@ public class StandardHugeGraph implements HugeGraph { private static class AbstractCacheNotifier implements CacheNotifier { + public static final Logger LOG = Log.logger(AbstractCacheNotifier.class); + private final EventHub hub; private final EventListener cacheEventListener; public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) { this.hub = hub; this.cacheEventListener = event -> { - Object[] args = event.args(); - E.checkArgument(args.length > 0 && args[0] instanceof String, - "Expect event action argument"); - if (Cache.ACTION_INVALIDED.equals(args[0])) { - event.checkArgs(String.class, HugeType.class, Object.class); - HugeType type = (HugeType) args[1]; - Object ids = args[2]; - if (ids instanceof Id[]) { - // argument type mismatch: proxy.invalid2(type,Id[]ids) - proxy.invalid2(type, (Id[]) ids); - } else if (ids instanceof Id) { - proxy.invalid(type, (Id) ids); - } else { - E.checkArgument(false, "Unexpected argument: %s", ids); + try { + LOG.info("Received event: {}", event); + Object[] args = event.args(); + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + String action = (String) args[0]; + LOG.debug("Event action: {}", action); + if (Cache.ACTION_INVALIDED.equals(action)) { + event.checkArgs(String.class, HugeType.class, Object.class); + HugeType type = (HugeType) args[1]; + Object ids = args[2]; + if (ids instanceof Id[]) { + LOG.debug("Calling proxy.invalid2 with type: {}, IDs: {}", type, Arrays.toString((Id[]) ids)); + proxy.invalid2(type, (Id[]) ids); + } else if (ids instanceof Id) { + LOG.debug("Calling proxy.invalid with type: {}, ID: {}", type, ids); + proxy.invalid(type, (Id) ids); + } else { + LOG.error("Unexpected argument: {}", ids); + E.checkArgument(false, "Unexpected argument: %s", ids); + } + return true; + } else if (Cache.ACTION_CLEARED.equals(action)) { + event.checkArgs(String.class, HugeType.class); + HugeType type = (HugeType) args[1]; + LOG.debug("Calling proxy.clear with type: {}", type); + proxy.clear(type); + return true; } - return true; - } else if (Cache.ACTION_CLEARED.equals(args[0])) { - event.checkArgs(String.class, HugeType.class); - HugeType type = (HugeType) args[1]; - proxy.clear(type); - return true; + } catch (Exception e) { + LOG.error("Error processing cache event: {}", e.getMessage(), e); } + LOG.warn("Event {} not handled",event); return false; }; this.hub.listen(Events.CACHE, this.cacheEventListener); + LOG.info("Cache event listener registered successfully. cacheEventListener {}",this.cacheEventListener); } @Override diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java index 83ab7f51a..cbf23e14d 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java @@ -133,7 +133,9 @@ public final class CachedGraphTransaction extends GraphTransaction { } return false; }; - this.store().provider().listen(this.storeEventListener); + if(storeEventListenStatus.putIfAbsent(this.params().name(),true)==null){ + this.store().provider().listen(this.storeEventListener); + } // Listen cache event: "cache"(invalid cache item) this.cacheEventListener = event -> { @@ -182,19 +184,21 @@ public final class CachedGraphTransaction extends GraphTransaction { } return false; }; - EventHub graphEventHub = this.params().graphEventHub(); - if (!graphEventHub.containsListener(Events.CACHE)) { + if(graphCacheListenStatus.putIfAbsent(this.params().name(),true)==null){ + EventHub graphEventHub = this.params().graphEventHub(); graphEventHub.listen(Events.CACHE, this.cacheEventListener); } } private void unlistenChanges() { - // Unlisten store event - this.store().provider().unlisten(this.storeEventListener); - - // Unlisten cache event - EventHub graphEventHub = this.params().graphEventHub(); - graphEventHub.unlisten(Events.CACHE, this.cacheEventListener); + String graphName = this.params().name(); + if (graphCacheListenStatus.remove(graphName) != null) { + EventHub graphEventHub = this.params().graphEventHub(); + graphEventHub.unlisten(Events.CACHE, this.cacheEventListener); + } + if (storeEventListenStatus.remove(graphName) != null) { + this.store().provider().unlisten(this.storeEventListener); + } } private void notifyChanges(String action, HugeType type, Id[] ids) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java index 7f441574e..e50fa5c6f 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -139,6 +140,10 @@ public class GraphTransaction extends IndexableTransaction { private final int verticesCapacity; private final int edgesCapacity; + protected static final ConcurrentHashMap<String, Boolean> graphCacheListenStatus = + new ConcurrentHashMap<>(); + protected static final ConcurrentHashMap<String, Boolean> storeEventListenStatus = + new ConcurrentHashMap<>(); public GraphTransaction(HugeGraphParams graph, BackendStore store) { super(graph, store); @@ -400,8 +405,8 @@ public class GraphTransaction extends IndexableTransaction { /* * If the backend stores vertex together with edges, it's edges * would be removed after removing vertex. Otherwise, if the - * backend stores vertex which is separated from edges, it's - * edges should be removed manually when removing vertex. + * backend stores vertex which is separated from edges, + * its edges should be removed manually when removing vertex. */ this.doRemove(this.serializer.writeVertex(v.prepareRemoved())); this.indexTx.updateVertexIndex(v, true); @@ -435,7 +440,7 @@ public class GraphTransaction extends IndexableTransaction { if (this.store().features().supportsUpdateVertexProperty()) { // Update vertex index without removed property this.indexTx.updateVertexIndex(prop.element(), false); - // Eliminate the property(OUT and IN owner edge) + // Eliminate the property (OUT and IN owner edge) this.doEliminate(this.serializer.writeVertexProperty(prop)); } else { // Override vertex @@ -447,12 +452,12 @@ public class GraphTransaction extends IndexableTransaction { if (this.store().features().supportsUpdateEdgeProperty()) { // Update edge index without removed property this.indexTx.updateEdgeIndex(prop.element(), false); - // Eliminate the property(OUT and IN owner edge) + // Eliminate the property (OUT and IN owner edge) this.doEliminate(this.serializer.writeEdgeProperty(prop)); this.doEliminate(this.serializer.writeEdgeProperty( prop.switchEdgeOwner())); } else { - // Override edge(it will be in addedEdges & updatedEdges) + // Override edge (it will be in addedEdges & updatedEdges) this.addEdge(prop.element()); } } @@ -464,7 +469,7 @@ public class GraphTransaction extends IndexableTransaction { if (this.store().features().supportsUpdateVertexProperty()) { // Update vertex index with new added property this.indexTx.updateVertexIndex(prop.element(), false); - // Append new property(OUT and IN owner edge) + // Append new property (OUT and IN owner edge) this.doAppend(this.serializer.writeVertexProperty(prop)); } else { // Override vertex @@ -474,9 +479,9 @@ public class GraphTransaction extends IndexableTransaction { assert p.element().type().isEdge(); HugeEdgeProperty<?> prop = (HugeEdgeProperty<?>) p; if (this.store().features().supportsUpdateEdgeProperty()) { - // Update edge index with new added property + // Update edge-index with new added property this.indexTx.updateEdgeIndex(prop.element(), false); - // Append new property(OUT and IN owner edge) + // Append new property (OUT and IN owner edge) this.doAppend(this.serializer.writeEdgeProperty(prop)); this.doAppend(this.serializer.writeEdgeProperty( prop.switchEdgeOwner())); @@ -560,12 +565,12 @@ public class GraphTransaction extends IndexableTransaction { QueryList<Number> queries = this.optimizeQueries(query, q -> { boolean isIndexQuery = q instanceof IdQuery; assert isIndexQuery || isConditionQuery || q == query; - // Need to fallback if there are uncommitted records + // Need to fall back if there are uncommitted records boolean fallback = hasUpdate; Number result; if (fallback) { - // Here just ignore it, and do fallback later + // Here just ignore it, and do fall back later result = null; } else if (!isIndexQuery || !isConditionQuery) { // It's a sysprop-query, let parent tx do it @@ -578,7 +583,7 @@ public class GraphTransaction extends IndexableTransaction { assert query instanceof ConditionQuery; OptimizedType optimized = ((ConditionQuery) query).optimized(); if (this.optimizeAggrByIndex && optimized == OptimizedType.INDEX) { - // The ids size means results count (assume no left index) + // The id's size means result count (assume no left index) result = q.idsSize(); } else { assert !fallback; @@ -587,7 +592,7 @@ public class GraphTransaction extends IndexableTransaction { } } - // Can't be optimized, then do fallback + // Can't be optimized, then do fall back if (fallback) { assert result == null; assert q.resultType().isVertex() || q.resultType().isEdge(); @@ -629,7 +634,7 @@ public class GraphTransaction extends IndexableTransaction { /* * No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label * update only can add nullable properties and user data, which is - * unconcerned with add vertex + * unconcerned with added vertex */ this.beforeWrite(); this.addedVertices.put(vertex.id(), vertex); @@ -762,21 +767,16 @@ public class GraphTransaction extends IndexableTransaction { return this.queryVerticesByIds(vertexIds, false, false, HugeType.SERVER); } - return this.queryVerticesByIds(vertexIds, false, false, - HugeType.VERTEX); + return this.queryVerticesByIds(vertexIds, false, false, HugeType.VERTEX); } - protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, - boolean adjacentVertex, - boolean checkMustExist) { - return this.queryVerticesByIds(vertexIds, adjacentVertex, checkMustExist, - HugeType.VERTEX); + protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, boolean adjacentVertex, + boolean checkMustExist) { + return this.queryVerticesByIds(vertexIds, adjacentVertex, checkMustExist, HugeType.VERTEX); } - protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, - boolean adjacentVertex, - boolean checkMustExist, - HugeType type) { + protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, boolean adjacentVertex, + boolean checkMustExist, HugeType type) { Query.checkForceCapacity(vertexIds.length); // NOTE: allowed duplicated vertices if query by duplicated ids @@ -890,7 +890,7 @@ public class GraphTransaction extends IndexableTransaction { /* * No need to lock EDGE_LABEL_ADD_UPDATE, because edge label * update only can add nullable properties and user data, which is - * unconcerned with add edge + * unconcerned with added edge */ this.beforeWrite(); this.addedEdges.put(edge.id(), edge); @@ -1132,7 +1132,7 @@ public class GraphTransaction extends IndexableTransaction { E.checkState(vertex != null, "No owner for updating property '%s'", prop.key()); - // Add property in memory for new created vertex + // Add property in memory for newly created vertex if (vertex.fresh()) { // The owner will do property update vertex.setProperty(prop); @@ -1177,7 +1177,7 @@ public class GraphTransaction extends IndexableTransaction { List<Id> primaryKeyIds = vertex.schemaLabel().primaryKeys(); E.checkArgument(!primaryKeyIds.contains(propKey.id()), "Can't remove primary key '%s'", prop.key()); - // Remove property in memory for new created vertex + // Remove property in memory for newly created vertex if (vertex.fresh()) { // The owner will do property update vertex.removeProperty(propKey.id()); @@ -1210,7 +1210,7 @@ public class GraphTransaction extends IndexableTransaction { E.checkState(edge != null, "No owner for updating property '%s'", prop.key()); - // Add property in memory for new created edge + // Add property in memory for newly created edge if (edge.fresh()) { // The owner will do property update edge.setProperty(prop); @@ -1250,11 +1250,11 @@ public class GraphTransaction extends IndexableTransaction { if (!edge.hasProperty(propKey.id())) { return; } - // Check is removing sort key + // Check is removing a sort key List<Id> sortKeyIds = edge.schemaLabel().sortKeys(); E.checkArgument(!sortKeyIds.contains(prop.propertyKey().id()), "Can't remove sort key '%s'", prop.key()); - // Remove property in memory for new created edge + // Remove property in memory for newly created edge if (edge.fresh()) { // The owner will do property update edge.removeProperty(propKey.id()); @@ -1280,7 +1280,7 @@ public class GraphTransaction extends IndexableTransaction { } /** - * Construct one edge condition query based on source vertex, direction and + * Construct one-edge condition query based on source vertex, direction and * edge labels * * @param sourceVertex source vertex of edge @@ -1340,8 +1340,8 @@ public class GraphTransaction extends IndexableTransaction { } private static ConditionQuery constructEdgesQuery(Id sourceVertex, - Directions direction, - List<Id> edgeLabels) { + Directions direction, + List<Id> edgeLabels) { E.checkState(sourceVertex != null, "The edge query must contain source vertex"); E.checkState(direction != null, @@ -1454,7 +1454,7 @@ public class GraphTransaction extends IndexableTransaction { /* * Supported query: * 1.query just by edge label - * 2.query just by PROPERTIES (like containsKey,containsValue) + * 2.query just by PROPERTIES (like containsKey, containsValue) * 3.query with scan */ if (query.containsCondition(HugeKeys.LABEL) || @@ -1570,8 +1570,8 @@ public class GraphTransaction extends IndexableTransaction { } if (vertexIdList.size() != filterVertexList.size()) { - // Modify on the copied relation to avoid affecting other query - Condition.Relation relation = + // Modify on the copied relation to avoid affecting another query + Condition.Relation relation = query.copyRelationAndUpdateQuery(HugeKeys.OWNER_VERTEX); relation.value(filterVertexList); } @@ -1603,7 +1603,8 @@ public class GraphTransaction extends IndexableTransaction { */ query.resetUserpropConditions(); - if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && query.condition(HugeKeys.SUB_LABEL) == null) { + if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && + query.condition(HugeKeys.SUB_LABEL) == null) { query.eq(HugeKeys.SUB_LABEL, el.id()); } LOG.debug("Query edges by sortKeys: {}", query); @@ -1613,7 +1614,7 @@ public class GraphTransaction extends IndexableTransaction { /* * Query only by sysprops, like: by vertex label, by edge label. - * NOTE: we assume sysprops would be indexed by backend store + * NOTE: we assume sysprops would be indexed by backend store, * but we don't support query edges only by direction/target-vertex. */ if (query.allSysprop()) { @@ -1842,7 +1843,7 @@ public class GraphTransaction extends IndexableTransaction { } /* * No need to lock INDEX_LABEL_ADD_UPDATE, because index label - * update only can add user data, which is unconcerned with + * update only can add user data, which is unconcerned with * update property */ this.beforeWrite(); @@ -1919,7 +1920,8 @@ public class GraphTransaction extends IndexableTransaction { } if (cq.optimized() == OptimizedType.INDEX) { // g.E().hasLabel(xxx).has(yyy) - // consider OptimizedType.INDEX_FILTER occurred in org.apache.hugegraph.core.EdgeCoreTest.testQueryCount + // consider OptimizedType.INDEX_FILTER occurred in org.apache.hugegraph.core + // .EdgeCoreTest.testQueryCount try { this.indexTx.asyncRemoveIndexLeft(cq, elem); } catch (Throwable e) { @@ -1934,7 +1936,7 @@ public class GraphTransaction extends IndexableTransaction { if (cq.existLeftIndex(elem.id())) { /* * Both have correct and left index, wo should return true - * but also needs to cleaned up left index + * but also needs to clean up left index */ try { this.indexTx.asyncRemoveIndexLeft(cq, elem); @@ -2067,8 +2069,8 @@ public class GraphTransaction extends IndexableTransaction { Set<V> txResults = InsertionOrderUtil.newSet(); /* - * Collect added/updated records - * Records in memory have higher priority than query from backend store + * Collect added/updated records. + * Records in memory have higher priority than a query from backend store */ for (V elem : addedTxRecords.values()) { if (query.reachLimit(txResults.size())) { @@ -2275,7 +2277,7 @@ public class GraphTransaction extends IndexableTransaction { while (iter.hasNext()) { consumer.accept(iter.next()); /* - * Commit per batch to avoid too much data in single commit, + * Commit per batch to avoid too much data in a single commit, * especially for Cassandra backend */ this.commitIfGtSize(GraphTransaction.COMMIT_BATCH); @@ -2300,7 +2302,7 @@ public class GraphTransaction extends IndexableTransaction { if (label.equals(elemLabel)) { consumer.accept(e); /* - * Commit per batch to avoid too much data in single + * Commit per batch to avoid too much data in a single * commit, especially for Cassandra backend */ this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);