This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
     new 47261232  core: main process of task execution (#619)
47261232 is described below

commit 47261232d7eb31ad11202948e074c0c37b86b61e
Author: dabuliud <[email protected]>
AuthorDate: Thu Nov 24 10:12:33 2022 +0800

     core: main process of task execution (#619)
    
    * core:task execute
    
    * remove kafka schema dependency
    
    * removeUnnecessaryDependency
    
    Co-authored-by: Warden <[email protected]>
---
 .travis.yml                                        |  25 +-
 anomalydetection/pom.xml                           |  22 +-
 core/pom.xml                                       |  25 +-
 .../core/worker/client/DispatcherClient.java       |  27 +-
 .../griffin/core/worker/context/WorkerContext.java |  67 ++++-
 .../griffin/core/worker/dao/DQInstanceDao.java     |  11 +
 .../apache/griffin/core/worker/dao/DQTaskDao.java  |  16 +
 .../core/worker/driver/PrestoTemplateDriver.java   |   4 +-
 .../core/worker/driver/SparkTemplateDriver.java    |   3 +-
 .../griffin/core/worker/driver/TemplateDriver.java |   5 +-
 .../griffin/core/worker/entity/bo/DQInstance.java  |  43 ++-
 .../core/worker/entity/bo/task/DQBaseTask.java     |  61 +++-
 .../core/worker/entity/bo/task/DQHiveTask.java     |  16 +-
 .../core/worker/entity/bo/task/DQKafkaTask.java    |  16 +-
 .../core/worker/entity/dispatcher/JobStatus.java   |  38 +++
 .../worker/entity/dispatcher/JobStatusRequest.java |   6 +
 .../entity/dispatcher/JobStatusResponse.java       |   7 +
 .../worker/entity/dispatcher/MetricRequest.java    |  10 +
 .../worker/entity/dispatcher/MetricResponse.java   |  11 +
 .../worker/entity/dispatcher/SubmitRequest.java    |  49 ++-
 .../core/worker/entity/enums/DQEngineEnum.java     |  34 ++-
 .../core/worker/entity/enums/DQErrorCode.java      |  13 +
 .../core/worker/entity/enums/DQInstanceStatus.java |  24 ++
 .../core/worker/entity/enums/DQTaskStatus.java     |  27 +-
 .../entity/enums/DispatcherJobStatusEnum.java      |   6 +
 .../griffin/core/worker/entity/pojo/Metric.java    |  11 +
 .../worker/schedule/TaskDispatcherScheduler.java   | 328 +++++++++++++++------
 .../core/worker/service/DQInstanceService.java     |  39 +++
 .../griffin/core/worker/service/DQTaskService.java | 106 +++++++
 dispatcher/pom.xml                                 |  22 +-
 pom.xml                                            |   1 +
 scheduler/pom.xml                                  |  22 +-
 32 files changed, 893 insertions(+), 202 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 319902c4..fdc2d986 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,31 +11,32 @@
 # or implied. See the License for the specific language governing permissions 
and limitations under
 # the License.
 
-language: java scala node_js
+#language: java scala node_js
+language: java scala
 
-services:
-  - docker
+#services:
+#  - docker
 
-env:
-  - COMPOSE_FILE=griffin-doc/docker/compose/docker-compose-batch.yml
+#env:
+#  - COMPOSE_FILE=griffin-doc/docker/compose/docker-compose-batch.yml
 
 jdk:
   - openjdk8
 scala:
   - 2.10.6
-node_js:
-  - "6"
+#node_js:
+#  - "6"
 git:
   quiet: true
 cache:
   directories:
     - $HOME/.m2
 
-install:
-  - npm install -g bower
-
-before_script:
-  - docker-compose up -d
+#install:
+#  - npm install -g bower
+#
+#before_script:
+#  - docker-compose up -d
 
 
 # keep 30, need change according to ci logs.
diff --git a/anomalydetection/pom.xml b/anomalydetection/pom.xml
index 2a501a48..804de868 100644
--- a/anomalydetection/pom.xml
+++ b/anomalydetection/pom.xml
@@ -226,17 +226,17 @@ under the License.
         </dependency>
 
         <!-- to access confluent schema registry -->
-        <dependency>
-            <groupId>io.confluent</groupId>
-            <artifactId>kafka-schema-registry-client</artifactId>
-            <version>${confluent.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>io.confluent</groupId>-->
+<!--            <artifactId>kafka-schema-registry-client</artifactId>-->
+<!--            <version>${confluent.version}</version>-->
+<!--            <exclusions>-->
+<!--                <exclusion>-->
+<!--                    <groupId>org.slf4j</groupId>-->
+<!--                    <artifactId>slf4j-log4j12</artifactId>-->
+<!--                </exclusion>-->
+<!--            </exclusions>-->
+<!--        </dependency>-->
 
         <!--schedule-->
         <dependency>
diff --git a/core/pom.xml b/core/pom.xml
index d510a5d4..4c0b7c00 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -226,17 +226,17 @@ under the License.
         </dependency>
 
         <!-- to access confluent schema registry -->
-        <dependency>
-            <groupId>io.confluent</groupId>
-            <artifactId>kafka-schema-registry-client</artifactId>
-            <version>${confluent.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>io.confluent</groupId>-->
+<!--            <artifactId>kafka-schema-registry-client</artifactId>-->
+<!--            <version>${confluent.version}</version>-->
+<!--            <exclusions>-->
+<!--                <exclusion>-->
+<!--                    <groupId>org.slf4j</groupId>-->
+<!--                    <artifactId>slf4j-log4j12</artifactId>-->
+<!--                </exclusion>-->
+<!--            </exclusions>-->
+<!--        </dependency>-->
 
         <!--schedule-->
         <dependency>
@@ -411,6 +411,9 @@ under the License.
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <encoding>UTF-8</encoding>
+                </configuration>
             </plugin>
         </plugins>
     </build>
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
 
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
index dbbcd6cd..5422f35e 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -1,11 +1,15 @@
 package org.apache.griffin.core.worker.client;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.griffin.core.worker.entity.bo.DQInstance;
 import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
+import org.springframework.stereotype.Service;
 
 /**
  * Dispatcher客户端 负责与dispatcher交互
  */
+@Service
 public class DispatcherClient {
 
     public SubmitResponse submitSql(SubmitRequest request) {
@@ -14,22 +18,35 @@ public class DispatcherClient {
 
 
     public JobStatusResponse getJobStatus(JobStatusRequest jobStatusRequest) {
+        // todo parse job status
         return null;
     }
 
-    public double getMetricResult() {
-        return 0.0d;
+    public MetricResponse getMetricResult(MetricRequest metricRequest) {
+        return null;
     }
 
     public SubmitRequest wrapperDQTask(DQInstance waittingTask) {
         return null;
     }
 
-    public JobStatus wrapperSubmitResponse(SubmitResponse resp) {
-        return null;
+    public JobStatus wrapperSubmitResponse(Pair<Long, SubmitResponse> resp) {
+        Long partitionTime = resp.getLeft();
+        SubmitResponse response = resp.getRight();
+        boolean finished = false;
+        if (response.getCode() != DQErrorCode.SUCCESS.getCode()) finished = 
true;
+        return JobStatus.builder()
+                .jobId(response.getJobId())
+                .finished(finished)
+                .partitionTime(partitionTime)
+                .build();
     }
 
     public JobStatusRequest wrapperJobStatusRequest(JobStatus jobStatus) {
-        return null;
+        return new JobStatusRequest(jobStatus.getJobId());
+    }
+
+    public MetricRequest wrapperMetricRequest(JobStatus jobStatus) {
+        return new MetricRequest(jobStatus.getJobId());
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java 
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
index 17e238de..52280f36 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
@@ -4,11 +4,13 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.griffin.core.worker.entity.bo.DQInstance;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * 上下文信息  全局唯一
@@ -16,12 +18,12 @@ import java.util.Queue;
 @Component
 public class WorkerContext {
 
-    private final Queue<DQInstance> WAITTING_TASK_QUEUE;
+    private final List<DQInstance> WAITTING_TASK_QUEUE;
 //    public static final List<DQBaseTask> runningTaskIdQueue = 
Lists.newCopyOnWriteArrayList();
     // runningTaskIdList = RECORDING_TASK_LIST + EVALUATING_TASK_LIST + 
ALERTING_TASK_LIST
-    private final Queue<DQInstance> RECORDING_TASK_QUEUE;
-    private final Queue<DQInstance> EVALUATING_TASK_QUEUE;
-    private final Queue<DQInstance> ALERTING_TASK_QUEUE;
+    private final List<DQInstance> RECORDING_TASK_QUEUE;
+    private final LinkedBlockingQueue<DQInstance> EVALUATING_TASK_QUEUE;
+    private final LinkedBlockingQueue<DQInstance> ALERTING_TASK_QUEUE;
 
     // success和failed队列数据老化问题?
     public final List<DQInstance> successTaskIdList;
@@ -29,24 +31,44 @@ public class WorkerContext {
     
     public WorkerContext() {
         // 设置队列长度
-        WAITTING_TASK_QUEUE = Queues.newPriorityQueue();
-        RECORDING_TASK_QUEUE = Queues.newPriorityQueue();
-        EVALUATING_TASK_QUEUE = Queues.newPriorityQueue();
-        ALERTING_TASK_QUEUE = Queues.newPriorityQueue();
+        WAITTING_TASK_QUEUE = Lists.newCopyOnWriteArrayList();
+        RECORDING_TASK_QUEUE = Lists.newCopyOnWriteArrayList();
+        // 这两个应该是一个阻塞队列 只要有任务来就可以处理
+        EVALUATING_TASK_QUEUE = Queues.newLinkedBlockingQueue();
+        ALERTING_TASK_QUEUE =Queues.newLinkedBlockingQueue();
         successTaskIdList = Lists.newArrayList();
         failedTaskIdList = Lists.newArrayList();
     }
-    
+
+    public List<DQInstance> getWAITTING_TASK_QUEUE() {
+        return WAITTING_TASK_QUEUE;
+    }
+
+    public List<DQInstance> getRECORDING_TASK_QUEUE() {
+        return RECORDING_TASK_QUEUE;
+    }
+
+    public LinkedBlockingQueue<DQInstance> getEVALUATING_TASK_QUEUE() {
+        return EVALUATING_TASK_QUEUE;
+    }
+
+    public LinkedBlockingQueue<DQInstance> getALERTING_TASK_QUEUE() {
+        return ALERTING_TASK_QUEUE;
+    }
+
+    public List<DQInstance> getSuccessTaskIdList() {
+        return successTaskIdList;
+    }
+
+    public List<DQInstance> getFailedTaskIdList() {
+        return failedTaskIdList;
+    }
+
     @PostConstruct
     public void init() {
-        initQueueInfo();
         resetTaskStatusWhenStartUp();
     }
 
-    private void initQueueInfo() {
-        
-    }
-
     public DQInstance getWaittingTask() {
         return null;
     }
@@ -55,7 +77,8 @@ public class WorkerContext {
         return null;
     }
 
-    public void offerToRecordingTaskQueue(DQInstance dqInstance) {
+    public boolean offerToRecordingTaskQueue(DQInstance dqInstance) {
+        return false;
     }
     public void offerToAlertingTaskQueue(DQInstance dqInstance) {
     }
@@ -72,11 +95,23 @@ public class WorkerContext {
 
     }
 
-    public boolean canSubmitToDispatcher() {
+    public boolean canSubmitToSpecEngine(DQEngineEnum engine) {
         return false;
     }
 
 
     public void offerToEvaluatingTaskQueue(DQInstance dqInstance) {
     }
+
+    public void removeAll(List<DQInstance> targetList, List<DQInstance> 
waittingToRemoveFromRecordingList) {
+        targetList.removeAll(waittingToRemoveFromRecordingList);
+    }
+
+    public void addFailedDQInstanceInfo(DQInstance instance) {
+
+    }
+
+    public void addSuccessDQInstanceInfo(DQInstance instance) {
+
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java 
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
new file mode 100644
index 00000000..11dd93f9
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.dao;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface DQInstanceDao {
+
+    void updateDQInstanceStatus(DQInstance instance, int status);
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java 
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
new file mode 100644
index 00000000..c94074cb
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
@@ -0,0 +1,16 @@
+package org.apache.griffin.core.worker.dao;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public interface DQTaskDao {
+
+    void updateDQTaskListStatus(List<DQBaseTask> tasks, int status);
+    void updateDQTaskListStatus(DQBaseTask task, int status);
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
 
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
index c0ab20dc..61ab2bdb 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -1,13 +1,13 @@
 package org.apache.griffin.core.worker.driver;
 
-import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
 
+import java.util.List;
 import java.util.Map;
 
 public class PrestoTemplateDriver extends TemplateDriver{
     @Override
-    public String getRecordSql(DQRecordTemplate template, Map<String, String> 
params) {
+    public List<String> getRecordSql(DQRecordTemplate template, Map<String, 
String> params) {
         return null;
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
 
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
index 6e54d4d1..eaf17b4a 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -2,11 +2,12 @@ package org.apache.griffin.core.worker.driver;
 
 import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
 
+import java.util.List;
 import java.util.Map;
 
 public class SparkTemplateDriver extends TemplateDriver {
     @Override
-    public String getRecordSql(DQRecordTemplate template, Map<String, String> 
params) {
+    public List<String> getRecordSql(DQRecordTemplate template, Map<String, 
String> params) {
         return null;
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java 
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
index 74bd7079..85a23390 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -6,6 +6,7 @@ import 
org.apache.griffin.core.worker.factory.TemplateDriverFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
 import java.util.Map;
 
 @Service
@@ -17,9 +18,9 @@ public abstract class TemplateDriver {
     /**
      * 拼出的SQL 返回值 必须是 <ruleId, Partition, Metric>  这样可以做SQL合并
      */
-    public abstract String getRecordSql(DQRecordTemplate template, Map<String, 
String> params);
+    public abstract List<String> getRecordSql(DQRecordTemplate template, 
Map<String, String> params);
 
-    public String getRecordSql(DQEngineEnum engine, DQRecordTemplate template, 
Map<String, String> params) {
+    public List<String> getRecordSql(DQEngineEnum engine, DQRecordTemplate 
template, Map<String, String> params) {
         TemplateDriver templateDriver = 
templateDriverFactory.getTemplateDrvier(engine);
         return templateDriver.getRecordSql(template, params);
     }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
index 0eacac7f..af5ff201 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
@@ -2,8 +2,9 @@ package org.apache.griffin.core.worker.entity.bo;
 
 import lombok.Data;
 import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
-import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
 
 import java.util.List;
 
@@ -14,17 +15,18 @@ import java.util.List;
 @Data
 public class DQInstance {
     private Long id;
-
     // 实例状态
-    private DQTaskStatus status;
+    private DQInstanceStatus status;
     // 记录状态年龄  状态更新是重置
     private int statusAge;
     // 任务信息
     private List<DQBaseTask> subTaskList;
+    //
+    private long scanTimeStamp = 0L;
+
 
-    private List<JobStatus> jobStatusList;
 
-    public void setStatus(DQTaskStatus status) {
+    public void setStatus(DQInstanceStatus status) {
         if (this.status != status) resetStatusAge();
         this.status = status;
     }
@@ -41,4 +43,35 @@ public class DQInstance {
         // 一个状态年龄处理了5次 无法变更 说明处理该任务一直失败
         return statusAge > 5;
     }
+
+    public boolean hasTaskToSubmit() {
+        boolean hasTaskToSubmit = false;
+        for (DQBaseTask dqBaseTask : subTaskList) {
+            if (dqBaseTask.getStatus() == DQTaskStatus.WAITTING) {
+                hasTaskToSubmit = true;
+                break;
+            }
+        }
+        return hasTaskToSubmit;
+    }
+
+    public boolean isFinishRecord() {
+        boolean isFinishRecord = true;
+        for (DQBaseTask dqBaseTask : subTaskList) {
+            if (dqBaseTask.getStatus().getCode() <= 
DQTaskStatus.RECORDING.getCode()) {
+                isFinishRecord = false;
+                break;
+            }
+        }
+        return isFinishRecord;
+    }
+
+    public void doEvaluteTask() {
+        subTaskList.forEach(DQBaseTask::evaluate);
+
+    }
+
+    public void doAlertTask() {
+        // 收敛告警信息 进行告警
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
index f597bbb4..fd01f036 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -1,11 +1,20 @@
 package org.apache.griffin.core.worker.entity.bo.task;
 
+import com.beust.jcommander.internal.Lists;
 import lombok.Data;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.pojo.Metric;
 import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
 import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
 import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
 
+import java.util.List;
+
 /**
  * 任务基础类
  *   一个完整的任务包含
@@ -16,15 +25,57 @@ import 
org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
 @Data
 public abstract class DQBaseTask {
 
+    private long id;
+    private String owner;
     private WorkerContext wc;
-
+    private DQEngineEnum engine;
     private DQRecordRule recordRule;
     private DQEvaluateRule dqEvaluateRule;
     private DQAlertRule dqAlertRule;
-
+    private DQTaskStatus status;
+    private List<JobStatus> jobStatusList;
+    private List<Metric> metricList = Lists.newArrayList();
+    // 记录状态年龄  状态更新是重置
+    private int statusAge;
     // 生成recordsql和 模板 + 参数 有关系
     // 生成SQL部分希望交给模板来做
-    public abstract void doRecord();
-    public abstract void doEvaluate();
-    public abstract void doAlert();
+    public List<Pair<Long, String>> record() {
+        // before
+        List<Pair<Long, String>> partitionTimeAndSqlList = doRecord();
+        // after
+        return partitionTimeAndSqlList;
+    }
+    public void evaluate() {
+        doEvaluate();
+    }
+    public void alert() {
+        doAlert();
+    }
+
+    // partitionTime And sql
+    public abstract List<Pair<Long, String>> doRecord();
+    public abstract boolean doEvaluate();
+    public abstract boolean doAlert();
+
+    public void setStatus(DQTaskStatus status) {
+        if (this.status != status) resetStatusAge();
+        this.status = status;
+    }
+
+    public void resetStatusAge() {
+        statusAge = 0;
+    }
+
+    public void incrStatusAge() {
+        statusAge++;
+    }
+
+    public boolean isFailed() {
+        // 一个状态年龄处理了5次 无法变更 说明处理该任务一直失败
+        return statusAge > 5;
+    }
+
+    public void addMetric(long partitionTime, Double metric) {
+        metricList.add(new Metric(partitionTime, metric));
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
index 96be4deb..62c6ef3b 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -1,22 +1,26 @@
 package org.apache.griffin.core.worker.entity.bo.task;
 
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.List;
+
 /**
  * 表维度
  */
 public class DQHiveTask extends DQBaseTask {
 
     @Override
-    public void doRecord() {
-
+    public List<Pair<Long, String>> doRecord() {
+        return null;
     }
 
     @Override
-    public void doEvaluate() {
-
+    public boolean doEvaluate() {
+        return false;
     }
 
     @Override
-    public void doAlert() {
-
+    public boolean doAlert() {
+        return false;
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
index 7f8c13d5..356c0fac 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
@@ -1,18 +1,22 @@
 package org.apache.griffin.core.worker.entity.bo.task;
 
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.List;
+
 public class DQKafkaTask extends DQBaseTask {
     @Override
-    public void doRecord() {
-
+    public List<Pair<Long, String>> doRecord() {
+        return null;
     }
 
     @Override
-    public void doEvaluate() {
-
+    public boolean doEvaluate() {
+        return false;
     }
 
     @Override
-    public void doAlert() {
-
+    public boolean doAlert() {
+        return false;
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
index 57f56341..9b6a550e 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
@@ -6,5 +6,43 @@ import lombok.Data;
 public class JobStatus {
     private String jobId;
     private boolean finished = false;
+    private long partitionTime;
 
+    public JobStatus() {
+    }
+
+    public JobStatus(String jobId, boolean finished, long partitionTime) {
+        this.jobId = jobId;
+        this.finished = finished;
+        this.partitionTime = partitionTime;
+    }
+
+    public static Builder builder() {
+        return new JobStatus.Builder();
+    }
+
+    public static class Builder {
+        private String jobId;
+        private boolean finished = false;
+        private long partitionTime;
+
+        public Builder jobId(String jobId) {
+            this.jobId = jobId;
+            return this;
+        }
+
+        public Builder finished(boolean finished) {
+            this.finished = finished;
+            return this;
+        }
+
+        public Builder partitionTime(long partitionTime) {
+            this.partitionTime = partitionTime;
+            return this;
+        }
+
+        public JobStatus build() {
+            return new JobStatus(this.jobId, this.finished, 
this.partitionTime);
+        }
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
index ab991c46..711706cb 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
@@ -1,4 +1,10 @@
 package org.apache.griffin.core.worker.entity.dispatcher;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
 public class JobStatusRequest {
+    String jobId;
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
index bf38b956..af7fc45e 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
@@ -1,6 +1,13 @@
 package org.apache.griffin.core.worker.entity.dispatcher;
 
+import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
+import org.apache.griffin.core.worker.entity.enums.DispatcherJobStatusEnum;
+
 public class JobStatusResponse {
+    private Integer code;
+    private DispatcherJobStatusEnum jobStatus;
+    private DQErrorCode errorCode;
+    private Exception ex;
     public boolean isSuccess() {
         return false;
     }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricRequest.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricRequest.java
new file mode 100644
index 00000000..5b2faa5d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricRequest.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class MetricRequest {
+    String jobId;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricResponse.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricResponse.java
new file mode 100644
index 00000000..4b40ae9d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricResponse.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+
+@Data
+public class MetricResponse {
+    Integer code;
+    Double metric;
+//    Enum errorCode;
+    Exception ex;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
index ae52ad7a..f93b726e 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
@@ -8,5 +8,52 @@ public class SubmitRequest {
     private String recordSql;
     private DQEngineEnum engine;  // Spark,Hive,Presto,etc.
     private String owner;
-    private Integer maxRetryCount;
+    private Integer maxRetryCount = 3;
+
+    public SubmitRequest() {
+    }
+
+    public SubmitRequest(String recordSql, DQEngineEnum engine, String owner, 
Integer maxRetryCount) {
+        this.recordSql = recordSql;
+        this.engine = engine;
+        this.owner = owner;
+        this.maxRetryCount = maxRetryCount;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String recordSql;
+        private DQEngineEnum engine;
+        private String owner;
+        private Integer maxRetryCount = 3;
+
+
+        public Builder recordSql(String recordSql) {
+            this.recordSql = recordSql;
+            return this;
+        }
+
+        public Builder engine(DQEngineEnum engine) {
+            this.engine = engine;
+            return this;
+        }
+
+        public Builder owner(String owner) {
+            this.owner = owner;
+            return this;
+        }
+
+        public Builder retryCount(Integer maxRetryCount) {
+            this.maxRetryCount = maxRetryCount;
+            return this;
+        }
+
+        public SubmitRequest build() {
+            return new SubmitRequest(this.recordSql, this.engine, this.owner, 
this.maxRetryCount);
+        }
+    }
+
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
index 0e9d6629..8a28487f 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
@@ -1,7 +1,35 @@
 package org.apache.griffin.core.worker.entity.enums;
 
 public enum DQEngineEnum {
-    PRESTO,
-    SPARK,
-    HIVE;
+    PRESTO(0, 1),
+    SPARK(1, 2),
+    HIVE(2, -1);
+
+    private final int code;
+    private final int backEngineCode;
+
+    DQEngineEnum(int code, int backEngineCode) {
+        this.code = code;
+        this.backEngineCode = backEngineCode;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    /**
+     * @return DQEngineEnum
+     */
+    public DQEngineEnum getBackEngine() {
+        if (this.backEngineCode == -1) return null;
+        return findByCode(this.code);
+    }
+
+    public DQEngineEnum findByCode(int code) {
+        DQEngineEnum[] values = DQEngineEnum.values();
+        for (DQEngineEnum value : values) {
+            if (code == value.code) return value;
+        }
+        return null;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
index 4aa19f10..f15c94ba 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
@@ -1,4 +1,17 @@
 package org.apache.griffin.core.worker.entity.enums;
 
 public enum DQErrorCode {
+    SUCCESS(200),
+    INTERNAL_ERROR(500),
+    EXTERNAL_ERROR(400);
+
+    private final int code;
+
+    DQErrorCode(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
new file mode 100644
index 00000000..6e70721b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
@@ -0,0 +1,24 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQInstanceStatus {
+    ACCEPTED(0),
+    WAITTING(1),
+    SUBMITTING(2), // 任务提交中
+    RUNNING(3),
+    RECORDING(4),
+    EVALUATING(5),
+    EVALUATE_ALERTING(6), // Metric 需要告警
+    FAILED_ALERTING(7),   // 任务执行失败需要告警
+    SUCCESS(8),
+    FAILED(9);
+
+    private final int code;
+
+    DQInstanceStatus(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
index 68292a45..ab5e0716 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
@@ -1,12 +1,23 @@
 package org.apache.griffin.core.worker.entity.enums;
 
 public enum DQTaskStatus {
-    ACCEPTED,
-    WAITTING,
-    RUNNING,
-    RECORDING,
-    EVALUATING,
-    ALERTING,
-    SUCCESS,
-    FAILED;
+    WAITTING(0),
+    RECORDING(1),
+    RECORDED(1),
+    EVALUATING(2),
+    EVALUATED(2),
+    ALERTING(3),
+    ALERTED(3),
+    SUCCESS(4),
+    FAILED(5);
+
+    private final int code;
+
+    DQTaskStatus(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DispatcherJobStatusEnum.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DispatcherJobStatusEnum.java
new file mode 100644
index 00000000..a5ce96bb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DispatcherJobStatusEnum.java
@@ -0,0 +1,6 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DispatcherJobStatusEnum {
+    FINISHED,
+    RUNNING;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
new file mode 100644
index 00000000..6a36e9b4
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.entity.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Metric {
+    private long partitionTime;
+    private double metric;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
 
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
index cbb498b1..79759a02 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -1,135 +1,291 @@
 package org.apache.griffin.core.worker.schedule;
 
-import org.apache.griffin.core.worker.client.DispatcherClient;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.griffin.core.worker.context.WorkerContext;
 import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
 import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.service.DQInstanceService;
+import org.apache.griffin.core.worker.service.DQTaskService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 任务执行调度期 和 dispatcher交互
  */
+@Component
 public class TaskDispatcherScheduler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
-    @Autowired
+    private static final Logger log = 
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
+
     private WorkerContext wc;
+    private DQInstanceService dqInstanceService;
+    private DQTaskService dqTaskService;
+
+    @Autowired
+    public void setWc(WorkerContext wc) {
+        this.wc = wc;
+    }
+    @Autowired
+    public void setDqInstanceService(DQInstanceService dqInstanceService) {
+        this.dqInstanceService = dqInstanceService;
+    }
     @Autowired
-    private DispatcherClient dispatcherClient;
+    public void setDqTaskService(DQTaskService dqTaskService) {
+        this.dqTaskService = dqTaskService;
+    }
+
+    @PostConstruct
+    public void startEvaluetingAndAlertThread() {
+        // todo 用线程池运行
+        scanAlertingTask();
+        scanEvaluatingTask();
+    }
 
     /**
      * 进行任务调度
      */
-    @Scheduled(fixedDelay = 1 * 5 * 1000L)
+    @Scheduled(fixedDelay = 5 * 1000L)
     public void doTaskDispatcherScheduler() {
-        // 检查当前环境是否有可以提交任务到dispatcher(并发度限制)
-        if (!wc.canSubmitToDispatcher()) return;
+        log.info("doTaskDispatcherScheduler start.");
+        List<DQInstance> waittingToRemoveFromWaitingList = 
Lists.newArrayList();
+        Queue<DQInstance> waitingToRecordingDQInstanceQueue = 
Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
         // 从waitting队列获取任务
-        DQInstance dqInstance = wc.getWaittingTask();
-        // 开始提交任务 放到recording队列中
-        dqInstance.setStatus(DQTaskStatus.WAITTING);
-        wc.offerToRecordingTaskQueue(dqInstance);
+        try {
+            while (true) {
+                try {
+                    DQInstance dqInstance = 
waitingToRecordingDQInstanceQueue.poll();
+                    // 队列中无元素 结束循环
+                    if (dqInstance == null) break;
+                    if (DQInstanceStatus.ACCEPTED != dqInstance.getStatus()) {
+                        // 非初始状态
+                        // 同步数据库状态 缓存中的实例状态可能不是最新的 从数据库构建最新的实例 替换缓存中的实例
+                        dqInstance = 
dqInstanceService.getById(dqInstance.getId());
+                        // 根据状态放到对应队列
+                        waittingToRemoveFromWaitingList.add(dqInstance);
+                    } else {
+                        // 提交到recording队列
+                        // 开始提交任务 放到recording队列中
+                        if (dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.WAITTING)) {
+                            waittingToRemoveFromWaitingList.add(dqInstance);
+                        }
+                    }
+                } catch (Exception e) {
+                    // todo
+                    log.error("doTaskDispatcherScheduler scan waitting task 
failed, ex:", e);
+                }
+            }
+        } catch (Exception e) {
+            // todo
+            log.error("scanRecordingTask failed, ex:", e);
+        } finally {
+            // 根据状态分发到指定队列
+            
waittingToRemoveFromWaitingList.forEach(this::offerToSpecQueueByStatus);
+            // 从 原队列移除 队列移除
+            if (CollectionUtils.isNotEmpty(waittingToRemoveFromWaitingList)) 
wc.removeAll(wc.getWAITTING_TASK_QUEUE(), waittingToRemoveFromWaitingList);
+        }
+        log.info("doTaskDispatcherScheduler end.");
     }
 
-    private List<SubmitRequest> getSubmitRequest(DQInstance dqInstance) {
-
-        return null;
+    private void offerToSpecQueueByStatus(DQInstance instance) {
+        DQInstanceStatus status = instance.getStatus();
+        switch (status) {
+            case WAITTING:
+            case RUNNING:
+            case SUBMITTING:
+            case RECORDING:
+                wc.offerToRecordingTaskQueue(instance);
+                break;
+            case EVALUATING:
+                wc.offerToEvaluatingTaskQueue(instance);
+                break;
+            case EVALUATE_ALERTING:
+            case FAILED_ALERTING:
+                wc.offerToAlertingTaskQueue(instance);
+                break;
+            case FAILED:
+                wc.addFailedDQInstanceInfo(instance);
+                break;
+            case SUCCESS:
+                wc.addSuccessDQInstanceInfo(instance);
+                break;
+            default:
+                // todo 未知状态 丢弃任务
+                log.warn("Unknown status, id : {}, status : {}, instance: {}", 
instance.getId(), status, instance);
+                break;
+        }
     }
 
-    @Scheduled(fixedDelay = 1 * 5 * 1000L)
+    @Scheduled(fixedDelay = 5 * 1000L)
     public void scanRecordingTask() {
+        List<DQInstance> waittingToRemoveFromRecordingList = 
Lists.newArrayList();
+        Queue<DQInstance> waitingToSubmitDQInstanceQueue = 
Queues.newPriorityBlockingQueue(wc.getRECORDING_TASK_QUEUE());
+        try {
+            while (CollectionUtils.isNotEmpty(waitingToSubmitDQInstanceQueue)) 
{
+                try {
+                    DQInstance dqInstance = 
waitingToSubmitDQInstanceQueue.poll();
+                    if (dqInstance == null) break;
+                    processRecordingInstance(dqInstance, 
waittingToRemoveFromRecordingList);
+                } catch (Exception e) {
+                    // todo
+                    log.error("scanRecordingTask failed, ex:", e);
+                }
+            }
+        } catch (Exception e) {
+            // todo
+            log.error("scanRecordingTask failed, ex:", e);
+        } finally {
+            // 根据状态分发到指定队列
+            
waittingToRemoveFromRecordingList.forEach(this::offerToSpecQueueByStatus);
+            // 从 record 队列移除
+            if (CollectionUtils.isNotEmpty(waittingToRemoveFromRecordingList)) 
wc.removeAll(wc.getRECORDING_TASK_QUEUE(), waittingToRemoveFromRecordingList);
+        }
+    }
 
-        // 遍历 recording tasks 检查状态进行更新
-        DQInstance dqInstance = wc.getRecordingTask();
-        // recording队列任务应该只有两种状态  如果服用队列的话,可能有多种状态 先做分队列的方式
-        switch (dqInstance.getStatus()) {
+    private void processRecordingInstance(DQInstance dqInstance, 
List<DQInstance> waittingToRemoveFromRecordingList) {
+        // 检查当前环境是否有可以提交任务到dispatcher(并发度限制  需要根据提交的引擎计算)
+        DQInstanceStatus instanceStatus = dqInstance.getStatus();
+        List<DQBaseTask> subTaskList = dqInstance.getSubTaskList();
+        switch (instanceStatus) {
+            case ACCEPTED:
             case WAITTING:
-                // 提交任务
-                if (doSubmitRecordingTask(dqInstance)) {
-                    dqInstance.setStatus(DQTaskStatus.RUNNING);
-                    // 提交到队尾 等待下次处理
-                    wc.offerToRecordingTaskQueue(dqInstance);
-                } else {
-                    // 提交失败
-                    dqInstance.incrStatusAge();
-                    if (dqInstance.isFailed()) {
-                        wc.offerToAlertingTaskQueue(dqInstance);
-                    } else {
-                        // 如果失败 放回队尾 等待下次提交
-                        wc.offerToRecordingTaskQueue(dqInstance);
-                    }
+                dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.SUBMITTING);
+                submitTaskToDispatcher(subTaskList);
+                if (!dqInstance.hasTaskToSubmit()) {
+                    dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.RUNNING);
                 }
+                break;
+            case SUBMITTING:
+                submitTaskToDispatcher(subTaskList);
+                // 检查是否所有任务都已经提交
+                if (!dqInstance.hasTaskToSubmit()) {
+                    dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.RUNNING);
+                }
+                break;
             case RUNNING:
-                // 查询状态
-                boolean hasRunningTask = checkJobStatus(dqInstance);
-                if (hasRunningTask) {
-                    // 提交到队尾 等待下次轮询
-                    wc.offerToRecordingTaskQueue(dqInstance);
-                } else {
-                    // ? 是否拆任务提交到下一阶段 还是整体提交   先整体提交
-                    // 没有正在运行的子任务了 提交给下一阶段执行
-                    wc.offerToEvaluatingTaskQueue(dqInstance);
+                // 检查并更新任务状态
+                checkJobStatus(subTaskList);
+                if (dqInstance.isFinishRecord()) {
+                    // record 任务都完成了  准备移除
+                    waittingToRemoveFromRecordingList.add(dqInstance);
                 }
+                break;
+            default:
+                break;
         }
-        // 如果状态是完成
-            // 获取结果 放入task
-            // 从recording tasks中移除任务
-            // 放入到 evaluate 队列
-        // 未完成  等待下次轮询
     }
 
-    private boolean checkJobStatus(DQInstance dqInstance) {
-        List<JobStatus> jobStatusList = dqInstance.getJobStatusList();
-        boolean hasRunningTask = false;
-        for (JobStatus jobStatus : jobStatusList) {
-            if (jobStatus.isFinished()) continue;
-            JobStatusRequest jobStatusRequest =  
dispatcherClient.wrapperJobStatusRequest(jobStatus);
-            JobStatusResponse jobStatusResponse = 
dispatcherClient.getJobStatus(jobStatusRequest);
-            if (jobStatusResponse.isSuccess()) {
-                jobStatus.setFinished(true);
-            } else {
-                hasRunningTask = true;
+    private void checkJobStatus(List<DQBaseTask> subTaskList) {
+        subTaskList.forEach(task -> dqTaskService.checkJobStatus(task));
+    }
+
+    private void submitTaskToDispatcher(List<DQBaseTask> subTaskList) {
+        // 遍历 recording tasks 检查状态进行更新
+        subTaskList.forEach(task -> {
+            DQTaskStatus taskStatus = task.getStatus();
+            switch (taskStatus) {
+                case WAITTING:
+                    // 提交任务
+                    doSubmitTaskToDispatcher(task);
+                    if (task.isFailed()) {
+                        // 提交一直失败
+                        dqTaskService.updateTaskStatus(task, 
DQTaskStatus.FAILED);
+                    }
+                    break;
+                case RECORDING:
+                    // 查询结果
+                    boolean isFinished = dqTaskService.checkJobStatus(task);
+                    if (isFinished) {
+                        // 任务结束, 设置任务状态为record结束
+                        dqTaskService.updateTaskStatus(task, 
DQTaskStatus.RECORDED);
+                    }
+                    break;
+                default:
+                    // 其余状态不处理
+                    break;
             }
-        }
-        return hasRunningTask;
+        });
     }
 
-    private boolean doSubmitRecordingTask(DQInstance dqInstance) {
-        // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
-        List<SubmitRequest> requestList = getSubmitRequest(dqInstance);
-        // 提交任务 获取任务对应的job信息
-        List<JobStatus> jobStatusList = requestList.stream()
-                .map(req -> dispatcherClient.submitSql(req))
-                .map(resp -> dispatcherClient.wrapperSubmitResponse(resp))
-                .collect(Collectors.toList());
-        // 设置job信息
-        dqInstance.setJobStatusList(jobStatusList);
-        return true;
+    private void doSubmitTaskToDispatcher(DQBaseTask task) {
+        // 并发度检查
+        if (!wc.canSubmitToSpecEngine(task.getEngine())) return;
+        if (dqTaskService.doSubmitRecordingTask(task)) {
+            // 任务提交成功  更新状态为recording
+            dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDING);
+        } else {
+            // 提交失败 记录一次失败
+            task.incrStatusAge();
+        }
     }
 
     public void scanEvaluatingTask() {
-        // 遍历 evaluate 队列
-        // 如果状态是完成
-        // 获取结果 放入task
-        // 从evaluate tasks中移除任务
-        // 放入到alert队列
-        // 未完成  等待下次轮询
+        LinkedBlockingQueue<DQInstance> evaluating_task_queue = 
wc.getEVALUATING_TASK_QUEUE();
+        while (true) {
+            DQInstance dqInstance = null;
+            try {
+                // Evaluating 来一个处理一个
+                dqInstance = evaluating_task_queue.poll(5, TimeUnit.SECONDS);
+                if (dqInstance == null) continue;
+                // 根据状态打回任务
+                // 执行evaluating
+                if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING) {
+                    dqInstance.doEvaluteTask();
+                } else {
+                    offerToSpecQueueByStatus(dqInstance);
+                }
+            } catch (Exception e) {
+                if (dqInstance != null) {
+                    log.error("scanEvaluatingTask doEvalute failed, id : {}, 
instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+                    dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.FAILED_ALERTING);
+                    offerToSpecQueueByStatus(dqInstance);
+                } else {
+                    log.error("scanEvaluatingTask poll instance failed. ex:", 
e);
+                }
+            }
+        }
     }
 
     public void scanAlertingTask() {
-        // 遍历 evaluate 队列
-        // 如果状态是完成
-        // 获取结果 放入task
-        // 从evaluate tasks中移除任务
-        // 放入到alert队列
-        // 未完成  等待下次轮询
+        LinkedBlockingQueue<DQInstance> alerting_task_queue = 
wc.getALERTING_TASK_QUEUE();
+        while (true) {
+            DQInstance dqInstance = null;
+            try {
+                // Evaluating 来一个处理一个
+                dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
+                if (dqInstance == null) continue;
+                // 根据状态打回任务
+                // 执行evaluating
+                if (dqInstance.getStatus() == DQInstanceStatus.FAILED_ALERTING 
|| dqInstance.getStatus() == DQInstanceStatus.EVALUATE_ALERTING) {
+                    dqInstance.doAlertTask();
+                } else {
+                    offerToSpecQueueByStatus(dqInstance);
+                }
+            } catch (Exception e) {
+                if (dqInstance != null) {
+                    log.error("scanAlertingTask doAlert failed, id : {}, 
instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+                    if (dqInstance.isFailed()) {
+                        // 重试很多次了 直接设置为失败
+                        dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.FAILED);
+                        // 没有失败很多次的话,状态不修改 放回队列重试
+                    }
+                    // 放回队列准备重试
+                    offerToSpecQueueByStatus(dqInstance);
+                } else {
+                    log.error("scanAlertingTask poll instance failed. ex:", e);
+                }
+            }
+        }
     }
-
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
new file mode 100644
index 00000000..a39c17d1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
@@ -0,0 +1,39 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.dao.DQInstanceDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQInstanceService {
+    private static final Logger log = 
LoggerFactory.getLogger(DQInstanceService.class);
+
+    @Autowired
+    private DQInstanceDao dqInstanceDao;
+
+    /**
+     * 数据库和内存需要保持同步
+     * @param instance
+     * @param status
+     * @return
+     */
+    public boolean updateStatus(DQInstance instance, DQInstanceStatus status) {
+        try {
+            dqInstanceDao.updateDQInstanceStatus(instance, status.getCode());
+            instance.setStatus(status);
+            log.info("instance {} {} => {} success.", instance.getId(), 
instance.getStatus(), status);
+            return true;
+        } catch (Exception e) {
+            log.error("instance {} {} => {} failed, ex", instance.getId(), 
instance.getStatus(), status, e);
+        }
+        return false;
+    }
+
+    public DQInstance getById(long id) {
+        return null;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
new file mode 100644
index 00000000..8c0f018b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
@@ -0,0 +1,106 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.griffin.core.worker.client.DispatcherClient;
+import org.apache.griffin.core.worker.dao.DQInstanceDao;
+import org.apache.griffin.core.worker.dao.DQTaskDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Service
+public class DQTaskService {
+    private static final Logger log = 
LoggerFactory.getLogger(DQTaskService.class);
+
+    @Autowired
+    private DQTaskDao dqTaskDao;
+    @Autowired
+    private DispatcherClient dispatcherClient;
+
+    public boolean updateTaskStatus(DQBaseTask task, DQTaskStatus status) {
+        try {
+            dqTaskDao.updateDQTaskListStatus(task, status.getCode());
+            task.setStatus(status);
+            return true;
+        } catch (Exception e) {
+            log.error("task {} {} => {} failed, ex", task.getId(), 
task.getStatus(), status, e);
+        }
+        return false;
+    }
+
+    public boolean updateTaskStatus(List<DQBaseTask> tasks, DQTaskStatus 
status) {
+        if (CollectionUtils.isEmpty(tasks)) return false;
+        DQBaseTask sampleTask = tasks.get(0);
+        List<Long> taskIdList = 
tasks.stream().map(DQBaseTask::getId).collect(Collectors.toList());
+        try {
+            dqTaskDao.updateDQTaskListStatus(tasks, status.getCode());
+            tasks.forEach(x -> x.setStatus(status));
+            return true;
+        } catch (Exception e) {
+            log.error("task {} {} => {} failed, ex", taskIdList, 
sampleTask.getStatus(), status, e);
+        }
+        return false;
+    }
+
+    public boolean doSubmitRecordingTask(DQBaseTask task) {
+        // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
+        List<Pair<Long, SubmitRequest>> requestList = getSubmitRequest(task);
+        // 提交任务 获取任务对应的job信息
+        List<JobStatus> jobStatusList = requestList.stream()
+                .map(reqPair -> Pair.of(reqPair.getLeft(), 
dispatcherClient.submitSql(reqPair.getRight())))
+                .map(respPair -> 
dispatcherClient.wrapperSubmitResponse(respPair))
+                .collect(Collectors.toList());
+        // 设置job信息
+        task.setJobStatusList(jobStatusList);
+        return true;
+    }
+
+    public boolean checkJobStatus(DQBaseTask task) {
+        List<JobStatus> jobStatusList = task.getJobStatusList();
+        boolean isFinished = true;
+        for (JobStatus jobStatus : jobStatusList) {
+            if (jobStatus.isFinished()) continue;
+            JobStatusRequest jobStatusRequest =  
dispatcherClient.wrapperJobStatusRequest(jobStatus);
+            JobStatusResponse jobStatusResponse = 
dispatcherClient.getJobStatus(jobStatusRequest);
+            if (jobStatusResponse.isSuccess()) {
+                // 更新状态 获取结果
+                MetricRequest metricRequest = 
dispatcherClient.wrapperMetricRequest(jobStatus);
+                MetricResponse metricResponse = 
dispatcherClient.getMetricResult(metricRequest);
+                Double metric = metricResponse.getMetric();
+                task.addMetric(jobStatus.getPartitionTime(), metric);
+                jobStatus.setFinished(true);
+            } else {
+                // 只要有一个没有成功 就应该是false
+                isFinished = false;
+            }
+        }
+        return isFinished;
+    }
+
+    private List<Pair<Long, SubmitRequest>> getSubmitRequest(DQBaseTask task) {
+        List<Pair<Long, String>> partitionTimeAndSqlList = task.record();
+        return partitionTimeAndSqlList.stream()
+                .map(partitionTimeAndSql -> {
+                    Long paprtitionTime = partitionTimeAndSql.getLeft();
+                    String recordSql = partitionTimeAndSql.getRight();
+                    SubmitRequest request = SubmitRequest.builder()
+                            .recordSql(recordSql)
+                            .engine(task.getEngine())
+                            .owner(task.getOwner())
+                            .build();
+                    return Pair.of(paprtitionTime, request);
+                })
+                .collect(Collectors.toList());
+    }
+}
diff --git a/dispatcher/pom.xml b/dispatcher/pom.xml
index fce4ed45..da5cd950 100644
--- a/dispatcher/pom.xml
+++ b/dispatcher/pom.xml
@@ -226,17 +226,17 @@ under the License.
         </dependency>
 
         <!-- to access confluent schema registry -->
-        <dependency>
-            <groupId>io.confluent</groupId>
-            <artifactId>kafka-schema-registry-client</artifactId>
-            <version>${confluent.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>io.confluent</groupId>-->
+<!--            <artifactId>kafka-schema-registry-client</artifactId>-->
+<!--            <version>${confluent.version}</version>-->
+<!--            <exclusions>-->
+<!--                <exclusion>-->
+<!--                    <groupId>org.slf4j</groupId>-->
+<!--                    <artifactId>slf4j-log4j12</artifactId>-->
+<!--                </exclusion>-->
+<!--            </exclusions>-->
+<!--        </dependency>-->
 
         <!--schedule-->
         <dependency>
diff --git a/pom.xml b/pom.xml
index 119d0af4..9cca4fe1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,7 @@ under the License.
                     <artifactId>maven-compiler-plugin</artifactId>
                     <version>3.8.1</version>
                     <configuration>
+                        <encoding>utf8</encoding>
                         <source>${maven.compiler.source}</source>
                         <target>${maven.compiler.target}</target>
                     </configuration>
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index 03a88ff6..0cd03a89 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -226,17 +226,17 @@ under the License.
         </dependency>
 
         <!-- to access confluent schema registry -->
-        <dependency>
-            <groupId>io.confluent</groupId>
-            <artifactId>kafka-schema-registry-client</artifactId>
-            <version>${confluent.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>io.confluent</groupId>-->
+<!--            <artifactId>kafka-schema-registry-client</artifactId>-->
+<!--            <version>${confluent.version}</version>-->
+<!--            <exclusions>-->
+<!--                <exclusion>-->
+<!--                    <groupId>org.slf4j</groupId>-->
+<!--                    <artifactId>slf4j-log4j12</artifactId>-->
+<!--                </exclusion>-->
+<!--            </exclusions>-->
+<!--        </dependency>-->
 
         <!--schedule-->
         <dependency>

Reply via email to