This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
     new d4fe792a Griffin 1.0.0 dev (#613)
d4fe792a is described below

commit d4fe792a8b1e58656f5bbd9eb84c02e23e1130c8
Author: dabuliud <[email protected]>
AuthorDate: Thu Nov 3 09:05:46 2022 +0800

    Griffin 1.0.0 dev (#613)
    
    * Update GriffinWebApplication.java
    
    * test  update
    
    * worker
    
    * first commit
    
    * dev
    
    * recording流程处理
    
    * Griffin 1.0.0 dev (#611)
    
    * init griffin new workflow
    
    * model data quality request
    
    Co-authored-by: guoyp <[email protected]>
    
    Co-authored-by: Warden <[email protected]>
    Co-authored-by: William Guo <[email protected]>
    Co-authored-by: guoyp <[email protected]>
---
 core/pom.xml                                       |   8 ++
 .../core/worker/client/DispatcherClient.java       |  35 ++++++
 .../griffin/core/worker/context/WorkerContext.java |  82 +++++++++++++
 .../core/worker/driver/PrestoTemplateDriver.java   |  13 ++
 .../core/worker/driver/SparkTemplateDriver.java    |  12 ++
 .../griffin/core/worker/driver/TemplateDriver.java |  27 +++++
 .../griffin/core/worker/entity/bo/DQInstance.java  |  44 +++++++
 .../core/worker/entity/bo/task/DQBaseTask.java     |  30 +++++
 .../core/worker/entity/bo/task/DQHiveTask.java     |  22 ++++
 .../core/worker/entity/bo/task/DQKafkaTask.java    |  18 +++
 .../core/worker/entity/dispatcher/JobStatus.java   |  10 ++
 .../worker/entity/dispatcher/JobStatusRequest.java |   4 +
 .../entity/dispatcher/JobStatusResponse.java       |   7 ++
 .../worker/entity/dispatcher/SubmitRequest.java    |  12 ++
 .../worker/entity/dispatcher/SubmitResponse.java   |  12 ++
 .../core/worker/entity/enums/DQEngineEnum.java     |   7 ++
 .../core/worker/entity/enums/DQErrorCode.java      |   4 +
 .../core/worker/entity/enums/DQTaskStatus.java     |  12 ++
 .../core/worker/entity/pojo/rule/DQAlertRule.java  |   4 +
 .../worker/entity/pojo/rule/DQEvaluateRule.java    |   4 +
 .../core/worker/entity/pojo/rule/DQRecordRule.java |   9 ++
 .../entity/pojo/template/DQRecordBaseTemplate.java |   4 +
 .../pojo/template/DQRecordCompostiveTemplate.java  |   4 +
 .../entity/pojo/template/DQRecordTemplate.java     |   5 +
 .../griffin/core/worker/factory/TaskFactory.java   |   9 ++
 .../core/worker/factory/TemplateDriverFactory.java |  12 ++
 .../worker/schedule/TaskDispatcherScheduler.java   | 135 +++++++++++++++++++++
 .../core/worker/service/DQTemplateService.java     |  15 +++
 .../core/worker/service/WorkCoreService.java       |  33 +++++
 .../core/worker/service/WorkerManageService.java   |  26 ++++
 30 files changed, 619 insertions(+)

diff --git a/core/pom.xml b/core/pom.xml
index 57aadedf..d510a5d4 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -297,6 +297,14 @@ under the License.
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>${elasticsearch-rest-client.version}</version>
         </dependency>
+
+        <!--   新增依赖     -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.20</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <!--if you need mysql, please uncomment mysql-connector-java -->
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
 
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
new file mode 100644
index 00000000..dbbcd6cd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -0,0 +1,35 @@
+package org.apache.griffin.core.worker.client;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.dispatcher.*;
+
+/**
+ * Dispatcher客户端 负责与dispatcher交互
+ */
+public class DispatcherClient {
+
+    public SubmitResponse submitSql(SubmitRequest request) {
+        return null;
+    }
+
+
+    public JobStatusResponse getJobStatus(JobStatusRequest jobStatusRequest) {
+        return null;
+    }
+
+    public double getMetricResult() {
+        return 0.0d;
+    }
+
+    public SubmitRequest wrapperDQTask(DQInstance waittingTask) {
+        return null;
+    }
+
+    public JobStatus wrapperSubmitResponse(SubmitResponse resp) {
+        return null;
+    }
+
+    public JobStatusRequest wrapperJobStatusRequest(JobStatus jobStatus) {
+        return null;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java 
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
new file mode 100644
index 00000000..17e238de
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
@@ -0,0 +1,82 @@
+package org.apache.griffin.core.worker.context;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * 上下文信息  全局唯一
+ */
+@Component
+public class WorkerContext {
+
+    private final Queue<DQInstance> WAITTING_TASK_QUEUE;
+//    public static final List<DQBaseTask> runningTaskIdQueue = 
Lists.newCopyOnWriteArrayList();
+    // runningTaskIdList = RECORDING_TASK_LIST + EVALUATING_TASK_LIST + 
ALERTING_TASK_LIST
+    private final Queue<DQInstance> RECORDING_TASK_QUEUE;
+    private final Queue<DQInstance> EVALUATING_TASK_QUEUE;
+    private final Queue<DQInstance> ALERTING_TASK_QUEUE;
+
+    // success和failed队列数据老化问题?
+    public final List<DQInstance> successTaskIdList;
+    public final List<DQInstance> failedTaskIdList;
+    
+    public WorkerContext() {
+        // 设置队列长度
+        WAITTING_TASK_QUEUE = Queues.newPriorityQueue();
+        RECORDING_TASK_QUEUE = Queues.newPriorityQueue();
+        EVALUATING_TASK_QUEUE = Queues.newPriorityQueue();
+        ALERTING_TASK_QUEUE = Queues.newPriorityQueue();
+        successTaskIdList = Lists.newArrayList();
+        failedTaskIdList = Lists.newArrayList();
+    }
+    
+    @PostConstruct
+    public void init() {
+        initQueueInfo();
+        resetTaskStatusWhenStartUp();
+    }
+
+    private void initQueueInfo() {
+        
+    }
+
+    public DQInstance getWaittingTask() {
+        return null;
+    }
+
+    public DQInstance getRecordingTask() {
+        return null;
+    }
+
+    public void offerToRecordingTaskQueue(DQInstance dqInstance) {
+    }
+    public void offerToAlertingTaskQueue(DQInstance dqInstance) {
+    }
+
+    /**
+     * 启动时,加载让分配在该节点的任务信息到
+     */
+    public void resetTaskStatusWhenStartUp() {
+
+    }
+
+    // 统计当前节点任务信息
+    public void getWorkerTaskStatus() {
+
+    }
+
+    public boolean canSubmitToDispatcher() {
+        return false;
+    }
+
+
+    public void offerToEvaluatingTaskQueue(DQInstance dqInstance) {
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
 
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
new file mode 100644
index 00000000..c0ab20dc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -0,0 +1,13 @@
+package org.apache.griffin.core.worker.driver;
+
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+
+import java.util.Map;
+
+public class PrestoTemplateDriver extends TemplateDriver{
+    @Override
+    public String getRecordSql(DQRecordTemplate template, Map<String, String> 
params) {
+        return null;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
 
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
new file mode 100644
index 00000000..6e54d4d1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.driver;
+
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+
+import java.util.Map;
+
+public class SparkTemplateDriver extends TemplateDriver {
+    @Override
+    public String getRecordSql(DQRecordTemplate template, Map<String, String> 
params) {
+        return null;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java 
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
new file mode 100644
index 00000000..74bd7079
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -0,0 +1,27 @@
+package org.apache.griffin.core.worker.driver;
+
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.factory.TemplateDriverFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+@Service
+public abstract class TemplateDriver {
+
+    @Autowired
+    private TemplateDriverFactory templateDriverFactory;
+
+    /**
+     * 拼出的SQL 返回值 必须是 <ruleId, Partition, Metric>  这样可以做SQL合并
+     */
+    public abstract String getRecordSql(DQRecordTemplate template, Map<String, 
String> params);
+
+    public String getRecordSql(DQEngineEnum engine, DQRecordTemplate template, 
Map<String, String> params) {
+        TemplateDriver templateDriver = 
templateDriverFactory.getTemplateDrvier(engine);
+        return templateDriver.getRecordSql(template, params);
+    }
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
new file mode 100644
index 00000000..0eacac7f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
@@ -0,0 +1,44 @@
+package org.apache.griffin.core.worker.entity.bo;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+
+import java.util.List;
+
+/**
+ * 任务实例, 当前任务运行时的快照
+ *      一个实例包含多个子任务
+ */
+@Data
+public class DQInstance {
+    private Long id;
+
+    // 实例状态
+    private DQTaskStatus status;
+    // 记录状态年龄  状态更新是重置
+    private int statusAge;
+    // 任务信息
+    private List<DQBaseTask> subTaskList;
+
+    private List<JobStatus> jobStatusList;
+
+    public void setStatus(DQTaskStatus status) {
+        if (this.status != status) resetStatusAge();
+        this.status = status;
+    }
+
+    public void resetStatusAge() {
+        statusAge = 0;
+    }
+
+    public void incrStatusAge() {
+        statusAge++;
+    }
+
+    public boolean isFailed() {
+        // 一个状态年龄处理了5次 无法变更 说明处理该任务一直失败
+        return statusAge > 5;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
new file mode 100644
index 00000000..f597bbb4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -0,0 +1,30 @@
+package org.apache.griffin.core.worker.entity.bo.task;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
+
+/**
+ * 任务基础类
+ *   一个完整的任务包含
+ *      record
+ *      evaluate
+ *      alert
+ */
+@Data
+public abstract class DQBaseTask {
+
+    private WorkerContext wc;
+
+    private DQRecordRule recordRule;
+    private DQEvaluateRule dqEvaluateRule;
+    private DQAlertRule dqAlertRule;
+
+    // 生成recordsql和 模板 + 参数 有关系
+    // 生成SQL部分希望交给模板来做
+    public abstract void doRecord();
+    public abstract void doEvaluate();
+    public abstract void doAlert();
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
new file mode 100644
index 00000000..96be4deb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -0,0 +1,22 @@
+package org.apache.griffin.core.worker.entity.bo.task;
+
+/**
+ * 表维度
+ */
+public class DQHiveTask extends DQBaseTask {
+
+    @Override
+    public void doRecord() {
+
+    }
+
+    @Override
+    public void doEvaluate() {
+
+    }
+
+    @Override
+    public void doAlert() {
+
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
new file mode 100644
index 00000000..7f8c13d5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
@@ -0,0 +1,18 @@
+package org.apache.griffin.core.worker.entity.bo.task;
+
+public class DQKafkaTask extends DQBaseTask {
+    @Override
+    public void doRecord() {
+
+    }
+
+    @Override
+    public void doEvaluate() {
+
+    }
+
+    @Override
+    public void doAlert() {
+
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
new file mode 100644
index 00000000..57f56341
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+
+@Data
+public class JobStatus {
+    private String jobId;
+    private boolean finished = false;
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
new file mode 100644
index 00000000..ab991c46
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+public class JobStatusRequest {
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
new file mode 100644
index 00000000..bf38b956
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+public class JobStatusResponse {
+    public boolean isSuccess() {
+        return false;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
new file mode 100644
index 00000000..ae52ad7a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+
+@Data
+public class SubmitRequest {
+    private String recordSql;
+    private DQEngineEnum engine;  // Spark,Hive,Presto,etc.
+    private String owner;
+    private Integer maxRetryCount;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitResponse.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitResponse.java
new file mode 100644
index 00000000..b367e09a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitResponse.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
+
+@Data
+public class SubmitResponse {
+    private Integer code;
+    private String jobId;
+    private DQErrorCode errorCode;
+    private Exception ex;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
new file mode 100644
index 00000000..0e9d6629
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQEngineEnum {
+    PRESTO,
+    SPARK,
+    HIVE;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
new file mode 100644
index 00000000..4aa19f10
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQErrorCode {
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
new file mode 100644
index 00000000..68292a45
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQTaskStatus {
+    ACCEPTED,
+    WAITTING,
+    RUNNING,
+    RECORDING,
+    EVALUATING,
+    ALERTING,
+    SUCCESS,
+    FAILED;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
new file mode 100644
index 00000000..86d4fca8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.rule;
+
+public class DQAlertRule {
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
new file mode 100644
index 00000000..5fc51854
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.rule;
+
+public class DQEvaluateRule {
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
new file mode 100644
index 00000000..94deba4b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
@@ -0,0 +1,9 @@
+package org.apache.griffin.core.worker.entity.pojo.rule;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+
+@Data
+public class DQRecordRule {
+    private DQRecordTemplate template;
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
new file mode 100644
index 00000000..fbe51281
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.template;
+
+public class DQRecordBaseTemplate {
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
new file mode 100644
index 00000000..cfccf84a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.template;
+
+public class DQRecordCompostiveTemplate {
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
new file mode 100644
index 00000000..d4b12047
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
@@ -0,0 +1,5 @@
+package org.apache.griffin.core.worker.entity.pojo.template;
+
+public class DQRecordTemplate {
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java 
b/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
new file mode 100644
index 00000000..b6b79f32
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
@@ -0,0 +1,9 @@
+package org.apache.griffin.core.worker.factory;
+
+public class TaskFactory {
+
+
+
+
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
 
b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
new file mode 100644
index 00000000..32970020
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.worker.driver.TemplateDriver;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TemplateDriverFactory {
+    public TemplateDriver getTemplateDrvier(DQEngineEnum engine) {
+        return null;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
 
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
new file mode 100644
index 00000000..cbb498b1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -0,0 +1,135 @@
+package org.apache.griffin.core.worker.schedule;
+
+import org.apache.griffin.core.worker.client.DispatcherClient;
+import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 任务执行调度期 和 dispatcher交互
+ */
+public class TaskDispatcherScheduler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
+    @Autowired
+    private WorkerContext wc;
+    @Autowired
+    private DispatcherClient dispatcherClient;
+
+    /**
+     * 进行任务调度
+     */
+    @Scheduled(fixedDelay = 1 * 5 * 1000L)
+    public void doTaskDispatcherScheduler() {
+        // 检查当前环境是否有可以提交任务到dispatcher(并发度限制)
+        if (!wc.canSubmitToDispatcher()) return;
+        // 从waitting队列获取任务
+        DQInstance dqInstance = wc.getWaittingTask();
+        // 开始提交任务 放到recording队列中
+        dqInstance.setStatus(DQTaskStatus.WAITTING);
+        wc.offerToRecordingTaskQueue(dqInstance);
+    }
+
+    private List<SubmitRequest> getSubmitRequest(DQInstance dqInstance) {
+
+        return null;
+    }
+
+    @Scheduled(fixedDelay = 1 * 5 * 1000L)
+    public void scanRecordingTask() {
+
+        // 遍历 recording tasks 检查状态进行更新
+        DQInstance dqInstance = wc.getRecordingTask();
+        // recording队列任务应该只有两种状态  如果服用队列的话,可能有多种状态 先做分队列的方式
+        switch (dqInstance.getStatus()) {
+            case WAITTING:
+                // 提交任务
+                if (doSubmitRecordingTask(dqInstance)) {
+                    dqInstance.setStatus(DQTaskStatus.RUNNING);
+                    // 提交到队尾 等待下次处理
+                    wc.offerToRecordingTaskQueue(dqInstance);
+                } else {
+                    // 提交失败
+                    dqInstance.incrStatusAge();
+                    if (dqInstance.isFailed()) {
+                        wc.offerToAlertingTaskQueue(dqInstance);
+                    } else {
+                        // 如果失败 放回队尾 等待下次提交
+                        wc.offerToRecordingTaskQueue(dqInstance);
+                    }
+                }
+            case RUNNING:
+                // 查询状态
+                boolean hasRunningTask = checkJobStatus(dqInstance);
+                if (hasRunningTask) {
+                    // 提交到队尾 等待下次轮询
+                    wc.offerToRecordingTaskQueue(dqInstance);
+                } else {
+                    // ? 是否拆任务提交到下一阶段 还是整体提交   先整体提交
+                    // 没有正在运行的子任务了 提交给下一阶段执行
+                    wc.offerToEvaluatingTaskQueue(dqInstance);
+                }
+        }
+        // 如果状态是完成
+            // 获取结果 放入task
+            // 从recording tasks中移除任务
+            // 放入到 evaluate 队列
+        // 未完成  等待下次轮询
+    }
+
+    private boolean checkJobStatus(DQInstance dqInstance) {
+        List<JobStatus> jobStatusList = dqInstance.getJobStatusList();
+        boolean hasRunningTask = false;
+        for (JobStatus jobStatus : jobStatusList) {
+            if (jobStatus.isFinished()) continue;
+            JobStatusRequest jobStatusRequest =  
dispatcherClient.wrapperJobStatusRequest(jobStatus);
+            JobStatusResponse jobStatusResponse = 
dispatcherClient.getJobStatus(jobStatusRequest);
+            if (jobStatusResponse.isSuccess()) {
+                jobStatus.setFinished(true);
+            } else {
+                hasRunningTask = true;
+            }
+        }
+        return hasRunningTask;
+    }
+
+    private boolean doSubmitRecordingTask(DQInstance dqInstance) {
+        // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
+        List<SubmitRequest> requestList = getSubmitRequest(dqInstance);
+        // 提交任务 获取任务对应的job信息
+        List<JobStatus> jobStatusList = requestList.stream()
+                .map(req -> dispatcherClient.submitSql(req))
+                .map(resp -> dispatcherClient.wrapperSubmitResponse(resp))
+                .collect(Collectors.toList());
+        // 设置job信息
+        dqInstance.setJobStatusList(jobStatusList);
+        return true;
+    }
+
+    public void scanEvaluatingTask() {
+        // 遍历 evaluate 队列
+        // 如果状态是完成
+        // 获取结果 放入task
+        // 从evaluate tasks中移除任务
+        // 放入到alert队列
+        // 未完成  等待下次轮询
+    }
+
+    public void scanAlertingTask() {
+        // 遍历 evaluate 队列
+        // 如果状态是完成
+        // 获取结果 放入task
+        // 从evaluate tasks中移除任务
+        // 放入到alert队列
+        // 未完成  等待下次轮询
+    }
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTemplateService.java
 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTemplateService.java
new file mode 100644
index 00000000..cc236aba
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTemplateService.java
@@ -0,0 +1,15 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.driver.TemplateDriver;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQTemplateService {
+
+    @Autowired
+    private TemplateDriver templateDriver;
+
+//    public void void
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
 
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
new file mode 100644
index 00000000..314a328d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
@@ -0,0 +1,33 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.context.WorkerContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class WorkCoreService {
+
+    @Autowired
+    private WorkerContext workerContext;
+
+    public void submitDQTask(Long instanceId) {
+        // 收到提交请求
+        // 参数校验
+        // 判断环境是否可以接收任务
+        // 构建任务实例 怎么保证实例生成的ID一样? 1 实例ID由master指派  2 entity id + partition 3 
实例由master生成
+            // 构建新实例
+            // 恢复实例 (任务重新分配过来的)
+        // 根据状态提交队列等待执行
+            // 新构建的实例 -> waitting 等待调度
+            // 恢复的实例  根据状态调度
+    }
+
+    public void stopDQTask(Long instanceId) {
+
+    }
+
+    public void querySingleDQTask(Long instanceId) {
+
+    }
+
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/WorkerManageService.java
 
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkerManageService.java
new file mode 100644
index 00000000..2fd2ff4e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkerManageService.java
@@ -0,0 +1,26 @@
+package org.apache.griffin.core.worker.service;
+
+import org.springframework.stereotype.Service;
+
+/**
+ * Worker管理服务
+ */
+@Service
+public class WorkerManageService {
+
+    // 注册节点
+    public void registDQWorkerNode() {
+
+    }
+
+    // 获取节点任务状态信息
+    public void getDQWorkNodeStatus() {
+
+    }
+
+    // 上报心跳
+    public void nodeHeartBeat() {
+
+    }
+
+}

Reply via email to