This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a commit to branch pd-store
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/pd-store by this push:
     new f34743b5f feat: preliminary support for distributed task scheduler 
(#2319)
f34743b5f is described below

commit f34743b5fbd2b0dd2da10932417d8a4d047bda9a
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Oct 16 11:22:50 2023 +0800

    feat: preliminary support for distributed task scheduler (#2319)
    
    * chore: migrate DistributedTaskScheduler
    
    * fix: basic setup for DistributedTaskScheduler
    
    * chore: set default scheduler type to local
    
    * chore: code style
    
    * chore: code style
    
    * fix: impl checkRequirement for DistributedTaskScheduler
    
    * chore: add comment for task server id
    
    * fix: task result vertex type
    
    * chore: review
    
    * fix: StandardClusterRoleStore queryVertex
    
    * chore: add comment
    
    duplicated commit in hstore for clusterRole
---
 .../hugegraph/api/filter/RedirectFilter.java       |  24 +
 .../java/org/apache/hugegraph/api/job/TaskAPI.java |   5 +-
 .../apache/hugegraph/auth/HugeGraphAuthProxy.java  |  36 +-
 .../java/org/apache/hugegraph/HugeGraphParams.java |   2 +
 .../org/apache/hugegraph/StandardHugeGraph.java    |   8 +
 .../backend/cache/CachedSchemaTransactionV2.java   |   2 +-
 .../org/apache/hugegraph/config/CoreOptions.java   |  16 +
 .../masterelection/StandardClusterRoleStore.java   |   7 +-
 .../org/apache/hugegraph/structure/HugeVertex.java |  10 +-
 .../hugegraph/task/DistributedTaskScheduler.java   | 667 +++++++++++++++++++++
 .../java/org/apache/hugegraph/task/HugeTask.java   |  95 ++-
 .../org/apache/hugegraph/task/HugeTaskResult.java  | 122 ++++
 .../hugegraph/task/StandardTaskScheduler.java      | 157 +----
 .../hugegraph/task/TaskAndResultScheduler.java     | 335 +++++++++++
 .../hugegraph/task/TaskAndResultTransaction.java   | 103 ++++
 .../org/apache/hugegraph/task/TaskManager.java     | 221 +++++--
 .../org/apache/hugegraph/task/TaskScheduler.java   |  18 +-
 .../java/org/apache/hugegraph/task/TaskStatus.java |   4 +-
 .../org/apache/hugegraph/task/TaskTransaction.java | 165 +++++
 .../backend/store/hstore/HstoreSessionsImpl.java   |   4 +
 .../org/apache/hugegraph/core/TaskCoreTest.java    |   6 +-
 .../org/apache/hugegraph/tinkerpop/TestGraph.java  |   2 +-
 22 files changed, 1796 insertions(+), 213 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
index e675dd955..044b60ca0 100644
--- 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
+++ 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java
@@ -26,17 +26,23 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.hugegraph.HugeGraph;
 import org.apache.hugegraph.core.GraphManager;
 import org.apache.hugegraph.masterelection.GlobalMasterInfo;
+import org.apache.hugegraph.task.DistributedTaskScheduler;
 import org.apache.hugegraph.util.E;
 import org.apache.hugegraph.util.Log;
 import org.glassfish.hk2.api.IterableProvider;
 import org.glassfish.hk2.api.ServiceHandle;
 import org.glassfish.jersey.message.internal.HeaderUtils;
+import org.glassfish.jersey.server.ContainerRequest;
 import org.slf4j.Logger;
 
 import jakarta.ws.rs.NameBinding;
@@ -56,6 +62,10 @@ public class RedirectFilter implements 
ContainerRequestFilter {
 
     private static final String X_HG_REDIRECT = "x-hg-redirect";
 
+    private static final String TASK_API_REGEX = "graphs/(.*?)/tasks/.*";
+
+    private static final Pattern TASK_API_PATTERN = 
Pattern.compile(TASK_API_REGEX);
+
     private static volatile Client client = null;
 
     @Context
@@ -91,6 +101,20 @@ public class RedirectFilter implements 
ContainerRequestFilter {
             StringUtils.isEmpty(masterNodeInfo.url())) {
             return;
         }
+
+        // skip filter if call TaskAPI to graph with DistributedTaskScheduler
+        if (context instanceof ContainerRequest) {
+            String relativePath = ((ContainerRequest) context).getPath(true);
+            Matcher matcher = TASK_API_PATTERN.matcher(relativePath);
+            if (matcher.matches()) {
+                HugeGraph graph = manager.graph(matcher.group(1));
+                if (Objects.nonNull(graph) &&
+                    graph.taskScheduler() instanceof DistributedTaskScheduler) 
{
+                    return;
+                }
+            }
+        }
+
         String url = masterNodeInfo.url();
 
         URI redirectUri;
diff --git 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
index c2dedf32a..f295e3e00 100644
--- 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
+++ 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java
@@ -136,11 +136,12 @@ public class TaskAPI extends API {
     @RedirectFilter.RedirectMasterRole
     public void delete(@Context GraphManager manager,
                        @PathParam("graph") String graph,
-                       @PathParam("id") long id) {
+                       @PathParam("id") long id,
+                       @DefaultValue("false") @QueryParam("force") boolean 
force) {
         LOG.debug("Graph [{}] delete task: {}", graph, id);
 
         TaskScheduler scheduler = graph(manager, graph).taskScheduler();
-        HugeTask<?> task = scheduler.delete(IdGenerator.of(id));
+        HugeTask<?> task = scheduler.delete(IdGenerator.of(id), force);
         E.checkArgument(task != null, "There is no task with id '%s'", id);
     }
 
diff --git 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
index 04cfac30d..96841dbe6 100644
--- 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
+++ 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -70,6 +71,7 @@ import org.apache.hugegraph.structure.HugeElement;
 import org.apache.hugegraph.structure.HugeFeatures;
 import org.apache.hugegraph.structure.HugeVertex;
 import org.apache.hugegraph.task.HugeTask;
+import org.apache.hugegraph.task.ServerInfoManager;
 import org.apache.hugegraph.task.TaskManager;
 import org.apache.hugegraph.task.TaskScheduler;
 import org.apache.hugegraph.task.TaskStatus;
@@ -1085,10 +1087,10 @@ public final class HugeGraphAuthProxy implements 
HugeGraph {
         }
 
         @Override
-        public <V> HugeTask<V> delete(Id id) {
+        public <V> HugeTask<V> delete(Id id, boolean force) {
             verifyTaskPermission(HugePermission.DELETE,
                                  this.taskScheduler.task(id));
-            return this.taskScheduler.delete(id);
+            return this.taskScheduler.delete(id, force);
         }
 
         @Override
@@ -1124,6 +1126,36 @@ public final class HugeGraphAuthProxy implements 
HugeGraph {
             this.taskScheduler.checkRequirement(op);
         }
 
+        @Override
+        public <V> V call(Callable<V> callable) {
+            verifyAnyPermission();
+            return this.taskScheduler.call(callable);
+        }
+
+        @Override
+        public <V> V call(Runnable runnable) {
+            verifyAnyPermission();
+            return this.taskScheduler.call(runnable);
+        }
+
+        @Override
+        public ServerInfoManager serverManager() {
+            verifyAnyPermission();
+            return this.taskScheduler.serverManager();
+        }
+
+        @Override
+        public String graphName() {
+            verifyAnyPermission();
+            return this.taskScheduler.graphName();
+        }
+
+        @Override
+        public void taskDone(HugeTask<?> task) {
+            verifyAnyPermission();
+            this.taskScheduler.taskDone(task);
+        }
+
         private void verifyTaskPermission(HugePermission actionPerm) {
             verifyPermission(actionPerm, ResourceType.TASK);
         }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
index dd611ae68..dca98b1d0 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
@@ -94,4 +94,6 @@ public interface HugeGraphParams {
     RamTable ramtable();
 
     <T> void submitEphemeralJob(EphemeralJob<T> job);
+
+    String schedulerType();
 }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index c2ea1ea43..805cf78a1 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -181,6 +181,8 @@ public class StandardHugeGraph implements HugeGraph {
 
     private final MetaManager metaManager = MetaManager.instance();
 
+    private final String schedulerType;
+
     public StandardHugeGraph(HugeConfig config) {
         this.params = new StandardHugeGraphParams();
         this.configuration = config;
@@ -214,6 +216,7 @@ public class StandardHugeGraph implements HugeGraph {
         this.closed = false;
         this.mode = GraphMode.NONE;
         this.readMode = GraphReadMode.OLTP_ONLY;
+        this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE);
 
         LockUtil.init(this.name);
 
@@ -1339,6 +1342,11 @@ public class StandardHugeGraph implements HugeGraph {
         public <T> void submitEphemeralJob(EphemeralJob<T> job) {
             this.ephemeralJobQueue.add(job);
         }
+
+        @Override
+        public String schedulerType() {
+            return StandardHugeGraph.this.schedulerType;
+        }
     }
 
     private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
index 6e5d6dca1..b113e0f37 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
@@ -25,7 +25,7 @@ import org.apache.hugegraph.util.Events;
 import com.google.common.collect.ImmutableSet;
 
 public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {
-   private final Cache<Id, Object> idCache;
+    private final Cache<Id, Object> idCache;
     private final Cache<Id, Object> nameCache;
 
     private final SchemaCaches<SchemaElement> arrayCaches;
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
index fe8dfc2e2..bb8745327 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
@@ -331,6 +331,14 @@ public class CoreOptions extends OptionHolder {
                     1
             );
 
+    public static final ConfigOption<String> SCHEDULER_TYPE =
+            new ConfigOption<>(
+                    "task.scheduler_type",
+                    "The type of scheduler used in distribution system.",
+                    allowValues("local", "distributed"),
+                    "local"
+            );
+
     public static final ConfigOption<Boolean> TASK_SYNC_DELETION =
             new ConfigOption<>(
                     "task.sync_deletion",
@@ -339,6 +347,14 @@ public class CoreOptions extends OptionHolder {
                     false
             );
 
+    public static final ConfigOption<Integer> TASK_RETRY =
+            new ConfigOption<>(
+                    "task.retry",
+                    "Task retry times.",
+                    rangeInt(0, 3),
+                    0
+            );
+
     public static final ConfigOption<Long> STORE_CONN_DETECT_INTERVAL =
             new ConfigOption<>(
                     "store.connection_detect_interval",
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
index f0f69319d..fa7a0777e 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardClusterRoleStore.java
@@ -146,7 +146,12 @@ public class StandardClusterRoleStore implements 
ClusterRoleStore {
 
     private Optional<Vertex> queryVertex() {
         GraphTransaction tx = this.graph.systemTransaction();
-        ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
+        ConditionQuery query;
+        if (this.graph.backendStoreFeatures().supportsTaskAndServerVertex()) {
+            query = new ConditionQuery(HugeType.SERVER);
+        } else {
+            query = new ConditionQuery(HugeType.VERTEX);
+        }
         VertexLabel vl = this.graph.graph().vertexLabel(P.ROLE_DATA);
         query.eq(HugeKeys.LABEL, vl.id());
         query.query(Condition.eq(vl.primaryKeys().get(0), "default"));
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
index 731bfa444..47f13a956 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java
@@ -39,12 +39,14 @@ import org.apache.hugegraph.backend.query.QueryResults;
 import org.apache.hugegraph.backend.serializer.BytesBuffer;
 import org.apache.hugegraph.backend.tx.GraphTransaction;
 import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.masterelection.StandardClusterRoleStore;
 import org.apache.hugegraph.perf.PerfUtil.Watched;
 import org.apache.hugegraph.schema.EdgeLabel;
 import org.apache.hugegraph.schema.PropertyKey;
 import org.apache.hugegraph.schema.VertexLabel;
 import org.apache.hugegraph.task.HugeServerInfo;
 import org.apache.hugegraph.task.HugeTask;
+import org.apache.hugegraph.task.HugeTaskResult;
 import org.apache.hugegraph.type.HugeType;
 import org.apache.hugegraph.type.define.Cardinality;
 import org.apache.hugegraph.type.define.CollectionType;
@@ -92,10 +94,14 @@ public class HugeVertex extends HugeElement implements 
Vertex, Cloneable {
 
     @Override
     public HugeType type() {
-        if (label != null && label.name().equals(HugeTask.P.TASK)) {
+        if (label != null &&
+            (label.name().equals(HugeTask.P.TASK) ||
+             label.name().equals(HugeTaskResult.P.TASKRESULT))) {
             return HugeType.TASK;
         }
-        if (label != null && label.name().equals(HugeServerInfo.P.SERVER)) {
+        if (label != null &&
+            (label.name().equals(HugeServerInfo.P.SERVER) ||
+             label.name().equals(StandardClusterRoleStore.P.ROLE_DATA))) {
             return HugeType.SERVER;
         }
         return HugeType.VERTEX;
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
new file mode 100644
index 000000000..0ab97b7ca
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.page.PageInfo;
+import org.apache.hugegraph.backend.query.QueryResults;
+import org.apache.hugegraph.concurrent.LockGroup;
+import org.apache.hugegraph.concurrent.LockManager;
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.exception.ConnectionException;
+import org.apache.hugegraph.exception.NotFoundException;
+import org.apache.hugegraph.meta.MetaManager;
+import org.apache.hugegraph.meta.lock.LockResult;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.LockUtil;
+import org.apache.hugegraph.util.Log;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+
+public class DistributedTaskScheduler extends TaskAndResultScheduler {
+    protected static final int SCHEDULE_PERIOD = 10;
+    private static final Logger LOG = 
Log.logger(DistributedTaskScheduler.class);
+    private final ExecutorService taskDbExecutor;
+    private final ExecutorService schemaTaskExecutor;
+    private final ExecutorService olapTaskExecutor;
+    private final ExecutorService ephemeralTaskExecutor;
+    private final ExecutorService gremlinTaskExecutor;
+    private final ScheduledThreadPoolExecutor schedulerExecutor;
+    private final ScheduledFuture<?> cronFuture;
+
+    private final String lockGroupName;
+
+    /**
+     * the status of scheduler
+     */
+    private final AtomicBoolean closed = new AtomicBoolean(true);
+
+    private final ConcurrentHashMap<Id, HugeTask<?>> runningTasks = new 
ConcurrentHashMap<>();
+
+    public DistributedTaskScheduler(HugeGraphParams graph,
+                                    ScheduledThreadPoolExecutor 
schedulerExecutor,
+                                    ExecutorService taskDbExecutor,
+                                    ExecutorService schemaTaskExecutor,
+                                    ExecutorService olapTaskExecutor,
+                                    ExecutorService gremlinTaskExecutor,
+                                    ExecutorService ephemeralTaskExecutor,
+                                    ExecutorService serverInfoDbExecutor) {
+        super(graph, serverInfoDbExecutor);
+
+        this.taskDbExecutor = taskDbExecutor;
+        this.schemaTaskExecutor = schemaTaskExecutor;
+        this.olapTaskExecutor = olapTaskExecutor;
+        this.gremlinTaskExecutor = gremlinTaskExecutor;
+        this.ephemeralTaskExecutor = ephemeralTaskExecutor;
+
+        this.schedulerExecutor = schedulerExecutor;
+
+        lockGroupName = String.format("%s_%s_distributed", graphSpace, graph);
+        LockManager.instance().create(lockGroupName);
+
+        this.closed.set(false);
+
+        this.cronFuture = this.schedulerExecutor.scheduleWithFixedDelay(
+            () -> {
+                // TODO: uncomment later - graph space
+                // LockUtil.lock(this.graph().spaceGraphName(), 
LockUtil.GRAPH_LOCK);
+                LockUtil.lock("", LockUtil.GRAPH_LOCK);
+                try {
+                    // TODO: 使用超级管理员权限,查询任务
+                    // TaskManager.useAdmin();
+                    this.cronSchedule();
+                } catch (Throwable t) {
+                    LOG.info("cronScheduler exception ", t);
+                } finally {
+                    // TODO: uncomment later - graph space
+                    LockUtil.unlock("", LockUtil.GRAPH_LOCK);
+                    // LockUtil.unlock(this.graph().spaceGraphName(), 
LockUtil.GRAPH_LOCK);
+                }
+            },
+            10L, SCHEDULE_PERIOD,
+            TimeUnit.SECONDS);
+    }
+
+    private static boolean sleep(long ms) {
+        try {
+            Thread.sleep(ms);
+            return true;
+        } catch (InterruptedException ignored) {
+            // Ignore InterruptedException
+            return false;
+        }
+    }
+
+    public void cronSchedule() {
+        // 执行周期调度任务
+
+        if (!this.graph.started() || this.graph.closed()) {
+            return;
+        }
+
+        // 处理 NEW 状态的任务
+        Iterator<HugeTask<Object>> news = queryTaskWithoutResultByStatus(
+            TaskStatus.NEW);
+
+        while (!this.closed.get() && news.hasNext()) {
+            HugeTask<?> newTask = news.next();
+            LOG.info("Try to start task({})@({}/{})", newTask.id(),
+                     this.graphSpace, this.graphName);
+            if (!tryStartHugeTask(newTask)) {
+                // 任务提交失败时,线程池已打满
+                break;
+            }
+        }
+
+        // 处理 RUNNING 状态的任务
+        Iterator<HugeTask<Object>> runnings =
+            queryTaskWithoutResultByStatus(TaskStatus.RUNNING);
+
+        while (!this.closed.get() && runnings.hasNext()) {
+            HugeTask<?> running = runnings.next();
+            initTaskParams(running);
+            if (!isLockedTask(running.id().toString())) {
+                LOG.info("Try to update task({})@({}/{}) status" +
+                         "(RUNNING->FAILED)", running.id(), this.graphSpace,
+                         this.graphName);
+                updateStatusWithLock(running.id(), TaskStatus.RUNNING,
+                                     TaskStatus.FAILED);
+                runningTasks.remove(running.id());
+            }
+        }
+
+        // 处理 FAILED/HANGING 状态的任务
+        Iterator<HugeTask<Object>> faileds =
+            queryTaskWithoutResultByStatus(TaskStatus.FAILED);
+
+        while (!this.closed.get() && faileds.hasNext()) {
+            HugeTask<?> failed = faileds.next();
+            initTaskParams(failed);
+            if (failed.retries() < 
this.graph().option(CoreOptions.TASK_RETRY)) {
+                LOG.info("Try to update task({})@({}/{}) status(FAILED->NEW)",
+                         failed.id(), this.graphSpace, this.graphName);
+                updateStatusWithLock(failed.id(), TaskStatus.FAILED,
+                                     TaskStatus.NEW);
+            }
+        }
+
+        // 处理 CANCELLING 状态的任务
+        Iterator<HugeTask<Object>> cancellings = 
queryTaskWithoutResultByStatus(
+            TaskStatus.CANCELLING);
+
+        while (!this.closed.get() && cancellings.hasNext()) {
+            Id cancellingId = cancellings.next().id();
+            if (runningTasks.containsKey(cancellingId)) {
+                HugeTask<?> cancelling = runningTasks.get(cancellingId);
+                initTaskParams(cancelling);
+                LOG.info("Try to cancel task({})@({}/{})",
+                         cancelling.id(), this.graphSpace, this.graphName);
+                cancelling.cancel(true);
+
+                runningTasks.remove(cancellingId);
+            } else {
+                // 本地没有执行任务,但是当前任务已经无节点在执行
+                if (!isLockedTask(cancellingId.toString())) {
+                    updateStatusWithLock(cancellingId, TaskStatus.CANCELLING,
+                                         TaskStatus.CANCELLED);
+                }
+            }
+        }
+
+        // 处理 DELETING 状态的任务
+        Iterator<HugeTask<Object>> deletings = queryTaskWithoutResultByStatus(
+            TaskStatus.DELETING);
+
+        while (!this.closed.get() && deletings.hasNext()) {
+            Id deletingId = deletings.next().id();
+            if (runningTasks.containsKey(deletingId)) {
+                HugeTask<?> deleting = runningTasks.get(deletingId);
+                initTaskParams(deleting);
+                deleting.cancel(true);
+
+                // 删除存储信息
+                deleteFromDB(deletingId);
+
+                runningTasks.remove(deletingId);
+            } else {
+                // 本地没有执行任务,但是当前任务已经无节点在执行
+                if (!isLockedTask(deletingId.toString())) {
+                    deleteFromDB(deletingId);
+                }
+            }
+        }
+    }
+
+    protected <V> Iterator<HugeTask<V>> 
queryTaskWithoutResultByStatus(TaskStatus status) {
+        if (this.closed.get()) {
+            return QueryResults.emptyIterator();
+        }
+        return queryTaskWithoutResult(HugeTask.P.STATUS, status.code(), 
NO_LIMIT, null);
+    }
+
+    @Override
+    public HugeGraph graph() {
+        return this.graph.graph();
+    }
+
+    @Override
+    public int pendingTasks() {
+        return this.runningTasks.size();
+    }
+
+    @Override
+    public <V> void restoreTasks() {
+        // DO Nothing!
+    }
+
+    @Override
+    public <V> Future<?> schedule(HugeTask<V> task) {
+        E.checkArgumentNotNull(task, "Task can't be null");
+
+        initTaskParams(task);
+
+        if (task.ephemeralTask()) {
+            // 处理 ephemeral 任务,不需要调度,直接执行
+            return this.ephemeralTaskExecutor.submit(task);
+        }
+
+        // 处理 schema 任务
+        // 处理 gremlin 任务
+        // 处理 olap 计算任务
+        // 添加任务到 DB,当前任务状态为 NEW
+        // TODO: save server id for task
+        this.save(task);
+
+        if (!this.closed.get()) {
+            LOG.info("Try to start task({})@({}/{}) immediately", task.id(),
+                     this.graphSpace, this.graphName);
+            tryStartHugeTask(task);
+        } else {
+            LOG.info("TaskScheduler has closed");
+        }
+
+        return null;
+    }
+
+    protected <V> void initTaskParams(HugeTask<V> task) {
+        // 绑定当前任务执行所需的环境变量
+        // 在任务反序列化和执行之前,均需要调用该方法
+        task.scheduler(this);
+        TaskCallable<V> callable = task.callable();
+        callable.task(task);
+        callable.graph(this.graph());
+
+        if (callable instanceof TaskCallable.SysTaskCallable) {
+            ((TaskCallable.SysTaskCallable<?>) callable).params(this.graph);
+        }
+    }
+
+    @Override
+    public <V> void cancel(HugeTask<V> task) {
+        // 更新状态为 CANCELLING
+        if (!task.completed()) {
+            // 任务未完成,才可执行状态未 CANCELLING
+            this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
+        } else {
+            LOG.info("cancel task({}) error, task has completed", task.id());
+        }
+    }
+
+    @Override
+    public void init() {
+        this.call(() -> this.tx().initSchema());
+    }
+
+    protected <V> HugeTask<V> deleteFromDB(Id id) {
+        // 从 DB 中删除 Task,不检查任务状态
+        return this.call(() -> {
+            Iterator<Vertex> vertices = this.tx().queryTaskInfos(id);
+            HugeVertex vertex = (HugeVertex) QueryResults.one(vertices);
+            if (vertex == null) {
+                return null;
+            }
+            HugeTask<V> result = HugeTask.fromVertex(vertex);
+            this.tx().removeVertex(vertex);
+            return result;
+        });
+    }
+
+    @Override
+    public <V> HugeTask<V> delete(Id id, boolean force) {
+        if (!force) {
+            // 更改状态为 DELETING,通过自动调度实现删除操作
+            this.updateStatus(id, null, TaskStatus.DELETING);
+            return null;
+        } else {
+            return this.deleteFromDB(id);
+        }
+    }
+
+    @Override
+    public boolean close() {
+        if (this.closed.get()) {
+            return true;
+        }
+
+        // set closed
+        this.closed.set(true);
+
+        // cancel all running tasks
+        for (HugeTask<?> task : this.runningTasks.values()) {
+            LOG.info("cancel task({}) @({}/{}) when closing scheduler",
+                     task.id(), graphSpace, graphName);
+            this.cancel(task);
+        }
+
+        try {
+            this.waitUntilAllTasksCompleted(10);
+        } catch (TimeoutException e) {
+            LOG.warn("Tasks not completed when close distributed task 
scheduler", e);
+        }
+
+        // cancel cron thread
+        if (!cronFuture.isDone() && !cronFuture.isCancelled()) {
+            cronFuture.cancel(false);
+        }
+
+        if (!this.taskDbExecutor.isShutdown()) {
+            this.call(() -> {
+                try {
+                    this.tx().close();
+                } catch (ConnectionException ignored) {
+                    // ConnectionException means no connection established
+                }
+                this.graph.closeTx();
+            });
+        }
+        return true;
+    }
+
+    @Override
+    public <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds)
+        throws TimeoutException {
+        return this.waitUntilTaskCompleted(id, seconds, QUERY_INTERVAL);
+    }
+
+    @Override
+    public <V> HugeTask<V> waitUntilTaskCompleted(Id id)
+        throws TimeoutException {
+        // This method is just used by tests
+        long timeout = this.graph.configuration()
+                                 .get(CoreOptions.TASK_WAIT_TIMEOUT);
+        return this.waitUntilTaskCompleted(id, timeout, 1L);
+    }
+
+    private <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds,
+                                                   long intervalMs)
+        throws TimeoutException {
+        long passes = seconds * 1000 / intervalMs;
+        HugeTask<V> task = null;
+        for (long pass = 0; ; pass++) {
+            try {
+                task = this.taskWithoutResult(id);
+            } catch (NotFoundException e) {
+                if (task != null && task.completed()) {
+                    assert task.id().asLong() < 0L : task.id();
+                    sleep(intervalMs);
+                    return task;
+                }
+                throw e;
+            }
+            if (task.completed()) {
+                // Wait for task result being set after status is completed
+                sleep(intervalMs);
+                // 查询带有结果的任务信息
+                task = this.task(id);
+                return task;
+            }
+            if (pass >= passes) {
+                break;
+            }
+            sleep(intervalMs);
+        }
+        throw new TimeoutException(String.format(
+            "Task '%s' was not completed in %s seconds", id, seconds));
+    }
+
+    @Override
+    public void waitUntilAllTasksCompleted(long seconds)
+        throws TimeoutException {
+        long passes = seconds * 1000 / QUERY_INTERVAL;
+        int taskSize = 0;
+        for (long pass = 0; ; pass++) {
+            taskSize = this.pendingTasks();
+            if (taskSize == 0) {
+                sleep(QUERY_INTERVAL);
+                return;
+            }
+            if (pass >= passes) {
+                break;
+            }
+            sleep(QUERY_INTERVAL);
+        }
+        throw new TimeoutException(String.format(
+            "There are still %s incomplete tasks after %s seconds",
+            taskSize, seconds));
+
+    }
+
+    @Override
+    public void checkRequirement(String op) {
+        if (!this.serverManager().master()) {
+            throw new HugeException("Can't %s task on non-master server", op);
+        }
+    }
+
+    @Override
+    public <V> V call(Callable<V> callable) {
+        return this.call(callable, this.taskDbExecutor);
+    }
+
+    @Override
+    public <V> V call(Runnable runnable) {
+        return this.call(Executors.callable(runnable, null));
+    }
+
+    private <V> V call(Callable<V> callable, ExecutorService executor) {
+        try {
+            callable = new TaskManager.ContextCallable<>(callable);
+            return executor.submit(callable).get();
+        } catch (Exception e) {
+            throw new HugeException("Failed to update/query TaskStore for " +
+                                    "graph(%s/%s): %s", e, this.graphSpace,
+                                    this.graph.name(), e.toString());
+        }
+    }
+
+    protected boolean updateStatus(Id id, TaskStatus prestatus,
+                                   TaskStatus status) {
+        HugeTask<Object> task = this.taskWithoutResult(id);
+        initTaskParams(task);
+        if (prestatus == null || task.status() == prestatus) {
+            task.overwriteStatus(status);
+            // 如果状态更新为 FAILED -> NEW,则增加重试次数
+            if (prestatus == TaskStatus.FAILED && status == TaskStatus.NEW) {
+                task.retry();
+            }
+            this.save(task);
+            LOG.info("Update task({}) success: pre({}), status({})",
+                     id, prestatus, status);
+
+            return true;
+        } else {
+            LOG.info("Update task({}) status conflict: current({}), " +
+                     "pre({}), status({})", id, task.status(),
+                     prestatus, status);
+            return false;
+        }
+    }
+
+    protected boolean updateStatusWithLock(Id id, TaskStatus prestatus,
+                                           TaskStatus status) {
+
+        LockResult lockResult = tryLockTask(id.asString());
+
+        if (lockResult.lockSuccess()) {
+            try {
+                return updateStatus(id, prestatus, status);
+            } finally {
+                unlockTask(id.asString(), lockResult);
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * try to start task;
+     *
+     * @param task
+     * @return true if the task have start
+     */
+    private boolean tryStartHugeTask(HugeTask<?> task) {
+        // Print Scheduler status
+        logCurrentState();
+
+        initTaskParams(task);
+
+        ExecutorService chosenExecutor = gremlinTaskExecutor;
+
+        if (task.computer()) {
+            chosenExecutor = this.olapTaskExecutor;
+        }
+
+        // TODO: uncomment later - vermeer job
+        //if (task.vermeer()) {
+        //    chosenExecutor = this.olapTaskExecutor;
+        //}
+
+        if (task.gremlinTask()) {
+            chosenExecutor = this.gremlinTaskExecutor;
+        }
+
+        if (task.schemaTask()) {
+            chosenExecutor = schemaTaskExecutor;
+        }
+
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) chosenExecutor;
+        if (executor.getActiveCount() < executor.getMaximumPoolSize()) {
+            TaskRunner<?> runner = new TaskRunner<>(task);
+            chosenExecutor.submit(runner);
+            LOG.info("Start task({})@({}/{})", task.id(),
+                     this.graphSpace, this.graphName);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    protected void logCurrentState() {
+        int gremlinActive =
+            ((ThreadPoolExecutor) gremlinTaskExecutor).getActiveCount();
+        int schemaActive =
+            ((ThreadPoolExecutor) schemaTaskExecutor).getActiveCount();
+        int ephemeralActive =
+            ((ThreadPoolExecutor) ephemeralTaskExecutor).getActiveCount();
+        int olapActive =
+            ((ThreadPoolExecutor) olapTaskExecutor).getActiveCount();
+
+        LOG.info("Current State: gremlinTaskExecutor({}), schemaTaskExecutor" +
+                 "({}), ephemeralTaskExecutor({}), olapTaskExecutor({})",
+                 gremlinActive, schemaActive, ephemeralActive, olapActive);
+    }
+
+    private LockResult tryLockTask(String taskId) {
+
+        LockResult lockResult = new LockResult();
+
+        LockGroup lockGroup = LockManager.instance().get(lockGroupName);
+        Lock localLock = lockGroup.lock(taskId);
+
+        if (!localLock.tryLock()) {
+            return new LockResult();
+        }
+
+        try {
+            lockResult =
+                MetaManager.instance().tryLockTask(graphSpace, graphName,
+                                                   taskId);
+        } catch (Throwable t) {
+            LOG.info(String.format("try to lock task(%s) error", taskId), t);
+        }
+
+        if (!lockResult.lockSuccess()) {
+            localLock.unlock();
+        }
+
+        return lockResult;
+    }
+
+    private void unlockTask(String taskId, LockResult lockResult) {
+
+        try {
+            MetaManager.instance().unlockTask(graphSpace, graphName, taskId,
+                                              lockResult);
+        } catch (Throwable t) {
+            LOG.info(String.format("try to unlock task(%s) error",
+                                   taskId), t);
+        }
+
+        LockGroup lockGroup = LockManager.instance().get(lockGroupName);
+        Lock localLock = lockGroup.lock(taskId);
+
+        localLock.unlock();
+    }
+
+    private boolean isLockedTask(String taskId) {
+        return MetaManager.instance().isLockedTask(graphSpace,
+                                                   graphName, taskId);
+    }
+
+    private class TaskRunner<V> implements Runnable {
+
+        private final HugeTask<V> task;
+
+        public TaskRunner(HugeTask<V> task) {
+            this.task = task;
+        }
+
+        @Override
+        public void run() {
+            LockResult lockResult = tryLockTask(task.id().asString());
+
+            initTaskParams(task);
+            if (lockResult.lockSuccess()) {
+
+                TaskManager.setContext(task.context());
+                try {
+                    // 1. start task can be from schedule() & cronSchedule()
+                    // 2. recheck the status of task, in case one same task
+                    // called by both methods at same time;
+                    HugeTask<Object> queryTask = task(this.task.id());
+                    if (queryTask != null &&
+                        !TaskStatus.NEW.equals(queryTask.status())) {
+                        return;
+                    }
+
+                    runningTasks.put(task.id(), task);
+
+                    // 任务执行不会抛出异常,HugeTask 在执行过程中,会捕获异常,并存储到 DB 中
+                    task.run();
+                } catch (Throwable t) {
+                    LOG.info("exception when execute task", t);
+                } finally {
+                    runningTasks.remove(task.id());
+                    unlockTask(task.id().asString(), lockResult);
+
+                    LOG.info("task({}) finished.", task.id().toString());
+                }
+            }
+        }
+    }
+
+    @Override
+    public String graphName() {
+        return this.graph.name();
+    }
+
+    @Override
+    public void taskDone(HugeTask<?> task) {
+        // DO Nothing
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
index 5a3439ac7..cfa1bbccc 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java
@@ -32,6 +32,8 @@ import java.util.stream.Collectors;
 import org.apache.hugegraph.backend.id.Id;
 import org.apache.hugegraph.backend.id.IdGenerator;
 import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.job.GremlinJob;
+import org.apache.hugegraph.job.schema.SchemaJob;
 import org.apache.hugegraph.type.define.SerialEnum;
 import org.apache.hugegraph.util.*;
 import org.apache.tinkerpop.gremlin.structure.Graph.Hidden;
@@ -218,6 +220,10 @@ public class HugeTask<V> extends FutureTask<V> {
         return this.result;
     }
 
+    public synchronized void result(HugeTaskResult result) {
+        this.result = result.result();
+    }
+
     private synchronized boolean result(TaskStatus status, String result) {
         checkPropertySize(result, P.RESULT);
         if (this.status(status)) {
@@ -263,6 +269,18 @@ public class HugeTask<V> extends FutureTask<V> {
         return ComputerJob.COMPUTER.equals(this.type);
     }
 
+    public boolean schemaTask() {
+        return this.callable instanceof SchemaJob;
+    }
+
+    public boolean gremlinTask() {
+        return this.callable instanceof GremlinJob;
+    }
+
+    public boolean ephemeralTask() {
+        return this.callable instanceof EphemeralJob;
+    }
+
     @Override
     public String toString() {
         return String.format("HugeTask(%s)%s", this.id, this.asMap());
@@ -346,9 +364,7 @@ public class HugeTask<V> extends FutureTask<V> {
         } catch (Throwable e) {
             LOG.error("An exception occurred when calling done()", e);
         } finally {
-            StandardTaskScheduler scheduler = (StandardTaskScheduler)
-                                              this.scheduler();
-            scheduler.taskDone(this);
+            this.scheduler().taskDone(this);
         }
     }
 
@@ -428,6 +444,10 @@ public class HugeTask<V> extends FutureTask<V> {
         return false;
     }
 
+    public synchronized void overwriteStatus(TaskStatus status) {
+        this.status = status;
+    }
+
     protected void property(String key, Object value) {
         E.checkNotNull(key, "property key");
         switch (key) {
@@ -560,6 +580,75 @@ public class HugeTask<V> extends FutureTask<V> {
         return list.toArray();
     }
 
+    protected synchronized Object[] asArrayWithoutResult() {
+        E.checkState(this.type != null, "Task type can't be null");
+        E.checkState(this.name != null, "Task name can't be null");
+
+        List<Object> list = new ArrayList<>(28);
+
+        list.add(T.label);
+        list.add(P.TASK);
+
+        list.add(T.id);
+        list.add(this.id);
+
+        list.add(P.TYPE);
+        list.add(this.type);
+
+        list.add(P.NAME);
+        list.add(this.name);
+
+        list.add(P.CALLABLE);
+        list.add(this.callable.getClass().getName());
+
+        list.add(P.STATUS);
+        list.add(this.status.code());
+
+        list.add(P.PROGRESS);
+        list.add(this.progress);
+
+        list.add(P.CREATE);
+        list.add(this.create);
+
+        list.add(P.RETRIES);
+        list.add(this.retries);
+
+        if (this.description != null) {
+            list.add(P.DESCRIPTION);
+            list.add(this.description);
+        }
+
+        if (this.context != null) {
+            list.add(P.CONTEXT);
+            list.add(this.context);
+        }
+
+        if (this.update != null) {
+            list.add(P.UPDATE);
+            list.add(this.update);
+        }
+
+        if (this.dependencies != null) {
+            list.add(P.DEPENDENCIES);
+            list.add(this.dependencies.stream().map(Id::asLong)
+                                      .collect(toOrderSet()));
+        }
+
+        if (this.input != null) {
+            byte[] bytes = StringEncoding.compress(this.input);
+            checkPropertySize(bytes.length, P.INPUT);
+            list.add(P.INPUT);
+            list.add(bytes);
+        }
+
+        if (this.server != null) {
+            list.add(P.SERVER);
+            list.add(this.server.asString());
+        }
+
+        return list.toArray();
+    }
+
     public Map<String, Object> asMap() {
         return this.asMap(true);
     }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTaskResult.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTaskResult.java
new file mode 100644
index 000000000..24fc186cd
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTaskResult.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.SchemaManager;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.type.define.Cardinality;
+import org.apache.hugegraph.type.define.DataType;
+import org.apache.hugegraph.util.Blob;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Log;
+import org.apache.hugegraph.util.StringEncoding;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.slf4j.Logger;
+
+public class HugeTaskResult {
+    private static final Logger LOG = Log.logger(HugeTaskResult.class);
+    private static final float DECOMPRESS_RATIO = 10.0F;
+    private final String taskResultId;
+    private volatile String result;
+
+    public HugeTaskResult(String taskId) {
+        this.taskResultId = taskId;
+        this.result = null;
+    }
+
+    public static String genId(Id taskId) {
+        return String.format("task_result_%d", taskId.asLong());
+    }
+
+    public static HugeTaskResult fromVertex(Vertex vertex) {
+        Id taskResultId = (Id) vertex.id();
+        HugeTaskResult taskResult = new 
HugeTaskResult(taskResultId.asString());
+        for (Iterator<VertexProperty<Object>> iter = vertex.properties(); 
iter.hasNext(); ) {
+            VertexProperty<Object> prop = iter.next();
+            taskResult.property(prop.key(), prop.value());
+        }
+        return taskResult;
+    }
+
+    public String taskResultId() {
+        return this.taskResultId;
+    }
+
+    public void result(String result) {
+        this.result = result;
+    }
+
+    public String result() {
+        return this.result;
+    }
+
+    protected synchronized Object[] asArray() {
+
+        List<Object> list = new ArrayList<>(6);
+
+        list.add(T.label);
+        list.add(HugeTaskResult.P.TASKRESULT);
+
+        list.add(T.id);
+        list.add(this.taskResultId);
+
+        if (this.result != null) {
+            byte[] bytes = StringEncoding.compress(this.result);
+            list.add(HugeTaskResult.P.RESULT);
+            list.add(bytes);
+        }
+
+        return list.toArray();
+    }
+
+    protected void property(String key, Object value) {
+        E.checkNotNull(key, "property key");
+        switch (key) {
+            case P.RESULT:
+                this.result = StringEncoding.decompress(((Blob) 
value).bytes(), DECOMPRESS_RATIO);
+                break;
+            default:
+                throw new AssertionError("Unsupported key: " + key);
+        }
+    }
+
+    public static final class P {
+
+        public static final String TASKRESULT = 
Graph.Hidden.hide("taskresult");
+
+        public static final String RESULT = "~result_result";
+
+        public static String unhide(String key) {
+            final String prefix = Graph.Hidden.hide("result_");
+            if (key.startsWith(prefix)) {
+                return key.substring(prefix.length());
+            }
+            return key;
+        }
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
index 50d0a13c7..7c97f302e 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
@@ -79,11 +79,6 @@ public class StandardTaskScheduler implements TaskScheduler {
 
     private volatile TaskTransaction taskTx;
 
-    private static final long NO_LIMIT = -1L;
-    private static final long PAGE_SIZE = 500L;
-    private static final long QUERY_INTERVAL = 100L;
-    private static final int MAX_PENDING_TASKS = 10000;
-
     public StandardTaskScheduler(HugeGraphParams graph,
                                  ExecutorService taskExecutor,
                                  ExecutorService taskDbExecutor,
@@ -107,6 +102,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
         return this.graph.graph();
     }
 
+    @Override
     public String graphName() {
         return this.graph.name();
     }
@@ -299,7 +295,8 @@ public class StandardTaskScheduler implements TaskScheduler 
{
                                 task.id(), task.status());
     }
 
-    protected ServerInfoManager serverManager() {
+    @Override
+    public ServerInfoManager serverManager() {
         return this.serverManager;
     }
 
@@ -419,7 +416,8 @@ public class StandardTaskScheduler implements TaskScheduler 
{
         } while (page != null);
     }
 
-    protected void taskDone(HugeTask<?> task) {
+    @Override
+    public void taskDone(HugeTask<?> task) {
         this.remove(task);
 
         Id selfServerId = this.serverManager().selfServerId();
@@ -433,13 +431,17 @@ public class StandardTaskScheduler implements 
TaskScheduler {
     }
 
     protected void remove(HugeTask<?> task) {
+        this.remove(task, false);
+    }
+
+    protected void remove(HugeTask<?> task, boolean force) {
         E.checkNotNull(task, "remove task");
         HugeTask<?> delTask = this.tasks.remove(task.id());
         if (delTask != null && delTask != task) {
             LOG.warn("Task '{}' may be inconsistent status {}(expect {})",
                       task.id(), task.status(), delTask.status());
         }
-        assert delTask == null || delTask.completed() ||
+        assert force || delTask == null || delTask.completed() ||
                delTask.cancelling() || delTask.isCancelled() : delTask;
     }
 
@@ -550,7 +552,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
     }
 
     @Override
-    public <V> HugeTask<V> delete(Id id) {
+    public <V> HugeTask<V> delete(Id id, boolean force) {
         this.checkOnMasterNode("delete");
 
         HugeTask<?> task = this.task(id);
@@ -565,11 +567,11 @@ public class StandardTaskScheduler implements 
TaskScheduler {
          * when the database status is inconsistent.
          */
         if (task != null) {
-            E.checkArgument(task.completed(),
+            E.checkArgument(force || task.completed(),
                             "Can't delete incomplete task '%s' in status %s" +
                             ", Please try to cancel the task first",
                             id, task.status());
-            this.remove(task);
+            this.remove(task, force);
         }
 
         return this.call(() -> {
@@ -579,7 +581,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
                 return null;
             }
             HugeTask<V> result = HugeTask.fromVertex(vertex);
-            E.checkState(result.completed(),
+            E.checkState(force || result.completed(),
                          "Can't delete incomplete task '%s' in status %s",
                          id, result.status());
             this.tx().removeVertex(vertex);
@@ -704,11 +706,13 @@ public class StandardTaskScheduler implements 
TaskScheduler {
         });
     }
 
-    private <V> V call(Runnable runnable) {
+    @Override
+    public <V> V call(Runnable runnable) {
         return this.call(Executors.callable(runnable, null));
     }
 
-    private <V> V call(Callable<V> callable) {
+    @Override
+    public  <V> V call(Callable<V> callable) {
         assert !Thread.currentThread().getName().startsWith(
                "task-db-worker") : "can't call by itself";
         try {
@@ -741,129 +745,4 @@ public class StandardTaskScheduler implements 
TaskScheduler {
             return false;
         }
     }
-
-    private static class TaskTransaction extends GraphTransaction {
-
-        public static final String TASK = P.TASK;
-
-        public TaskTransaction(HugeGraphParams graph, BackendStore store) {
-            super(graph, store);
-            this.autoCommit(true);
-        }
-
-        public HugeVertex constructVertex(HugeTask<?> task) {
-            if (!this.graph().existsVertexLabel(TASK)) {
-                throw new HugeException("Schema is missing for task(%s) '%s'",
-                                        task.id(), task.name());
-            }
-            return this.constructVertex(false, task.asArray());
-        }
-
-        public void deleteIndex(HugeVertex vertex) {
-            // Delete the old record if exist
-            Iterator<Vertex> old = this.queryTaskInfos(vertex.id());
-            HugeVertex oldV = (HugeVertex) QueryResults.one(old);
-            if (oldV == null) {
-                return;
-            }
-            this.deleteIndexIfNeeded(oldV, vertex);
-        }
-
-        private boolean deleteIndexIfNeeded(HugeVertex oldV, HugeVertex newV) {
-            if (!oldV.value(P.STATUS).equals(newV.value(P.STATUS))) {
-                // Only delete vertex if index value changed else override it
-                this.updateIndex(this.indexLabel(P.STATUS).id(), oldV, true);
-                return true;
-            }
-            return false;
-        }
-
-        public void initSchema() {
-            if (this.existVertexLabel(TASK)) {
-                return;
-            }
-
-            HugeGraph graph = this.graph();
-            String[] properties = this.initProperties();
-
-            // Create vertex label '~task'
-            VertexLabel label = graph.schema().vertexLabel(TASK)
-                                     .properties(properties)
-                                     .useCustomizeNumberId()
-                                     .nullableKeys(P.DESCRIPTION, P.CONTEXT,
-                                                   P.UPDATE, P.INPUT, P.RESULT,
-                                                   P.DEPENDENCIES, P.SERVER)
-                                     .enableLabelIndex(true)
-                                     .build();
-            this.params().schemaTransaction().addVertexLabel(label);
-
-            // Create index
-            this.createIndexLabel(label, P.STATUS);
-        }
-
-        private boolean existVertexLabel(String label) {
-            return this.params().schemaTransaction()
-                                .getVertexLabel(label) != null;
-        }
-
-        private String[] initProperties() {
-            List<String> props = new ArrayList<>();
-
-            props.add(createPropertyKey(P.TYPE));
-            props.add(createPropertyKey(P.NAME));
-            props.add(createPropertyKey(P.CALLABLE));
-            props.add(createPropertyKey(P.DESCRIPTION));
-            props.add(createPropertyKey(P.CONTEXT));
-            props.add(createPropertyKey(P.STATUS, DataType.BYTE));
-            props.add(createPropertyKey(P.PROGRESS, DataType.INT));
-            props.add(createPropertyKey(P.CREATE, DataType.DATE));
-            props.add(createPropertyKey(P.UPDATE, DataType.DATE));
-            props.add(createPropertyKey(P.RETRIES, DataType.INT));
-            props.add(createPropertyKey(P.INPUT, DataType.BLOB));
-            props.add(createPropertyKey(P.RESULT, DataType.BLOB));
-            props.add(createPropertyKey(P.DEPENDENCIES, DataType.LONG,
-                                        Cardinality.SET));
-            props.add(createPropertyKey(P.SERVER));
-
-            return props.toArray(new String[0]);
-        }
-
-        private String createPropertyKey(String name) {
-            return this.createPropertyKey(name, DataType.TEXT);
-        }
-
-        private String createPropertyKey(String name, DataType dataType) {
-            return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
-        }
-
-        private String createPropertyKey(String name, DataType dataType,
-                                         Cardinality cardinality) {
-            HugeGraph graph = this.graph();
-            SchemaManager schema = graph.schema();
-            PropertyKey propertyKey = schema.propertyKey(name)
-                                            .dataType(dataType)
-                                            .cardinality(cardinality)
-                                            .build();
-            this.params().schemaTransaction().addPropertyKey(propertyKey);
-            return name;
-        }
-
-        private IndexLabel createIndexLabel(VertexLabel label, String field) {
-            HugeGraph graph = this.graph();
-            SchemaManager schema = graph.schema();
-            String name = Hidden.hide("task-index-by-" + field);
-            IndexLabel indexLabel = schema.indexLabel(name)
-                                          .on(HugeType.VERTEX_LABEL, TASK)
-                                          .by(field)
-                                          .build();
-            this.params().schemaTransaction().addIndexLabel(label, indexLabel);
-            return indexLabel;
-        }
-
-        private IndexLabel indexLabel(String field) {
-            String name = Hidden.hide("task-index-by-" + field);
-            HugeGraph graph = this.graph();
-            return graph.indexLabel(name);
-        }
-    }
 }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java
new file mode 100644
index 000000000..f076f6c46
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.query.Condition;
+import org.apache.hugegraph.backend.query.ConditionQuery;
+import org.apache.hugegraph.backend.query.QueryResults;
+import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.exception.NotFoundException;
+import org.apache.hugegraph.iterator.ListIterator;
+import org.apache.hugegraph.iterator.MapperIterator;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.type.define.HugeKeys;
+import org.apache.hugegraph.util.E;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Base class of task & result scheduler
+ */
+public abstract class TaskAndResultScheduler implements TaskScheduler {
+    /**
+     * Which graph the scheduler belongs to
+     */
+    protected final HugeGraphParams graph;
+    protected final String graphSpace;
+    protected final String graphName;
+
+    /**
+     * Task transactions, for persistence
+     */
+    protected volatile TaskAndResultTransaction taskTx = null;
+
+    private final ServerInfoManager serverManager;
+
+    public TaskAndResultScheduler(
+        HugeGraphParams graph,
+        ExecutorService serverInfoDbExecutor) {
+        E.checkNotNull(graph, "graph");
+
+        this.graph = graph;
+        // TODO: uncomment later - graph space
+        // this.graphSpace = graph.graph().graphSpace();
+        this.graphSpace = "";
+        this.graphName = graph.name();
+
+        this.serverManager = new ServerInfoManager(graph, 
serverInfoDbExecutor);
+    }
+
+    @Override
+    public <V> void save(HugeTask<V> task) {
+        E.checkArgumentNotNull(task, "Task can't be null");
+        String rawResult = task.result();
+
+        // Save task without result;
+        this.call(() -> {
+            // Construct vertex from task
+            HugeVertex vertex = this.tx().constructTaskVertex(task);
+            // Delete index of old vertex to avoid stale index
+            this.tx().deleteIndex(vertex);
+            // Add or update task info to backend store
+            return this.tx().addVertex(vertex);
+        });
+
+        // 保存 result 结果
+        if (rawResult != null) {
+            HugeTaskResult result =
+                new HugeTaskResult(HugeTaskResult.genId(task.id()));
+            result.result(rawResult);
+
+            this.call(() -> {
+                // Construct vertex from task
+                HugeVertex vertex = 
this.tx().constructTaskResultVertex(result);
+                // Add or update task info to backend store
+                return this.tx().addVertex(vertex);
+            });
+        }
+    }
+
+    @Override
+    public <V> HugeTask<V> task(Id id) {
+        HugeTask<V> task = this.call(() -> {
+            Iterator<Vertex> vertices = this.tx().queryTaskInfos(id);
+            Vertex vertex = QueryResults.one(vertices);
+            if (vertex == null) {
+                return null;
+            }
+            return HugeTask.fromVertex(vertex);
+        });
+
+        if (task == null) {
+            throw new NotFoundException("Can't find task with id '%s'", id);
+        }
+
+        HugeTaskResult taskResult = queryTaskResult(id);
+        if (taskResult != null) {
+            task.result(taskResult);
+        }
+
+        return task;
+    }
+
+    @Override
+    public <V> Iterator<HugeTask<V>> tasks(List<Id> ids) {
+        return this.tasksWithoutResult(ids);
+    }
+
+    @Override
+    public <V> Iterator<HugeTask<V>> tasks(TaskStatus status, long limit,
+                                           String page) {
+        if (status == null) {
+            return this.queryTaskWithoutResult(ImmutableMap.of(), limit, page);
+        }
+        return this.queryTaskWithoutResult(HugeTask.P.STATUS, status.code(),
+                                           limit, page);
+    }
+
+    protected <V> Iterator<HugeTask<V>> queryTask(String key, Object value,
+                                                  long limit, String page) {
+        return this.queryTask(ImmutableMap.of(key, value), limit, page);
+    }
+
+    protected <V> Iterator<HugeTask<V>> queryTask(Map<String, Object> 
conditions,
+                                                  long limit, String page) {
+        Iterator<HugeTask<V>> ts = this.call(() -> {
+            ConditionQuery query = new ConditionQuery(HugeType.TASK);
+            if (page != null) {
+                query.page(page);
+            }
+            VertexLabel vl = this.graph().vertexLabel(HugeTask.P.TASK);
+            query.eq(HugeKeys.LABEL, vl.id());
+            for (Map.Entry<String, Object> entry : conditions.entrySet()) {
+                PropertyKey pk = this.graph().propertyKey(entry.getKey());
+                query.query(Condition.eq(pk.id(), entry.getValue()));
+            }
+            query.showHidden(true);
+            if (limit != NO_LIMIT) {
+                query.limit(limit);
+            }
+            Iterator<Vertex> vertices = this.tx().queryTaskInfos(query);
+            Iterator<HugeTask<V>> tasks =
+                new MapperIterator<>(vertices, HugeTask::fromVertex);
+            // Convert iterator to list to avoid across thread tx accessed
+            return QueryResults.toList(tasks);
+        });
+
+        return new MapperIterator<>(ts, (task) -> {
+            HugeTaskResult taskResult = queryTaskResult(task.id());
+            if (taskResult != null) {
+                task.result(taskResult);
+            }
+            return task;
+        });
+    }
+
+    protected <V> Iterator<HugeTask<V>> queryTask(List<Id> ids) {
+        ListIterator<HugeTask<V>> ts = this.call(
+            () -> {
+                Object[] idArray = ids.toArray(new Id[ids.size()]);
+                Iterator<Vertex> vertices = this.tx()
+                                                .queryTaskInfos(idArray);
+                Iterator<HugeTask<V>> tasks =
+                    new MapperIterator<>(vertices,
+                                         HugeTask::fromVertex);
+                // Convert iterator to list to avoid across thread tx accessed
+                return QueryResults.toList(tasks);
+            });
+
+        Iterator<HugeTaskResult> results = queryTaskResult(ids);
+
+        HashMap<String, HugeTaskResult> resultCaches = new HashMap<>();
+        while (results.hasNext()) {
+            HugeTaskResult entry = results.next();
+            resultCaches.put(entry.taskResultId(), entry);
+        }
+
+        return new MapperIterator<>(ts, (task) -> {
+            HugeTaskResult taskResult =
+                resultCaches.get(HugeTaskResult.genId(task.id()));
+            if (taskResult != null) {
+                task.result(taskResult);
+            }
+            return task;
+        });
+    }
+
+    protected <V> HugeTask<V> taskWithoutResult(Id id) {
+        HugeTask<V> result = this.call(() -> {
+            Iterator<Vertex> vertices = this.tx().queryTaskInfos(id);
+            Vertex vertex = QueryResults.one(vertices);
+            if (vertex == null) {
+                return null;
+            }
+            return HugeTask.fromVertex(vertex);
+        });
+
+        return result;
+    }
+
+    protected <V> Iterator<HugeTask<V>> tasksWithoutResult(List<Id> ids) {
+        return this.call(() -> {
+            Object[] idArray = ids.toArray(new Id[ids.size()]);
+            Iterator<Vertex> vertices = this.tx().queryTaskInfos(idArray);
+            Iterator<HugeTask<V>> tasks =
+                new MapperIterator<>(vertices, HugeTask::fromVertex);
+            // Convert iterator to list to avoid across thread tx accessed
+            return QueryResults.toList(tasks);
+        });
+    }
+
+    protected <V> Iterator<HugeTask<V>> tasksWithoutResult(TaskStatus status,
+                                                           long limit,
+                                                           String page) {
+        if (status == null) {
+            return this.queryTaskWithoutResult(ImmutableMap.of(), limit, page);
+        }
+        return this.queryTaskWithoutResult(HugeTask.P.STATUS, status.code(),
+                                           limit, page);
+    }
+
+    protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(String key,
+                                                               Object value,
+                                                               long limit, 
String page) {
+        return this.queryTaskWithoutResult(ImmutableMap.of(key, value), limit, 
page);
+    }
+
+    protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(Map<String,
+        Object> conditions, long limit, String page) {
+        return this.call(() -> {
+            ConditionQuery query = new ConditionQuery(HugeType.TASK);
+            if (page != null) {
+                query.page(page);
+            }
+            VertexLabel vl = this.graph().vertexLabel(HugeTask.P.TASK);
+            query.eq(HugeKeys.LABEL, vl.id());
+            for (Map.Entry<String, Object> entry : conditions.entrySet()) {
+                PropertyKey pk = this.graph().propertyKey(entry.getKey());
+                query.query(Condition.eq(pk.id(), entry.getValue()));
+            }
+            query.showHidden(true);
+            if (limit != NO_LIMIT) {
+                query.limit(limit);
+            }
+            Iterator<Vertex> vertices = this.tx().queryTaskInfos(query);
+            Iterator<HugeTask<V>> tasks =
+                new MapperIterator<>(vertices, HugeTask::fromVertex);
+            // Convert iterator to list to avoid across thread tx accessed
+            return QueryResults.toList(tasks);
+        });
+    }
+
+    protected HugeTaskResult queryTaskResult(Id taskid) {
+        HugeTaskResult result = this.call(() -> {
+            Iterator<Vertex> vertices =
+                this.tx().queryTaskInfos(HugeTaskResult.genId(taskid));
+            Vertex vertex = QueryResults.one(vertices);
+            if (vertex == null) {
+                return null;
+            }
+
+            return HugeTaskResult.fromVertex(vertex);
+        });
+
+        return result;
+    }
+
+    protected Iterator<HugeTaskResult> queryTaskResult(List<Id> taskIds) {
+        return this.call(() -> {
+            Object[] idArray =
+                taskIds.stream().map(HugeTaskResult::genId).toArray();
+            Iterator<Vertex> vertices = this.tx()
+                                            .queryTaskInfos(idArray);
+            Iterator<HugeTaskResult> tasks =
+                new MapperIterator<>(vertices,
+                                     HugeTaskResult::fromVertex);
+            // Convert iterator to list to avoid across thread tx accessed
+            return QueryResults.toList(tasks);
+        });
+    }
+
+    protected TaskAndResultTransaction tx() {
+        // NOTE: only the owner thread can access task tx
+        if (this.taskTx == null) {
+            /*
+             * NOTE: don't synchronized(this) due to scheduler thread hold
+             * this lock through scheduleTasks(), then query tasks and wait
+             * for db-worker thread after call(), the tx may not be initialized
+             * but can't catch this lock, then cause deadlock.
+             * We just use this.serverManager as a monitor here
+             */
+            synchronized (this.serverManager) {
+                if (this.taskTx == null) {
+                    BackendStore store = this.graph.loadSystemStore();
+                    TaskAndResultTransaction tx = new 
TaskAndResultTransaction(this.graph, store);
+                    assert this.taskTx == null; // may be reentrant?
+                    this.taskTx = tx;
+                }
+            }
+        }
+        assert this.taskTx != null;
+        return this.taskTx;
+    }
+
+    @Override
+    public ServerInfoManager serverManager() {
+        return this.serverManager;
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultTransaction.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultTransaction.java
new file mode 100644
index 000000000..c39ae2361
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultTransaction.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.SchemaManager;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.type.define.Cardinality;
+import org.apache.hugegraph.type.define.DataType;
+
+public class TaskAndResultTransaction extends TaskTransaction {
+
+    public static final String TASKRESULT = HugeTaskResult.P.TASKRESULT;
+
+    /**
+     * Task transactions, for persistence
+     */
+    protected volatile TaskAndResultTransaction taskTx = null;
+
+    public TaskAndResultTransaction(HugeGraphParams graph, BackendStore store) 
{
+        super(graph, store);
+        this.autoCommit(true);
+    }
+
+    public HugeVertex constructTaskVertex(HugeTask<?> task) {
+        if (!this.graph().existsVertexLabel(TASK)) {
+            throw new HugeException("Schema is missing for task(%s) '%s'",
+                                    task.id(), task.name());
+        }
+
+        return this.constructVertex(false, task.asArrayWithoutResult());
+    }
+
+    public HugeVertex constructTaskResultVertex(HugeTaskResult taskResult) {
+        if (!this.graph().existsVertexLabel(TASKRESULT)) {
+            throw new HugeException("Schema is missing for task result");
+        }
+
+        return this.constructVertex(false, taskResult.asArray());
+    }
+
+    @Override
+    public void initSchema() {
+        super.initSchema();
+
+        if (this.graph().existsVertexLabel(TASKRESULT)) {
+            return;
+        }
+
+        HugeGraph graph = this.graph();
+        String[] properties = this.initTaskResultProperties();
+
+        // Create vertex label '~taskresult'
+        VertexLabel label =
+            
graph.schema().vertexLabel(HugeTaskResult.P.TASKRESULT).properties(properties)
+                 .nullableKeys(HugeTaskResult.P.RESULT)
+                 .useCustomizeStringId().enableLabelIndex(true).build();
+
+        graph.addVertexLabel(label);
+    }
+
+    private String[] initTaskResultProperties() {
+        List<String> props = new ArrayList<>();
+        props.add(createPropertyKey(HugeTaskResult.P.RESULT, DataType.BLOB));
+
+        return props.toArray(new String[0]);
+    }
+
+    private String createPropertyKey(String name, DataType dataType) {
+        return createPropertyKey(name, dataType, Cardinality.SINGLE);
+    }
+
+    private String createPropertyKey(String name, DataType dataType, 
Cardinality cardinality) {
+        SchemaManager schema = this.graph().schema();
+        PropertyKey propertyKey =
+            
schema.propertyKey(name).dataType(dataType).cardinality(cardinality).build();
+        this.graph().addPropertyKey(propertyKey);
+        return name;
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index 524a1f759..056b7ac5a 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -47,6 +47,11 @@ public final class TaskManager {
                                "server-info-db-worker-%d";
     public static final String TASK_SCHEDULER = "task-scheduler-%d";
 
+    public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
+    public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
+    public static final String EPHEMERAL_TASK_WORKER = 
"ephemeral-task-worker-%d";
+    public static final String DISTRIBUTED_TASK_SCHEDULER = 
"distributed-scheduler-%d";
+
     protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
 
     private static final int THREADS = 4;
@@ -59,6 +64,11 @@ public final class TaskManager {
     private final ExecutorService serverInfoDbExecutor;
     private final PausableScheduledThreadPool schedulerExecutor;
 
+    private final ExecutorService schemaTaskExecutor;
+    private final ExecutorService olapTaskExecutor;
+    private final ExecutorService ephemeralTaskExecutor;
+    private final PausableScheduledThreadPool distributedSchedulerExecutor;
+
     private boolean enableRoleElected = false;
 
     public static TaskManager instance() {
@@ -75,6 +85,17 @@ public final class TaskManager {
                               1, TASK_DB_WORKER);
         this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
                                     1, SERVER_INFO_DB_WORKER);
+
+        this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+                                                                  
SCHEMA_TASK_WORKER);
+        this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+                                                                
OLAP_TASK_WORKER);
+        this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+                                                                     
EPHEMERAL_TASK_WORKER);
+        this.distributedSchedulerExecutor =
+                ExecutorUtil.newPausableScheduledThreadPool(1,
+                                                            
DISTRIBUTED_TASK_SCHEDULER);
+
         // For schedule task to run, just one thread is ok
         this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
                                  1, TASK_SCHEDULER);
@@ -87,11 +108,36 @@ public final class TaskManager {
 
     public void addScheduler(HugeGraphParams graph) {
         E.checkArgumentNotNull(graph, "The graph can't be null");
-
-        TaskScheduler scheduler = new StandardTaskScheduler(graph,
-                                  this.taskExecutor, this.taskDbExecutor,
-                                  this.serverInfoDbExecutor);
-        this.schedulers.put(graph, scheduler);
+        LOG.info("Use {} as the scheduler of graph ({})",
+                 graph.schedulerType(), graph.name());
+        // TODO: 如当前服务绑定到指定的非 DEFAULT 图空间,非当前图空间的图不再创建任务调度器 (graph space)
+        switch (graph.schedulerType()) {
+            case "distributed": {
+                TaskScheduler scheduler =
+                    new DistributedTaskScheduler(
+                        graph,
+                        distributedSchedulerExecutor,
+                        taskDbExecutor,
+                        schemaTaskExecutor,
+                        olapTaskExecutor,
+                        taskExecutor, /* gremlinTaskExecutor */
+                        ephemeralTaskExecutor,
+                        serverInfoDbExecutor);
+                this.schedulers.put(graph, scheduler);
+                break;
+            }
+            case "local":
+            default: {
+                TaskScheduler scheduler =
+                    new StandardTaskScheduler(
+                        graph,
+                        this.taskExecutor,
+                        this.taskDbExecutor,
+                        this.serverInfoDbExecutor);
+                this.schedulers.put(graph, scheduler);
+                break;
+            }
+        }
     }
 
     public void closeScheduler(HugeGraphParams graph) {
@@ -122,6 +168,10 @@ public final class TaskManager {
         if (!this.schedulerExecutor.isTerminated()) {
             this.closeSchedulerTx(graph);
         }
+
+        if (!this.distributedSchedulerExecutor.isTerminated()) {
+            this.closeDistributedSchedulerTx(graph);
+        }
     }
 
     private void closeTaskTx(HugeGraphParams graph) {
@@ -156,6 +206,21 @@ public final class TaskManager {
         }
     }
 
+    private void closeDistributedSchedulerTx(HugeGraphParams graph) {
+        final Callable<Void> closeTx = () -> {
+            // Do close-tx for current thread
+            graph.closeTx();
+            // Let other threads run
+            Thread.yield();
+            return null;
+        };
+        try {
+            this.distributedSchedulerExecutor.submit(closeTx).get();
+        } catch (Exception e) {
+            throw new HugeException("Exception when closing scheduler tx", e);
+        }
+    }
+
     public void pauseScheduledThreadPool() {
         this.schedulerExecutor.pauseSchedule();
     }
@@ -169,8 +234,7 @@ public final class TaskManager {
     }
 
     public ServerInfoManager getServerInfoManager(HugeGraphParams graph) {
-        StandardTaskScheduler scheduler = (StandardTaskScheduler)
-                                          this.getScheduler(graph);
+        TaskScheduler scheduler = this.getScheduler(graph);
         if (scheduler == null) {
             return null;
         }
@@ -194,10 +258,21 @@ public final class TaskManager {
             }
         }
 
+        if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
+            this.distributedSchedulerExecutor.shutdown();
+            try {
+                terminated = 
this.distributedSchedulerExecutor.awaitTermination(timeout,
+                                                                               
 unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
         if (terminated && !this.taskExecutor.isShutdown()) {
             this.taskExecutor.shutdown();
             try {
-                terminated = this.taskExecutor.awaitTermination(timeout, unit);
+                terminated = this.taskExecutor.awaitTermination(timeout,
+                                                                unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -216,7 +291,38 @@ public final class TaskManager {
         if (terminated && !this.taskDbExecutor.isShutdown()) {
             this.taskDbExecutor.shutdown();
             try {
-                terminated = this.taskDbExecutor.awaitTermination(timeout, 
unit);
+                terminated = this.taskDbExecutor.awaitTermination(timeout,
+                                                                  unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
+        if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
+            this.ephemeralTaskExecutor.shutdown();
+            try {
+                terminated = 
this.ephemeralTaskExecutor.awaitTermination(timeout,
+                                                                         unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
+        if (terminated && !this.schemaTaskExecutor.isShutdown()) {
+            this.schemaTaskExecutor.shutdown();
+            try {
+                terminated = this.schemaTaskExecutor.awaitTermination(timeout,
+                                                                      unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
+        if (terminated && !this.olapTaskExecutor.isShutdown()) {
+            this.olapTaskExecutor.shutdown();
+            try {
+                terminated = this.olapTaskExecutor.awaitTermination(timeout,
+                                                                    unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -260,8 +366,7 @@ public final class TaskManager {
     public void onAsRoleMaster() {
         try {
             for (TaskScheduler entry : this.schedulers.values()) {
-                StandardTaskScheduler scheduler = (StandardTaskScheduler) 
entry;
-                ServerInfoManager serverInfoManager = 
scheduler.serverManager();
+                ServerInfoManager serverInfoManager = entry.serverManager();
                 
serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(), 
NodeRole.MASTER);
             }
         } catch (Throwable e) {
@@ -273,8 +378,7 @@ public final class TaskManager {
     public void onAsRoleWorker() {
         try {
             for (TaskScheduler entry : this.schedulers.values()) {
-                StandardTaskScheduler scheduler = (StandardTaskScheduler) 
entry;
-                ServerInfoManager serverInfoManager = 
scheduler.serverManager();
+                ServerInfoManager serverInfoManager = entry.serverManager();
                 
serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(), 
NodeRole.WORKER);
             }
         } catch (Throwable e) {
@@ -291,7 +395,7 @@ public final class TaskManager {
         // Called by scheduler timer
         try {
             for (TaskScheduler entry : this.schedulers.values()) {
-                StandardTaskScheduler scheduler = (StandardTaskScheduler) 
entry;
+                TaskScheduler scheduler = entry;
                 // Maybe other thread close&remove scheduler at the same time
                 synchronized (scheduler) {
                     this.scheduleOrExecuteJobForGraph(scheduler);
@@ -302,56 +406,59 @@ public final class TaskManager {
         }
     }
 
-    private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) 
{
+    private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) {
         E.checkNotNull(scheduler, "scheduler");
 
-        ServerInfoManager serverManager = scheduler.serverManager();
-        String graph = scheduler.graphName();
-
-        LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
-        try {
-            /*
-             * Skip if:
-             * graph is closed (iterate schedulers before graph is closing)
-             *  or
-             * graph is not initialized(maybe truncated or cleared).
-             *
-             * If graph is closing by other thread, current thread get
-             * serverManager and try lock graph, at the same time other
-             * thread deleted the lock-group, current thread would get
-             * exception 'LockGroup xx does not exists'.
-             * If graph is closed, don't call serverManager.initialized()
-             * due to it will reopen graph tx.
-             */
-            if (!serverManager.graphReady()) {
-                return;
-            }
-
-            // Update server heartbeat
-            serverManager.heartbeat();
+        if (scheduler instanceof StandardTaskScheduler) {
+            StandardTaskScheduler standardTaskScheduler = 
(StandardTaskScheduler) (scheduler);
+            ServerInfoManager serverManager = scheduler.serverManager();
+            String graph = scheduler.graphName();
 
-            /*
-             * Master schedule tasks to suitable servers.
-             * Worker maybe become Master, so Master also need perform tasks 
assigned by
-             * previous Master when enableRoleElected is true.
-             * However, the master only needs to take the assignment,
-             * because the master stays the same when enableRoleElected is 
false.
-             * There is no suitable server when these tasks are created
-             */
-            if (serverManager.master()) {
-                scheduler.scheduleTasks();
-                if (!this.enableRoleElected && 
!serverManager.onlySingleNode()) {
+            LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
+            try {
+                /*
+                 * Skip if:
+                 * graph is closed (iterate schedulers before graph is closing)
+                 *  or
+                 * graph is not initialized(maybe truncated or cleared).
+                 *
+                 * If graph is closing by other thread, current thread get
+                 * serverManager and try lock graph, at the same time other
+                 * thread deleted the lock-group, current thread would get
+                 * exception 'LockGroup xx does not exists'.
+                 * If graph is closed, don't call serverManager.initialized()
+                 * due to it will reopen graph tx.
+                 */
+                if (!serverManager.graphReady()) {
                     return;
                 }
-            }
 
-            // Schedule queued tasks scheduled to current server
-            scheduler.executeTasksOnWorker(serverManager.selfServerId());
+                // Update server heartbeat
+                serverManager.heartbeat();
+
+                /*
+                 * Master schedule tasks to suitable servers.
+                 * Worker maybe become Master, so Master also need perform 
tasks assigned by
+                 * previous Master when enableRoleElected is true.
+                 * However, the master only needs to take the assignment,
+                 * because the master stays the same when enableRoleElected is 
false.
+                 * There is no suitable server when these tasks are created
+                 */
+                if (serverManager.master()) {
+                    standardTaskScheduler.scheduleTasks();
+                    if (!this.enableRoleElected && 
!serverManager.onlySingleNode()) {
+                        return;
+                    }
+                }
 
-            // Cancel tasks scheduled to current server
-            scheduler.cancelTasksOnWorker(serverManager.selfServerId());
-        } finally {
-            LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+                // Schedule queued tasks scheduled to current server
+                
standardTaskScheduler.executeTasksOnWorker(serverManager.selfServerId());
+
+                // Cancel tasks scheduled to current server
+                
standardTaskScheduler.cancelTasksOnWorker(serverManager.selfServerId());
+            } finally {
+                LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+            }
         }
     }
 
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
index 69304f43c..4ee4c992a 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.task;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
@@ -27,6 +28,11 @@ import org.apache.hugegraph.HugeGraph;
 
 public interface TaskScheduler {
 
+    long NO_LIMIT = -1L;
+    long PAGE_SIZE = 500L;
+    long QUERY_INTERVAL = 100L;
+    int MAX_PENDING_TASKS = 10000;
+
     HugeGraph graph();
 
     int pendingTasks();
@@ -39,7 +45,7 @@ public interface TaskScheduler {
 
     <V> void save(HugeTask<V> task);
 
-    <V> HugeTask<V> delete(Id id);
+    <V> HugeTask<V> delete(Id id, boolean force);
 
     <V> HugeTask<V> task(Id id);
 
@@ -62,4 +68,14 @@ public interface TaskScheduler {
                                     throws TimeoutException;
 
     void checkRequirement(String op);
+
+    <V> V call(Callable<V> callable);
+
+    <V> V call(Runnable runnable);
+
+    ServerInfoManager serverManager();
+
+    String graphName();
+
+    void taskDone(HugeTask<?> task);
 }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
index dd65206b0..a9c3101d2 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskStatus.java
@@ -37,7 +37,9 @@ public enum TaskStatus implements SerialEnum {
     SUCCESS(7, "success"),
     CANCELLING(8, "cancelling"),
     CANCELLED(9, "cancelled"),
-    FAILED(10, "failed");
+    FAILED(10, "failed"),
+    HANGING(11, "hanging"),
+    DELETING(12, "deleting");
 
     // NOTE: order is important(RESTORING > RUNNING > QUEUED) when restoring
     public static final List<TaskStatus> PENDING_STATUSES = ImmutableList.of(
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java
new file mode 100644
index 000000000..2b27019c7
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.query.QueryResults;
+import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.schema.IndexLabel;
+import org.apache.hugegraph.schema.PropertyKey;
+import org.apache.hugegraph.schema.SchemaManager;
+import org.apache.hugegraph.schema.VertexLabel;
+import org.apache.hugegraph.structure.HugeVertex;
+import org.apache.hugegraph.type.HugeType;
+import org.apache.hugegraph.type.define.Cardinality;
+import org.apache.hugegraph.type.define.DataType;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+public class TaskTransaction extends GraphTransaction {
+
+    public static final String TASK = HugeTask.P.TASK;
+
+    public TaskTransaction(HugeGraphParams graph, BackendStore store) {
+        super(graph, store);
+        this.autoCommit(true);
+    }
+
+    public HugeVertex constructVertex(HugeTask<?> task) {
+        if (!this.graph().existsVertexLabel(TASK)) {
+            throw new HugeException("Schema is missing for task(%s) '%s'",
+                                    task.id(), task.name());
+        }
+        return this.constructVertex(false, task.asArray());
+    }
+
+    public void deleteIndex(HugeVertex vertex) {
+        // Delete the old record if exist
+        Iterator<Vertex> old = this.queryTaskInfos(vertex.id());
+        HugeVertex oldV = (HugeVertex) QueryResults.one(old);
+        if (oldV == null) {
+            return;
+        }
+        this.deleteIndexIfNeeded(oldV, vertex);
+    }
+
+    private boolean deleteIndexIfNeeded(HugeVertex oldV, HugeVertex newV) {
+        if 
(!oldV.value(HugeTask.P.STATUS).equals(newV.value(HugeTask.P.STATUS))) {
+            // Only delete vertex if index value changed else override it
+            this.updateIndex(this.indexLabel(HugeTask.P.STATUS).id(), oldV, 
true);
+            return true;
+        }
+        return false;
+    }
+
+    public void initSchema() {
+        if (this.existVertexLabel(TASK)) {
+            return;
+        }
+
+        HugeGraph graph = this.graph();
+        String[] properties = this.initProperties();
+
+        // Create vertex label '~task'
+        VertexLabel label = graph.schema().vertexLabel(TASK)
+                                 .properties(properties)
+                                 .useCustomizeNumberId()
+                                 .nullableKeys(HugeTask.P.DESCRIPTION, 
HugeTask.P.CONTEXT,
+                                               HugeTask.P.UPDATE, 
HugeTask.P.INPUT,
+                                               HugeTask.P.RESULT,
+                                               HugeTask.P.DEPENDENCIES, 
HugeTask.P.SERVER)
+                                 .enableLabelIndex(true)
+                                 .build();
+        this.params().schemaTransaction().addVertexLabel(label);
+
+        // Create index
+        this.createIndexLabel(label, HugeTask.P.STATUS);
+    }
+
+    private boolean existVertexLabel(String label) {
+        return this.params().schemaTransaction()
+                   .getVertexLabel(label) != null;
+    }
+
+    private String[] initProperties() {
+        List<String> props = new ArrayList<>();
+
+        props.add(createPropertyKey(HugeTask.P.TYPE));
+        props.add(createPropertyKey(HugeTask.P.NAME));
+        props.add(createPropertyKey(HugeTask.P.CALLABLE));
+        props.add(createPropertyKey(HugeTask.P.DESCRIPTION));
+        props.add(createPropertyKey(HugeTask.P.CONTEXT));
+        props.add(createPropertyKey(HugeTask.P.STATUS, DataType.BYTE));
+        props.add(createPropertyKey(HugeTask.P.PROGRESS, DataType.INT));
+        props.add(createPropertyKey(HugeTask.P.CREATE, DataType.DATE));
+        props.add(createPropertyKey(HugeTask.P.UPDATE, DataType.DATE));
+        props.add(createPropertyKey(HugeTask.P.RETRIES, DataType.INT));
+        props.add(createPropertyKey(HugeTask.P.INPUT, DataType.BLOB));
+        props.add(createPropertyKey(HugeTask.P.RESULT, DataType.BLOB));
+        props.add(createPropertyKey(HugeTask.P.DEPENDENCIES, DataType.LONG,
+                                    Cardinality.SET));
+        props.add(createPropertyKey(HugeTask.P.SERVER));
+
+        return props.toArray(new String[0]);
+    }
+
+    private String createPropertyKey(String name) {
+        return this.createPropertyKey(name, DataType.TEXT);
+    }
+
+    private String createPropertyKey(String name, DataType dataType) {
+        return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
+    }
+
+    private String createPropertyKey(String name, DataType dataType,
+                                     Cardinality cardinality) {
+        HugeGraph graph = this.graph();
+        SchemaManager schema = graph.schema();
+        PropertyKey propertyKey = schema.propertyKey(name)
+                                        .dataType(dataType)
+                                        .cardinality(cardinality)
+                                        .build();
+        this.params().schemaTransaction().addPropertyKey(propertyKey);
+        return name;
+    }
+
+    private IndexLabel createIndexLabel(VertexLabel label, String field) {
+        HugeGraph graph = this.graph();
+        SchemaManager schema = graph.schema();
+        String name = Graph.Hidden.hide("task-index-by-" + field);
+        IndexLabel indexLabel = schema.indexLabel(name)
+                                      .on(HugeType.VERTEX_LABEL, TASK)
+                                      .by(field)
+                                      .build();
+        this.params().schemaTransaction().addIndexLabel(label, indexLabel);
+        return indexLabel;
+    }
+
+    private IndexLabel indexLabel(String field) {
+        String name = Graph.Hidden.hide("task-index-by-" + field);
+        HugeGraph graph = this.graph();
+        return graph.indexLabel(name);
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
index dad89db79..bc38aedd9 100755
--- 
a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
+++ 
b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
@@ -450,6 +450,10 @@ public class HstoreSessionsImpl extends HstoreSessions {
          */
         @Override
         public Integer commit() {
+            if (!this.hasChanges()) {
+                // TODO: log a message with level WARNING
+                return 0;
+            }
             int commitSize = this.changedSize;
             if (TRANSACTIONAL) {
                 this.graph.commit();
diff --git 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
index dabf3ad0f..27cd880fa 100644
--- 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
+++ 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
@@ -56,7 +56,7 @@ public class TaskCoreTest extends BaseCoreTest {
 
         Iterator<HugeTask<Object>> iter = scheduler.tasks(null, -1, null);
         while (iter.hasNext()) {
-            scheduler.delete(iter.next().id());
+            scheduler.delete(iter.next().id(), true);
         }
     }
 
@@ -77,7 +77,7 @@ public class TaskCoreTest extends BaseCoreTest {
         Assert.assertFalse(task.completed());
 
         Assert.assertThrows(IllegalArgumentException.class, () -> {
-            scheduler.delete(id);
+            scheduler.delete(id, true);
         }, e -> {
             Assert.assertContains("Can't delete incomplete task '88888'",
                                   e.getMessage());
@@ -107,7 +107,7 @@ public class TaskCoreTest extends BaseCoreTest {
         Assert.assertEquals("test-task", iter.next().name());
         Assert.assertFalse(iter.hasNext());
 
-        scheduler.delete(id);
+        scheduler.delete(id, true);
         iter = scheduler.tasks(null, 10, null);
         Assert.assertFalse(iter.hasNext());
         Assert.assertThrows(NotFoundException.class, () -> {
diff --git 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
index c6dcff4a8..1dfa85fb9 100644
--- 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
+++ 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java
@@ -148,7 +148,7 @@ public class TestGraph implements Graph {
 
         TaskScheduler scheduler = this.graph.taskScheduler();
         scheduler.tasks(null, -1, null).forEachRemaining(elem -> {
-            scheduler.delete(elem.id());
+            scheduler.delete(elem.id(), true);
         });
     }
 

Reply via email to