This is an automated email from the ASF dual-hosted git repository.
vaughn pushed a commit to branch pd-store
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/pd-store by this push:
new f34743b5f feat: preliminary support for distributed task scheduler
(#2319)
f34743b5f is described below
commit f34743b5fbd2b0dd2da10932417d8a4d047bda9a
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Oct 16 11:22:50 2023 +0800
feat: preliminary support for distributed task scheduler (#2319)
* chore: migrate DistributedTaskScheduler
* fix: basic setup for DistributedTaskScheduler
* chore: set default scheduler type to local
* chore: code style
* chore: code style
* fix: impl checkRequirement for DistributedTaskScheduler
* chore: add comment for task server id
* fix: task result vertex type
* chore: review
* fix: StandardClusterRoleStore queryVertex
* chore: add comment
duplicated commit in hstore for clusterRole
---
.../hugegraph/api/filter/RedirectFilter.java | 24 +
.../java/org/apache/hugegraph/api/job/TaskAPI.java | 5 +-
.../apache/hugegraph/auth/HugeGraphAuthProxy.java | 36 +-
.../java/org/apache/hugegraph/HugeGraphParams.java | 2 +
.../org/apache/hugegraph/StandardHugeGraph.java | 8 +
.../backend/cache/CachedSchemaTransactionV2.java | 2 +-
.../org/apache/hugegraph/config/CoreOptions.java | 16 +
.../masterelection/StandardClusterRoleStore.java | 7 +-
.../org/apache/hugegraph/structure/HugeVertex.java | 10 +-
.../hugegraph/task/DistributedTaskScheduler.java | 667 +++++++++++++++++++++
.../java/org/apache/hugegraph/task/HugeTask.java | 95 ++-
.../org/apache/hugegraph/task/HugeTaskResult.java | 122 ++++
.../hugegraph/task/StandardTaskScheduler.java | 157 +----
.../hugegraph/task/TaskAndResultScheduler.java | 335 +++++++++++
.../hugegraph/task/TaskAndResultTransaction.java | 103 ++++
.../org/apache/hugegraph/task/TaskManager.java | 221 +++++--
.../org/apache/hugegraph/task/TaskScheduler.java | 18 +-
.../java/org/apache/hugegraph/task/TaskStatus.java | 4 +-
.../org/apache/hugegraph/task/TaskTransaction.java | 165 +++++
.../backend/store/hstore/HstoreSessionsImpl.java | 4 +
.../org/apache/hugegraph/core/TaskCoreTest.java | 6 +-
.../org/apache/hugegraph/tinkerpop/TestGraph.java | 2 +-
22 files changed, 1796 insertions(+), 213 deletions(-)
diff --git
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
index e675dd955..044b60ca0 100644
---
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
+++
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
@@ -26,17 +26,23 @@ import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
+import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.core.GraphManager;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;
+import org.apache.hugegraph.task.DistributedTaskScheduler;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.glassfish.hk2.api.IterableProvider;
import org.glassfish.hk2.api.ServiceHandle;
import org.glassfish.jersey.message.internal.HeaderUtils;
+import org.glassfish.jersey.server.ContainerRequest;
import org.slf4j.Logger;
import jakarta.ws.rs.NameBinding;
@@ -56,6 +62,10 @@ public class RedirectFilter implements
ContainerRequestFilter {
private static final String X_HG_REDIRECT = "x-hg-redirect";
+ private static final String TASK_API_REGEX = "graphs/(.*?)/tasks/.*";
+
+ private static final Pattern TASK_API_PATTERN =
Pattern.compile(TASK_API_REGEX);
+
private static volatile Client client = null;
@Context
@@ -91,6 +101,20 @@ public class RedirectFilter implements
ContainerRequestFilter {
StringUtils.isEmpty(masterNodeInfo.url())) {
return;
}
+
+ // skip filter if call TaskAPI to graph with DistributedTaskScheduler
+ if (context instanceof ContainerRequest) {
+ String relativePath = ((ContainerRequest) context).getPath(true);
+ Matcher matcher = TASK_API_PATTERN.matcher(relativePath);
+ if (matcher.matches()) {
+ HugeGraph graph = manager.graph(matcher.group(1));
+ if (Objects.nonNull(graph) &&
+ graph.taskScheduler() instanceof DistributedTaskScheduler)
{
+ return;
+ }
+ }
+ }
+
String url = masterNodeInfo.url();
URI redirectUri;
diff --git
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
index c2dedf32a..f295e3e00 100644
---
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
+++
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
@@ -136,11 +136,12 @@ public class TaskAPI extends API {
@RedirectFilter.RedirectMasterRole
public void delete(@Context GraphManager manager,
@PathParam("graph") String graph,
- @PathParam("id") long id) {
+ @PathParam("id") long id,
+ @DefaultValue("false") @QueryParam("force") boolean
force) {
LOG.debug("Graph [{}] delete task: {}", graph, id);
TaskScheduler scheduler = graph(manager, graph).taskScheduler();
- HugeTask<?> task = scheduler.delete(IdGenerator.of(id));
+ HugeTask<?> task = scheduler.delete(IdGenerator.of(id), force);
E.checkArgument(task != null, "There is no task with id '%s'", id);
}
diff --git
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
index 04cfac30d..96841dbe6 100644
---
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
+++
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -70,6 +71,7 @@ import org.apache.hugegraph.structure.HugeElement;
import org.apache.hugegraph.structure.HugeFeatures;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.task.HugeTask;
+import org.apache.hugegraph.task.ServerInfoManager;
import org.apache.hugegraph.task.TaskManager;
import org.apache.hugegraph.task.TaskScheduler;
import org.apache.hugegraph.task.TaskStatus;
@@ -1085,10 +1087,10 @@ public final class HugeGraphAuthProxy implements
HugeGraph {
}
@Override
- public <V> HugeTask<V> delete(Id id) {
+ public <V> HugeTask<V> delete(Id id, boolean force) {
verifyTaskPermission(HugePermission.DELETE,
this.taskScheduler.task(id));
- return this.taskScheduler.delete(id);
+ return this.taskScheduler.delete(id, force);
}
@Override
@@ -1124,6 +1126,36 @@ public final class HugeGraphAuthProxy implements
HugeGraph {
this.taskScheduler.checkRequirement(op);
}
+ @Override
+ public <V> V call(Callable<V> callable) {
+ verifyAnyPermission();
+ return this.taskScheduler.call(callable);
+ }
+
+ @Override
+ public <V> V call(Runnable runnable) {
+ verifyAnyPermission();
+ return this.taskScheduler.call(runnable);
+ }
+
+ @Override
+ public ServerInfoManager serverManager() {
+ verifyAnyPermission();
+ return this.taskScheduler.serverManager();
+ }
+
+ @Override
+ public String graphName() {
+ verifyAnyPermission();
+ return this.taskScheduler.graphName();
+ }
+
+ @Override
+ public void taskDone(HugeTask<?> task) {
+ verifyAnyPermission();
+ this.taskScheduler.taskDone(task);
+ }
+
private void verifyTaskPermission(HugePermission actionPerm) {
verifyPermission(actionPerm, ResourceType.TASK);
}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
index dd611ae68..dca98b1d0 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
@@ -94,4 +94,6 @@ public interface HugeGraphParams {
RamTable ramtable();
<T> void submitEphemeralJob(EphemeralJob<T> job);
+
+ String schedulerType();
}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index c2ea1ea43..805cf78a1 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -181,6 +181,8 @@ public class StandardHugeGraph implements HugeGraph {
private final MetaManager metaManager = MetaManager.instance();
+ private final String schedulerType;
+
public StandardHugeGraph(HugeConfig config) {
this.params = new StandardHugeGraphParams();
this.configuration = config;
@@ -214,6 +216,7 @@ public class StandardHugeGraph implements HugeGraph {
this.closed = false;
this.mode = GraphMode.NONE;
this.readMode = GraphReadMode.OLTP_ONLY;
+ this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE);
LockUtil.init(this.name);
@@ -1339,6 +1342,11 @@ public class StandardHugeGraph implements HugeGraph {
public <T> void submitEphemeralJob(EphemeralJob<T> job) {
this.ephemeralJobQueue.add(job);
}
+
+ @Override
+ public String schedulerType() {
+ return StandardHugeGraph.this.schedulerType;
+ }
}
private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
index 6e5d6dca1..b113e0f37 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
@@ -25,7 +25,7 @@ import org.apache.hugegraph.util.Events;
import com.google.common.collect.ImmutableSet;
public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {
- private final Cache<Id, Object> idCache;
+ private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;
private final SchemaCaches<SchemaElement> arrayCaches;
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
index fe8dfc2e2..bb8745327 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
@@ -331,6 +331,14 @@ public class CoreOptions extends OptionHolder {
1
);
+ public static final ConfigOption<String> SCHEDULER_TYPE =
+ new ConfigOption<>(
+ "task.scheduler_type",
+ "The type of scheduler used in distribution system.",
+ allowValues("local", "distributed"),
+ "local"
+ );
+
public static final ConfigOption<Boolean> TASK_SYNC_DELETION =
new ConfigOption<>(
"task.sync_deletion",
@@ -339,6 +347,14 @@ public class CoreOptions extends OptionHolder {
false
);
+ public static final ConfigOption<Integer> TASK_RETRY =
+ new ConfigOption<>(
+ "task.retry",
+ "Task retry times.",
+ rangeInt(0, 3),
+ 0
+ );
+
public static final ConfigOption<Long> STORE_CONN_DETECT_INTERVAL =
new ConfigOption<>(
"store.connection_detect_interval",
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
index f0f69319d..fa7a0777e 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
@@ -146,7 +146,12 @@ public class StandardClusterRoleStore implements
ClusterRoleStore {
private Optional<Vertex> queryVertex() {
GraphTransaction tx = this.graph.systemTransaction();
- ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
+ ConditionQuery query;
+ if (this.graph.backendStoreFeatures().supportsTaskAndServerVertex()) {
+ query = new ConditionQuery(HugeType.SERVER);
+ } else {
+ query = new ConditionQuery(HugeType.VERTEX);
+ }
VertexLabel vl = this.graph.graph().vertexLabel(P.ROLE_DATA);
query.eq(HugeKeys.LABEL, vl.id());
query.query(Condition.eq(vl.primaryKeys().get(0), "default"));
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
index 731bfa444..47f13a956 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
@@ -39,12 +39,14 @@ import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.masterelection.StandardClusterRoleStore;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.task.HugeServerInfo;
import org.apache.hugegraph.task.HugeTask;
+import org.apache.hugegraph.task.HugeTaskResult;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Cardinality;
import org.apache.hugegraph.type.define.CollectionType;
@@ -92,10 +94,14 @@ public class HugeVertex extends HugeElement implements
Vertex, Cloneable {
@Override
public HugeType type() {
- if (label != null && label.name().equals(HugeTask.P.TASK)) {
+ if (label != null &&
+ (label.name().equals(HugeTask.P.TASK) ||
+ label.name().equals(HugeTaskResult.P.TASKRESULT))) {
return HugeType.TASK;
}
- if (label != null && label.name().equals(HugeServerInfo.P.SERVER)) {
+ if (label != null &&
+ (label.name().equals(HugeServerInfo.P.SERVER) ||
+ label.name().equals(StandardClusterRoleStore.P.ROLE_DATA))) {
return HugeType.SERVER;
}
return HugeType.VERTEX;
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
new file mode 100644
index 000000000..0ab97b7ca
--- /dev/null
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.page.PageInfo;
+import org.apache.hugegraph.backend.query.QueryResults;
+import org.apache.hugegraph.concurrent.LockGroup;
+import org.apache.hugegraph.concurrent.LockManager;
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.exception.ConnectionException;
+import org.apache.hugegraph.exception.NotFoundException;
+import org.apache.hugegraph.meta.MetaManager;
+import org.apache.hugegraph.meta.lock.LockResult;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.LockUtil;
+import org.apache.hugegraph.util.Log;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+
+public class DistributedTaskScheduler extends TaskAndResultScheduler {
+ protected static final int SCHEDULE_PERIOD = 10;
+ private static final Logger LOG =
Log.logger(DistributedTaskScheduler.class);
+ private final ExecutorService taskDbExecutor;
+ private final ExecutorService schemaTaskExecutor;
+ private final ExecutorService olapTaskExecutor;
+ private final ExecutorService ephemeralTaskExecutor;
+ private final ExecutorService gremlinTaskExecutor;
+ private final ScheduledThreadPoolExecutor schedulerExecutor;
+ private final ScheduledFuture<?> cronFuture;
+
+ private final String lockGroupName;
+
+ /**
+ * the status of scheduler
+ */
+ private final AtomicBoolean closed = new AtomicBoolean(true);
+
+ private final ConcurrentHashMap<Id, HugeTask<?>> runningTasks = new
ConcurrentHashMap<>();
+
+ public DistributedTaskScheduler(HugeGraphParams graph,
+ ScheduledThreadPoolExecutor
schedulerExecutor,
+ ExecutorService taskDbExecutor,
+ ExecutorService schemaTaskExecutor,
+ ExecutorService olapTaskExecutor,
+ ExecutorService gremlinTaskExecutor,
+ ExecutorService ephemeralTaskExecutor,
+ ExecutorService serverInfoDbExecutor) {
+ super(graph, serverInfoDbExecutor);
+
+ this.taskDbExecutor = taskDbExecutor;
+ this.schemaTaskExecutor = schemaTaskExecutor;
+ this.olapTaskExecutor = olapTaskExecutor;
+ this.gremlinTaskExecutor = gremlinTaskExecutor;
+ this.ephemeralTaskExecutor = ephemeralTaskExecutor;
+
+ this.schedulerExecutor = schedulerExecutor;
+
+ lockGroupName = String.format("%s_%s_distributed", graphSpace, graph);
+ LockManager.instance().create(lockGroupName);
+
+ this.closed.set(false);
+
+ this.cronFuture = this.schedulerExecutor.scheduleWithFixedDelay(
+ () -> {
+ // TODO: uncomment later - graph space
+ // LockUtil.lock(this.graph().spaceGraphName(),
LockUtil.GRAPH_LOCK);
+ LockUtil.lock("", LockUtil.GRAPH_LOCK);
+ try {
+ // TODO: 使用超级管理员权限,查询任务
+ // TaskManager.useAdmin();
+ this.cronSchedule();
+ } catch (Throwable t) {
+ LOG.info("cronScheduler exception ", t);
+ } finally {
+ // TODO: uncomment later - graph space
+ LockUtil.unlock("", LockUtil.GRAPH_LOCK);
+ // LockUtil.unlock(this.graph().spaceGraphName(),
LockUtil.GRAPH_LOCK);
+ }
+ },
+ 10L, SCHEDULE_PERIOD,
+ TimeUnit.SECONDS);
+ }
+
+ private static boolean sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ return true;
+ } catch (InterruptedException ignored) {
+ // Ignore InterruptedException
+ return false;
+ }
+ }
+
+ public void cronSchedule() {
+ // 执行周期调度任务
+
+ if (!this.graph.started() || this.graph.closed()) {
+ return;
+ }
+
+ // 处理 NEW 状态的任务
+ Iterator<HugeTask<Object>> news = queryTaskWithoutResultByStatus(
+ TaskStatus.NEW);
+
+ while (!this.closed.get() && news.hasNext()) {
+ HugeTask<?> newTask = news.next();
+ LOG.info("Try to start task({})@({}/{})", newTask.id(),
+ this.graphSpace, this.graphName);
+ if (!tryStartHugeTask(newTask)) {
+ // 任务提交失败时,线程池已打满
+ break;
+ }
+ }
+
+ // 处理 RUNNING 状态的任务
+ Iterator<HugeTask<Object>> runnings =
+ queryTaskWithoutResultByStatus(TaskStatus.RUNNING);
+
+ while (!this.closed.get() && runnings.hasNext()) {
+ HugeTask<?> running = runnings.next();
+ initTaskParams(running);
+ if (!isLockedTask(running.id().toString())) {
+ LOG.info("Try to update task({})@({}/{}) status" +
+ "(RUNNING->FAILED)", running.id(), this.graphSpace,
+ this.graphName);
+ updateStatusWithLock(running.id(), TaskStatus.RUNNING,
+ TaskStatus.FAILED);
+ runningTasks.remove(running.id());
+ }
+ }
+
+ // 处理 FAILED/HANGING 状态的任务
+ Iterator<HugeTask<Object>> faileds =
+ queryTaskWithoutResultByStatus(TaskStatus.FAILED);
+
+ while (!this.closed.get() && faileds.hasNext()) {
+ HugeTask<?> failed = faileds.next();
+ initTaskParams(failed);
+ if (failed.retries() <
this.graph().option(CoreOptions.TASK_RETRY)) {
+ LOG.info("Try to update task({})@({}/{}) status(FAILED->NEW)",
+ failed.id(), this.graphSpace, this.graphName);
+ updateStatusWithLock(failed.id(), TaskStatus.FAILED,
+ TaskStatus.NEW);
+ }
+ }
+
+ // 处理 CANCELLING 状态的任务
+ Iterator<HugeTask<Object>> cancellings =
queryTaskWithoutResultByStatus(
+ TaskStatus.CANCELLING);
+
+ while (!this.closed.get() && cancellings.hasNext()) {
+ Id cancellingId = cancellings.next().id();
+ if (runningTasks.containsKey(cancellingId)) {
+ HugeTask<?> cancelling = runningTasks.get(cancellingId);
+ initTaskParams(cancelling);
+ LOG.info("Try to cancel task({})@({}/{})",
+ cancelling.id(), this.graphSpace, this.graphName);
+ cancelling.cancel(true);
+
+ runningTasks.remove(cancellingId);
+ } else {
+ // 本地没有执行任务,但是当前任务已经无节点在执行
+ if (!isLockedTask(cancellingId.toString())) {
+ updateStatusWithLock(cancellingId, TaskStatus.CANCELLING,
+ TaskStatus.CANCELLED);
+ }
+ }
+ }
+
+ // 处理 DELETING 状态的任务
+ Iterator<HugeTask<Object>> deletings = queryTaskWithoutResultByStatus(
+ TaskStatus.DELETING);
+
+ while (!this.closed.get() && deletings.hasNext()) {
+ Id deletingId = deletings.next().id();
+ if (runningTasks.containsKey(deletingId)) {
+ HugeTask<?> deleting = runningTasks.get(deletingId);
+ initTaskParams(deleting);
+ deleting.cancel(true);
+
+ // 删除存储信息
+ deleteFromDB(deletingId);
+
+ runningTasks.remove(deletingId);
+ } else {
+ // 本地没有执行任务,但是当前任务已经无节点在执行
+ if (!isLockedTask(deletingId.toString())) {
+ deleteFromDB(deletingId);
+ }
+ }
+ }
+ }
+
+ protected <V> Iterator<HugeTask<V>>
queryTaskWithoutResultByStatus(TaskStatus status) {
+ if (this.closed.get()) {
+ return QueryResults.emptyIterator();
+ }
+ return queryTaskWithoutResult(HugeTask.P.STATUS, status.code(),
NO_LIMIT, null);
+ }
+
+ @Override
+ public HugeGraph graph() {
+ return this.graph.graph();
+ }
+
+ @Override
+ public int pendingTasks() {
+ return this.runningTasks.size();
+ }
+
+ @Override
+ public <V> void restoreTasks() {
+ // DO Nothing!
+ }
+
+ @Override
+ public <V> Future<?> schedule(HugeTask<V> task) {
+ E.checkArgumentNotNull(task, "Task can't be null");
+
+ initTaskParams(task);
+
+ if (task.ephemeralTask()) {
+ // 处理 ephemeral 任务,不需要调度,直接执行
+ return this.ephemeralTaskExecutor.submit(task);
+ }
+
+ // 处理 schema 任务
+ // 处理 gremlin 任务
+ // 处理 olap 计算任务
+ // 添加任务到 DB,当前任务状态为 NEW
+ // TODO: save server id for task
+ this.save(task);
+
+ if (!this.closed.get()) {
+ LOG.info("Try to start task({})@({}/{}) immediately", task.id(),
+ this.graphSpace, this.graphName);
+ tryStartHugeTask(task);
+ } else {
+ LOG.info("TaskScheduler has closed");
+ }
+
+ return null;
+ }
+
+ protected <V> void initTaskParams(HugeTask<V> task) {
+ // 绑定当前任务执行所需的环境变量
+ // 在任务反序列化和执行之前,均需要调用该方法
+ task.scheduler(this);
+ TaskCallable<V> callable = task.callable();
+ callable.task(task);
+ callable.graph(this.graph());
+
+ if (callable instanceof TaskCallable.SysTaskCallable) {
+ ((TaskCallable.SysTaskCallable<?>) callable).params(this.graph);
+ }
+ }
+
+ @Override
+ public <V> void cancel(HugeTask<V> task) {
+ // 更新状态为 CANCELLING
+ if (!task.completed()) {
+ // 任务未完成,才可执行状态未 CANCELLING
+ this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
+ } else {
+ LOG.info("cancel task({}) error, task has completed", task.id());
+ }
+ }
+
+ @Override
+ public void init() {
+ this.call(() -> this.tx().initSchema());
+ }
+
+ protected <V> HugeTask<V> deleteFromDB(Id id) {
+ // 从 DB 中删除 Task,不检查任务状态
+ return this.call(() -> {
+ Iterator<Vertex> vertices = this.tx().queryTaskInfos(id);
+ HugeVertex vertex = (HugeVertex) QueryResults.one(vertices);
+ if (vertex == null) {
+ return null;
+ }
+ HugeTask<V> result = HugeTask.fromVertex(vertex);
+ this.tx().removeVertex(vertex);
+ return result;
+ });
+ }
+
+ @Override
+ public <V> HugeTask<V> delete(Id id, boolean force) {
+ if (!force) {
+ // 更改状态为 DELETING,通过自动调度实现删除操作
+ this.updateStatus(id, null, TaskStatus.DELETING);
+ return null;
+ } else {
+ return this.deleteFromDB(id);
+ }
+ }
+
+ @Override
+ public boolean close() {
+ if (this.closed.get()) {
+ return true;
+ }
+
+ // set closed
+ this.closed.set(true);
+
+ // cancel all running tasks
+ for (HugeTask<?> task : this.runningTasks.values()) {
+ LOG.info("cancel task({}) @({}/{}) when closing scheduler",
+ task.id(), graphSpace, graphName);
+ this.cancel(task);
+ }
+
+ try {
+ this.waitUntilAllTasksCompleted(10);
+ } catch (TimeoutException e) {
+ LOG.warn("Tasks not completed when close distributed task
scheduler", e);
+ }
+
+ // cancel cron thread
+ if (!cronFuture.isDone() && !cronFuture.isCancelled()) {
+ cronFuture.cancel(false);
+ }
+
+ if (!this.taskDbExecutor.isShutdown()) {
+ this.call(() -> {
+ try {
+ this.tx().close();
+ } catch (ConnectionException ignored) {
+ // ConnectionException means no connection established
+ }
+ this.graph.closeTx();
+ });
+ }
+ return true;
+ }
+
+ @Override
+ public <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds)
+ throws TimeoutException {
+ return this.waitUntilTaskCompleted(id, seconds, QUERY_INTERVAL);
+ }
+
+ @Override
+ public <V> HugeTask<V> waitUntilTaskCompleted(Id id)
+ throws TimeoutException {
+ // This method is just used by tests
+ long timeout = this.graph.configuration()
+ .get(CoreOptions.TASK_WAIT_TIMEOUT);
+ return this.waitUntilTaskCompleted(id, timeout, 1L);
+ }
+
+ private <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds,
+ long intervalMs)
+ throws TimeoutException {
+ long passes = seconds * 1000 / intervalMs;
+ HugeTask<V> task = null;
+ for (long pass = 0; ; pass++) {
+ try {
+ task = this.taskWithoutResult(id);
+ } catch (NotFoundException e) {
+ if (task != null && task.completed()) {
+ assert task.id().asLong() < 0L : task.id();
+ sleep(intervalMs);
+ return task;
+ }
+ throw e;
+ }
+ if (task.completed()) {
+ // Wait for task result being set after status is completed
+ sleep(intervalMs);
+ // 查询带有结果的任务信息
+ task = this.task(id);
+ return task;
+ }
+ if (pass >= passes) {
+ break;
+ }
+ sleep(intervalMs);
+ }
+ throw new TimeoutException(String.format(
+ "Task '%s' was not completed in %s seconds", id, seconds));
+ }
+
+ @Override
+ public void waitUntilAllTasksCompleted(long seconds)
+ throws TimeoutException {
+ long passes = seconds * 1000 / QUERY_INTERVAL;
+ int taskSize = 0;
+ for (long pass = 0; ; pass++) {
+ taskSize = this.pendingTasks();
+ if (taskSize == 0) {
+ sleep(QUERY_INTERVAL);
+ return;
+ }
+ if (pass >= passes) {
+ break;
+ }
+ sleep(QUERY_INTERVAL);
+ }
+ throw new TimeoutException(String.format(
+ "There are still %s incomplete tasks after %s seconds",
+ taskSize, seconds));
+
+ }
+
+ @Override
+ public void checkRequirement(String op) {
+ if (!this.serverManager().master()) {
+ throw new HugeException("Can't %s task on non-master server", op);
+ }
+ }
+
+ @Override
+ public <V> V call(Callable<V> callable) {
+ return this.call(callable, this.taskDbExecutor);
+ }
+
+ @Override
+ public <V> V call(Runnable runnable) {
+ return this.call(Executors.callable(runnable, null));
+ }
+
+ private <V> V call(Callable<V> callable, ExecutorService executor) {
+ try {
+ callable = new TaskManager.ContextCallable<>(callable);
+ return executor.submit(callable).get();
+ } catch (Exception e) {
+ throw new HugeException("Failed to update/query TaskStore for " +
+ "graph(%s/%s): %s", e, this.graphSpace,
+ this.graph.name(), e.toString());
+ }
+ }
+
+ protected boolean updateStatus(Id id, TaskStatus prestatus,
+ TaskStatus status) {
+ HugeTask<Object> task = this.taskWithoutResult(id);
+ initTaskParams(task);
+ if (prestatus == null || task.status() == prestatus) {
+ task.overwriteStatus(status);
+ // 如果状态更新为 FAILED -> NEW,则增加重试次数
+ if (prestatus == TaskStatus.FAILED && status == TaskStatus.NEW) {
+ task.retry();
+ }
+ this.save(task);
+ LOG.info("Update task({}) success: pre({}), status({})",
+ id, prestatus, status);
+
+ return true;
+ } else {
+ LOG.info("Update task({}) status conflict: current({}), " +
+ "pre({}), status({})", id, task.status(),
+ prestatus, status);
+ return false;
+ }
+ }
+
+ protected boolean updateStatusWithLock(Id id, TaskStatus prestatus,
+ TaskStatus status) {
+
+ LockResult lockResult = tryLockTask(id.asString());
+
+ if (lockResult.lockSuccess()) {
+ try {
+ return updateStatus(id, prestatus, status);
+ } finally {
+ unlockTask(id.asString(), lockResult);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * try to start task;
+ *
+ * @param task
+ * @return true if the task have start
+ */
+ private boolean tryStartHugeTask(HugeTask<?> task) {
+ // Print Scheduler status
+ logCurrentState();
+
+ initTaskParams(task);
+
+ ExecutorService chosenExecutor = gremlinTaskExecutor;
+
+ if (task.computer()) {
+ chosenExecutor = this.olapTaskExecutor;
+ }
+
+ // TODO: uncomment later - vermeer job
+ //if (task.vermeer()) {
+ // chosenExecutor = this.olapTaskExecutor;
+ //}
+
+ if (task.gremlinTask()) {
+ chosenExecutor = this.gremlinTaskExecutor;
+ }
+
+ if (task.schemaTask()) {
+ chosenExecutor = schemaTaskExecutor;
+ }
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) chosenExecutor;
+ if (executor.getActiveCount() < executor.getMaximumPoolSize()) {
+ TaskRunner<?> runner = new TaskRunner<>(task);
+ chosenExecutor.submit(runner);
+ LOG.info("Start task({})@({}/{})", task.id(),
+ this.graphSpace, this.graphName);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ protected void logCurrentState() {
+ int gremlinActive =
+ ((ThreadPoolExecutor) gremlinTaskExecutor).getActiveCount();
+ int schemaActive =
+ ((ThreadPoolExecutor) schemaTaskExecutor).getActiveCount();
+ int ephemeralActive =
+ ((ThreadPoolExecutor) ephemeralTaskExecutor).getActiveCount();
+ int olapActive =
+ ((ThreadPoolExecutor) olapTaskExecutor).getActiveCount();
+
+ LOG.info("Current State: gremlinTaskExecutor({}), schemaTaskExecutor" +
+ "({}), ephemeralTaskExecutor({}), olapTaskExecutor({})",
+ gremlinActive, schemaActive, ephemeralActive, olapActive);
+ }
+
+ private LockResult tryLockTask(String taskId) {
+
+ LockResult lockResult = new LockResult();
+
+ LockGroup lockGroup = LockManager.instance().get(lockGroupName);
+ Lock localLock = lockGroup.lock(taskId);
+
+ if (!localLock.tryLock()) {
+ return new LockResult();
+ }
+
+ try {
+ lockResult =
+ MetaManager.instance().tryLockTask(graphSpace, graphName,
+ taskId);
+ } catch (Throwable t) {
+ LOG.info(String.format("try to lock task(%s) error", taskId), t);
+ }
+
+ if (!lockResult.lockSuccess()) {
+ localLock.unlock();
+ }
+
+ return lockResult;
+ }
+
+ private void unlockTask(String taskId, LockResult lockResult) {
+
+ try {
+ MetaManager.instance().unlockTask(graphSpace, graphName, taskId,
+ lockResult);
+ } catch (Throwable t) {
+ LOG.info(String.format("try to unlock task(%s) error",
+ taskId), t);
+ }
+
+ LockGroup lockGroup = LockManager.instance().get(lockGroupName);
+ Lock localLock = lockGroup.lock(taskId);
+
+ localLock.unlock();
+ }
+
+ private boolean isLockedTask(String taskId) {
+ return MetaManager.instance().isLockedTask(graphSpace,
+ graphName, taskId);
+ }
+
+ private class TaskRunner<V> implements Runnable {
+
+ private final HugeTask<V> task;
+
+ public TaskRunner(HugeTask<V> task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ LockResult lockResult = tryLockTask(task.id().asString());
+
+ initTaskParams(task);
+ if (lockResult.lockSuccess()) {
+
+ TaskManager.setContext(task.context());
+ try {
+ // 1. start task can be from schedule() & cronSchedule()
+ // 2. recheck the status of task, in case one same task
+ // called by both methods at same time;
+ HugeTask<Object> queryTask = task(this.task.id());
+ if (queryTask != null &&
+ !TaskStatus.NEW.equals(queryTask.status())) {
+ return;
+ }
+
+ runningTasks.put(task.id(), task);
+
+ // 任务执行不会抛出异常,HugeTask 在执行过程中,会捕获异常,并存储到 DB 中
+ task.run();
+ } catch (Throwable t) {
+ LOG.info("exception when execute task", t);
+ } finally {
+ runningTasks.remove(task.id());
+ unlockTask(task.id().asString(), lockResult);
+
+ LOG.info("task({}) finished.", task.id().toString());
+ }
+ }
+ }
+ }
+
+ @Override
+ public String graphName() {
+ return this.graph.name();
+ }
+
+ @Override
+ public void taskDone(HugeTask<?> task) {
+ // DO Nothing
+ }
+}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
index 5a3439ac7..cfa1bbccc 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
@@ -32,6 +32,8 @@ import java.util.stream.Collectors;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.job.GremlinJob;
+import org.apache.hugegraph.job.schema.SchemaJob;
import org.apache.hugegraph.type.define.SerialEnum;
import org.apache.hugegraph.util.*;
import org.apache.tinkerpop.gremlin.structure.Graph.Hidden;
@@ -218,6 +220,10 @@ public class HugeTask<V> extends FutureTask<V> {
return this.result;
}
+ public synchronized void result(HugeTaskResult result) {
+ this.result = result.result();
+ }
+
private synchronized boolean result(TaskStatus status, String result) {
checkPropertySize(result, P.RESULT);
if (this.status(status)) {
@@ -263,6 +269,18 @@ public class HugeTask<V> extends FutureTask<V> {
return ComputerJob.COMPUTER.equals(this.type);
}
+ public boolean schemaTask() {
+ return this.callable instanceof SchemaJob;
+ }
+
+ public boolean gremlinTask() {
+ return this.callable instanceof GremlinJob;
+ }
+
+ public boolean ephemeralTask() {
+ return this.callable instanceof EphemeralJob;
+ }
+
@Override
public String toString() {
return String.format("HugeTask(%s)%s", this.id, this.asMap());
@@ -346,9 +364,7 @@ public class HugeTask<V> extends FutureTask<V> {
} catch (Throwable e) {
LOG.error("An exception occurred when calling done()", e);
} finally {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
- this.scheduler();
- scheduler.taskDone(this);
+ this.scheduler().taskDone(this);
}
}
@@ -428,6 +444,10 @@ public class HugeTask<V> extends FutureTask<V> {
return false;
}
+ public synchronized void overwriteStatus(TaskStatus status) {
+ this.status = status;
+ }
+
protected void property(String key, Object value) {
E.checkNotNull(key, "property key");
switch (key) {
@@ -560,6 +580,75 @@ public class HugeTask<V> extends FutureTask<V> {
return list.toArray();
}
+ protected synchronized Object[] asArrayWithoutResult() {
+ E.checkState(this.type != null, "Task type can't be null");
+ E.checkState(this.name != null, "Task name can't be null");
+
+ List<Object> list = new ArrayList<>(28);
+
+ list.add(T.label);
+ list.add(P.TASK);
+
+ list.add(T.id);
+ list.add(this.id);
+
+ list.add(P.TYPE);
+ list.add(this.type);
+
+ list.add(P.NAME);
+ list.add(this.name);
+
+ list.add(P.CALLABLE);
+ list.add(this.callable.getClass().getName());
+
+ list.add(P.STATUS);
+ list.add(this.status.code());
+
+ list.add(P.PROGRESS);
+ list.add(this.progress);
+
+ list.add(P.CREATE);
+ list.add(this.create);
+
+ list.add(P.RETRIES);
+ list.add(this.retries);
+
+ if (this.description != null) {
+ list.add(P.DESCRIPTION);
+ list.add(this.description);
+ }
+
+ if (this.context != null) {
+ list.add(P.CONTEXT);
+ list.add(this.context);
+ }
+
+ if (this.update != null) {
+ list.add(P.UPDATE);
+ list.add(this.update);
+ }
+
+ if (this.dependencies != null) {
+ list.add(P.DEPENDENCIES);
+ list.add(this.dependencies.stream().map(Id::asLong)
+ .collect(toOrderSet()));
+ }
+
+ if (this.input != null) {
+ byte[] bytes = StringEncoding.compress(this.input);
+ checkPropertySize(bytes.length, P.INPUT);
+ list.add(P.INPUT);
+ list.add(bytes);
+ }
+
+ if (this.server != null) {
+ list.add(P.SERVER);
+ list.add(this.server.asString());
+ }
+
+ return list.toArray();
+ }
+
public Map<String, Object> asMap() {
return this.asMap(true);
}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTaskResult.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTaskResult.java
new file mode 100644
index 000000000..24fc186cd
--- /dev/null
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTaskResult.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.SchemaManager;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.type.define.Cardinality;
+import org.apache.hugegraph.type.define.DataType;
+import org.apache.hugegraph.util.Blob;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Log;
+import org.apache.hugegraph.util.StringEncoding;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.slf4j.Logger;
+
+public class HugeTaskResult {
+ private static final Logger LOG = Log.logger(HugeTaskResult.class);
+ private static final float DECOMPRESS_RATIO = 10.0F;
+ private final String taskResultId;
+ private volatile String result;
+
+ public HugeTaskResult(String taskId) {
+ this.taskResultId = taskId;
+ this.result = null;
+ }
+
+ public static String genId(Id taskId) {
+ return String.format("task_result_%d", taskId.asLong());
+ }
+
+ public static HugeTaskResult fromVertex(Vertex vertex) {
+ Id taskResultId = (Id) vertex.id();
+ HugeTaskResult taskResult = new
HugeTaskResult(taskResultId.asString());
+ for (Iterator<VertexProperty<Object>> iter = vertex.properties();
iter.hasNext(); ) {
+ VertexProperty<Object> prop = iter.next();
+ taskResult.property(prop.key(), prop.value());
+ }
+ return taskResult;
+ }
+
+ public String taskResultId() {
+ return this.taskResultId;
+ }
+
+ public void result(String result) {
+ this.result = result;
+ }
+
+ public String result() {
+ return this.result;
+ }
+
+ protected synchronized Object[] asArray() {
+
+ List<Object> list = new ArrayList<>(6);
+
+ list.add(T.label);
+ list.add(HugeTaskResult.P.TASKRESULT);
+
+ list.add(T.id);
+ list.add(this.taskResultId);
+
+ if (this.result != null) {
+ byte[] bytes = StringEncoding.compress(this.result);
+ list.add(HugeTaskResult.P.RESULT);
+ list.add(bytes);
+ }
+
+ return list.toArray();
+ }
+
+ protected void property(String key, Object value) {
+ E.checkNotNull(key, "property key");
+ switch (key) {
+ case P.RESULT:
+ this.result = StringEncoding.decompress(((Blob)
value).bytes(), DECOMPRESS_RATIO);
+ break;
+ default:
+ throw new AssertionError("Unsupported key: " + key);
+ }
+ }
+
+ public static final class P {
+
+ public static final String TASKRESULT =
Graph.Hidden.hide("taskresult");
+
+ public static final String RESULT = "~result_result";
+
+ public static String unhide(String key) {
+ final String prefix = Graph.Hidden.hide("result_");
+ if (key.startsWith(prefix)) {
+ return key.substring(prefix.length());
+ }
+ return key;
+ }
+ }
+}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
index 50d0a13c7..7c97f302e 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
@@ -79,11 +79,6 @@ public class StandardTaskScheduler implements TaskScheduler {
private volatile TaskTransaction taskTx;
- private static final long NO_LIMIT = -1L;
- private static final long PAGE_SIZE = 500L;
- private static final long QUERY_INTERVAL = 100L;
- private static final int MAX_PENDING_TASKS = 10000;
-
public StandardTaskScheduler(HugeGraphParams graph,
ExecutorService taskExecutor,
ExecutorService taskDbExecutor,
@@ -107,6 +102,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
return this.graph.graph();
}
+ @Override
public String graphName() {
return this.graph.name();
}
@@ -299,7 +295,8 @@ public class StandardTaskScheduler implements TaskScheduler
{
task.id(), task.status());
}
- protected ServerInfoManager serverManager() {
+ @Override
+ public ServerInfoManager serverManager() {
return this.serverManager;
}
@@ -419,7 +416,8 @@ public class StandardTaskScheduler implements TaskScheduler
{
} while (page != null);
}
- protected void taskDone(HugeTask<?> task) {
+ @Override
+ public void taskDone(HugeTask<?> task) {
this.remove(task);
Id selfServerId = this.serverManager().selfServerId();
@@ -433,13 +431,17 @@ public class StandardTaskScheduler implements
TaskScheduler {
}
protected void remove(HugeTask<?> task) {
+ this.remove(task, false);
+ }
+
+ protected void remove(HugeTask<?> task, boolean force) {
E.checkNotNull(task, "remove task");
HugeTask<?> delTask = this.tasks.remove(task.id());
if (delTask != null && delTask != task) {
LOG.warn("Task '{}' may be inconsistent status {}(expect {})",
task.id(), task.status(), delTask.status());
}
- assert delTask == null || delTask.completed() ||
+ assert force || delTask == null || delTask.completed() ||
delTask.cancelling() || delTask.isCancelled() : delTask;
}
@@ -550,7 +552,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
}
@Override
- public <V> HugeTask<V> delete(Id id) {
+ public <V> HugeTask<V> delete(Id id, boolean force) {
this.checkOnMasterNode("delete");
HugeTask<?> task = this.task(id);
@@ -565,11 +567,11 @@ public class StandardTaskScheduler implements
TaskScheduler {
* when the database status is inconsistent.
*/
if (task != null) {
- E.checkArgument(task.completed(),
+ E.checkArgument(force || task.completed(),
"Can't delete incomplete task '%s' in status %s" +
", Please try to cancel the task first",
id, task.status());
- this.remove(task);
+ this.remove(task, force);
}
return this.call(() -> {
@@ -579,7 +581,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
return null;
}
HugeTask<V> result = HugeTask.fromVertex(vertex);
- E.checkState(result.completed(),
+ E.checkState(force || result.completed(),
"Can't delete incomplete task '%s' in status %s",
id, result.status());
this.tx().removeVertex(vertex);
@@ -704,11 +706,13 @@ public class StandardTaskScheduler implements
TaskScheduler {
});
}
- private <V> V call(Runnable runnable) {
+ @Override
+ public <V> V call(Runnable runnable) {
return this.call(Executors.callable(runnable, null));
}
- private <V> V call(Callable<V> callable) {
+ @Override
+ public <V> V call(Callable<V> callable) {
assert !Thread.currentThread().getName().startsWith(
"task-db-worker") : "can't call by itself";
try {
@@ -741,129 +745,4 @@ public class StandardTaskScheduler implements
TaskScheduler {
return false;
}
}
-
- private static class TaskTransaction extends GraphTransaction {
-
- public static final String TASK = P.TASK;
-
- public TaskTransaction(HugeGraphParams graph, BackendStore store) {
- super(graph, store);
- this.autoCommit(true);
- }
-
- public HugeVertex constructVertex(HugeTask<?> task) {
- if (!this.graph().existsVertexLabel(TASK)) {
- throw new HugeException("Schema is missing for task(%s) '%s'",
- task.id(), task.name());
- }
- return this.constructVertex(false, task.asArray());
- }
-
- public void deleteIndex(HugeVertex vertex) {
- // Delete the old record if exist
- Iterator<Vertex> old = this.queryTaskInfos(vertex.id());
- HugeVertex oldV = (HugeVertex) QueryResults.one(old);
- if (oldV == null) {
- return;
- }
- this.deleteIndexIfNeeded(oldV, vertex);
- }
-
- private boolean deleteIndexIfNeeded(HugeVertex oldV, HugeVertex newV) {
- if (!oldV.value(P.STATUS).equals(newV.value(P.STATUS))) {
- // Only delete vertex if index value changed else override it
- this.updateIndex(this.indexLabel(P.STATUS).id(), oldV, true);
- return true;
- }
- return false;
- }
-
- public void initSchema() {
- if (this.existVertexLabel(TASK)) {
- return;
- }
-
- HugeGraph graph = this.graph();
- String[] properties = this.initProperties();
-
- // Create vertex label '~task'
- VertexLabel label = graph.schema().vertexLabel(TASK)
- .properties(properties)
- .useCustomizeNumberId()
- .nullableKeys(P.DESCRIPTION, P.CONTEXT,
- P.UPDATE, P.INPUT, P.RESULT,
- P.DEPENDENCIES, P.SERVER)
- .enableLabelIndex(true)
- .build();
- this.params().schemaTransaction().addVertexLabel(label);
-
- // Create index
- this.createIndexLabel(label, P.STATUS);
- }
-
- private boolean existVertexLabel(String label) {
- return this.params().schemaTransaction()
- .getVertexLabel(label) != null;
- }
-
- private String[] initProperties() {
- List<String> props = new ArrayList<>();
-
- props.add(createPropertyKey(P.TYPE));
- props.add(createPropertyKey(P.NAME));
- props.add(createPropertyKey(P.CALLABLE));
- props.add(createPropertyKey(P.DESCRIPTION));
- props.add(createPropertyKey(P.CONTEXT));
- props.add(createPropertyKey(P.STATUS, DataType.BYTE));
- props.add(createPropertyKey(P.PROGRESS, DataType.INT));
- props.add(createPropertyKey(P.CREATE, DataType.DATE));
- props.add(createPropertyKey(P.UPDATE, DataType.DATE));
- props.add(createPropertyKey(P.RETRIES, DataType.INT));
- props.add(createPropertyKey(P.INPUT, DataType.BLOB));
- props.add(createPropertyKey(P.RESULT, DataType.BLOB));
- props.add(createPropertyKey(P.DEPENDENCIES, DataType.LONG,
- Cardinality.SET));
- props.add(createPropertyKey(P.SERVER));
-
- return props.toArray(new String[0]);
- }
-
- private String createPropertyKey(String name) {
- return this.createPropertyKey(name, DataType.TEXT);
- }
-
- private String createPropertyKey(String name, DataType dataType) {
- return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
- }
-
- private String createPropertyKey(String name, DataType dataType,
- Cardinality cardinality) {
- HugeGraph graph = this.graph();
- SchemaManager schema = graph.schema();
- PropertyKey propertyKey = schema.propertyKey(name)
- .dataType(dataType)
- .cardinality(cardinality)
- .build();
- this.params().schemaTransaction().addPropertyKey(propertyKey);
- return name;
- }
-
- private IndexLabel createIndexLabel(VertexLabel label, String field) {
- HugeGraph graph = this.graph();
- SchemaManager schema = graph.schema();
- String name = Hidden.hide("task-index-by-" + field);
- IndexLabel indexLabel = schema.indexLabel(name)
- .on(HugeType.VERTEX_LABEL, TASK)
- .by(field)
- .build();
- this.params().schemaTransaction().addIndexLabel(label, indexLabel);
- return indexLabel;
- }
-
- private IndexLabel indexLabel(String field) {
- String name = Hidden.hide("task-index-by-" + field);
- HugeGraph graph = this.graph();
- return graph.indexLabel(name);
- }
- }
}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java
new file mode 100644
index 000000000..f076f6c46
--- /dev/null
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.query.Condition;
+import org.apache.hugegraph.backend.query.ConditionQuery;
+import org.apache.hugegraph.backend.query.QueryResults;
+import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.exception.NotFoundException;
+import org.apache.hugegraph.iterator.ListIterator;
+import org.apache.hugegraph.iterator.MapperIterator;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.type.define.HugeKeys;
+import org.apache.hugegraph.util.E;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Base class of task & result scheduler
+ */
+public abstract class TaskAndResultScheduler implements TaskScheduler {
+ /**
+ * Which graph the scheduler belongs to
+ */
+ protected final HugeGraphParams graph;
+ protected final String graphSpace;
+ protected final String graphName;
+
+ /**
+ * Task transactions, for persistence
+ */
+ protected volatile TaskAndResultTransaction taskTx = null;
+
+ private final ServerInfoManager serverManager;
+
+ public TaskAndResultScheduler(
+ HugeGraphParams graph,
+ ExecutorService serverInfoDbExecutor) {
+ E.checkNotNull(graph, "graph");
+
+ this.graph = graph;
+ // TODO: uncomment later - graph space
+ // this.graphSpace = graph.graph().graphSpace();
+ this.graphSpace = "";
+ this.graphName = graph.name();
+
+ this.serverManager = new ServerInfoManager(graph,
serverInfoDbExecutor);
+ }
+
+ @Override
+ public <V> void save(HugeTask<V> task) {
+ E.checkArgumentNotNull(task, "Task can't be null");
+ String rawResult = task.result();
+
+ // Save task without result;
+ this.call(() -> {
+ // Construct vertex from task
+ HugeVertex vertex = this.tx().constructTaskVertex(task);
+ // Delete index of old vertex to avoid stale index
+ this.tx().deleteIndex(vertex);
+ // Add or update task info to backend store
+ return this.tx().addVertex(vertex);
+ });
+
+ // 保存 result 结果
+ if (rawResult != null) {
+ HugeTaskResult result =
+ new HugeTaskResult(HugeTaskResult.genId(task.id()));
+ result.result(rawResult);
+
+ this.call(() -> {
+ // Construct vertex from task
+ HugeVertex vertex =
this.tx().constructTaskResultVertex(result);
+ // Add or update task info to backend store
+ return this.tx().addVertex(vertex);
+ });
+ }
+ }
+
+ @Override
+ public <V> HugeTask<V> task(Id id) {
+ HugeTask<V> task = this.call(() -> {
+ Iterator<Vertex> vertices = this.tx().queryTaskInfos(id);
+ Vertex vertex = QueryResults.one(vertices);
+ if (vertex == null) {
+ return null;
+ }
+ return HugeTask.fromVertex(vertex);
+ });
+
+ if (task == null) {
+ throw new NotFoundException("Can't find task with id '%s'", id);
+ }
+
+ HugeTaskResult taskResult = queryTaskResult(id);
+ if (taskResult != null) {
+ task.result(taskResult);
+ }
+
+ return task;
+ }
+
+ @Override
+ public <V> Iterator<HugeTask<V>> tasks(List<Id> ids) {
+ return this.tasksWithoutResult(ids);
+ }
+
+ @Override
+ public <V> Iterator<HugeTask<V>> tasks(TaskStatus status, long limit,
+ String page) {
+ if (status == null) {
+ return this.queryTaskWithoutResult(ImmutableMap.of(), limit, page);
+ }
+ return this.queryTaskWithoutResult(HugeTask.P.STATUS, status.code(),
+ limit, page);
+ }
+
+ protected <V> Iterator<HugeTask<V>> queryTask(String key, Object value,
+ long limit, String page) {
+ return this.queryTask(ImmutableMap.of(key, value), limit, page);
+ }
+
+ protected <V> Iterator<HugeTask<V>> queryTask(Map<String, Object>
conditions,
+ long limit, String page) {
+ Iterator<HugeTask<V>> ts = this.call(() -> {
+ ConditionQuery query = new ConditionQuery(HugeType.TASK);
+ if (page != null) {
+ query.page(page);
+ }
+ VertexLabel vl = this.graph().vertexLabel(HugeTask.P.TASK);
+ query.eq(HugeKeys.LABEL, vl.id());
+ for (Map.Entry<String, Object> entry : conditions.entrySet()) {
+ PropertyKey pk = this.graph().propertyKey(entry.getKey());
+ query.query(Condition.eq(pk.id(), entry.getValue()));
+ }
+ query.showHidden(true);
+ if (limit != NO_LIMIT) {
+ query.limit(limit);
+ }
+ Iterator<Vertex> vertices = this.tx().queryTaskInfos(query);
+ Iterator<HugeTask<V>> tasks =
+ new MapperIterator<>(vertices, HugeTask::fromVertex);
+ // Convert iterator to list to avoid across thread tx accessed
+ return QueryResults.toList(tasks);
+ });
+
+ return new MapperIterator<>(ts, (task) -> {
+ HugeTaskResult taskResult = queryTaskResult(task.id());
+ if (taskResult != null) {
+ task.result(taskResult);
+ }
+ return task;
+ });
+ }
+
+ protected <V> Iterator<HugeTask<V>> queryTask(List<Id> ids) {
+ ListIterator<HugeTask<V>> ts = this.call(
+ () -> {
+ Object[] idArray = ids.toArray(new Id[ids.size()]);
+ Iterator<Vertex> vertices = this.tx()
+ .queryTaskInfos(idArray);
+ Iterator<HugeTask<V>> tasks =
+ new MapperIterator<>(vertices,
+ HugeTask::fromVertex);
+ // Convert iterator to list to avoid across thread tx accessed
+ return QueryResults.toList(tasks);
+ });
+
+ Iterator<HugeTaskResult> results = queryTaskResult(ids);
+
+ HashMap<String, HugeTaskResult> resultCaches = new HashMap<>();
+ while (results.hasNext()) {
+ HugeTaskResult entry = results.next();
+ resultCaches.put(entry.taskResultId(), entry);
+ }
+
+ return new MapperIterator<>(ts, (task) -> {
+ HugeTaskResult taskResult =
+ resultCaches.get(HugeTaskResult.genId(task.id()));
+ if (taskResult != null) {
+ task.result(taskResult);
+ }
+ return task;
+ });
+ }
+
+ protected <V> HugeTask<V> taskWithoutResult(Id id) {
+ HugeTask<V> result = this.call(() -> {
+ Iterator<Vertex> vertices = this.tx().queryTaskInfos(id);
+ Vertex vertex = QueryResults.one(vertices);
+ if (vertex == null) {
+ return null;
+ }
+ return HugeTask.fromVertex(vertex);
+ });
+
+ return result;
+ }
+
+ protected <V> Iterator<HugeTask<V>> tasksWithoutResult(List<Id> ids) {
+ return this.call(() -> {
+ Object[] idArray = ids.toArray(new Id[ids.size()]);
+ Iterator<Vertex> vertices = this.tx().queryTaskInfos(idArray);
+ Iterator<HugeTask<V>> tasks =
+ new MapperIterator<>(vertices, HugeTask::fromVertex);
+ // Convert iterator to list to avoid across thread tx accessed
+ return QueryResults.toList(tasks);
+ });
+ }
+
+ protected <V> Iterator<HugeTask<V>> tasksWithoutResult(TaskStatus status,
+ long limit,
+ String page) {
+ if (status == null) {
+ return this.queryTaskWithoutResult(ImmutableMap.of(), limit, page);
+ }
+ return this.queryTaskWithoutResult(HugeTask.P.STATUS, status.code(),
+ limit, page);
+ }
+
+ protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(String key,
+ Object value,
+ long limit,
String page) {
+ return this.queryTaskWithoutResult(ImmutableMap.of(key, value), limit,
page);
+ }
+
+ protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(Map<String,
+ Object> conditions, long limit, String page) {
+ return this.call(() -> {
+ ConditionQuery query = new ConditionQuery(HugeType.TASK);
+ if (page != null) {
+ query.page(page);
+ }
+ VertexLabel vl = this.graph().vertexLabel(HugeTask.P.TASK);
+ query.eq(HugeKeys.LABEL, vl.id());
+ for (Map.Entry<String, Object> entry : conditions.entrySet()) {
+ PropertyKey pk = this.graph().propertyKey(entry.getKey());
+ query.query(Condition.eq(pk.id(), entry.getValue()));
+ }
+ query.showHidden(true);
+ if (limit != NO_LIMIT) {
+ query.limit(limit);
+ }
+ Iterator<Vertex> vertices = this.tx().queryTaskInfos(query);
+ Iterator<HugeTask<V>> tasks =
+ new MapperIterator<>(vertices, HugeTask::fromVertex);
+ // Convert iterator to list to avoid across thread tx accessed
+ return QueryResults.toList(tasks);
+ });
+ }
+
+ protected HugeTaskResult queryTaskResult(Id taskid) {
+ HugeTaskResult result = this.call(() -> {
+ Iterator<Vertex> vertices =
+ this.tx().queryTaskInfos(HugeTaskResult.genId(taskid));
+ Vertex vertex = QueryResults.one(vertices);
+ if (vertex == null) {
+ return null;
+ }
+
+ return HugeTaskResult.fromVertex(vertex);
+ });
+
+ return result;
+ }
+
+ protected Iterator<HugeTaskResult> queryTaskResult(List<Id> taskIds) {
+ return this.call(() -> {
+ Object[] idArray =
+ taskIds.stream().map(HugeTaskResult::genId).toArray();
+ Iterator<Vertex> vertices = this.tx()
+ .queryTaskInfos(idArray);
+ Iterator<HugeTaskResult> tasks =
+ new MapperIterator<>(vertices,
+ HugeTaskResult::fromVertex);
+ // Convert iterator to list to avoid across thread tx accessed
+ return QueryResults.toList(tasks);
+ });
+ }
+
+ protected TaskAndResultTransaction tx() {
+ // NOTE: only the owner thread can access task tx
+ if (this.taskTx == null) {
+ /*
+ * NOTE: don't synchronized(this) due to scheduler thread hold
+ * this lock through scheduleTasks(), then query tasks and wait
+ * for db-worker thread after call(), the tx may not be initialized
+ * but can't catch this lock, then cause deadlock.
+ * We just use this.serverManager as a monitor here
+ */
+ synchronized (this.serverManager) {
+ if (this.taskTx == null) {
+ BackendStore store = this.graph.loadSystemStore();
+ TaskAndResultTransaction tx = new
TaskAndResultTransaction(this.graph, store);
+ assert this.taskTx == null; // may be reentrant?
+ this.taskTx = tx;
+ }
+ }
+ }
+ assert this.taskTx != null;
+ return this.taskTx;
+ }
+
+ @Override
+ public ServerInfoManager serverManager() {
+ return this.serverManager;
+ }
+}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultTransaction.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultTransaction.java
new file mode 100644
index 000000000..c39ae2361
--- /dev/null
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultTransaction.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.SchemaManager;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.type.define.Cardinality;
+import org.apache.hugegraph.type.define.DataType;
+
+public class TaskAndResultTransaction extends TaskTransaction {
+
+ public static final String TASKRESULT = HugeTaskResult.P.TASKRESULT;
+
+ /**
+ * Task transactions, for persistence
+ */
+ protected volatile TaskAndResultTransaction taskTx = null;
+
+ public TaskAndResultTransaction(HugeGraphParams graph, BackendStore store)
{
+ super(graph, store);
+ this.autoCommit(true);
+ }
+
+ public HugeVertex constructTaskVertex(HugeTask<?> task) {
+ if (!this.graph().existsVertexLabel(TASK)) {
+ throw new HugeException("Schema is missing for task(%s) '%s'",
+ task.id(), task.name());
+ }
+
+ return this.constructVertex(false, task.asArrayWithoutResult());
+ }
+
+ public HugeVertex constructTaskResultVertex(HugeTaskResult taskResult) {
+ if (!this.graph().existsVertexLabel(TASKRESULT)) {
+ throw new HugeException("Schema is missing for task result");
+ }
+
+ return this.constructVertex(false, taskResult.asArray());
+ }
+
+ @Override
+ public void initSchema() {
+ super.initSchema();
+
+ if (this.graph().existsVertexLabel(TASKRESULT)) {
+ return;
+ }
+
+ HugeGraph graph = this.graph();
+ String[] properties = this.initTaskResultProperties();
+
+ // Create vertex label '~taskresult'
+ VertexLabel label =
+
graph.schema().vertexLabel(HugeTaskResult.P.TASKRESULT).properties(properties)
+ .nullableKeys(HugeTaskResult.P.RESULT)
+ .useCustomizeStringId().enableLabelIndex(true).build();
+
+ graph.addVertexLabel(label);
+ }
+
+ private String[] initTaskResultProperties() {
+ List<String> props = new ArrayList<>();
+ props.add(createPropertyKey(HugeTaskResult.P.RESULT, DataType.BLOB));
+
+ return props.toArray(new String[0]);
+ }
+
+ private String createPropertyKey(String name, DataType dataType) {
+ return createPropertyKey(name, dataType, Cardinality.SINGLE);
+ }
+
+ private String createPropertyKey(String name, DataType dataType,
Cardinality cardinality) {
+ SchemaManager schema = this.graph().schema();
+ PropertyKey propertyKey =
+
schema.propertyKey(name).dataType(dataType).cardinality(cardinality).build();
+ this.graph().addPropertyKey(propertyKey);
+ return name;
+ }
+}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index 524a1f759..056b7ac5a 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -47,6 +47,11 @@ public final class TaskManager {
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";
+ public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
+ public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
+ public static final String EPHEMERAL_TASK_WORKER =
"ephemeral-task-worker-%d";
+ public static final String DISTRIBUTED_TASK_SCHEDULER =
"distributed-scheduler-%d";
+
protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
private static final int THREADS = 4;
@@ -59,6 +64,11 @@ public final class TaskManager {
private final ExecutorService serverInfoDbExecutor;
private final PausableScheduledThreadPool schedulerExecutor;
+ private final ExecutorService schemaTaskExecutor;
+ private final ExecutorService olapTaskExecutor;
+ private final ExecutorService ephemeralTaskExecutor;
+ private final PausableScheduledThreadPool distributedSchedulerExecutor;
+
private boolean enableRoleElected = false;
public static TaskManager instance() {
@@ -75,6 +85,17 @@ public final class TaskManager {
1, TASK_DB_WORKER);
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);
+
+ this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+
SCHEMA_TASK_WORKER);
+ this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+
OLAP_TASK_WORKER);
+ this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+
EPHEMERAL_TASK_WORKER);
+ this.distributedSchedulerExecutor =
+ ExecutorUtil.newPausableScheduledThreadPool(1,
+
DISTRIBUTED_TASK_SCHEDULER);
+
// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
@@ -87,11 +108,36 @@ public final class TaskManager {
public void addScheduler(HugeGraphParams graph) {
E.checkArgumentNotNull(graph, "The graph can't be null");
-
- TaskScheduler scheduler = new StandardTaskScheduler(graph,
- this.taskExecutor, this.taskDbExecutor,
- this.serverInfoDbExecutor);
- this.schedulers.put(graph, scheduler);
+ LOG.info("Use {} as the scheduler of graph ({})",
+ graph.schedulerType(), graph.name());
+ // TODO: 如当前服务绑定到指定的非 DEFAULT 图空间,非当前图空间的图不再创建任务调度器 (graph space)
+ switch (graph.schedulerType()) {
+ case "distributed": {
+ TaskScheduler scheduler =
+ new DistributedTaskScheduler(
+ graph,
+ distributedSchedulerExecutor,
+ taskDbExecutor,
+ schemaTaskExecutor,
+ olapTaskExecutor,
+ taskExecutor, /* gremlinTaskExecutor */
+ ephemeralTaskExecutor,
+ serverInfoDbExecutor);
+ this.schedulers.put(graph, scheduler);
+ break;
+ }
+ case "local":
+ default: {
+ TaskScheduler scheduler =
+ new StandardTaskScheduler(
+ graph,
+ this.taskExecutor,
+ this.taskDbExecutor,
+ this.serverInfoDbExecutor);
+ this.schedulers.put(graph, scheduler);
+ break;
+ }
+ }
}
public void closeScheduler(HugeGraphParams graph) {
@@ -122,6 +168,10 @@ public final class TaskManager {
if (!this.schedulerExecutor.isTerminated()) {
this.closeSchedulerTx(graph);
}
+
+ if (!this.distributedSchedulerExecutor.isTerminated()) {
+ this.closeDistributedSchedulerTx(graph);
+ }
}
private void closeTaskTx(HugeGraphParams graph) {
@@ -156,6 +206,21 @@ public final class TaskManager {
}
}
+ private void closeDistributedSchedulerTx(HugeGraphParams graph) {
+ final Callable<Void> closeTx = () -> {
+ // Do close-tx for current thread
+ graph.closeTx();
+ // Let other threads run
+ Thread.yield();
+ return null;
+ };
+ try {
+ this.distributedSchedulerExecutor.submit(closeTx).get();
+ } catch (Exception e) {
+ throw new HugeException("Exception when closing scheduler tx", e);
+ }
+ }
+
public void pauseScheduledThreadPool() {
this.schedulerExecutor.pauseSchedule();
}
@@ -169,8 +234,7 @@ public final class TaskManager {
}
public ServerInfoManager getServerInfoManager(HugeGraphParams graph) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
- this.getScheduler(graph);
+ TaskScheduler scheduler = this.getScheduler(graph);
if (scheduler == null) {
return null;
}
@@ -194,10 +258,21 @@ public final class TaskManager {
}
}
+ if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
+ this.distributedSchedulerExecutor.shutdown();
+ try {
+ terminated =
this.distributedSchedulerExecutor.awaitTermination(timeout,
+
unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
if (terminated && !this.taskExecutor.isShutdown()) {
this.taskExecutor.shutdown();
try {
- terminated = this.taskExecutor.awaitTermination(timeout, unit);
+ terminated = this.taskExecutor.awaitTermination(timeout,
+ unit);
} catch (Throwable e) {
ex = e;
}
@@ -216,7 +291,38 @@ public final class TaskManager {
if (terminated && !this.taskDbExecutor.isShutdown()) {
this.taskDbExecutor.shutdown();
try {
- terminated = this.taskDbExecutor.awaitTermination(timeout,
unit);
+ terminated = this.taskDbExecutor.awaitTermination(timeout,
+ unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
+ if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
+ this.ephemeralTaskExecutor.shutdown();
+ try {
+ terminated =
this.ephemeralTaskExecutor.awaitTermination(timeout,
+ unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
+ if (terminated && !this.schemaTaskExecutor.isShutdown()) {
+ this.schemaTaskExecutor.shutdown();
+ try {
+ terminated = this.schemaTaskExecutor.awaitTermination(timeout,
+ unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
+ if (terminated && !this.olapTaskExecutor.isShutdown()) {
+ this.olapTaskExecutor.shutdown();
+ try {
+ terminated = this.olapTaskExecutor.awaitTermination(timeout,
+ unit);
} catch (Throwable e) {
ex = e;
}
@@ -260,8 +366,7 @@ public final class TaskManager {
public void onAsRoleMaster() {
try {
for (TaskScheduler entry : this.schedulers.values()) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
entry;
- ServerInfoManager serverInfoManager =
scheduler.serverManager();
+ ServerInfoManager serverInfoManager = entry.serverManager();
serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(),
NodeRole.MASTER);
}
} catch (Throwable e) {
@@ -273,8 +378,7 @@ public final class TaskManager {
public void onAsRoleWorker() {
try {
for (TaskScheduler entry : this.schedulers.values()) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
entry;
- ServerInfoManager serverInfoManager =
scheduler.serverManager();
+ ServerInfoManager serverInfoManager = entry.serverManager();
serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(),
NodeRole.WORKER);
}
} catch (Throwable e) {
@@ -291,7 +395,7 @@ public final class TaskManager {
// Called by scheduler timer
try {
for (TaskScheduler entry : this.schedulers.values()) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
entry;
+ TaskScheduler scheduler = entry;
// Maybe other thread close&remove scheduler at the same time
synchronized (scheduler) {
this.scheduleOrExecuteJobForGraph(scheduler);
@@ -302,56 +406,59 @@ public final class TaskManager {
}
}
- private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler)
{
+ private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) {
E.checkNotNull(scheduler, "scheduler");
- ServerInfoManager serverManager = scheduler.serverManager();
- String graph = scheduler.graphName();
-
- LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
- try {
- /*
- * Skip if:
- * graph is closed (iterate schedulers before graph is closing)
- * or
- * graph is not initialized(maybe truncated or cleared).
- *
- * If graph is closing by other thread, current thread get
- * serverManager and try lock graph, at the same time other
- * thread deleted the lock-group, current thread would get
- * exception 'LockGroup xx does not exists'.
- * If graph is closed, don't call serverManager.initialized()
- * due to it will reopen graph tx.
- */
- if (!serverManager.graphReady()) {
- return;
- }
-
- // Update server heartbeat
- serverManager.heartbeat();
+ if (scheduler instanceof StandardTaskScheduler) {
+ StandardTaskScheduler standardTaskScheduler =
(StandardTaskScheduler) (scheduler);
+ ServerInfoManager serverManager = scheduler.serverManager();
+ String graph = scheduler.graphName();
- /*
- * Master schedule tasks to suitable servers.
- * Worker maybe become Master, so Master also need perform tasks
assigned by
- * previous Master when enableRoleElected is true.
- * However, the master only needs to take the assignment,
- * because the master stays the same when enableRoleElected is
false.
- * There is no suitable server when these tasks are created
- */
- if (serverManager.master()) {
- scheduler.scheduleTasks();
- if (!this.enableRoleElected &&
!serverManager.onlySingleNode()) {
+ LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
+ try {
+ /*
+ * Skip if:
+ * graph is closed (iterate schedulers before graph is closing)
+ * or
+ * graph is not initialized(maybe truncated or cleared).
+ *
+ * If graph is closing by other thread, current thread get
+ * serverManager and try lock graph, at the same time other
+ * thread deleted the lock-group, current thread would get
+ * exception 'LockGroup xx does not exists'.
+ * If graph is closed, don't call serverManager.initialized()
+ * due to it will reopen graph tx.
+ */
+ if (!serverManager.graphReady()) {
return;
}
- }
- // Schedule queued tasks scheduled to current server
- scheduler.executeTasksOnWorker(serverManager.selfServerId());
+ // Update server heartbeat
+ serverManager.heartbeat();
+
+ /*
+ * Master schedule tasks to suitable servers.
+ * Worker maybe become Master, so Master also need perform
tasks assigned by
+ * previous Master when enableRoleElected is true.
+ * However, the master only needs to take the assignment,
+ * because the master stays the same when enableRoleElected is
false.
+ * There is no suitable server when these tasks are created
+ */
+ if (serverManager.master()) {
+ standardTaskScheduler.scheduleTasks();
+ if (!this.enableRoleElected &&
!serverManager.onlySingleNode()) {
+ return;
+ }
+ }
- // Cancel tasks scheduled to current server
- scheduler.cancelTasksOnWorker(serverManager.selfServerId());
- } finally {
- LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+ // Schedule queued tasks scheduled to current server
+
standardTaskScheduler.executeTasksOnWorker(serverManager.selfServerId());
+
+ // Cancel tasks scheduled to current server
+
standardTaskScheduler.cancelTasksOnWorker(serverManager.selfServerId());
+ } finally {
+ LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+ }
}
}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
index 69304f43c..4ee4c992a 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.task;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@@ -27,6 +28,11 @@ import org.apache.hugegraph.HugeGraph;
public interface TaskScheduler {
+ long NO_LIMIT = -1L;
+ long PAGE_SIZE = 500L;
+ long QUERY_INTERVAL = 100L;
+ int MAX_PENDING_TASKS = 10000;
+
HugeGraph graph();
int pendingTasks();
@@ -39,7 +45,7 @@ public interface TaskScheduler {
<V> void save(HugeTask<V> task);
- <V> HugeTask<V> delete(Id id);
+ <V> HugeTask<V> delete(Id id, boolean force);
<V> HugeTask<V> task(Id id);
@@ -62,4 +68,14 @@ public interface TaskScheduler {
throws TimeoutException;
void checkRequirement(String op);
+
+ <V> V call(Callable<V> callable);
+
+ <V> V call(Runnable runnable);
+
+ ServerInfoManager serverManager();
+
+ String graphName();
+
+ void taskDone(HugeTask<?> task);
}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
index dd65206b0..a9c3101d2 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
@@ -37,7 +37,9 @@ public enum TaskStatus implements SerialEnum {
SUCCESS(7, "success"),
CANCELLING(8, "cancelling"),
CANCELLED(9, "cancelled"),
- FAILED(10, "failed");
+ FAILED(10, "failed"),
+ HANGING(11, "hanging"),
+ DELETING(12, "deleting");
// NOTE: order is important(RESTORING > RUNNING > QUEUED) when restoring
public static final List<TaskStatus> PENDING_STATUSES = ImmutableList.of(
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java
new file mode 100644
index 000000000..2b27019c7
--- /dev/null
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.query.QueryResults;
+import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.schema.IndexLabel;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.SchemaManager;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.type.define.Cardinality;
+import org.apache.hugegraph.type.define.DataType;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+public class TaskTransaction extends GraphTransaction {
+
+ public static final String TASK = HugeTask.P.TASK;
+
+ public TaskTransaction(HugeGraphParams graph, BackendStore store) {
+ super(graph, store);
+ this.autoCommit(true);
+ }
+
+ public HugeVertex constructVertex(HugeTask<?> task) {
+ if (!this.graph().existsVertexLabel(TASK)) {
+ throw new HugeException("Schema is missing for task(%s) '%s'",
+ task.id(), task.name());
+ }
+ return this.constructVertex(false, task.asArray());
+ }
+
+ public void deleteIndex(HugeVertex vertex) {
+ // Delete the old record if exist
+ Iterator<Vertex> old = this.queryTaskInfos(vertex.id());
+ HugeVertex oldV = (HugeVertex) QueryResults.one(old);
+ if (oldV == null) {
+ return;
+ }
+ this.deleteIndexIfNeeded(oldV, vertex);
+ }
+
+ private boolean deleteIndexIfNeeded(HugeVertex oldV, HugeVertex newV) {
+ if
(!oldV.value(HugeTask.P.STATUS).equals(newV.value(HugeTask.P.STATUS))) {
+ // Only delete vertex if index value changed else override it
+ this.updateIndex(this.indexLabel(HugeTask.P.STATUS).id(), oldV,
true);
+ return true;
+ }
+ return false;
+ }
+
+ public void initSchema() {
+ if (this.existVertexLabel(TASK)) {
+ return;
+ }
+
+ HugeGraph graph = this.graph();
+ String[] properties = this.initProperties();
+
+ // Create vertex label '~task'
+ VertexLabel label = graph.schema().vertexLabel(TASK)
+ .properties(properties)
+ .useCustomizeNumberId()
+ .nullableKeys(HugeTask.P.DESCRIPTION,
HugeTask.P.CONTEXT,
+ HugeTask.P.UPDATE,
HugeTask.P.INPUT,
+ HugeTask.P.RESULT,
+ HugeTask.P.DEPENDENCIES,
HugeTask.P.SERVER)
+ .enableLabelIndex(true)
+ .build();
+ this.params().schemaTransaction().addVertexLabel(label);
+
+ // Create index
+ this.createIndexLabel(label, HugeTask.P.STATUS);
+ }
+
+ private boolean existVertexLabel(String label) {
+ return this.params().schemaTransaction()
+ .getVertexLabel(label) != null;
+ }
+
+ private String[] initProperties() {
+ List<String> props = new ArrayList<>();
+
+ props.add(createPropertyKey(HugeTask.P.TYPE));
+ props.add(createPropertyKey(HugeTask.P.NAME));
+ props.add(createPropertyKey(HugeTask.P.CALLABLE));
+ props.add(createPropertyKey(HugeTask.P.DESCRIPTION));
+ props.add(createPropertyKey(HugeTask.P.CONTEXT));
+ props.add(createPropertyKey(HugeTask.P.STATUS, DataType.BYTE));
+ props.add(createPropertyKey(HugeTask.P.PROGRESS, DataType.INT));
+ props.add(createPropertyKey(HugeTask.P.CREATE, DataType.DATE));
+ props.add(createPropertyKey(HugeTask.P.UPDATE, DataType.DATE));
+ props.add(createPropertyKey(HugeTask.P.RETRIES, DataType.INT));
+ props.add(createPropertyKey(HugeTask.P.INPUT, DataType.BLOB));
+ props.add(createPropertyKey(HugeTask.P.RESULT, DataType.BLOB));
+ props.add(createPropertyKey(HugeTask.P.DEPENDENCIES, DataType.LONG,
+ Cardinality.SET));
+ props.add(createPropertyKey(HugeTask.P.SERVER));
+
+ return props.toArray(new String[0]);
+ }
+
+ private String createPropertyKey(String name) {
+ return this.createPropertyKey(name, DataType.TEXT);
+ }
+
+ private String createPropertyKey(String name, DataType dataType) {
+ return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
+ }
+
+ private String createPropertyKey(String name, DataType dataType,
+ Cardinality cardinality) {
+ HugeGraph graph = this.graph();
+ SchemaManager schema = graph.schema();
+ PropertyKey propertyKey = schema.propertyKey(name)
+ .dataType(dataType)
+ .cardinality(cardinality)
+ .build();
+ this.params().schemaTransaction().addPropertyKey(propertyKey);
+ return name;
+ }
+
+ private IndexLabel createIndexLabel(VertexLabel label, String field) {
+ HugeGraph graph = this.graph();
+ SchemaManager schema = graph.schema();
+ String name = Graph.Hidden.hide("task-index-by-" + field);
+ IndexLabel indexLabel = schema.indexLabel(name)
+ .on(HugeType.VERTEX_LABEL, TASK)
+ .by(field)
+ .build();
+ this.params().schemaTransaction().addIndexLabel(label, indexLabel);
+ return indexLabel;
+ }
+
+ private IndexLabel indexLabel(String field) {
+ String name = Graph.Hidden.hide("task-index-by-" + field);
+ HugeGraph graph = this.graph();
+ return graph.indexLabel(name);
+ }
+}
diff --git
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
index dad89db79..bc38aedd9 100755
---
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
+++
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
@@ -450,6 +450,10 @@ public class HstoreSessionsImpl extends HstoreSessions {
*/
@Override
public Integer commit() {
+ if (!this.hasChanges()) {
+ // TODO: log a message with level WARNING
+ return 0;
+ }
int commitSize = this.changedSize;
if (TRANSACTIONAL) {
this.graph.commit();
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
index dabf3ad0f..27cd880fa 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
@@ -56,7 +56,7 @@ public class TaskCoreTest extends BaseCoreTest {
Iterator<HugeTask<Object>> iter = scheduler.tasks(null, -1, null);
while (iter.hasNext()) {
- scheduler.delete(iter.next().id());
+ scheduler.delete(iter.next().id(), true);
}
}
@@ -77,7 +77,7 @@ public class TaskCoreTest extends BaseCoreTest {
Assert.assertFalse(task.completed());
Assert.assertThrows(IllegalArgumentException.class, () -> {
- scheduler.delete(id);
+ scheduler.delete(id, true);
}, e -> {
Assert.assertContains("Can't delete incomplete task '88888'",
e.getMessage());
@@ -107,7 +107,7 @@ public class TaskCoreTest extends BaseCoreTest {
Assert.assertEquals("test-task", iter.next().name());
Assert.assertFalse(iter.hasNext());
- scheduler.delete(id);
+ scheduler.delete(id, true);
iter = scheduler.tasks(null, 10, null);
Assert.assertFalse(iter.hasNext());
Assert.assertThrows(NotFoundException.class, () -> {
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
index c6dcff4a8..1dfa85fb9 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
@@ -148,7 +148,7 @@ public class TestGraph implements Graph {
TaskScheduler scheduler = this.graph.taskScheduler();
scheduler.tasks(null, -1, null).forEachRemaining(elem -> {
- scheduler.delete(elem.id());
+ scheduler.delete(elem.id(), true);
});
}