This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch pd-store in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit d4a20a120a000b1d68a1dbf5fd8bbc837ac16138 Author: VGalaxies <[email protected]> AuthorDate: Sat Feb 3 22:04:21 2024 +0800 chore: sync adapted code to internal 4.0 version only the code within hugegraph-core module --- .../backend/cache/CachedSchemaTransactionV2.java | 17 +++--- .../org/apache/hugegraph/config/CoreOptions.java | 8 +++ .../hugegraph/meta/lock/PdDistributedLock.java | 4 +- .../hugegraph/task/DistributedTaskScheduler.java | 61 ++++++++-------------- 4 files changed, 43 insertions(+), 47 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java index b29a4227c..ef6bcc03b 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java @@ -293,13 +293,9 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 { if (value == null) { value = super.getSchema(type, name); if (value != null) { - this.resetCachedAllIfReachedCapacity(); - - this.nameCache.update(prefixedName, value); - - SchemaElement schema = (SchemaElement) value; - Id prefixedId = generateId(schema.type(), schema.id()); - this.idCache.update(prefixedId, schema); + // Note: reload all schema if the cache is inconsistent with storage layer + this.clearCache(false); + this.loadAllSchema(); } } return (T) value; @@ -338,6 +334,13 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 { } } + private void loadAllSchema() { + getAllSchema(HugeType.PROPERTY_KEY); + getAllSchema(HugeType.VERTEX_LABEL); + getAllSchema(HugeType.EDGE_LABEL); + getAllSchema(HugeType.INDEX_LABEL); + } + @Override public void clear() { // Clear schema info firstly diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java index bb8745327..9f17b0cae 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java @@ -297,6 +297,14 @@ public class CoreOptions extends OptionHolder { 0 ); + public static final ConfigOption<Long> TASK_SCHEDULE_PERIOD = + new ConfigOption<>( + "task.schedule_period", + "Period time when scheduler to schedule task", + rangeInt(0L, Long.MAX_VALUE), + 10L + ); + public static final ConfigOption<Long> TASK_WAIT_TIMEOUT = new ConfigOption<>( "task.wait_timeout", diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java index 7da78af06..3e8c66ea1 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java @@ -44,7 +44,7 @@ public class PdDistributedLock { public LockResult lock(String key, long second) { long ttl = second * 1000L; try { - LockResponse response = this.client.lock(key, ttl); + LockResponse response = this.client.lockWithoutReentrant(key, ttl); boolean succeed = response.getSucceed(); LockResult result = new LockResult(); if (succeed) { @@ -56,7 +56,7 @@ public class PdDistributedLock { synchronized (result) { keepAlive(key); } - }, 10, period, TimeUnit.MILLISECONDS); + }, period, period, TimeUnit.MILLISECONDS); result.setFuture(future); } return result; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java index 09d6c3d50..c29d7a3b8 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java @@ -17,7 +17,6 @@ package org.apache.hugegraph.task; -import java.util.Collection; import java.util.Iterator; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -30,16 +29,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.HugeGraphParams; import org.apache.hugegraph.backend.id.Id; -import org.apache.hugegraph.backend.page.PageInfo; import org.apache.hugegraph.backend.query.QueryResults; -import org.apache.hugegraph.concurrent.LockGroup; -import org.apache.hugegraph.concurrent.LockManager; import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.exception.ConnectionException; import org.apache.hugegraph.exception.NotFoundException; @@ -53,7 +48,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex; import org.slf4j.Logger; public class DistributedTaskScheduler extends TaskAndResultScheduler { - protected static final int SCHEDULE_PERIOD = 10; + private final long schedulePeriod; private static final Logger LOG = Log.logger(DistributedTaskScheduler.class); private final ExecutorService taskDbExecutor; private final ExecutorService schemaTaskExecutor; @@ -63,8 +58,6 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { private final ScheduledThreadPoolExecutor schedulerExecutor; private final ScheduledFuture<?> cronFuture; - private final String lockGroupName; - /** * the status of scheduler */ @@ -90,11 +83,11 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { this.schedulerExecutor = schedulerExecutor; - lockGroupName = String.format("%s_%s_distributed", graphSpace, graph); - LockManager.instance().create(lockGroupName); - this.closed.set(false); + this.schedulePeriod = this.graph.configuration() + .get(CoreOptions.TASK_SCHEDULE_PERIOD); + this.cronFuture = this.schedulerExecutor.scheduleWithFixedDelay( () -> { // TODO: uncomment later - graph space @@ -105,14 +98,15 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { // TaskManager.useAdmin(); this.cronSchedule(); } catch (Throwable t) { - LOG.info("cronScheduler exception ", t); + // TODO: log with graph space + LOG.info("cronScheduler exception graph: {}", this.graphName(), t); } finally { // TODO: uncomment later - graph space LockUtil.unlock("", LockUtil.GRAPH_LOCK); // LockUtil.unlock(this.graph().spaceGraphName(), LockUtil.GRAPH_LOCK); } }, - 10L, SCHEDULE_PERIOD, + 10L, schedulePeriod, TimeUnit.SECONDS); } @@ -158,9 +152,14 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { LOG.info("Try to update task({})@({}/{}) status" + "(RUNNING->FAILED)", running.id(), this.graphSpace, this.graphName); - updateStatusWithLock(running.id(), TaskStatus.RUNNING, - TaskStatus.FAILED); - runningTasks.remove(running.id()); + if (updateStatusWithLock(running.id(), TaskStatus.RUNNING, + TaskStatus.FAILED)) { + runningTasks.remove(running.id()); + } else { + LOG.warn("Update task({})@({}/{}) status" + + "(RUNNING->FAILED) failed", + running.id(), this.graphSpace, this.graphName); + } } } @@ -484,7 +483,7 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { return true; } else { - LOG.info("Update task({}) status conflict: current({}), " + + LOG.warn("Update task({}) status conflict: current({}), " + "pre({}), status({})", id, task.status(), prestatus, status); return false; @@ -542,7 +541,7 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { if (executor.getActiveCount() < executor.getMaximumPoolSize()) { TaskRunner<?> runner = new TaskRunner<>(task); chosenExecutor.submit(runner); - LOG.info("Start task({})@({}/{})", task.id(), + LOG.info("Submit task({})@({}/{})", task.id(), this.graphSpace, this.graphName); return true; @@ -570,23 +569,12 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { LockResult lockResult = new LockResult(); - LockGroup lockGroup = LockManager.instance().get(lockGroupName); - Lock localLock = lockGroup.lock(taskId); - - if (!localLock.tryLock()) { - return new LockResult(); - } - try { lockResult = MetaManager.instance().tryLockTask(graphSpace, graphName, taskId); } catch (Throwable t) { - LOG.info(String.format("try to lock task(%s) error", taskId), t); - } - - if (!lockResult.lockSuccess()) { - localLock.unlock(); + LOG.warn(String.format("try to lock task(%s) error", taskId), t); } return lockResult; @@ -598,14 +586,9 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { MetaManager.instance().unlockTask(graphSpace, graphName, taskId, lockResult); } catch (Throwable t) { - LOG.info(String.format("try to unlock task(%s) error", + LOG.warn(String.format("try to unlock task(%s) error", taskId), t); } - - LockGroup lockGroup = LockManager.instance().get(lockGroupName); - Lock localLock = lockGroup.lock(taskId); - - localLock.unlock(); } private boolean isLockedTask(String taskId) { @@ -626,7 +609,9 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { LockResult lockResult = tryLockTask(task.id().asString()); initTaskParams(task); - if (lockResult.lockSuccess()) { + if (lockResult.lockSuccess() && !task.completed()) { + + LOG.info("Start task({})", task.id()); TaskManager.setContext(task.context()); try { @@ -644,7 +629,7 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler { // 任务执行不会抛出异常,HugeTask 在执行过程中,会捕获异常,并存储到 DB 中 task.run(); } catch (Throwable t) { - LOG.info("exception when execute task", t); + LOG.warn("exception when execute task", t); } finally { runningTasks.remove(task.id()); unlockTask(task.id().asString(), lockResult);
