This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
new 47261232 core: main process of task execution (#619)
47261232 is described below
commit 47261232d7eb31ad11202948e074c0c37b86b61e
Author: dabuliud <[email protected]>
AuthorDate: Thu Nov 24 10:12:33 2022 +0800
core: main process of task execution (#619)
* core:task execute
* remove kafka schema dependency
* removeUnnecessaryDependency
Co-authored-by: Warden <[email protected]>
---
.travis.yml | 25 +-
anomalydetection/pom.xml | 22 +-
core/pom.xml | 25 +-
.../core/worker/client/DispatcherClient.java | 27 +-
.../griffin/core/worker/context/WorkerContext.java | 67 ++++-
.../griffin/core/worker/dao/DQInstanceDao.java | 11 +
.../apache/griffin/core/worker/dao/DQTaskDao.java | 16 +
.../core/worker/driver/PrestoTemplateDriver.java | 4 +-
.../core/worker/driver/SparkTemplateDriver.java | 3 +-
.../griffin/core/worker/driver/TemplateDriver.java | 5 +-
.../griffin/core/worker/entity/bo/DQInstance.java | 43 ++-
.../core/worker/entity/bo/task/DQBaseTask.java | 61 +++-
.../core/worker/entity/bo/task/DQHiveTask.java | 16 +-
.../core/worker/entity/bo/task/DQKafkaTask.java | 16 +-
.../core/worker/entity/dispatcher/JobStatus.java | 38 +++
.../worker/entity/dispatcher/JobStatusRequest.java | 6 +
.../entity/dispatcher/JobStatusResponse.java | 7 +
.../worker/entity/dispatcher/MetricRequest.java | 10 +
.../worker/entity/dispatcher/MetricResponse.java | 11 +
.../worker/entity/dispatcher/SubmitRequest.java | 49 ++-
.../core/worker/entity/enums/DQEngineEnum.java | 34 ++-
.../core/worker/entity/enums/DQErrorCode.java | 13 +
.../core/worker/entity/enums/DQInstanceStatus.java | 24 ++
.../core/worker/entity/enums/DQTaskStatus.java | 27 +-
.../entity/enums/DispatcherJobStatusEnum.java | 6 +
.../griffin/core/worker/entity/pojo/Metric.java | 11 +
.../worker/schedule/TaskDispatcherScheduler.java | 328 +++++++++++++++------
.../core/worker/service/DQInstanceService.java | 39 +++
.../griffin/core/worker/service/DQTaskService.java | 106 +++++++
dispatcher/pom.xml | 22 +-
pom.xml | 1 +
scheduler/pom.xml | 22 +-
32 files changed, 893 insertions(+), 202 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 319902c4..fdc2d986 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,31 +11,32 @@
# or implied. See the License for the specific language governing permissions
and limitations under
# the License.
-language: java scala node_js
+#language: java scala node_js
+language: java scala
-services:
- - docker
+#services:
+# - docker
-env:
- - COMPOSE_FILE=griffin-doc/docker/compose/docker-compose-batch.yml
+#env:
+# - COMPOSE_FILE=griffin-doc/docker/compose/docker-compose-batch.yml
jdk:
- openjdk8
scala:
- 2.10.6
-node_js:
- - "6"
+#node_js:
+# - "6"
git:
quiet: true
cache:
directories:
- $HOME/.m2
-install:
- - npm install -g bower
-
-before_script:
- - docker-compose up -d
+#install:
+# - npm install -g bower
+#
+#before_script:
+# - docker-compose up -d
# keep 30, need change according to ci logs.
diff --git a/anomalydetection/pom.xml b/anomalydetection/pom.xml
index 2a501a48..804de868 100644
--- a/anomalydetection/pom.xml
+++ b/anomalydetection/pom.xml
@@ -226,17 +226,17 @@ under the License.
</dependency>
<!-- to access confluent schema registry -->
- <dependency>
- <groupId>io.confluent</groupId>
- <artifactId>kafka-schema-registry-client</artifactId>
- <version>${confluent.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+<!-- <dependency>-->
+<!-- <groupId>io.confluent</groupId>-->
+<!-- <artifactId>kafka-schema-registry-client</artifactId>-->
+<!-- <version>${confluent.version}</version>-->
+<!-- <exclusions>-->
+<!-- <exclusion>-->
+<!-- <groupId>org.slf4j</groupId>-->
+<!-- <artifactId>slf4j-log4j12</artifactId>-->
+<!-- </exclusion>-->
+<!-- </exclusions>-->
+<!-- </dependency>-->
<!--schedule-->
<dependency>
diff --git a/core/pom.xml b/core/pom.xml
index d510a5d4..4c0b7c00 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -226,17 +226,17 @@ under the License.
</dependency>
<!-- to access confluent schema registry -->
- <dependency>
- <groupId>io.confluent</groupId>
- <artifactId>kafka-schema-registry-client</artifactId>
- <version>${confluent.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+<!-- <dependency>-->
+<!-- <groupId>io.confluent</groupId>-->
+<!-- <artifactId>kafka-schema-registry-client</artifactId>-->
+<!-- <version>${confluent.version}</version>-->
+<!-- <exclusions>-->
+<!-- <exclusion>-->
+<!-- <groupId>org.slf4j</groupId>-->
+<!-- <artifactId>slf4j-log4j12</artifactId>-->
+<!-- </exclusion>-->
+<!-- </exclusions>-->
+<!-- </dependency>-->
<!--schedule-->
<dependency>
@@ -411,6 +411,9 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ </configuration>
</plugin>
</plugins>
</build>
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
index dbbcd6cd..5422f35e 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -1,11 +1,15 @@
package org.apache.griffin.core.worker.client;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.griffin.core.worker.entity.bo.DQInstance;
import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
+import org.springframework.stereotype.Service;
/**
* Dispatcher客户端 负责与dispatcher交互
*/
+@Service
public class DispatcherClient {
public SubmitResponse submitSql(SubmitRequest request) {
@@ -14,22 +18,35 @@ public class DispatcherClient {
public JobStatusResponse getJobStatus(JobStatusRequest jobStatusRequest) {
+ // todo parse job status
return null;
}
- public double getMetricResult() {
- return 0.0d;
+ public MetricResponse getMetricResult(MetricRequest metricRequest) {
+ return null;
}
public SubmitRequest wrapperDQTask(DQInstance waittingTask) {
return null;
}
- public JobStatus wrapperSubmitResponse(SubmitResponse resp) {
- return null;
+ public JobStatus wrapperSubmitResponse(Pair<Long, SubmitResponse> resp) {
+ Long partitionTime = resp.getLeft();
+ SubmitResponse response = resp.getRight();
+ boolean finished = false;
+ if (response.getCode() != DQErrorCode.SUCCESS.getCode()) finished =
true;
+ return JobStatus.builder()
+ .jobId(response.getJobId())
+ .finished(finished)
+ .partitionTime(partitionTime)
+ .build();
}
public JobStatusRequest wrapperJobStatusRequest(JobStatus jobStatus) {
- return null;
+ return new JobStatusRequest(jobStatus.getJobId());
+ }
+
+ public MetricRequest wrapperMetricRequest(JobStatus jobStatus) {
+ return new MetricRequest(jobStatus.getJobId());
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
index 17e238de..52280f36 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
@@ -4,11 +4,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.griffin.core.worker.entity.bo.DQInstance;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* 上下文信息 全局唯一
@@ -16,12 +18,12 @@ import java.util.Queue;
@Component
public class WorkerContext {
- private final Queue<DQInstance> WAITTING_TASK_QUEUE;
+ private final List<DQInstance> WAITTING_TASK_QUEUE;
// public static final List<DQBaseTask> runningTaskIdQueue =
Lists.newCopyOnWriteArrayList();
// runningTaskIdList = RECORDING_TASK_LIST + EVALUATING_TASK_LIST +
ALERTING_TASK_LIST
- private final Queue<DQInstance> RECORDING_TASK_QUEUE;
- private final Queue<DQInstance> EVALUATING_TASK_QUEUE;
- private final Queue<DQInstance> ALERTING_TASK_QUEUE;
+ private final List<DQInstance> RECORDING_TASK_QUEUE;
+ private final LinkedBlockingQueue<DQInstance> EVALUATING_TASK_QUEUE;
+ private final LinkedBlockingQueue<DQInstance> ALERTING_TASK_QUEUE;
// success和failed队列数据老化问题?
public final List<DQInstance> successTaskIdList;
@@ -29,24 +31,44 @@ public class WorkerContext {
public WorkerContext() {
// 设置队列长度
- WAITTING_TASK_QUEUE = Queues.newPriorityQueue();
- RECORDING_TASK_QUEUE = Queues.newPriorityQueue();
- EVALUATING_TASK_QUEUE = Queues.newPriorityQueue();
- ALERTING_TASK_QUEUE = Queues.newPriorityQueue();
+ WAITTING_TASK_QUEUE = Lists.newCopyOnWriteArrayList();
+ RECORDING_TASK_QUEUE = Lists.newCopyOnWriteArrayList();
+ // 这两个应该是一个阻塞队列 只要有任务来就可以处理
+ EVALUATING_TASK_QUEUE = Queues.newLinkedBlockingQueue();
+ ALERTING_TASK_QUEUE =Queues.newLinkedBlockingQueue();
successTaskIdList = Lists.newArrayList();
failedTaskIdList = Lists.newArrayList();
}
-
+
+ public List<DQInstance> getWAITTING_TASK_QUEUE() {
+ return WAITTING_TASK_QUEUE;
+ }
+
+ public List<DQInstance> getRECORDING_TASK_QUEUE() {
+ return RECORDING_TASK_QUEUE;
+ }
+
+ public LinkedBlockingQueue<DQInstance> getEVALUATING_TASK_QUEUE() {
+ return EVALUATING_TASK_QUEUE;
+ }
+
+ public LinkedBlockingQueue<DQInstance> getALERTING_TASK_QUEUE() {
+ return ALERTING_TASK_QUEUE;
+ }
+
+ public List<DQInstance> getSuccessTaskIdList() {
+ return successTaskIdList;
+ }
+
+ public List<DQInstance> getFailedTaskIdList() {
+ return failedTaskIdList;
+ }
+
@PostConstruct
public void init() {
- initQueueInfo();
resetTaskStatusWhenStartUp();
}
- private void initQueueInfo() {
-
- }
-
public DQInstance getWaittingTask() {
return null;
}
@@ -55,7 +77,8 @@ public class WorkerContext {
return null;
}
- public void offerToRecordingTaskQueue(DQInstance dqInstance) {
+ public boolean offerToRecordingTaskQueue(DQInstance dqInstance) {
+ return false;
}
public void offerToAlertingTaskQueue(DQInstance dqInstance) {
}
@@ -72,11 +95,23 @@ public class WorkerContext {
}
- public boolean canSubmitToDispatcher() {
+ public boolean canSubmitToSpecEngine(DQEngineEnum engine) {
return false;
}
public void offerToEvaluatingTaskQueue(DQInstance dqInstance) {
}
+
+ public void removeAll(List<DQInstance> targetList, List<DQInstance>
waittingToRemoveFromRecordingList) {
+ targetList.removeAll(waittingToRemoveFromRecordingList);
+ }
+
+ public void addFailedDQInstanceInfo(DQInstance instance) {
+
+ }
+
+ public void addSuccessDQInstanceInfo(DQInstance instance) {
+
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
new file mode 100644
index 00000000..11dd93f9
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.dao;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface DQInstanceDao {
+
+ void updateDQInstanceStatus(DQInstance instance, int status);
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
new file mode 100644
index 00000000..c94074cb
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
@@ -0,0 +1,16 @@
+package org.apache.griffin.core.worker.dao;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public interface DQTaskDao {
+
+ void updateDQTaskListStatus(List<DQBaseTask> tasks, int status);
+ void updateDQTaskListStatus(DQBaseTask task, int status);
+
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
index c0ab20dc..61ab2bdb 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -1,13 +1,13 @@
package org.apache.griffin.core.worker.driver;
-import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import java.util.List;
import java.util.Map;
public class PrestoTemplateDriver extends TemplateDriver{
@Override
- public String getRecordSql(DQRecordTemplate template, Map<String, String>
params) {
+ public List<String> getRecordSql(DQRecordTemplate template, Map<String,
String> params) {
return null;
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
index 6e54d4d1..eaf17b4a 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -2,11 +2,12 @@ package org.apache.griffin.core.worker.driver;
import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import java.util.List;
import java.util.Map;
public class SparkTemplateDriver extends TemplateDriver {
@Override
- public String getRecordSql(DQRecordTemplate template, Map<String, String>
params) {
+ public List<String> getRecordSql(DQRecordTemplate template, Map<String,
String> params) {
return null;
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
index 74bd7079..85a23390 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -6,6 +6,7 @@ import
org.apache.griffin.core.worker.factory.TemplateDriverFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
import java.util.Map;
@Service
@@ -17,9 +18,9 @@ public abstract class TemplateDriver {
/**
* 拼出的SQL 返回值 必须是 <ruleId, Partition, Metric> 这样可以做SQL合并
*/
- public abstract String getRecordSql(DQRecordTemplate template, Map<String,
String> params);
+ public abstract List<String> getRecordSql(DQRecordTemplate template,
Map<String, String> params);
- public String getRecordSql(DQEngineEnum engine, DQRecordTemplate template,
Map<String, String> params) {
+ public List<String> getRecordSql(DQEngineEnum engine, DQRecordTemplate
template, Map<String, String> params) {
TemplateDriver templateDriver =
templateDriverFactory.getTemplateDrvier(engine);
return templateDriver.getRecordSql(template, params);
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
index 0eacac7f..af5ff201 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
@@ -2,8 +2,9 @@ package org.apache.griffin.core.worker.entity.bo;
import lombok.Data;
import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
-import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
import java.util.List;
@@ -14,17 +15,18 @@ import java.util.List;
@Data
public class DQInstance {
private Long id;
-
// 实例状态
- private DQTaskStatus status;
+ private DQInstanceStatus status;
// 记录状态年龄 状态更新是重置
private int statusAge;
// 任务信息
private List<DQBaseTask> subTaskList;
+ //
+ private long scanTimeStamp = 0L;
+
- private List<JobStatus> jobStatusList;
- public void setStatus(DQTaskStatus status) {
+ public void setStatus(DQInstanceStatus status) {
if (this.status != status) resetStatusAge();
this.status = status;
}
@@ -41,4 +43,35 @@ public class DQInstance {
// 一个状态年龄处理了5次 无法变更 说明处理该任务一直失败
return statusAge > 5;
}
+
+ public boolean hasTaskToSubmit() {
+ boolean hasTaskToSubmit = false;
+ for (DQBaseTask dqBaseTask : subTaskList) {
+ if (dqBaseTask.getStatus() == DQTaskStatus.WAITTING) {
+ hasTaskToSubmit = true;
+ break;
+ }
+ }
+ return hasTaskToSubmit;
+ }
+
+ public boolean isFinishRecord() {
+ boolean isFinishRecord = true;
+ for (DQBaseTask dqBaseTask : subTaskList) {
+ if (dqBaseTask.getStatus().getCode() <=
DQTaskStatus.RECORDING.getCode()) {
+ isFinishRecord = false;
+ break;
+ }
+ }
+ return isFinishRecord;
+ }
+
+ public void doEvaluteTask() {
+ subTaskList.forEach(DQBaseTask::evaluate);
+
+ }
+
+ public void doAlertTask() {
+ // 收敛告警信息 进行告警
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
index f597bbb4..fd01f036 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -1,11 +1,20 @@
package org.apache.griffin.core.worker.entity.bo.task;
+import com.beust.jcommander.internal.Lists;
import lombok.Data;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.pojo.Metric;
import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
+import java.util.List;
+
/**
* 任务基础类
* 一个完整的任务包含
@@ -16,15 +25,57 @@ import
org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
@Data
public abstract class DQBaseTask {
+ private long id;
+ private String owner;
private WorkerContext wc;
-
+ private DQEngineEnum engine;
private DQRecordRule recordRule;
private DQEvaluateRule dqEvaluateRule;
private DQAlertRule dqAlertRule;
-
+ private DQTaskStatus status;
+ private List<JobStatus> jobStatusList;
+ private List<Metric> metricList = Lists.newArrayList();
+ // 记录状态年龄 状态更新是重置
+ private int statusAge;
// 生成recordsql和 模板 + 参数 有关系
// 生成SQL部分希望交给模板来做
- public abstract void doRecord();
- public abstract void doEvaluate();
- public abstract void doAlert();
+ public List<Pair<Long, String>> record() {
+ // before
+ List<Pair<Long, String>> partitionTimeAndSqlList = doRecord();
+ // after
+ return partitionTimeAndSqlList;
+ }
+ public void evaluate() {
+ doEvaluate();
+ }
+ public void alert() {
+ doAlert();
+ }
+
+ // partitionTime And sql
+ public abstract List<Pair<Long, String>> doRecord();
+ public abstract boolean doEvaluate();
+ public abstract boolean doAlert();
+
+ public void setStatus(DQTaskStatus status) {
+ if (this.status != status) resetStatusAge();
+ this.status = status;
+ }
+
+ public void resetStatusAge() {
+ statusAge = 0;
+ }
+
+ public void incrStatusAge() {
+ statusAge++;
+ }
+
+ public boolean isFailed() {
+ // 一个状态年龄处理了5次 无法变更 说明处理该任务一直失败
+ return statusAge > 5;
+ }
+
+ public void addMetric(long partitionTime, Double metric) {
+ metricList.add(new Metric(partitionTime, metric));
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
index 96be4deb..62c6ef3b 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -1,22 +1,26 @@
package org.apache.griffin.core.worker.entity.bo.task;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.List;
+
/**
* 表维度
*/
public class DQHiveTask extends DQBaseTask {
@Override
- public void doRecord() {
-
+ public List<Pair<Long, String>> doRecord() {
+ return null;
}
@Override
- public void doEvaluate() {
-
+ public boolean doEvaluate() {
+ return false;
}
@Override
- public void doAlert() {
-
+ public boolean doAlert() {
+ return false;
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
index 7f8c13d5..356c0fac 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
@@ -1,18 +1,22 @@
package org.apache.griffin.core.worker.entity.bo.task;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.List;
+
public class DQKafkaTask extends DQBaseTask {
@Override
- public void doRecord() {
-
+ public List<Pair<Long, String>> doRecord() {
+ return null;
}
@Override
- public void doEvaluate() {
-
+ public boolean doEvaluate() {
+ return false;
}
@Override
- public void doAlert() {
-
+ public boolean doAlert() {
+ return false;
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
index 57f56341..9b6a550e 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatus.java
@@ -6,5 +6,43 @@ import lombok.Data;
public class JobStatus {
private String jobId;
private boolean finished = false;
+ private long partitionTime;
+ public JobStatus() {
+ }
+
+ public JobStatus(String jobId, boolean finished, long partitionTime) {
+ this.jobId = jobId;
+ this.finished = finished;
+ this.partitionTime = partitionTime;
+ }
+
+ public static Builder builder() {
+ return new JobStatus.Builder();
+ }
+
+ public static class Builder {
+ private String jobId;
+ private boolean finished = false;
+ private long partitionTime;
+
+ public Builder jobId(String jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public Builder finished(boolean finished) {
+ this.finished = finished;
+ return this;
+ }
+
+ public Builder partitionTime(long partitionTime) {
+ this.partitionTime = partitionTime;
+ return this;
+ }
+
+ public JobStatus build() {
+ return new JobStatus(this.jobId, this.finished,
this.partitionTime);
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
index ab991c46..711706cb 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusRequest.java
@@ -1,4 +1,10 @@
package org.apache.griffin.core.worker.entity.dispatcher;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
public class JobStatusRequest {
+ String jobId;
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
index bf38b956..af7fc45e 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/JobStatusResponse.java
@@ -1,6 +1,13 @@
package org.apache.griffin.core.worker.entity.dispatcher;
+import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
+import org.apache.griffin.core.worker.entity.enums.DispatcherJobStatusEnum;
+
public class JobStatusResponse {
+ private Integer code;
+ private DispatcherJobStatusEnum jobStatus;
+ private DQErrorCode errorCode;
+ private Exception ex;
public boolean isSuccess() {
return false;
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricRequest.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricRequest.java
new file mode 100644
index 00000000..5b2faa5d
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricRequest.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class MetricRequest {
+ String jobId;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricResponse.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricResponse.java
new file mode 100644
index 00000000..4b40ae9d
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/MetricResponse.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.entity.dispatcher;
+
+import lombok.Data;
+
+@Data
+public class MetricResponse {
+ Integer code;
+ Double metric;
+// Enum errorCode;
+ Exception ex;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
index ae52ad7a..f93b726e 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/dispatcher/SubmitRequest.java
@@ -8,5 +8,52 @@ public class SubmitRequest {
private String recordSql;
private DQEngineEnum engine; // Spark,Hive,Presto,etc.
private String owner;
- private Integer maxRetryCount;
+ private Integer maxRetryCount = 3;
+
+ public SubmitRequest() {
+ }
+
+ public SubmitRequest(String recordSql, DQEngineEnum engine, String owner,
Integer maxRetryCount) {
+ this.recordSql = recordSql;
+ this.engine = engine;
+ this.owner = owner;
+ this.maxRetryCount = maxRetryCount;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String recordSql;
+ private DQEngineEnum engine;
+ private String owner;
+ private Integer maxRetryCount = 3;
+
+
+ public Builder recordSql(String recordSql) {
+ this.recordSql = recordSql;
+ return this;
+ }
+
+ public Builder engine(DQEngineEnum engine) {
+ this.engine = engine;
+ return this;
+ }
+
+ public Builder owner(String owner) {
+ this.owner = owner;
+ return this;
+ }
+
+ public Builder retryCount(Integer maxRetryCount) {
+ this.maxRetryCount = maxRetryCount;
+ return this;
+ }
+
+ public SubmitRequest build() {
+ return new SubmitRequest(this.recordSql, this.engine, this.owner,
this.maxRetryCount);
+ }
+ }
+
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
index 0e9d6629..8a28487f 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQEngineEnum.java
@@ -1,7 +1,35 @@
package org.apache.griffin.core.worker.entity.enums;
public enum DQEngineEnum {
- PRESTO,
- SPARK,
- HIVE;
+ PRESTO(0, 1),
+ SPARK(1, 2),
+ HIVE(2, -1);
+
+ private final int code;
+ private final int backEngineCode;
+
+ DQEngineEnum(int code, int backEngineCode) {
+ this.code = code;
+ this.backEngineCode = backEngineCode;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ /**
+ * @return DQEngineEnum
+ */
+ public DQEngineEnum getBackEngine() {
+ if (this.backEngineCode == -1) return null;
+ return findByCode(this.code);
+ }
+
+ public DQEngineEnum findByCode(int code) {
+ DQEngineEnum[] values = DQEngineEnum.values();
+ for (DQEngineEnum value : values) {
+ if (code == value.code) return value;
+ }
+ return null;
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
index 4aa19f10..f15c94ba 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQErrorCode.java
@@ -1,4 +1,17 @@
package org.apache.griffin.core.worker.entity.enums;
public enum DQErrorCode {
+ SUCCESS(200),
+ INTERNAL_ERROR(500),
+ EXTERNAL_ERROR(400);
+
+ private final int code;
+
+ DQErrorCode(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
new file mode 100644
index 00000000..6e70721b
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
@@ -0,0 +1,24 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQInstanceStatus {
+ ACCEPTED(0),
+ WAITTING(1),
+ SUBMITTING(2), // 任务提交中
+ RUNNING(3),
+ RECORDING(4),
+ EVALUATING(5),
+ EVALUATE_ALERTING(6), // Metric 需要告警
+ FAILED_ALERTING(7), // 任务执行失败需要告警
+ SUCCESS(8),
+ FAILED(9);
+
+ private final int code;
+
+ DQInstanceStatus(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
index 68292a45..ab5e0716 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
@@ -1,12 +1,23 @@
package org.apache.griffin.core.worker.entity.enums;
public enum DQTaskStatus {
- ACCEPTED,
- WAITTING,
- RUNNING,
- RECORDING,
- EVALUATING,
- ALERTING,
- SUCCESS,
- FAILED;
+ WAITTING(0),
+ RECORDING(1),
+ RECORDED(1),
+ EVALUATING(2),
+ EVALUATED(2),
+ ALERTING(3),
+ ALERTED(3),
+ SUCCESS(4),
+ FAILED(5);
+
+ private final int code;
+
+ DQTaskStatus(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DispatcherJobStatusEnum.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DispatcherJobStatusEnum.java
new file mode 100644
index 00000000..a5ce96bb
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DispatcherJobStatusEnum.java
@@ -0,0 +1,6 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DispatcherJobStatusEnum {
+ FINISHED,
+ RUNNING;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
new file mode 100644
index 00000000..6a36e9b4
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.entity.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Metric {
+ private long partitionTime;
+ private double metric;
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
index cbb498b1..79759a02 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -1,135 +1,291 @@
package org.apache.griffin.core.worker.schedule;
-import org.apache.griffin.core.worker.client.DispatcherClient;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.griffin.core.worker.context.WorkerContext;
import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.service.DQInstanceService;
+import org.apache.griffin.core.worker.service.DQTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* 任务执行调度期 和 dispatcher交互
*/
+@Component
public class TaskDispatcherScheduler {
- private static final Logger LOG =
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
- @Autowired
+ private static final Logger log =
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
+
private WorkerContext wc;
+ private DQInstanceService dqInstanceService;
+ private DQTaskService dqTaskService;
+
+ @Autowired
+ public void setWc(WorkerContext wc) {
+ this.wc = wc;
+ }
+ @Autowired
+ public void setDqInstanceService(DQInstanceService dqInstanceService) {
+ this.dqInstanceService = dqInstanceService;
+ }
@Autowired
- private DispatcherClient dispatcherClient;
+ public void setDqTaskService(DQTaskService dqTaskService) {
+ this.dqTaskService = dqTaskService;
+ }
+
+ @PostConstruct
+ public void startEvaluetingAndAlertThread() {
+ // todo 用线程池运行
+ scanAlertingTask();
+ scanEvaluatingTask();
+ }
/**
* 进行任务调度
*/
- @Scheduled(fixedDelay = 1 * 5 * 1000L)
+ @Scheduled(fixedDelay = 5 * 1000L)
public void doTaskDispatcherScheduler() {
- // 检查当前环境是否有可以提交任务到dispatcher(并发度限制)
- if (!wc.canSubmitToDispatcher()) return;
+ log.info("doTaskDispatcherScheduler start.");
+ List<DQInstance> waittingToRemoveFromWaitingList =
Lists.newArrayList();
+ Queue<DQInstance> waitingToRecordingDQInstanceQueue =
Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
// 从waitting队列获取任务
- DQInstance dqInstance = wc.getWaittingTask();
- // 开始提交任务 放到recording队列中
- dqInstance.setStatus(DQTaskStatus.WAITTING);
- wc.offerToRecordingTaskQueue(dqInstance);
+ try {
+ while (true) {
+ try {
+ DQInstance dqInstance =
waitingToRecordingDQInstanceQueue.poll();
+ // 队列中无元素 结束循环
+ if (dqInstance == null) break;
+ if (DQInstanceStatus.ACCEPTED != dqInstance.getStatus()) {
+ // 非初始状态
+ // 同步数据库状态 缓存中的实例状态可能不是最新的 从数据库构建最新的实例 替换缓存中的实例
+ dqInstance =
dqInstanceService.getById(dqInstance.getId());
+ // 根据状态放到对应队列
+ waittingToRemoveFromWaitingList.add(dqInstance);
+ } else {
+ // 提交到recording队列
+ // 开始提交任务 放到recording队列中
+ if (dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.WAITTING)) {
+ waittingToRemoveFromWaitingList.add(dqInstance);
+ }
+ }
+ } catch (Exception e) {
+ // todo
+ log.error("doTaskDispatcherScheduler scan waitting task
failed, ex:", e);
+ }
+ }
+ } catch (Exception e) {
+ // todo
+ log.error("scanRecordingTask failed, ex:", e);
+ } finally {
+ // 根据状态分发到指定队列
+
waittingToRemoveFromWaitingList.forEach(this::offerToSpecQueueByStatus);
+ // 从 原队列移除 队列移除
+ if (CollectionUtils.isNotEmpty(waittingToRemoveFromWaitingList))
wc.removeAll(wc.getWAITTING_TASK_QUEUE(), waittingToRemoveFromWaitingList);
+ }
+ log.info("doTaskDispatcherScheduler end.");
}
- private List<SubmitRequest> getSubmitRequest(DQInstance dqInstance) {
-
- return null;
+ private void offerToSpecQueueByStatus(DQInstance instance) {
+ DQInstanceStatus status = instance.getStatus();
+ switch (status) {
+ case WAITTING:
+ case RUNNING:
+ case SUBMITTING:
+ case RECORDING:
+ wc.offerToRecordingTaskQueue(instance);
+ break;
+ case EVALUATING:
+ wc.offerToEvaluatingTaskQueue(instance);
+ break;
+ case EVALUATE_ALERTING:
+ case FAILED_ALERTING:
+ wc.offerToAlertingTaskQueue(instance);
+ break;
+ case FAILED:
+ wc.addFailedDQInstanceInfo(instance);
+ break;
+ case SUCCESS:
+ wc.addSuccessDQInstanceInfo(instance);
+ break;
+ default:
+ // todo 未知状态 丢弃任务
+ log.warn("Unknown status, id : {}, status : {}, instance: {}",
instance.getId(), status, instance);
+ break;
+ }
}
- @Scheduled(fixedDelay = 1 * 5 * 1000L)
+ @Scheduled(fixedDelay = 5 * 1000L)
public void scanRecordingTask() {
+ List<DQInstance> waittingToRemoveFromRecordingList =
Lists.newArrayList();
+ Queue<DQInstance> waitingToSubmitDQInstanceQueue =
Queues.newPriorityBlockingQueue(wc.getRECORDING_TASK_QUEUE());
+ try {
+ while (CollectionUtils.isNotEmpty(waitingToSubmitDQInstanceQueue))
{
+ try {
+ DQInstance dqInstance =
waitingToSubmitDQInstanceQueue.poll();
+ if (dqInstance == null) break;
+ processRecordingInstance(dqInstance,
waittingToRemoveFromRecordingList);
+ } catch (Exception e) {
+ // todo
+ log.error("scanRecordingTask failed, ex:", e);
+ }
+ }
+ } catch (Exception e) {
+ // todo
+ log.error("scanRecordingTask failed, ex:", e);
+ } finally {
+ // 根据状态分发到指定队列
+
waittingToRemoveFromRecordingList.forEach(this::offerToSpecQueueByStatus);
+ // 从 record 队列移除
+ if (CollectionUtils.isNotEmpty(waittingToRemoveFromRecordingList))
wc.removeAll(wc.getRECORDING_TASK_QUEUE(), waittingToRemoveFromRecordingList);
+ }
+ }
- // 遍历 recording tasks 检查状态进行更新
- DQInstance dqInstance = wc.getRecordingTask();
- // recording队列任务应该只有两种状态 如果服用队列的话,可能有多种状态 先做分队列的方式
- switch (dqInstance.getStatus()) {
+ private void processRecordingInstance(DQInstance dqInstance,
List<DQInstance> waittingToRemoveFromRecordingList) {
+ // 检查当前环境是否有可以提交任务到dispatcher(并发度限制 需要根据提交的引擎计算)
+ DQInstanceStatus instanceStatus = dqInstance.getStatus();
+ List<DQBaseTask> subTaskList = dqInstance.getSubTaskList();
+ switch (instanceStatus) {
+ case ACCEPTED:
case WAITTING:
- // 提交任务
- if (doSubmitRecordingTask(dqInstance)) {
- dqInstance.setStatus(DQTaskStatus.RUNNING);
- // 提交到队尾 等待下次处理
- wc.offerToRecordingTaskQueue(dqInstance);
- } else {
- // 提交失败
- dqInstance.incrStatusAge();
- if (dqInstance.isFailed()) {
- wc.offerToAlertingTaskQueue(dqInstance);
- } else {
- // 如果失败 放回队尾 等待下次提交
- wc.offerToRecordingTaskQueue(dqInstance);
- }
+ dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.SUBMITTING);
+ submitTaskToDispatcher(subTaskList);
+ if (!dqInstance.hasTaskToSubmit()) {
+ dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.RUNNING);
}
+ break;
+ case SUBMITTING:
+ submitTaskToDispatcher(subTaskList);
+ // 检查是否所有任务都已经提交
+ if (!dqInstance.hasTaskToSubmit()) {
+ dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.RUNNING);
+ }
+ break;
case RUNNING:
- // 查询状态
- boolean hasRunningTask = checkJobStatus(dqInstance);
- if (hasRunningTask) {
- // 提交到队尾 等待下次轮询
- wc.offerToRecordingTaskQueue(dqInstance);
- } else {
- // ? 是否拆任务提交到下一阶段 还是整体提交 先整体提交
- // 没有正在运行的子任务了 提交给下一阶段执行
- wc.offerToEvaluatingTaskQueue(dqInstance);
+ // 检查并更新任务状态
+ checkJobStatus(subTaskList);
+ if (dqInstance.isFinishRecord()) {
+ // record 任务都完成了 准备移除
+ waittingToRemoveFromRecordingList.add(dqInstance);
}
+ break;
+ default:
+ break;
}
- // 如果状态是完成
- // 获取结果 放入task
- // 从recording tasks中移除任务
- // 放入到 evaluate 队列
- // 未完成 等待下次轮询
}
- private boolean checkJobStatus(DQInstance dqInstance) {
- List<JobStatus> jobStatusList = dqInstance.getJobStatusList();
- boolean hasRunningTask = false;
- for (JobStatus jobStatus : jobStatusList) {
- if (jobStatus.isFinished()) continue;
- JobStatusRequest jobStatusRequest =
dispatcherClient.wrapperJobStatusRequest(jobStatus);
- JobStatusResponse jobStatusResponse =
dispatcherClient.getJobStatus(jobStatusRequest);
- if (jobStatusResponse.isSuccess()) {
- jobStatus.setFinished(true);
- } else {
- hasRunningTask = true;
+ private void checkJobStatus(List<DQBaseTask> subTaskList) {
+ subTaskList.forEach(task -> dqTaskService.checkJobStatus(task));
+ }
+
+ private void submitTaskToDispatcher(List<DQBaseTask> subTaskList) {
+ // 遍历 recording tasks 检查状态进行更新
+ subTaskList.forEach(task -> {
+ DQTaskStatus taskStatus = task.getStatus();
+ switch (taskStatus) {
+ case WAITTING:
+ // 提交任务
+ doSubmitTaskToDispatcher(task);
+ if (task.isFailed()) {
+ // 提交一直失败
+ dqTaskService.updateTaskStatus(task,
DQTaskStatus.FAILED);
+ }
+ break;
+ case RECORDING:
+ // 查询结果
+ boolean isFinished = dqTaskService.checkJobStatus(task);
+ if (isFinished) {
+ // 任务结束, 设置任务状态为record结束
+ dqTaskService.updateTaskStatus(task,
DQTaskStatus.RECORDED);
+ }
+ break;
+ default:
+ // 其余状态不处理
+ break;
}
- }
- return hasRunningTask;
+ });
}
- private boolean doSubmitRecordingTask(DQInstance dqInstance) {
- // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
- List<SubmitRequest> requestList = getSubmitRequest(dqInstance);
- // 提交任务 获取任务对应的job信息
- List<JobStatus> jobStatusList = requestList.stream()
- .map(req -> dispatcherClient.submitSql(req))
- .map(resp -> dispatcherClient.wrapperSubmitResponse(resp))
- .collect(Collectors.toList());
- // 设置job信息
- dqInstance.setJobStatusList(jobStatusList);
- return true;
+ private void doSubmitTaskToDispatcher(DQBaseTask task) {
+ // 并发度检查
+ if (!wc.canSubmitToSpecEngine(task.getEngine())) return;
+ if (dqTaskService.doSubmitRecordingTask(task)) {
+ // 任务提交成功 更新状态为recording
+ dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDING);
+ } else {
+ // 提交失败 记录一次失败
+ task.incrStatusAge();
+ }
}
public void scanEvaluatingTask() {
- // 遍历 evaluate 队列
- // 如果状态是完成
- // 获取结果 放入task
- // 从evaluate tasks中移除任务
- // 放入到alert队列
- // 未完成 等待下次轮询
+ LinkedBlockingQueue<DQInstance> evaluating_task_queue =
wc.getEVALUATING_TASK_QUEUE();
+ while (true) {
+ DQInstance dqInstance = null;
+ try {
+ // Evaluating 来一个处理一个
+ dqInstance = evaluating_task_queue.poll(5, TimeUnit.SECONDS);
+ if (dqInstance == null) continue;
+ // 根据状态打回任务
+ // 执行evaluating
+ if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING) {
+ dqInstance.doEvaluteTask();
+ } else {
+ offerToSpecQueueByStatus(dqInstance);
+ }
+ } catch (Exception e) {
+ if (dqInstance != null) {
+ log.error("scanEvaluatingTask doEvalute failed, id : {},
instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+ dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.FAILED_ALERTING);
+ offerToSpecQueueByStatus(dqInstance);
+ } else {
+ log.error("scanEvaluatingTask poll instance failed. ex:",
e);
+ }
+ }
+ }
}
public void scanAlertingTask() {
- // 遍历 evaluate 队列
- // 如果状态是完成
- // 获取结果 放入task
- // 从evaluate tasks中移除任务
- // 放入到alert队列
- // 未完成 等待下次轮询
+ LinkedBlockingQueue<DQInstance> alerting_task_queue =
wc.getALERTING_TASK_QUEUE();
+ while (true) {
+ DQInstance dqInstance = null;
+ try {
+ // Evaluating 来一个处理一个
+ dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
+ if (dqInstance == null) continue;
+ // 根据状态打回任务
+ // 执行evaluating
+ if (dqInstance.getStatus() == DQInstanceStatus.FAILED_ALERTING
|| dqInstance.getStatus() == DQInstanceStatus.EVALUATE_ALERTING) {
+ dqInstance.doAlertTask();
+ } else {
+ offerToSpecQueueByStatus(dqInstance);
+ }
+ } catch (Exception e) {
+ if (dqInstance != null) {
+ log.error("scanAlertingTask doAlert failed, id : {},
instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+ if (dqInstance.isFailed()) {
+ // 重试很多次了 直接设置为失败
+ dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.FAILED);
+ // 没有失败很多次的话,状态不修改 放回队列重试
+ }
+ // 放回队列准备重试
+ offerToSpecQueueByStatus(dqInstance);
+ } else {
+ log.error("scanAlertingTask poll instance failed. ex:", e);
+ }
+ }
+ }
}
-
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
new file mode 100644
index 00000000..a39c17d1
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
@@ -0,0 +1,39 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.dao.DQInstanceDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQInstanceService {
+ private static final Logger log =
LoggerFactory.getLogger(DQInstanceService.class);
+
+ @Autowired
+ private DQInstanceDao dqInstanceDao;
+
+ /**
+ * 数据库和内存需要保持同步
+ * @param instance
+ * @param status
+ * @return
+ */
+ public boolean updateStatus(DQInstance instance, DQInstanceStatus status) {
+ try {
+ dqInstanceDao.updateDQInstanceStatus(instance, status.getCode());
+ instance.setStatus(status);
+ log.info("instance {} {} => {} success.", instance.getId(),
instance.getStatus(), status);
+ return true;
+ } catch (Exception e) {
+ log.error("instance {} {} => {} failed, ex", instance.getId(),
instance.getStatus(), status, e);
+ }
+ return false;
+ }
+
+ public DQInstance getById(long id) {
+ return null;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
new file mode 100644
index 00000000..8c0f018b
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
@@ -0,0 +1,106 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.griffin.core.worker.client.DispatcherClient;
+import org.apache.griffin.core.worker.dao.DQInstanceDao;
+import org.apache.griffin.core.worker.dao.DQTaskDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Service
+public class DQTaskService {
+ private static final Logger log =
LoggerFactory.getLogger(DQTaskService.class);
+
+ @Autowired
+ private DQTaskDao dqTaskDao;
+ @Autowired
+ private DispatcherClient dispatcherClient;
+
+ public boolean updateTaskStatus(DQBaseTask task, DQTaskStatus status) {
+ try {
+ dqTaskDao.updateDQTaskListStatus(task, status.getCode());
+ task.setStatus(status);
+ return true;
+ } catch (Exception e) {
+ log.error("task {} {} => {} failed, ex", task.getId(),
task.getStatus(), status, e);
+ }
+ return false;
+ }
+
+ public boolean updateTaskStatus(List<DQBaseTask> tasks, DQTaskStatus
status) {
+ if (CollectionUtils.isEmpty(tasks)) return false;
+ DQBaseTask sampleTask = tasks.get(0);
+ List<Long> taskIdList =
tasks.stream().map(DQBaseTask::getId).collect(Collectors.toList());
+ try {
+ dqTaskDao.updateDQTaskListStatus(tasks, status.getCode());
+ tasks.forEach(x -> x.setStatus(status));
+ return true;
+ } catch (Exception e) {
+ log.error("task {} {} => {} failed, ex", taskIdList,
sampleTask.getStatus(), status, e);
+ }
+ return false;
+ }
+
+ public boolean doSubmitRecordingTask(DQBaseTask task) {
+ // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
+ List<Pair<Long, SubmitRequest>> requestList = getSubmitRequest(task);
+ // 提交任务 获取任务对应的job信息
+ List<JobStatus> jobStatusList = requestList.stream()
+ .map(reqPair -> Pair.of(reqPair.getLeft(),
dispatcherClient.submitSql(reqPair.getRight())))
+ .map(respPair ->
dispatcherClient.wrapperSubmitResponse(respPair))
+ .collect(Collectors.toList());
+ // 设置job信息
+ task.setJobStatusList(jobStatusList);
+ return true;
+ }
+
+ public boolean checkJobStatus(DQBaseTask task) {
+ List<JobStatus> jobStatusList = task.getJobStatusList();
+ boolean isFinished = true;
+ for (JobStatus jobStatus : jobStatusList) {
+ if (jobStatus.isFinished()) continue;
+ JobStatusRequest jobStatusRequest =
dispatcherClient.wrapperJobStatusRequest(jobStatus);
+ JobStatusResponse jobStatusResponse =
dispatcherClient.getJobStatus(jobStatusRequest);
+ if (jobStatusResponse.isSuccess()) {
+ // 更新状态 获取结果
+ MetricRequest metricRequest =
dispatcherClient.wrapperMetricRequest(jobStatus);
+ MetricResponse metricResponse =
dispatcherClient.getMetricResult(metricRequest);
+ Double metric = metricResponse.getMetric();
+ task.addMetric(jobStatus.getPartitionTime(), metric);
+ jobStatus.setFinished(true);
+ } else {
+ // 只要有一个没有成功 就应该是false
+ isFinished = false;
+ }
+ }
+ return isFinished;
+ }
+
+ private List<Pair<Long, SubmitRequest>> getSubmitRequest(DQBaseTask task) {
+ List<Pair<Long, String>> partitionTimeAndSqlList = task.record();
+ return partitionTimeAndSqlList.stream()
+ .map(partitionTimeAndSql -> {
+ Long paprtitionTime = partitionTimeAndSql.getLeft();
+ String recordSql = partitionTimeAndSql.getRight();
+ SubmitRequest request = SubmitRequest.builder()
+ .recordSql(recordSql)
+ .engine(task.getEngine())
+ .owner(task.getOwner())
+ .build();
+ return Pair.of(paprtitionTime, request);
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git a/dispatcher/pom.xml b/dispatcher/pom.xml
index fce4ed45..da5cd950 100644
--- a/dispatcher/pom.xml
+++ b/dispatcher/pom.xml
@@ -226,17 +226,17 @@ under the License.
</dependency>
<!-- to access confluent schema registry -->
- <dependency>
- <groupId>io.confluent</groupId>
- <artifactId>kafka-schema-registry-client</artifactId>
- <version>${confluent.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+<!-- <dependency>-->
+<!-- <groupId>io.confluent</groupId>-->
+<!-- <artifactId>kafka-schema-registry-client</artifactId>-->
+<!-- <version>${confluent.version}</version>-->
+<!-- <exclusions>-->
+<!-- <exclusion>-->
+<!-- <groupId>org.slf4j</groupId>-->
+<!-- <artifactId>slf4j-log4j12</artifactId>-->
+<!-- </exclusion>-->
+<!-- </exclusions>-->
+<!-- </dependency>-->
<!--schedule-->
<dependency>
diff --git a/pom.xml b/pom.xml
index 119d0af4..9cca4fe1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,7 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
+ <encoding>utf8</encoding>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index 03a88ff6..0cd03a89 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -226,17 +226,17 @@ under the License.
</dependency>
<!-- to access confluent schema registry -->
- <dependency>
- <groupId>io.confluent</groupId>
- <artifactId>kafka-schema-registry-client</artifactId>
- <version>${confluent.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+<!-- <dependency>-->
+<!-- <groupId>io.confluent</groupId>-->
+<!-- <artifactId>kafka-schema-registry-client</artifactId>-->
+<!-- <version>${confluent.version}</version>-->
+<!-- <exclusions>-->
+<!-- <exclusion>-->
+<!-- <groupId>org.slf4j</groupId>-->
+<!-- <artifactId>slf4j-log4j12</artifactId>-->
+<!-- </exclusion>-->
+<!-- </exclusions>-->
+<!-- </dependency>-->
<!--schedule-->
<dependency>