This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
new d4fe792a Griffin 1.0.0 dev (#613)
d4fe792a is described below
commit d4fe792a8b1e58656f5bbd9eb84c02e23e1130c8
Author: dabuliud <[email protected]>
AuthorDate: Thu Nov 3 09:05:46 2022 +0800
Griffin 1.0.0 dev (#613)
* Update GriffinWebApplication.java
* test update
* worker
* first commit
* dev
* recording流程处理
* Griffin 1.0.0 dev (#611)
* init griffin new workflow
* model data quality request
Co-authored-by: guoyp <[email protected]>
Co-authored-by: Warden <[email protected]>
Co-authored-by: William Guo <[email protected]>
Co-authored-by: guoyp <[email protected]>
---
core/pom.xml | 8 ++
.../core/worker/client/DispatcherClient.java | 35 ++++++
.../griffin/core/worker/context/WorkerContext.java | 82 +++++++++++++
.../core/worker/driver/PrestoTemplateDriver.java | 13 ++
.../core/worker/driver/SparkTemplateDriver.java | 12 ++
.../griffin/core/worker/driver/TemplateDriver.java | 27 +++++
.../griffin/core/worker/entity/bo/DQInstance.java | 44 +++++++
.../core/worker/entity/bo/task/DQBaseTask.java | 30 +++++
.../core/worker/entity/bo/task/DQHiveTask.java | 22 ++++
.../core/worker/entity/bo/task/DQKafkaTask.java | 18 +++
.../core/worker/entity/dispatcher/JobStatus.java | 10 ++
.../worker/entity/dispatcher/JobStatusRequest.java | 4 +
.../entity/dispatcher/JobStatusResponse.java | 7 ++
.../worker/entity/dispatcher/SubmitRequest.java | 12 ++
.../worker/entity/dispatcher/SubmitResponse.java | 12 ++
.../core/worker/entity/enums/DQEngineEnum.java | 7 ++
.../core/worker/entity/enums/DQErrorCode.java | 4 +
.../core/worker/entity/enums/DQTaskStatus.java | 12 ++
.../core/worker/entity/pojo/rule/DQAlertRule.java | 4 +
.../worker/entity/pojo/rule/DQEvaluateRule.java | 4 +
.../core/worker/entity/pojo/rule/DQRecordRule.java | 9 ++
.../entity/pojo/template/DQRecordBaseTemplate.java | 4 +
.../pojo/template/DQRecordCompostiveTemplate.java | 4 +
.../entity/pojo/template/DQRecordTemplate.java | 5 +
.../griffin/core/worker/factory/TaskFactory.java | 9 ++
.../core/worker/factory/TemplateDriverFactory.java | 12 ++
.../worker/schedule/TaskDispatcherScheduler.java | 135 +++++++++++++++++++++
.../core/worker/service/DQTemplateService.java | 15 +++
.../core/worker/service/WorkCoreService.java | 33 +++++
.../core/worker/service/WorkerManageService.java | 26 ++++
30 files changed, 619 insertions(+)
diff --git a/core/pom.xml b/core/pom.xml
index 57aadedf..d510a5d4 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -297,6 +297,14 @@ under the License.
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch-rest-client.version}</version>
</dependency>
+
+ <!-- 新增依赖 -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.20</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<profiles>
<!--if you need mysql, please uncomment mysql-connector-java -->
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
new file mode 100644
index 00000000..dbbcd6cd
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -0,0 +1,35 @@
+package org.apache.griffin.core.worker.client;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.dispatcher.*;
+
+/**
+ * Dispatcher客户端 负责与dispatcher交互
+ */
+public class DispatcherClient {
+
+ public SubmitResponse submitSql(SubmitRequest request) {
+ return null;
+ }
+
+
+ public JobStatusResponse getJobStatus(JobStatusRequest jobStatusRequest) {
+ return null;
+ }
+
+ public double getMetricResult() {
+ return 0.0d;
+ }
+
+ public SubmitRequest wrapperDQTask(DQInstance waittingTask) {
+ return null;
+ }
+
+ public JobStatus wrapperSubmitResponse(SubmitResponse resp) {
+ return null;
+ }
+
+ public JobStatusRequest wrapperJobStatusRequest(JobStatus jobStatus) {
+ return null;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
new file mode 100644
index 00000000..17e238de
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
@@ -0,0 +1,82 @@
+package org.apache.griffin.core.worker.context;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * 上下文信息 全局唯一
+ */
+@Component
+public class WorkerContext {
+
+ private final Queue<DQInstance> WAITTING_TASK_QUEUE;
+// public static final List<DQBaseTask> runningTaskIdQueue =
Lists.newCopyOnWriteArrayList();
+ // runningTaskIdList = RECORDING_TASK_LIST + EVALUATING_TASK_LIST +
ALERTING_TASK_LIST
+ private final Queue<DQInstance> RECORDING_TASK_QUEUE;
+ private final Queue<DQInstance> EVALUATING_TASK_QUEUE;
+ private final Queue<DQInstance> ALERTING_TASK_QUEUE;
+
+ // success和failed队列数据老化问题?
+ public final List<DQInstance> successTaskIdList;
+ public final List<DQInstance> failedTaskIdList;
+
+ public WorkerContext() {
+ // 设置队列长度
+ WAITTING_TASK_QUEUE = Queues.newPriorityQueue();
+ RECORDING_TASK_QUEUE = Queues.newPriorityQueue();
+ EVALUATING_TASK_QUEUE = Queues.newPriorityQueue();
+ ALERTING_TASK_QUEUE = Queues.newPriorityQueue();
+ successTaskIdList = Lists.newArrayList();
+ failedTaskIdList = Lists.newArrayList();
+ }
+
+ @PostConstruct
+ public void init() {
+ initQueueInfo();
+ resetTaskStatusWhenStartUp();
+ }
+
+ private void initQueueInfo() {
+
+ }
+
+ public DQInstance getWaittingTask() {
+ return null;
+ }
+
+ public DQInstance getRecordingTask() {
+ return null;
+ }
+
+ public void offerToRecordingTaskQueue(DQInstance dqInstance) {
+ }
+ public void offerToAlertingTaskQueue(DQInstance dqInstance) {
+ }
+
+ /**
+ * 启动时,加载让分配在该节点的任务信息到
+ */
+ public void resetTaskStatusWhenStartUp() {
+
+ }
+
+ // 统计当前节点任务信息
+ public void getWorkerTaskStatus() {
+
+ }
+
+ public boolean canSubmitToDispatcher() {
+ return false;
+ }
+
+
+ public void offerToEvaluatingTaskQueue(DQInstance dqInstance) {
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
new file mode 100644
index 00000000..c0ab20dc
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -0,0 +1,13 @@
+package org.apache.griffin.core.worker.driver;
+
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+
+import java.util.Map;
+
+public class PrestoTemplateDriver extends TemplateDriver{
+ @Override
+ public String getRecordSql(DQRecordTemplate template, Map<String, String>
params) {
+ return null;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
new file mode 100644
index 00000000..6e54d4d1
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.driver;
+
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+
+import java.util.Map;
+
+public class SparkTemplateDriver extends TemplateDriver {
+ @Override
+ public String getRecordSql(DQRecordTemplate template, Map<String, String>
params) {
+ return null;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
new file mode 100644
index 00000000..74bd7079
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -0,0 +1,27 @@
+package org.apache.griffin.core.worker.driver;
+
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.factory.TemplateDriverFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+@Service
+public abstract class TemplateDriver {
+
+ @Autowired
+ private TemplateDriverFactory templateDriverFactory;
+
+ /**
+ * 拼出的SQL 返回值 必须是 <ruleId, Partition, Metric> 这样可以做SQL合并
+ */
+ public abstract String getRecordSql(DQRecordTemplate template, Map<String,
String> params);
+
+ public String getRecordSql(DQEngineEnum engine, DQRecordTemplate template,
Map<String, String> params) {
+ TemplateDriver templateDriver =
templateDriverFactory.getTemplateDrvier(engine);
+ return templateDriver.getRecordSql(template, params);
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
new file mode 100644
index 00000000..0eacac7f
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
@@ -0,0 +1,44 @@
+package org.apache.griffin.core.worker.entity.bo;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+
+import java.util.List;
+
+/**
+ * 任务实例, 当前任务运行时的快照
+ * 一个实例包含多个子任务
+ */
+@Data
+public class DQInstance {
+ private Long id;
+
+ // 实例状态
+ private DQTaskStatus status;
+ // 记录状态年龄 状态更新是重置
+ private int statusAge;
+ // 任务信息
+ private List<DQBaseTask> subTaskList;
+
+ private List<JobStatus> jobStatusList;
+
+ public void setStatus(DQTaskStatus status) {
+ if (this.status != status) resetStatusAge();
+ this.status = status;
+ }
+
+ public void resetStatusAge() {
+ statusAge = 0;
+ }
+
+ public void incrStatusAge() {
+ statusAge++;
+ }
+
+ public boolean isFailed() {
+ // 一个状态年龄处理了5次 无法变更 说明处理该任务一直失败
+ return statusAge > 5;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
new file mode 100644
index 00000000..f597bbb4
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -0,0 +1,30 @@
+package org.apache.griffin.core.worker.entity.bo.task;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
+
+/**
+ * 任务基础类
+ * 一个完整的任务包含
+ * record
+ * evaluate
+ * alert
+ */
+@Data
+public abstract class DQBaseTask {
+
+ private WorkerContext wc;
+
+ private DQRecordRule recordRule;
+ private DQEvaluateRule dqEvaluateRule;
+ private DQAlertRule dqAlertRule;
+
+ // 生成recordsql和 模板 + 参数 有关系
+ // 生成SQL部分希望交给模板来做
+ public abstract void doRecord();
+ public abstract void doEvaluate();
+ public abstract void doAlert();
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
new file mode 100644
index 00000000..96be4deb
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -0,0 +1,22 @@
+package org.apache.griffin.core.worker.entity.bo.task;
+
+/**
+ * 表维度
+ */
+public class DQHiveTask extends DQBaseTask {
+
+ @Override
+ public void doRecord() {
+
+ }
+
+ @Override
+ public void doEvaluate() {
+
+ }
+
+ @Override
+ public void doAlert() {
+
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
new file mode 100644
index 00000000..7f8c13d5
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
@@ -0,0 +1,18 @@
+package org.apache.griffin.core.worker.entity.bo.task;
+
+public class DQKafkaTask extends DQBaseTask {
+ @Override
+ public void doRecord() {
+
+ }
+
+ @Override
+ public void doEvaluate() {
+
+ }
+
+ @Override
+ public void doAlert() {
+
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
new file mode 100644
index 00000000..57f56341
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+
+@Data
+public class JobStatus {
+ private String jobId;
+ private boolean finished = false;
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
new file mode 100644
index 00000000..ab991c46
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+public class JobStatusRequest {
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
new file mode 100644
index 00000000..bf38b956
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+public class JobStatusResponse {
+ public boolean isSuccess() {
+ return false;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
new file mode 100644
index 00000000..ae52ad7a
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+
+@Data
+public class SubmitRequest {
+ private String recordSql;
+ private DQEngineEnum engine; // Spark,Hive,Presto,etc.
+ private String owner;
+ private Integer maxRetryCount;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitResponse.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitResponse.java
new file mode 100644
index 00000000..b367e09a
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitResponse.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
+
+@Data
+public class SubmitResponse {
+ private Integer code;
+ private String jobId;
+ private DQErrorCode errorCode;
+ private Exception ex;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
new file mode 100644
index 00000000..0e9d6629
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQEngineEnum {
+ PRESTO,
+ SPARK,
+ HIVE;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
new file mode 100644
index 00000000..4aa19f10
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQErrorCode {
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
new file mode 100644
index 00000000..68292a45
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQTaskStatus {
+ ACCEPTED,
+ WAITTING,
+ RUNNING,
+ RECORDING,
+ EVALUATING,
+ ALERTING,
+ SUCCESS,
+ FAILED;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
new file mode 100644
index 00000000..86d4fca8
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.rule;
+
+public class DQAlertRule {
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
new file mode 100644
index 00000000..5fc51854
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.rule;
+
+public class DQEvaluateRule {
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
new file mode 100644
index 00000000..94deba4b
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
@@ -0,0 +1,9 @@
+package org.apache.griffin.core.worker.entity.pojo.rule;
+
+import lombok.Data;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+
+@Data
+public class DQRecordRule {
+ private DQRecordTemplate template;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
new file mode 100644
index 00000000..fbe51281
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.template;
+
+public class DQRecordBaseTemplate {
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
new file mode 100644
index 00000000..cfccf84a
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.pojo.template;
+
+public class DQRecordCompostiveTemplate {
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
new file mode 100644
index 00000000..d4b12047
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
@@ -0,0 +1,5 @@
+package org.apache.griffin.core.worker.entity.pojo.template;
+
+public class DQRecordTemplate {
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
b/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
new file mode 100644
index 00000000..b6b79f32
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
@@ -0,0 +1,9 @@
+package org.apache.griffin.core.worker.factory;
+
+public class TaskFactory {
+
+
+
+
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
new file mode 100644
index 00000000..32970020
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
@@ -0,0 +1,12 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.worker.driver.TemplateDriver;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TemplateDriverFactory {
+ public TemplateDriver getTemplateDrvier(DQEngineEnum engine) {
+ return null;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
new file mode 100644
index 00000000..cbb498b1
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -0,0 +1,135 @@
+package org.apache.griffin.core.worker.schedule;
+
+import org.apache.griffin.core.worker.client.DispatcherClient;
+import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 任务执行调度期 和 dispatcher交互
+ */
+public class TaskDispatcherScheduler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
+ @Autowired
+ private WorkerContext wc;
+ @Autowired
+ private DispatcherClient dispatcherClient;
+
+ /**
+ * 进行任务调度
+ */
+ @Scheduled(fixedDelay = 1 * 5 * 1000L)
+ public void doTaskDispatcherScheduler() {
+ // 检查当前环境是否有可以提交任务到dispatcher(并发度限制)
+ if (!wc.canSubmitToDispatcher()) return;
+ // 从waitting队列获取任务
+ DQInstance dqInstance = wc.getWaittingTask();
+ // 开始提交任务 放到recording队列中
+ dqInstance.setStatus(DQTaskStatus.WAITTING);
+ wc.offerToRecordingTaskQueue(dqInstance);
+ }
+
+ private List<SubmitRequest> getSubmitRequest(DQInstance dqInstance) {
+
+ return null;
+ }
+
+ @Scheduled(fixedDelay = 1 * 5 * 1000L)
+ public void scanRecordingTask() {
+
+ // 遍历 recording tasks 检查状态进行更新
+ DQInstance dqInstance = wc.getRecordingTask();
+ // recording队列任务应该只有两种状态 如果服用队列的话,可能有多种状态 先做分队列的方式
+ switch (dqInstance.getStatus()) {
+ case WAITTING:
+ // 提交任务
+ if (doSubmitRecordingTask(dqInstance)) {
+ dqInstance.setStatus(DQTaskStatus.RUNNING);
+ // 提交到队尾 等待下次处理
+ wc.offerToRecordingTaskQueue(dqInstance);
+ } else {
+ // 提交失败
+ dqInstance.incrStatusAge();
+ if (dqInstance.isFailed()) {
+ wc.offerToAlertingTaskQueue(dqInstance);
+ } else {
+ // 如果失败 放回队尾 等待下次提交
+ wc.offerToRecordingTaskQueue(dqInstance);
+ }
+ }
+ case RUNNING:
+ // 查询状态
+ boolean hasRunningTask = checkJobStatus(dqInstance);
+ if (hasRunningTask) {
+ // 提交到队尾 等待下次轮询
+ wc.offerToRecordingTaskQueue(dqInstance);
+ } else {
+ // ? 是否拆任务提交到下一阶段 还是整体提交 先整体提交
+ // 没有正在运行的子任务了 提交给下一阶段执行
+ wc.offerToEvaluatingTaskQueue(dqInstance);
+ }
+ }
+ // 如果状态是完成
+ // 获取结果 放入task
+ // 从recording tasks中移除任务
+ // 放入到 evaluate 队列
+ // 未完成 等待下次轮询
+ }
+
+ private boolean checkJobStatus(DQInstance dqInstance) {
+ List<JobStatus> jobStatusList = dqInstance.getJobStatusList();
+ boolean hasRunningTask = false;
+ for (JobStatus jobStatus : jobStatusList) {
+ if (jobStatus.isFinished()) continue;
+ JobStatusRequest jobStatusRequest =
dispatcherClient.wrapperJobStatusRequest(jobStatus);
+ JobStatusResponse jobStatusResponse =
dispatcherClient.getJobStatus(jobStatusRequest);
+ if (jobStatusResponse.isSuccess()) {
+ jobStatus.setFinished(true);
+ } else {
+ hasRunningTask = true;
+ }
+ }
+ return hasRunningTask;
+ }
+
+ private boolean doSubmitRecordingTask(DQInstance dqInstance) {
+ // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
+ List<SubmitRequest> requestList = getSubmitRequest(dqInstance);
+ // 提交任务 获取任务对应的job信息
+ List<JobStatus> jobStatusList = requestList.stream()
+ .map(req -> dispatcherClient.submitSql(req))
+ .map(resp -> dispatcherClient.wrapperSubmitResponse(resp))
+ .collect(Collectors.toList());
+ // 设置job信息
+ dqInstance.setJobStatusList(jobStatusList);
+ return true;
+ }
+
+ public void scanEvaluatingTask() {
+ // 遍历 evaluate 队列
+ // 如果状态是完成
+ // 获取结果 放入task
+ // 从evaluate tasks中移除任务
+ // 放入到alert队列
+ // 未完成 等待下次轮询
+ }
+
+ public void scanAlertingTask() {
+ // 遍历 evaluate 队列
+ // 如果状态是完成
+ // 获取结果 放入task
+ // 从evaluate tasks中移除任务
+ // 放入到alert队列
+ // 未完成 等待下次轮询
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTemplateService.java
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTemplateService.java
new file mode 100644
index 00000000..cc236aba
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTemplateService.java
@@ -0,0 +1,15 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.driver.TemplateDriver;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQTemplateService {
+
+ @Autowired
+ private TemplateDriver templateDriver;
+
+// public void void
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
new file mode 100644
index 00000000..314a328d
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
@@ -0,0 +1,33 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.context.WorkerContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class WorkCoreService {
+
+ @Autowired
+ private WorkerContext workerContext;
+
+ public void submitDQTask(Long instanceId) {
+ // 收到提交请求
+ // 参数校验
+ // 判断环境是否可以接收任务
+ // 构建任务实例 怎么保证实例生成的ID一样? 1 实例ID由master指派 2 entity id + partition 3
实例由master生成
+ // 构建新实例
+ // 恢复实例 (任务重新分配过来的)
+ // 根据状态提交队列等待执行
+ // 新构建的实例 -> waitting 等待调度
+ // 恢复的实例 根据状态调度
+ }
+
+ public void stopDQTask(Long instanceId) {
+
+ }
+
+ public void querySingleDQTask(Long instanceId) {
+
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/WorkerManageService.java
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkerManageService.java
new file mode 100644
index 00000000..2fd2ff4e
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/WorkerManageService.java
@@ -0,0 +1,26 @@
+package org.apache.griffin.core.worker.service;
+
+import org.springframework.stereotype.Service;
+
+/**
+ * Worker管理服务
+ */
+@Service
+public class WorkerManageService {
+
+ // 注册节点
+ public void registDQWorkerNode() {
+
+ }
+
+ // 获取节点任务状态信息
+ public void getDQWorkNodeStatus() {
+
+ }
+
+ // 上报心跳
+ public void nodeHeartBeat() {
+
+ }
+
+}