VGalaxies commented on code in PR #2301:
URL:
https://github.com/apache/incubator-hugegraph/pull/2301#discussion_r1325433577
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java:
##########
@@ -0,0 +1,469 @@
+package org.apache.hugegraph.backend.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.id.IdGenerator;
+import org.apache.hugegraph.backend.store.ram.IntObjectMap;
+import org.apache.hugegraph.backend.tx.SchemaTransactionV2;
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.event.EventHub;
+import org.apache.hugegraph.event.EventListener;
+import org.apache.hugegraph.meta.MetaDriver;
+import org.apache.hugegraph.meta.MetaManager;
+import org.apache.hugegraph.perf.PerfUtil;
+import org.apache.hugegraph.schema.SchemaElement;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Events;
+
+import com.google.common.collect.ImmutableSet;
+
+public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {
+ private final Cache<Id, Object> idCache;
+ private final Cache<Id, Object> nameCache;
+
+ private final SchemaCaches<SchemaElement> arrayCaches;
+
+ private EventListener storeEventListener;
+ private EventListener cacheEventListener;
+
+ public CachedSchemaTransactionV2(MetaDriver metaDriver,
+ String cluster,
+ HugeGraphParams graphParams) {
+ super(metaDriver, cluster, graphParams);
+
+ final long capacity = graphParams.configuration()
+
.get(CoreOptions.SCHEMA_CACHE_CAPACITY);
+ this.idCache = this.cache("schema-id", capacity);
+ this.nameCache = this.cache("schema-name", capacity);
+
+ SchemaCaches<SchemaElement> attachment = this.idCache.attachment();
+ if (attachment == null) {
+ int acSize = (int) (capacity >> 3);
+ attachment = this.idCache.attachment(new SchemaCaches<>(acSize));
+ }
+ this.arrayCaches = attachment;
+ this.listenChanges();
+ }
+
+ private static Id generateId(HugeType type, Id id) {
+ // NOTE: it's slower performance to use:
+ // String.format("%x-%s", type.code(), name)
+ return IdGenerator.of(type.string() + "-" + id.asString());
+ }
+
+ private static Id generateId(HugeType type, String name) {
+ return IdGenerator.of(type.string() + "-" + name);
+ }
+
+ public void close() {
+ this.clearCache(false);
+ this.unlistenChanges();
+ }
+
+ private Cache<Id, Object> cache(String prefix, long capacity) {
+ // TODO: uncomment later - graph space
+ //final String name = prefix + "-" + this.graph().spaceGraphName();
+ final String name = prefix + "-" + "";
+ // NOTE: must disable schema cache-expire due to getAllSchema()
+ return CacheManager.instance().cache(name, capacity);
+ }
+
+ private void listenChanges() {
+ // Listen store event: "store.init", "store.clear", ...
+ Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
+ Events.STORE_CLEAR,
+ Events.STORE_TRUNCATE);
+ this.storeEventListener = event -> {
+ if (storeEvents.contains(event.name())) {
+ LOG.debug("Graph {} clear schema cache on event '{}'",
+ this.graph(), event.name());
+ this.clearCache(true);
+ return true;
+ }
+ return false;
+ };
+
this.graphParams().loadGraphStore().provider().listen(this.storeEventListener);
+
+ // Listen cache event: "cache"(invalid cache item)
+ this.cacheEventListener = event -> {
+ LOG.debug("Graph {} received schema cache event: {}",
+ this.graph(), event);
+ Object[] args = event.args();
+ E.checkArgument(args.length > 0 && args[0] instanceof String,
+ "Expect event action argument");
+ if (Cache.ACTION_INVALID.equals(args[0])) {
+ event.checkArgs(String.class, HugeType.class, Id.class);
+ HugeType type = (HugeType) args[1];
+ Id id = (Id) args[2];
+ this.arrayCaches.remove(type, id);
+
+ id = generateId(type, id);
+ Object value = this.idCache.get(id);
+ if (value != null) {
+ // Invalidate id cache
+ this.idCache.invalidate(id);
+
+ // Invalidate name cache
+ SchemaElement schema = (SchemaElement) value;
+ Id prefixedName = generateId(schema.type(),
+ schema.name());
+ this.nameCache.invalidate(prefixedName);
+ }
+ this.resetCachedAll(type);
+ return true;
+ } else if (Cache.ACTION_CLEAR.equals(args[0])) {
+ event.checkArgs(String.class, HugeType.class);
+ this.clearCache(false);
+ return true;
+ }
+ return false;
+ };
+ EventHub schemaEventHub = this.graphParams().schemaEventHub();
+ if (!schemaEventHub.containsListener(Events.CACHE)) {
+ schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
+ }
+ }
+
+ public void clearCache(boolean notify) {
+ this.idCache.clear();
+ this.nameCache.clear();
+ this.arrayCaches.clear();
+ }
+
+ private void resetCachedAllIfReachedCapacity() {
+ if (this.idCache.size() >= this.idCache.capacity()) {
+ LOG.warn("Schema cache reached capacity({}): {}",
+ this.idCache.capacity(), this.idCache.size());
+ this.cachedTypes().clear();
+ }
+ }
+
+ private void unlistenChanges() {
+ // Unlisten store event
+ this.graphParams().loadGraphStore().provider()
+ .unlisten(this.storeEventListener);
+
+ // Unlisten cache event
+ EventHub schemaEventHub = this.graphParams().schemaEventHub();
+ schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+ }
+
+ private CachedTypes cachedTypes() {
+ return this.arrayCaches.cachedTypes();
+ }
+
+ private void resetCachedAll(HugeType type) {
+ // Set the cache all flag of the schema type to false
+ this.cachedTypes().put(type, false);
+ }
+
+ private void invalidateCache(HugeType type, Id id) {
+ // remove from id cache and name cache
+ Id prefixedId = generateId(type, id);
+ Object value = this.idCache.get(prefixedId);
+ if (value != null) {
+ this.idCache.invalidate(prefixedId);
+
+ SchemaElement schema = (SchemaElement) value;
+ Id prefixedName = generateId(schema.type(), schema.name());
+ this.nameCache.invalidate(prefixedName);
+ }
+
+ // remove from optimized array cache
+ this.arrayCaches.remove(type, id);
+ }
+
+ @Override
+ protected void updateSchema(SchemaElement schema,
+ Consumer<SchemaElement> updateCallback) {
+ super.updateSchema(schema, updateCallback);
+
+ this.updateCache(schema);
+ }
+
+ @Override
+ protected void addSchema(SchemaElement schema) {
+ super.addSchema(schema);
+
+ this.updateCache(schema);
+
+ if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
+ MetaManager.instance()
+ // TODO: uncomment later - graph space
+ //.notifySchemaCacheClear(this.graph().graphSpace(),
+ // this.graph().name());
+ .notifySchemaCacheClear("",
+ this.graph().name());
+ }
+ }
+
+ private void updateCache(SchemaElement schema) {
+ this.resetCachedAllIfReachedCapacity();
+
+ // update id cache
+ Id prefixedId = generateId(schema.type(), schema.id());
+ this.idCache.update(prefixedId, schema);
+
+ // update name cache
+ Id prefixedName = generateId(schema.type(), schema.name());
+ this.nameCache.update(prefixedName, schema);
+
+ // update optimized array cache
+ this.arrayCaches.updateIfNeeded(schema);
+ }
+
+ @Override
+ protected void removeSchema(SchemaElement schema) {
+ super.removeSchema(schema);
+
+ this.invalidateCache(schema.type(), schema.id());
+
+ if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
+ MetaManager.instance()
+ // TODO: uncomment later - graph space
+ //.notifySchemaCacheClear(this.graph().graphSpace(),
+ // this.graph().name());
+ .notifySchemaCacheClear("",
+ this.graph().name());
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
+ // try get from optimized array cache
+ if (id.number() && id.asLong() > 0L) {
+ SchemaElement value = this.arrayCaches.get(type, id);
+ if (value != null) {
+ return (T) value;
+ }
+ }
+
+ Id prefixedId = generateId(type, id);
+ Object value = this.idCache.get(prefixedId);
+ if (value == null) {
+ value = super.getSchema(type, id);
+ if (value != null) {
+ this.resetCachedAllIfReachedCapacity();
+
+ this.idCache.update(prefixedId, value);
+
+ SchemaElement schema = (SchemaElement) value;
+ Id prefixedName = generateId(schema.type(), schema.name());
+ this.nameCache.update(prefixedName, schema);
+ }
+ }
+
+ // update optimized array cache
+ this.arrayCaches.updateIfNeeded((SchemaElement) value);
+
+ return (T) value;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected <T extends SchemaElement> T getSchema(HugeType type,
+ String name) {
+ Id prefixedName = generateId(type, name);
+ Object value = this.nameCache.get(prefixedName);
+ if (value == null) {
+ value = super.getSchema(type, name);
+ if (value != null) {
+ this.resetCachedAllIfReachedCapacity();
+
+ this.nameCache.update(prefixedName, value);
+
+ SchemaElement schema = (SchemaElement) value;
+ Id prefixedId = generateId(schema.type(), schema.id());
+ this.idCache.update(prefixedId, schema);
+ }
+ }
+ return (T) value;
+ }
+
+ @Override
+ protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
+ Boolean cachedAll = this.cachedTypes().getOrDefault(type, false);
+ List<T> results;
+ if (cachedAll) {
+ results = new ArrayList<>();
+ // Get from cache
+ this.idCache.traverse(value -> {
+ @SuppressWarnings("unchecked")
+ T schema = (T) value;
+ if (schema.type() == type) {
+ results.add(schema);
+ }
+ });
+ return results;
+ } else {
+ results = super.getAllSchema(type);
+ long free = this.idCache.capacity() - this.idCache.size();
+ if (results.size() <= free) {
+ // Update cache
+ for (T schema : results) {
+ Id prefixedId = generateId(schema.type(), schema.id());
+ this.idCache.update(prefixedId, schema);
+
+ Id prefixedName = generateId(schema.type(), schema.name());
+ this.nameCache.update(prefixedName, schema);
+ }
+ this.cachedTypes().putIfAbsent(type, true);
+ }
+ return results;
+ }
+ }
+
+ @Override
+ public void clear() {
+ // Clear schema info firstly
+ super.clear();
+ this.clearCache(false);
+ }
+
+ private static final class SchemaCaches<V extends SchemaElement> {
+
+ private final int size;
+
+ private final IntObjectMap<V> pks;
+ private final IntObjectMap<V> vls;
+ private final IntObjectMap<V> els;
+ private final IntObjectMap<V> ils;
+
+ private final CachedTypes cachedTypes;
+
+ public SchemaCaches(int size) {
+ // TODO: improve size of each type for optimized array cache
+ this.size = size;
+
+ this.pks = new IntObjectMap<>(size);
+ this.vls = new IntObjectMap<>(size);
+ this.els = new IntObjectMap<>(size);
+ this.ils = new IntObjectMap<>(size);
+
+ this.cachedTypes = new CachedTypes();
+ }
+
+ public void updateIfNeeded(V schema) {
+ if (schema == null) {
+ return;
+ }
+ Id id = schema.id();
+ if (id.number() && id.asLong() > 0L) {
+ this.set(schema.type(), id, schema);
+ }
+ }
+
+ @PerfUtil.Watched
+ public V get(HugeType type, Id id) {
+ assert id.number();
+ long longId = id.asLong();
+ if (longId <= 0L) {
+ assert false : id;
+ return null;
+ }
+ int key = (int) longId;
Review Comment:
@imbajin ignored? The existing `CachedSchemaTransaction` also has the same
issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]