This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
new dfe8675c ExecuteNodeGrpc impl (#629)
dfe8675c is described below
commit dfe8675c95a1e2567c104013747ab5ca32ae029a
Author: dabuliud <[email protected]>
AuthorDate: Fri Mar 24 21:36:22 2023 +0800
ExecuteNodeGrpc impl (#629)
* ExecuteNodeGrpc impl
* ServerImpl
---------
Co-authored-by: Warden <[email protected]>
---
api/pom.xml | 70 ++++++++++++++++++-
.../apache/griffin/api}/dao/DQBusinessRuleDao.java | 5 +-
.../org/apache/griffin/api/dao/DQContentDao.java | 8 +++
.../griffin/api}/dao/DQContentInstanceMapDao.java | 4 +-
.../org/apache/griffin/api/dao/DQInstanceDao.java | 14 ++++
.../griffin/api}/entity/DQResoueceEnums.java | 2 +-
.../griffin/api}/entity/GriffinDQBusinessRule.java | 2 +-
.../griffin/api}/entity/GriffinDQContent.java | 2 +-
.../api}/entity/GriffinDQContentInstanceMap.java | 2 +-
.../apache/griffin/api}/entity/GriffinDQTable.java | 2 +-
.../api}/entity/enums/DQInstanceStatus.java | 13 +++-
.../griffin/api/entity/pojo/DQInstanceEntity.java | 24 +++++++
.../apache/griffin/api}/entity/pojo/DQTable.java | 2 +-
.../apache/griffin/api}/entity/pojo/Metric.java | 2 +-
.../griffin/api}/entity/pojo/rule/DQAlertRule.java | 2 +-
.../api}/entity/pojo/rule/DQEvaluateRule.java | 6 +-
.../api}/entity/pojo/rule/DQRecordRule.java | 8 +--
.../griffin/api/service/DQInstanceService.java | 70 +++++++++++++++++++
.../org/apache/griffin/api}/utils/DQDateUtils.java | 2 +-
.../apache/griffin/api}/utils/ExpressionUtils.java | 2 +-
api/src/main/proto/ExecuteNodeProtocols.proto | 4 +-
core/pom.xml | 79 +---------------------
.../griffin/core/common/dao/DQContentDao.java | 8 ---
.../core/common/utils/context/WorkerContext.java | 42 ++++++------
.../core/master/context/TaskManagerContext.java | 21 ++++++
.../griffin/core/master/server/TaskManager.java | 40 ++++++++---
.../service/DQTaskManagerInstanceService.java | 36 ++++++++++
.../core/master/service/TaskAssignService.java | 30 --------
.../core/master/transport/DQCConnection.java | 26 +++++--
.../master/transport/DQCConnectionManager.java | 4 +-
.../core/worker/client/DispatcherClient.java | 4 +-
.../griffin/core/worker/dao/DQInstanceDao.java | 14 ----
.../apache/griffin/core/worker/dao/DQTaskDao.java | 2 -
.../core/worker/driver/PrestoTemplateDriver.java | 3 +-
.../core/worker/driver/SparkTemplateDriver.java | 3 +-
.../griffin/core/worker/driver/TemplateDriver.java | 3 +-
.../bo/{DQInstance.java => DQInstanceBO.java} | 10 ++-
.../core/worker/entity/bo/task/DQBaseTask.java | 6 +-
.../core/worker/entity/bo/task/DQHiveTask.java | 1 -
.../pojo/template/DQRecordCompostiveTemplate.java | 4 --
.../{pojo => }/template/DQRecordBaseTemplate.java | 2 +-
.../template/DQRecordCompostiveTemplate.java | 4 ++
.../{pojo => }/template/DQRecordTemplate.java | 2 +-
.../core/worker/factory/DQInstanceFactory.java | 20 +++---
.../core/worker/factory/DQStageFactory.java | 5 +-
.../griffin/core/worker/factory/DQTaskFactory.java | 4 +-
.../worker/schedule/TaskDispatcherScheduler.java | 52 +++++++-------
.../griffin/core/worker/service/DQTaskService.java | 4 --
...ceService.java => DQWorkerInstanceService.java} | 35 ++++++----
.../worker/service/ExecuteNodeServiceImpl.java | 2 +-
.../griffin/core/worker/stage/DQAbstractStage.java | 8 +--
51 files changed, 438 insertions(+), 282 deletions(-)
diff --git a/api/pom.xml b/api/pom.xml
index 81f9a24d..435c7223 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -17,14 +17,80 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
- <!-- grpc -->
-<!-- <protobuf.version>3.7.1</protobuf.version>-->
<protobuf.version>3.17.2</protobuf.version>
<protobuf-plugin.version>0.6.1</protobuf-plugin.version>
<grpc.version>1.42.1</grpc.version>
+ <spring.boot.version>2.1.7.RELEASE</spring.boot.version>
+
<spring.security.kerberos.version>1.0.1.RELEASE</spring.security.kerberos.version>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>${spring.boot.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-log4j2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-properties-migrator</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.vaadin.external.google</groupId>
+ <artifactId>android-json</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-json</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hibernate</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.aspectj</groupId>
+ <artifactId>aspectjrt</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-aspects</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.security.kerberos</groupId>
+ <artifactId>spring-security-kerberos-client</artifactId>
+ <version>${spring.security.kerberos.version}</version>
+ </dependency>
<!-- new dependencies -->
<dependency>
<groupId>org.projectlombok</groupId>
diff --git
a/core/src/main/java/org/apache/griffin/core/common/dao/DQBusinessRuleDao.java
b/api/src/main/java/org/apache/griffin/api/dao/DQBusinessRuleDao.java
similarity index 62%
rename from
core/src/main/java/org/apache/griffin/core/common/dao/DQBusinessRuleDao.java
rename to api/src/main/java/org/apache/griffin/api/dao/DQBusinessRuleDao.java
index 1a791e2e..1f67e260 100644
---
a/core/src/main/java/org/apache/griffin/core/common/dao/DQBusinessRuleDao.java
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQBusinessRuleDao.java
@@ -1,10 +1,11 @@
-package org.apache.griffin.core.common.dao;
+package org.apache.griffin.api.dao;
-import org.apache.griffin.core.common.entity.GriffinDQBusinessRule;
+import org.apache.griffin.api.entity.GriffinDQBusinessRule;
import org.springframework.stereotype.Component;
import java.util.List;
+
@Component
public interface DQBusinessRuleDao {
List<GriffinDQBusinessRule> getListByDqcId(Long dqcId);
diff --git a/api/src/main/java/org/apache/griffin/api/dao/DQContentDao.java
b/api/src/main/java/org/apache/griffin/api/dao/DQContentDao.java
new file mode 100644
index 00000000..c7e12279
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQContentDao.java
@@ -0,0 +1,8 @@
+package org.apache.griffin.api.dao;
+
+
+import org.apache.griffin.api.entity.GriffinDQContent;
+
+public interface DQContentDao {
+ GriffinDQContent getById(Long id);
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentInstanceMapDao.java
b/api/src/main/java/org/apache/griffin/api/dao/DQContentInstanceMapDao.java
similarity index 60%
rename from
core/src/main/java/org/apache/griffin/core/common/dao/DQContentInstanceMapDao.java
rename to
api/src/main/java/org/apache/griffin/api/dao/DQContentInstanceMapDao.java
index 94ad5018..fd9f30af 100644
---
a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentInstanceMapDao.java
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQContentInstanceMapDao.java
@@ -1,6 +1,6 @@
-package org.apache.griffin.core.common.dao;
+package org.apache.griffin.api.dao;
-import org.apache.griffin.core.common.entity.GriffinDQContentInstanceMap;
+import org.apache.griffin.api.entity.GriffinDQContentInstanceMap;
import org.springframework.stereotype.Component;
@Component
diff --git a/api/src/main/java/org/apache/griffin/api/dao/DQInstanceDao.java
b/api/src/main/java/org/apache/griffin/api/dao/DQInstanceDao.java
new file mode 100644
index 00000000..10289d92
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQInstanceDao.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.api.dao;
+
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface DQInstanceDao {
+
+ DQInstanceEntity getById(Long id);
+
+ void updateDQInstanceStatus(Long id, int status);
+
+ void insert(DQInstanceEntity instance);
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/common/entity/DQResoueceEnums.java
b/api/src/main/java/org/apache/griffin/api/entity/DQResoueceEnums.java
similarity index 51%
rename from
core/src/main/java/org/apache/griffin/core/common/entity/DQResoueceEnums.java
rename to api/src/main/java/org/apache/griffin/api/entity/DQResoueceEnums.java
index 1e7f94b2..a6c1ec50 100644
---
a/core/src/main/java/org/apache/griffin/core/common/entity/DQResoueceEnums.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/DQResoueceEnums.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
public enum DQResoueceEnums {
HIVE, KAFKA;
diff --git
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQBusinessRule.java
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQBusinessRule.java
similarity index 70%
rename from
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQBusinessRule.java
rename to
api/src/main/java/org/apache/griffin/api/entity/GriffinDQBusinessRule.java
index 214010b0..66892dc0 100644
---
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQBusinessRule.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQBusinessRule.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
import lombok.Data;
diff --git
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContent.java
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContent.java
similarity index 86%
rename from
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContent.java
rename to api/src/main/java/org/apache/griffin/api/entity/GriffinDQContent.java
index de773467..1e9ee24b 100644
---
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContent.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContent.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
import lombok.Data;
diff --git
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContentInstanceMap.java
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContentInstanceMap.java
similarity index 75%
rename from
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContentInstanceMap.java
rename to
api/src/main/java/org/apache/griffin/api/entity/GriffinDQContentInstanceMap.java
index d46d7cc4..44156ba6 100644
---
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContentInstanceMap.java
+++
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContentInstanceMap.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
import lombok.Data;
diff --git
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQTable.java
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQTable.java
similarity index 78%
rename from
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQTable.java
rename to api/src/main/java/org/apache/griffin/api/entity/GriffinDQTable.java
index acbde9e5..0c92e531 100644
---
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQTable.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQTable.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
import lombok.Data;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
b/api/src/main/java/org/apache/griffin/api/entity/enums/DQInstanceStatus.java
similarity index 52%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
rename to
api/src/main/java/org/apache/griffin/api/entity/enums/DQInstanceStatus.java
index cd626a7e..fbd1ad98 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
+++
b/api/src/main/java/org/apache/griffin/api/entity/enums/DQInstanceStatus.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.enums;
+package org.apache.griffin.api.entity.enums;
public enum DQInstanceStatus {
ACCEPTED(0),
@@ -11,7 +11,8 @@ public enum DQInstanceStatus {
// EVALUATE_ALERTING(6), // Metric 需要告警
FAILED_ALERTING(7), // 任务执行失败需要告警
SUCCESS(8),
- FAILED(9);
+ FAILED(9),
+ STOPPED(10);
private final int code;
@@ -22,4 +23,12 @@ public enum DQInstanceStatus {
public int getCode() {
return code;
}
+
+ public static DQInstanceStatus findByCode(int statusCode) throws Exception
{
+ DQInstanceStatus[] values = DQInstanceStatus.values();
+ for (DQInstanceStatus value : values) {
+ if (value.code == statusCode) return value;
+ }
+ throw new Exception("Unknown DQInstanceStatus Code: " + statusCode);
+ }
}
diff --git
a/api/src/main/java/org/apache/griffin/api/entity/pojo/DQInstanceEntity.java
b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQInstanceEntity.java
new file mode 100644
index 00000000..18f20f37
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQInstanceEntity.java
@@ -0,0 +1,24 @@
+package org.apache.griffin.api.entity.pojo;
+
+import lombok.Data;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+
+/**
+ * 任务实例, 当前任务运行时的快照
+ * 一个实例包含多个子任务
+ */
+@Data
+public class DQInstanceEntity implements Comparable<DQInstanceEntity> {
+
+ private Long id;
+ private Long dqcId;
+ // 实例状态
+ private DQInstanceStatus status;
+ // 记录状态年龄 状态更新是重置
+ private int statusAge;
+
+ @Override
+ public int compareTo(DQInstanceEntity o) {
+ return 0;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQTable.java
similarity index 81%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
rename to api/src/main/java/org/apache/griffin/api/entity/pojo/DQTable.java
index 7be0c5ac..5bfc6d78 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQTable.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo;
+package org.apache.griffin.api.entity.pojo;
import lombok.Data;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
b/api/src/main/java/org/apache/griffin/api/entity/pojo/Metric.java
similarity index 76%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
rename to api/src/main/java/org/apache/griffin/api/entity/pojo/Metric.java
index 6a36e9b4..41354d01 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/Metric.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo;
+package org.apache.griffin.api.entity.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQAlertRule.java
similarity index 75%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
rename to
api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQAlertRule.java
index 37dd63f1..47388b7c 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQAlertRule.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo.rule;
+package org.apache.griffin.api.entity.pojo.rule;
import java.util.List;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQEvaluateRule.java
similarity index 68%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
rename to
api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQEvaluateRule.java
index 2ee5ec6a..42ec4ef9 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
+++
b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQEvaluateRule.java
@@ -1,7 +1,7 @@
-package org.apache.griffin.core.worker.entity.pojo.rule;
+package org.apache.griffin.api.entity.pojo.rule;
-import org.apache.griffin.core.worker.entity.pojo.Metric;
-import org.apache.griffin.core.worker.utils.ExpressionUtils;
+import org.apache.griffin.api.entity.pojo.Metric;
+import org.apache.griffin.api.utils.ExpressionUtils;
import java.util.List;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQRecordRule.java
similarity index 87%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
rename to
api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQRecordRule.java
index 3cf5605d..c47eb4a6 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
+++
b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQRecordRule.java
@@ -1,13 +1,13 @@
-package org.apache.griffin.core.worker.entity.pojo.rule;
+package org.apache.griffin.api.entity.pojo.rule;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
-import org.apache.griffin.core.worker.entity.pojo.DQTable;
-import
org.apache.griffin.core.worker.entity.pojo.template.DQRecordBaseTemplate;
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.api.entity.pojo.DQTable;
+import org.apache.griffin.core.worker.entity.template.DQRecordBaseTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
import org.apache.griffin.core.worker.utils.DQDateUtils;
import java.util.List;
diff --git
a/api/src/main/java/org/apache/griffin/api/service/DQInstanceService.java
b/api/src/main/java/org/apache/griffin/api/service/DQInstanceService.java
new file mode 100644
index 00000000..0cbcf2b6
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/service/DQInstanceService.java
@@ -0,0 +1,70 @@
+package org.apache.griffin.api.service;
+
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQInstanceService {
+ private static final Logger log =
LoggerFactory.getLogger(DQInstanceService.class);
+
+ @Autowired
+ private DQInstanceDao dqInstanceDao;
+// @Autowired
+// private DQContentInstanceMapDao dqContentInstanceMapDao;
+// @Autowired
+// private DQInstanceFactory dqInstanceFactory;
+
+ /**
+ * 数据库和内存需要保持同步
+ * @param instance obj
+ * @param status stat
+ * @return true or false
+ */
+ public boolean updateStatus(DQInstanceEntity instance, DQInstanceStatus
status) {
+ try {
+ dqInstanceDao.updateDQInstanceStatus(instance, status.getCode());
+ instance.setStatus(status);
+ log.info("instance {} {} => {} success.", instance.getId(),
instance.getStatus(), status);
+ return true;
+ } catch (Exception e) {
+ log.error("instance {} {} => {} failed, ex", instance.getId(),
instance.getStatus(), status, e);
+ }
+ return false;
+ }
+
+ /**
+ * construct instance
+ * @param id master assign id
+ * @return DQInstance
+ */
+ public DQInstanceEntity getById(Long id) {
+ if (id == null) {
+ log.error("Unknown instance id: null");
+ }
+ DQInstanceEntity ins = dqInstanceDao.getById(id);
+ return ins;
+// if (ins == null) {
+// // the ins id has no task info
+// return constructInstance(id);
+// }
+// return recoveryInstance(ins);
+ }
+
+// private DQInstanceEntity constructInstance(Long id) {
+// log.info("constructInstance id: {}", id);
+// GriffinDQContentInstanceMap contentInstanceMap =
dqContentInstanceMapDao.getContentInstanceMapByInstanceId(id);
+// Long instanceId = contentInstanceMap.getInstanceId();
+// Long dqcId = contentInstanceMap.getDqcId();
+// return dqInstanceFactory.constructInstance(instanceId, dqcId);
+// }
+//
+// private DQInstance recoveryInstance(DQInstance instance) {
+// return dqInstanceFactory.recoveryInstance(instance);
+// }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
b/api/src/main/java/org/apache/griffin/api/utils/DQDateUtils.java
similarity index 86%
rename from
core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
rename to api/src/main/java/org/apache/griffin/api/utils/DQDateUtils.java
index 8e5a1cf4..02ef18de 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
+++ b/api/src/main/java/org/apache/griffin/api/utils/DQDateUtils.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.utils;
+package org.apache.griffin.api.utils;
import java.util.concurrent.TimeUnit;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
b/api/src/main/java/org/apache/griffin/api/utils/ExpressionUtils.java
similarity index 75%
rename from
core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
rename to api/src/main/java/org/apache/griffin/api/utils/ExpressionUtils.java
index 23884c88..3114b0aa 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
+++ b/api/src/main/java/org/apache/griffin/api/utils/ExpressionUtils.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.utils;
+package org.apache.griffin.api.utils;
public class ExpressionUtils {
public static boolean evaluate(String expression, double metricValue) {
diff --git a/api/src/main/proto/ExecuteNodeProtocols.proto
b/api/src/main/proto/ExecuteNodeProtocols.proto
index 871dab7b..4d775118 100644
--- a/api/src/main/proto/ExecuteNodeProtocols.proto
+++ b/api/src/main/proto/ExecuteNodeProtocols.proto
@@ -16,7 +16,7 @@ service ExecuteNodeService {
// Ask ExecuteNode to stop one task
rpc stopDQTask(StopDQTaskRequest) returns (StopDQTaskResponse) {}
// Ask ExecuteNode to stop one task
- rpc querySingleDQTask(SQuerySingleDQTaskRequest) returns
(QuerySingleDQTaskResponse) {}
+ rpc querySingleDQTask(QuerySingleDQTaskRequest) returns
(QuerySingleDQTaskResponse) {}
rpc sayHello(SayHelloRequest) returns (SayHelloResponse) {}
}
@@ -38,7 +38,7 @@ message StopDQTaskResponse {
optional int32 status = 2;
}
-message SQuerySingleDQTaskRequest {
+message QuerySingleDQTaskRequest {
int64 instanceId = 1;
}
diff --git a/core/pom.xml b/core/pom.xml
index d07312a8..296bbf27 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -48,10 +48,6 @@ under the License.
<livy.core.version>0.3.0</livy.core.version>
<elasticsearch-rest-client.version>6.2.4</elasticsearch-rest-client.version>
<jackson-databind.version>2.9.9.3</jackson-databind.version>
- <!-- grpc -->
-<!-- <protobuf.version>3.19.1</protobuf.version>-->
-<!-- <protobuf-plugin.version>0.6.1</protobuf-plugin.version>-->
-<!-- <grpc.version>1.42.1</grpc.version>-->
</properties>
<repositories>
@@ -139,11 +135,7 @@ under the License.
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
- <!--<dependency>-->
- <!--<groupId>mysql</groupId>-->
- <!--<artifactId>mysql-connector-java</artifactId>-->
- <!--<version>${mysql.java.version}</version>-->
- <!--</dependency>-->
+
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
@@ -159,75 +151,6 @@ under the License.
<version>${jackson-databind.version}</version>
</dependency>
- <!-- to access metastore from hive-->
-<!-- <dependency>-->
-<!-- <groupId>org.apache.hadoop</groupId>-->
-<!-- <artifactId>hadoop-client</artifactId>-->
-<!-- <version>${hadoop.version}</version>-->
-<!-- <!–<scope>provided</scope>–>-->
-<!-- <exclusions>-->
-<!-- <exclusion>-->
-<!-- <groupId>javax.servlet</groupId>-->
-<!-- <artifactId>servlet-api</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>org.slf4j</groupId>-->
-<!-- <artifactId>slf4j-log4j12</artifactId>-->
-<!-- </exclusion>-->
-<!-- </exclusions>-->
-<!-- </dependency>-->
-<!-- <dependency>-->
-<!-- <groupId>org.apache.hive</groupId>-->
-<!-- <artifactId>hive-metastore</artifactId>-->
-<!-- <version>${hive.version}</version>-->
-<!-- <exclusions>-->
-<!-- <exclusion>-->
-<!-- <groupId>org.eclipse.jetty.aggregate</groupId>-->
-<!-- <artifactId>jetty-all</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>org.eclipse.jetty.orbit</groupId>-->
-<!-- <artifactId>javax.servlet</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>javax.servlet</groupId>-->
-<!-- <artifactId>servlet-api</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>de.ruedigermoeller</groupId>-->
-<!-- <artifactId>fst</artifactId>-->
-<!-- </exclusion>-->
-<!-- </exclusions>-->
-<!-- </dependency>-->
-
- <!-- to access Hive using JDBC -->
-<!-- <dependency>-->
-<!-- <groupId>org.apache.hive</groupId>-->
-<!-- <artifactId>hive-jdbc</artifactId>-->
-<!-- <version>${hive.version}</version>-->
-<!-- <exclusions>-->
-<!-- <exclusion>-->
-<!-- <groupId>org.eclipse.jetty.aggregate</groupId>-->
-<!-- <artifactId>*</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>org.eclipse.jetty.orbit</groupId>-->
-<!-- <artifactId>javax.servlet</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>javax.servlet</groupId>-->
-<!-- <artifactId>servlet-api</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>org.mortbay.jetty</groupId>-->
-<!-- <artifactId>servlet-api-2.5</artifactId>-->
-<!-- </exclusion>-->
-<!-- <exclusion>-->
-<!-- <groupId>org.slf4j</groupId>-->
-<!-- <artifactId>slf4j-log4j12</artifactId>-->
-<!-- </exclusion>-->
-<!-- </exclusions>-->
-<!-- </dependency>-->
<!--schedule-->
<dependency>
<groupId>org.springframework</groupId>
diff --git
a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentDao.java
b/core/src/main/java/org/apache/griffin/core/common/dao/DQContentDao.java
deleted file mode 100644
index 97182d9f..00000000
--- a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentDao.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.griffin.core.common.dao;
-
-
-import org.apache.griffin.core.common.entity.GriffinDQContent;
-
-public interface DQContentDao {
- GriffinDQContent getById(Long id);
-}
diff --git
a/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
b/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
index 49f99949..40da7a6e 100644
---
a/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
+++
b/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
@@ -2,7 +2,7 @@ package org.apache.griffin.core.common.utils.context;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.springframework.stereotype.Component;
@@ -17,14 +17,14 @@ import java.util.concurrent.LinkedBlockingQueue;
@Component
public class WorkerContext {
- private final List<DQInstance> WAITTING_TASK_QUEUE;
- private final List<DQInstance> RECORDING_TASK_QUEUE;
- private final LinkedBlockingQueue<DQInstance> EVALUATING_TASK_QUEUE;
- private final LinkedBlockingQueue<DQInstance> ALERTING_TASK_QUEUE;
+ private final List<DQInstanceBO> WAITTING_TASK_QUEUE;
+ private final List<DQInstanceBO> RECORDING_TASK_QUEUE;
+ private final LinkedBlockingQueue<DQInstanceBO> EVALUATING_TASK_QUEUE;
+ private final LinkedBlockingQueue<DQInstanceBO> ALERTING_TASK_QUEUE;
// success和failed队列数据老化问题?
- public final List<DQInstance> successTaskIdList;
- public final List<DQInstance> failedTaskIdList;
+ public final List<DQInstanceBO> successTaskIdList;
+ public final List<DQInstanceBO> failedTaskIdList;
public WorkerContext() {
// 设置队列长度
@@ -37,27 +37,27 @@ public class WorkerContext {
failedTaskIdList = Lists.newArrayList();
}
- public List<DQInstance> getWAITTING_TASK_QUEUE() {
+ public List<DQInstanceBO> getWAITTING_TASK_QUEUE() {
return WAITTING_TASK_QUEUE;
}
- public List<DQInstance> getRECORDING_TASK_QUEUE() {
+ public List<DQInstanceBO> getRECORDING_TASK_QUEUE() {
return RECORDING_TASK_QUEUE;
}
- public LinkedBlockingQueue<DQInstance> getEVALUATING_TASK_QUEUE() {
+ public LinkedBlockingQueue<DQInstanceBO> getEVALUATING_TASK_QUEUE() {
return EVALUATING_TASK_QUEUE;
}
- public LinkedBlockingQueue<DQInstance> getALERTING_TASK_QUEUE() {
+ public LinkedBlockingQueue<DQInstanceBO> getALERTING_TASK_QUEUE() {
return ALERTING_TASK_QUEUE;
}
- public List<DQInstance> getSuccessTaskIdList() {
+ public List<DQInstanceBO> getSuccessTaskIdList() {
return successTaskIdList;
}
- public List<DQInstance> getFailedTaskIdList() {
+ public List<DQInstanceBO> getFailedTaskIdList() {
return failedTaskIdList;
}
@@ -66,18 +66,18 @@ public class WorkerContext {
resetTaskStatusWhenStartUp();
}
- public DQInstance getWaittingTask() {
+ public DQInstanceBO getWaittingTask() {
return null;
}
- public DQInstance getRecordingTask() {
+ public DQInstanceBO getRecordingTask() {
return null;
}
- public boolean offerToRecordingTaskQueue(DQInstance dqInstance) {
+ public boolean offerToRecordingTaskQueue(DQInstanceBO dqInstance) {
return false;
}
- public void offerToAlertingTaskQueue(DQInstance dqInstance) {
+ public void offerToAlertingTaskQueue(DQInstanceBO dqInstance) {
}
/**
@@ -97,18 +97,18 @@ public class WorkerContext {
}
- public void offerToEvaluatingTaskQueue(DQInstance dqInstance) {
+ public void offerToEvaluatingTaskQueue(DQInstanceBO dqInstance) {
}
- public void removeAll(List<DQInstance> targetList, List<DQInstance>
waittingToRemoveFromRecordingList) {
+ public void removeAll(List<DQInstanceBO> targetList, List<DQInstanceBO>
waittingToRemoveFromRecordingList) {
targetList.removeAll(waittingToRemoveFromRecordingList);
}
- public void addFailedDQInstanceInfo(DQInstance instance) {
+ public void addFailedDQInstanceInfo(DQInstanceBO instance) {
}
- public void addSuccessDQInstanceInfo(DQInstance instance) {
+ public void addSuccessDQInstanceInfo(DQInstanceBO instance) {
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/master/context/TaskManagerContext.java
b/core/src/main/java/org/apache/griffin/core/master/context/TaskManagerContext.java
new file mode 100644
index 00000000..4184b2c9
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/master/context/TaskManagerContext.java
@@ -0,0 +1,21 @@
+package org.apache.griffin.core.master.context;
+
+import com.google.common.collect.Maps;
+import org.apache.griffin.core.master.transport.DQCConnection;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+@Component
+public class TaskManagerContext {
+
+ public static final Map<Long, DQCConnection> submitTaskContainer =
Maps.newConcurrentMap();
+
+ public void addTask(Long instanceId, DQCConnection client) {
+ submitTaskContainer.put(instanceId, client);
+ }
+
+ public void clearTask(Long instanceId) {
+ submitTaskContainer.remove(instanceId);
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java
b/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java
index 258dce68..4d30f51b 100644
--- a/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java
+++ b/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java
@@ -1,14 +1,9 @@
package org.apache.griffin.core.master.server;
-import com.google.common.collect.Maps;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;
-import org.apache.griffin.api.common.GRPCCode;
-import org.apache.griffin.api.proto.protocol.ExecuteNodeServiceGrpc;
-import org.apache.griffin.api.proto.protocol.SayHelloRequest;
-import org.apache.griffin.api.proto.protocol.SayHelloResponse;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.master.context.TaskManagerContext;
import org.apache.griffin.core.master.transport.DQCConnection;
import org.apache.griffin.core.master.transport.DQCConnectionManager;
import org.springframework.beans.factory.annotation.Autowired;
@@ -22,6 +17,8 @@ public class TaskManager {
@Autowired
private DQCConnectionManager dqcConnectionManager;
+ @Autowired
+ private TaskManagerContext taskManagerContext;
public void registerWorker(String hostName, int port) throws
UnknownHostException {
try {
@@ -36,10 +33,35 @@ public class TaskManager {
public void submitDQTask(Long instanceId) {
DQCConnection aliveClient = dqcConnectionManager.getAliveClient();
if (aliveClient == null) {
-
+ log.error("getAliveClient Failed, Please check");
+ return;
}
if (aliveClient.submitDQTask(instanceId)) {
- // todo add task and client info to cache
+ taskManagerContext.addTask(instanceId, aliveClient);
+ // todo flush to db
+ }
+ }
+
+ public DQInstanceStatus querySingleDQTask(Long instanceId) throws
Exception {
+ DQCConnection aliveClient = dqcConnectionManager.getAliveClient();
+ if (aliveClient == null) {
+ log.error("getAliveClient Failed, Please check");
+ return null;
+ }
+ DQInstanceStatus dqInstanceStatus =
aliveClient.querySingleDQTask(instanceId);
+ return dqInstanceStatus;
+ }
+
+ public boolean stopDQTask(Long instanceId) {
+ DQCConnection aliveClient = dqcConnectionManager.getAliveClient();
+ if (aliveClient == null) {
+ log.error("getAliveClient Failed, Please check");
+ return false;
+ }
+ if (aliveClient.stopDQTask(instanceId)) {
+ taskManagerContext.clearTask(instanceId);
+ return true;
}
+ return false;
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/master/service/DQTaskManagerInstanceService.java
b/core/src/main/java/org/apache/griffin/core/master/service/DQTaskManagerInstanceService.java
new file mode 100644
index 00000000..fec88cb9
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/master/service/DQTaskManagerInstanceService.java
@@ -0,0 +1,36 @@
+package org.apache.griffin.core.master.service;
+
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.core.master.server.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQTaskManagerInstanceService {
+ private static final Logger log =
LoggerFactory.getLogger(DQTaskManagerInstanceService.class);
+
+ @Autowired
+ private DQInstanceDao dqInstanceDao;
+ @Autowired
+ private TaskManager taskManager;
+
+ public boolean stopTask(Long instanceId) {
+ try {
+ boolean stopResp = taskManager.stopDQTask(instanceId);
+ if (stopResp) {
+ dqInstanceDao.updateDQInstanceStatus(instanceId,
DQInstanceStatus.STOPPED.getCode());
+ log.info("instance {} STOPPED success.", instanceId);
+ return true;
+ } else {
+ log.info("instance {} STOPPED failed.", instanceId);
+ return false;
+ }
+ } catch (Exception e) {
+ log.error("instance {} STOPPED failed, ex", instanceId, e);
+ }
+ return false;
+ }
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
b/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
deleted file mode 100644
index 05e8a761..00000000
---
a/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.griffin.core.master.service;
-
-import org.apache.griffin.core.master.strategy.AbstractAssignStrategy;
-import org.apache.griffin.core.master.strategy.AssignStrategyFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
-
-import javax.annotation.PostConstruct;
-
-
-//@Component
-public class TaskAssignService {
-
- @Value("${task.assign.strategy}")
- private String assignTaskStrategtClass;
-
- private AbstractAssignStrategy strategy;
-
- @PostConstruct
- public void init() {
- strategy = AssignStrategyFactory.getStrategy(assignTaskStrategtClass);
- Assert.notNull(strategy, "Task Assign Strategy init failed");
- }
-
-
-// public String assignTask(long instanceId) {
-// return strategy.assignTask(instanceId);
-// }
-}
diff --git
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
index d2d4c7be..8c1d2532 100644
---
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
+++
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
@@ -1,16 +1,12 @@
package org.apache.griffin.core.master.transport;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import io.grpc.stub.StreamObserver;
import lombok.Builder;
import lombok.Data;
import org.apache.griffin.api.common.GRPCCode;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
import org.apache.griffin.api.proto.protocol.*;
import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.Future;
/**
* the obj has a socketChannel to worker node
@@ -44,4 +40,24 @@ public class DQCConnection {
return submitDQTaskResponse.getCode() == GRPCCode.SUCCESS.getCode();
}
+ public DQInstanceStatus querySingleDQTask(Long instanceId) throws
Exception {
+ QuerySingleDQTaskRequest querySingleDQTaskRequest =
QuerySingleDQTaskRequest.newBuilder()
+ .setInstanceId(instanceId)
+ .build();
+ QuerySingleDQTaskResponse querySingleDQTaskResponse =
client.querySingleDQTask(querySingleDQTaskRequest);
+ if (querySingleDQTaskResponse.getCode() == GRPCCode.SUCCESS.getCode())
{
+ int status = querySingleDQTaskResponse.getStatus();
+ DQInstanceStatus dqInstanceStatus =
DQInstanceStatus.findByCode(status);
+ return dqInstanceStatus;
+ }
+ return null;
+ }
+
+ public boolean stopDQTask(Long instanceId) {
+ StopDQTaskRequest stopDQTaskRequest = StopDQTaskRequest.newBuilder()
+ .setInstanceId(instanceId)
+ .build();
+ StopDQTaskResponse stopDQTaskResponse =
client.stopDQTask(stopDQTaskRequest);
+ return stopDQTaskResponse.getCode() == GRPCCode.SUCCESS.getCode();
+ }
}
diff --git
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
index 9cddce5c..620ab38f 100644
---
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
+++
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
@@ -22,8 +22,10 @@ import java.util.stream.Collectors;
public class DQCConnectionManager {
private List<DQCConnection> clientList = Lists.newCopyOnWriteArrayList();
- private String assignStrategyClassName =
"org.apache.griffin.core.master.strategy.LooperAssignStrategy";
+ // todo read from conf
+ private final String assignStrategyClassName =
"org.apache.griffin.core.master.strategy.LooperAssignStrategy";
private AbstractAssignStrategy strategy;
+
@PostConstruct
public void init() {
strategy = AssignStrategyFactory.getStrategy(assignStrategyClassName);
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
index dbd4bd39..75035470 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -1,7 +1,7 @@
package org.apache.griffin.core.worker.client;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
import org.apache.griffin.core.worker.entity.dispatcher.*;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
@@ -32,7 +32,7 @@ public class DispatcherClient {
return null;
}
- public SubmitRequest wrapperDQTask(DQInstance waittingTask) {
+ public SubmitRequest wrapperDQTask(DQInstanceEntity waittingTask) {
return null;
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
deleted file mode 100644
index 544adb36..00000000
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.griffin.core.worker.dao;
-
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.springframework.stereotype.Component;
-
-@Component
-public interface DQInstanceDao {
-
- DQInstance getById(Long id);
-
- void updateDQInstanceStatus(DQInstance instance, int status);
-
- void insert(DQInstance instance);
-}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
index f87fbc46..ff8bc3fd 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
@@ -1,8 +1,6 @@
package org.apache.griffin.core.worker.dao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
-import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
import org.springframework.stereotype.Component;
import java.util.List;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
index b5d2cf62..826aa1f0 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -1,8 +1,7 @@
package org.apache.griffin.core.worker.driver;
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
-import java.util.List;
import java.util.Map;
public class PrestoTemplateDriver extends TemplateDriver{
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
index 4d500832..df9f2c32 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -1,8 +1,7 @@
package org.apache.griffin.core.worker.driver;
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
-import java.util.List;
import java.util.Map;
public class SparkTemplateDriver extends TemplateDriver {
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
index 9e2aab97..0516d556 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -1,12 +1,11 @@
package org.apache.griffin.core.worker.driver;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
import org.apache.griffin.core.worker.factory.TemplateDriverFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.List;
import java.util.Map;
@Service
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstanceBO.java
similarity index 82%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
rename to
core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstanceBO.java
index 303c7d13..9f22f35b 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstanceBO.java
@@ -1,13 +1,11 @@
package org.apache.griffin.core.worker.entity.bo;
import lombok.Data;
-import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.pojo.rule.DQAlertRule;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
import org.apache.griffin.core.worker.stage.DQAbstractStage;
-import org.apache.griffin.core.worker.stage.DQStage;
import java.util.List;
@@ -16,7 +14,7 @@ import java.util.List;
* 一个实例包含多个子任务
*/
@Data
-public class DQInstance implements Comparable<DQInstance> {
+public class DQInstanceBO implements Comparable<DQInstanceBO> {
private Long id;
private Long dqcId;
@@ -70,7 +68,7 @@ public class DQInstance implements Comparable<DQInstance> {
}
@Override
- public int compareTo(DQInstance o) {
+ public int compareTo(DQInstanceBO o) {
return 0;
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
index 5fdf4ed9..22d9d9e3 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -7,9 +7,9 @@ import
org.apache.griffin.core.common.utils.context.WorkerContext;
import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
-import org.apache.griffin.core.worker.entity.pojo.Metric;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
+import org.apache.griffin.api.entity.pojo.Metric;
+import org.apache.griffin.api.entity.pojo.rule.DQEvaluateRule;
+import org.apache.griffin.api.entity.pojo.rule.DQRecordRule;
import java.util.List;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
index be6aee5e..e793a1f2 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -1,7 +1,6 @@
package org.apache.griffin.core.worker.entity.bo.task;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
import java.util.List;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
deleted file mode 100644
index cfccf84a..00000000
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.griffin.core.worker.entity.pojo.template;
-
-public class DQRecordCompostiveTemplate {
-}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordBaseTemplate.java
similarity index 51%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
rename to
core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordBaseTemplate.java
index 44646da8..aa31523d 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordBaseTemplate.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo.template;
+package org.apache.griffin.core.worker.entity.template;
public class DQRecordBaseTemplate extends DQRecordTemplate {
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordCompostiveTemplate.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordCompostiveTemplate.java
new file mode 100644
index 00000000..37bcbba9
--- /dev/null
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordCompostiveTemplate.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.template;
+
+public class DQRecordCompostiveTemplate {
+}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordTemplate.java
similarity index 86%
rename from
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
rename to
core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordTemplate.java
index 49db5868..3af68a33 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordTemplate.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo.template;
+package org.apache.griffin.core.worker.entity.template;
import org.apache.griffin.core.worker.driver.TemplateDriver;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
index 7ea41cb1..ed70ea0d 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
@@ -1,11 +1,12 @@
package org.apache.griffin.core.worker.factory;
-import org.apache.griffin.core.common.dao.DQBusinessRuleDao;
-import org.apache.griffin.core.common.dao.DQContentDao;
-import org.apache.griffin.core.common.entity.GriffinDQBusinessRule;
-import org.apache.griffin.core.common.entity.GriffinDQContent;
-import org.apache.griffin.core.worker.dao.DQInstanceDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.api.dao.DQBusinessRuleDao;
+import org.apache.griffin.api.dao.DQContentDao;
+import org.apache.griffin.api.entity.GriffinDQBusinessRule;
+import org.apache.griffin.api.entity.GriffinDQContent;
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
import org.apache.griffin.core.worker.stage.DQAbstractStage;
@@ -28,8 +29,8 @@ public class DQInstanceFactory {
@Autowired
private DQTaskFactory dqTaskFactory;
- public DQInstance constructInstance(Long id, Long dqcId) {
- DQInstance instance = new DQInstance();
+ public DQInstanceBO constructInstance(Long id, Long dqcId) {
+ DQInstanceBO instance = new DQInstanceBO();
instance.setId(id);
instance.setDqcId(dqcId);
GriffinDQContent griffinDQContent = dqContentDao.getById(dqcId);
@@ -46,11 +47,10 @@ public class DQInstanceFactory {
// construct alert stage
DQAbstractStage alertStage =
dqStageFactory.constructStage(DQStageTypeEnum.ALERT, instance);
instance.setAlertingStage(alertStage);
- dqInstanceDao.insert(instance);
return instance;
}
- public DQInstance recoveryInstance(DQInstance instance) {
+ public DQInstanceBO recoveryInstance(DQInstanceEntity instance) {
return null;
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
index b8f4ec56..6f287b53 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
@@ -2,7 +2,8 @@ package org.apache.griffin.core.worker.factory;
import org.apache.griffin.core.common.utils.SpringUtils;
import org.apache.griffin.core.worker.dao.DQStageDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
import org.apache.griffin.core.worker.service.DQStageService;
import org.apache.griffin.core.worker.service.DQTaskService;
@@ -23,7 +24,7 @@ public class DQStageFactory {
@Autowired
private DQStageDao dqStageDao;
- public DQAbstractStage constructStage(DQStageTypeEnum stageTypeEnum,
DQInstance instance) {
+ public DQAbstractStage constructStage(DQStageTypeEnum stageTypeEnum,
DQInstanceBO instance) {
DQAbstractStage stage = null;
switch (stageTypeEnum) {
case RECORD:
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
index 11d6d900..73801705 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
@@ -1,7 +1,7 @@
package org.apache.griffin.core.worker.factory;
-import org.apache.griffin.core.common.entity.DQResoueceEnums;
-import org.apache.griffin.core.common.entity.GriffinDQBusinessRule;
+import org.apache.griffin.api.entity.DQResoueceEnums;
+import org.apache.griffin.api.entity.GriffinDQBusinessRule;
import org.apache.griffin.core.worker.dao.DQTaskDao;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.slf4j.Logger;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
index 4ddc34cd..a71e442b 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -3,13 +3,13 @@ package org.apache.griffin.core.worker.schedule;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
import org.apache.griffin.core.common.utils.context.WorkerContext;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
import org.apache.griffin.core.worker.exception.StageSubmitException;
-import org.apache.griffin.core.worker.service.DQInstanceService;
import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.service.DQWorkerInstanceService;
import org.apache.griffin.core.worker.stage.DQAbstractStage;
import org.apache.griffin.core.worker.stage.DQStage;
import org.slf4j.Logger;
@@ -33,7 +33,7 @@ public class TaskDispatcherScheduler {
private static final Logger log =
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
private WorkerContext wc;
- private DQInstanceService dqInstanceService;
+ private DQWorkerInstanceService dqWorkerInstanceService;
private DQStageService dqStageService;
@Autowired
@@ -41,8 +41,8 @@ public class TaskDispatcherScheduler {
this.wc = wc;
}
@Autowired
- public void setDqInstanceService(DQInstanceService dqInstanceService) {
- this.dqInstanceService = dqInstanceService;
+ public void setDqWorkerInstanceService(DQWorkerInstanceService
dqWorkerInstanceService) {
+ this.dqWorkerInstanceService = dqWorkerInstanceService;
}
@Autowired
public void setDqStageService(DQStageService dqStageService) {
@@ -63,23 +63,23 @@ public class TaskDispatcherScheduler {
@Scheduled(fixedDelay = 5 * 1000L)
public void doTaskDispatcherScheduler() {
log.info("doTaskDispatcherScheduler start.");
- List<DQInstance> waittingToRemoveFromWaitingList =
Lists.newArrayList();
- Queue<DQInstance> waitingToRecordingDQInstanceQueue =
Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
+ List<DQInstanceBO> waittingToRemoveFromWaitingList =
Lists.newArrayList();
+ Queue<DQInstanceBO> waitingToRecordingDQInstanceQueue =
Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
try {
while (true) {
try {
- DQInstance dqInstance =
waitingToRecordingDQInstanceQueue.poll();
+ DQInstanceBO dqInstance =
waitingToRecordingDQInstanceQueue.poll();
// queue is empty, quit
if (dqInstance == null) break;
if (DQInstanceStatus.ACCEPTED != dqInstance.getStatus()) {
// State is not init, sync from database and reassign
it
- dqInstance =
dqInstanceService.getById(dqInstance.getId());
+ dqInstance =
dqWorkerInstanceService.getById(dqInstance.getId());
// assign task by status
waittingToRemoveFromWaitingList.add(dqInstance);
} else {
// normal, update status and remove from queue, then
put it to the queue of recording task
- if (dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.WAITTING)) {
+ if (dqWorkerInstanceService.updateStatus(dqInstance,
DQInstanceStatus.WAITTING)) {
waittingToRemoveFromWaitingList.add(dqInstance);
}
}
@@ -100,7 +100,7 @@ public class TaskDispatcherScheduler {
log.info("doTaskDispatcherScheduler end.");
}
- private void offerToSpecQueueByStatus(DQInstance instance) {
+ private void offerToSpecQueueByStatus(DQInstanceBO instance) {
DQInstanceStatus status = instance.getStatus();
switch (status) {
case WAITTING:
@@ -132,12 +132,12 @@ public class TaskDispatcherScheduler {
@Scheduled(fixedDelay = 5 * 1000L)
public void scanRecordingTask() {
- List<DQInstance> waittingToRemoveFromRecordingList =
Lists.newArrayList();
- Queue<DQInstance> waitingToSubmitDQInstanceQueue =
Queues.newPriorityBlockingQueue(wc.getRECORDING_TASK_QUEUE());
+ List<DQInstanceBO> waittingToRemoveFromRecordingList =
Lists.newArrayList();
+ Queue<DQInstanceBO> waitingToSubmitDQInstanceQueue =
Queues.newPriorityBlockingQueue(wc.getRECORDING_TASK_QUEUE());
try {
while (CollectionUtils.isNotEmpty(waitingToSubmitDQInstanceQueue))
{
try {
- DQInstance dqInstance =
waitingToSubmitDQInstanceQueue.poll();
+ DQInstanceBO dqInstance =
waitingToSubmitDQInstanceQueue.poll();
if (dqInstance == null) break;
processRecordingInstance(dqInstance,
waittingToRemoveFromRecordingList);
} catch (Exception e) {
@@ -156,7 +156,7 @@ public class TaskDispatcherScheduler {
}
}
- private void processRecordingInstance(DQInstance dqInstance,
List<DQInstance> waittingToRemoveFromRecordingList) {
+ private void processRecordingInstance(DQInstanceBO dqInstance,
List<DQInstanceBO> waittingToRemoveFromRecordingList) {
try {
DQAbstractStage recordingStage = dqInstance.getRecordingStage();
DQStageStatus stageStatus = recordingStage.getStatus();
@@ -164,13 +164,13 @@ public class TaskDispatcherScheduler {
if (!dqStageService.submitStage(recordingStage)) {
throw new StageSubmitException("Submit stage failed!,
instance id: " + dqInstance.getId());
} else {
- dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.RECORDING);
+ dqWorkerInstanceService.updateStatus(dqInstance,
DQInstanceStatus.RECORDING);
}
} else if (stageStatus == DQStageStatus.FINISH) {
// if there is one task success, the instance should be
EVALUATING;
// if all tasks are failed, the instance should be
FAILED_ALERTING;
DQInstanceStatus instanceStatus = recordingStage.hasSuccess()?
DQInstanceStatus.EVALUATING: DQInstanceStatus.FAILED_ALERTING;
- dqInstanceService.updateStatus(dqInstance, instanceStatus);
+ dqWorkerInstanceService.updateStatus(dqInstance,
instanceStatus);
waittingToRemoveFromRecordingList.add(dqInstance);
}
} catch (Exception e) {
@@ -181,9 +181,9 @@ public class TaskDispatcherScheduler {
public void scanEvaluatingTask() {
Executors.newCachedThreadPool().execute(() -> {
- LinkedBlockingQueue<DQInstance> evaluating_task_queue =
wc.getEVALUATING_TASK_QUEUE();
+ LinkedBlockingQueue<DQInstanceBO> evaluating_task_queue =
wc.getEVALUATING_TASK_QUEUE();
while (true) {
- DQInstance dqInstance = null;
+ DQInstanceBO dqInstance = null;
try {
dqInstance = evaluating_task_queue.poll(5,
TimeUnit.SECONDS);
if (dqInstance == null) continue;
@@ -192,13 +192,13 @@ public class TaskDispatcherScheduler {
if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING)
{
dqStageService.executeStage(evaluatingStage);
DQInstanceStatus dqInstanceStatus =
evaluatingStage.hasSuccess() ? DQInstanceStatus.ALERTING :
DQInstanceStatus.FAILED_ALERTING;
- dqInstanceService.updateStatus(dqInstance,
dqInstanceStatus);
+ dqWorkerInstanceService.updateStatus(dqInstance,
dqInstanceStatus);
}
offerToSpecQueueByStatus(dqInstance);
} catch (Exception e) {
if (dqInstance != null) {
log.error("scanEvaluatingTask doEvalute failed, id :
{}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
- dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.FAILED_ALERTING);
+ dqWorkerInstanceService.updateStatus(dqInstance,
DQInstanceStatus.FAILED_ALERTING);
offerToSpecQueueByStatus(dqInstance);
} else {
log.error("scanEvaluatingTask poll instance failed.
ex:", e);
@@ -210,9 +210,9 @@ public class TaskDispatcherScheduler {
public void scanAlertingTask() {
Executors.newCachedThreadPool().execute(() -> {
- LinkedBlockingQueue<DQInstance> alerting_task_queue =
wc.getALERTING_TASK_QUEUE();
+ LinkedBlockingQueue<DQInstanceBO> alerting_task_queue =
wc.getALERTING_TASK_QUEUE();
while (true) {
- DQInstance dqInstance = null;
+ DQInstanceBO dqInstance = null;
try {
dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
if (dqInstance == null) continue;
@@ -221,7 +221,7 @@ public class TaskDispatcherScheduler {
DQStage alertingStage = dqInstance.getAlertingStage();
dqStageService.executeStage(alertingStage);
DQInstanceStatus dqInstanceStatus =
alertingStage.hasSuccess()? DQInstanceStatus.SUCCESS : DQInstanceStatus.FAILED;
- dqInstanceService.updateStatus(dqInstance,
dqInstanceStatus);
+ dqWorkerInstanceService.updateStatus(dqInstance,
dqInstanceStatus);
}
offerToSpecQueueByStatus(dqInstance);
} catch (Exception e) {
@@ -229,7 +229,7 @@ public class TaskDispatcherScheduler {
log.error("scanAlertingTask doAlert failed, id : {},
instance : {}, ex:", dqInstance.getId(), dqInstance, e);
if (dqInstance.isFailed()) {
// retry 5 times, set failed
- dqInstanceService.updateStatus(dqInstance,
DQInstanceStatus.FAILED);
+ dqWorkerInstanceService.updateStatus(dqInstance,
DQInstanceStatus.FAILED);
// retry times less than 5, do not modify status
and put task back
}
// put task back
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
index bf5b38b6..bd50374e 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
@@ -3,14 +3,10 @@ package org.apache.griffin.core.worker.service;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.griffin.core.worker.client.DispatcherClient;
-import org.apache.griffin.core.worker.dao.DQInstanceDao;
import org.apache.griffin.core.worker.dao.DQTaskDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.apache.griffin.core.worker.entity.dispatcher.*;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
b/core/src/main/java/org/apache/griffin/core/worker/service/DQWorkerInstanceService.java
similarity index 62%
rename from
core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
rename to
core/src/main/java/org/apache/griffin/core/worker/service/DQWorkerInstanceService.java
index 904de4ed..ae12415d 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/DQWorkerInstanceService.java
@@ -1,10 +1,11 @@
package org.apache.griffin.core.worker.service;
-import org.apache.griffin.core.common.entity.GriffinDQContentInstanceMap;
-import org.apache.griffin.core.common.dao.DQContentInstanceMapDao;
-import org.apache.griffin.core.worker.dao.DQInstanceDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.api.entity.GriffinDQContentInstanceMap;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.apache.griffin.api.dao.DQContentInstanceMapDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
import org.apache.griffin.core.worker.factory.DQInstanceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,8 +13,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
-public class DQInstanceService {
- private static final Logger log =
LoggerFactory.getLogger(DQInstanceService.class);
+public class DQWorkerInstanceService {
+ private static final Logger log =
LoggerFactory.getLogger(DQWorkerInstanceService.class);
@Autowired
private DQInstanceDao dqInstanceDao;
@@ -28,9 +29,10 @@ public class DQInstanceService {
* @param status stat
* @return true or false
*/
- public boolean updateStatus(DQInstance instance, DQInstanceStatus status) {
+ public boolean updateStatus(DQInstanceBO instance, DQInstanceStatus
status) {
try {
- dqInstanceDao.updateDQInstanceStatus(instance, status.getCode());
+ // todo report to master
+ dqInstanceDao.updateDQInstanceStatus(instance.getId(),
status.getCode());
instance.setStatus(status);
log.info("instance {} {} => {} success.", instance.getId(),
instance.getStatus(), status);
return true;
@@ -45,19 +47,24 @@ public class DQInstanceService {
* @param id master assign id
* @return DQInstance
*/
- public DQInstance getById(Long id) {
+ public DQInstanceBO getById(Long id) {
if (id == null) {
log.error("Unknown instance id: null");
}
- DQInstance ins = dqInstanceDao.getById(id);
+ DQInstanceEntity ins = dqInstanceDao.getById(id);
if (ins == null) {
// the ins id has no task info
- return constructInstance(id);
+ DQInstanceBO dqInstanceBO = constructInstance(id);
+ saveDqcInstanceEntityFromBO(dqInstanceBO);
+ return dqInstanceBO;
}
return recoveryInstance(ins);
}
- private DQInstance constructInstance(Long id) {
+ private void saveDqcInstanceEntityFromBO(DQInstanceBO dqInstanceBO) {
+ }
+
+ private DQInstanceBO constructInstance(Long id) {
log.info("constructInstance id: {}", id);
GriffinDQContentInstanceMap contentInstanceMap =
dqContentInstanceMapDao.getContentInstanceMapByInstanceId(id);
Long instanceId = contentInstanceMap.getInstanceId();
@@ -65,7 +72,7 @@ public class DQInstanceService {
return dqInstanceFactory.constructInstance(instanceId, dqcId);
}
- private DQInstance recoveryInstance(DQInstance instance) {
+ private DQInstanceBO recoveryInstance(DQInstanceEntity instance) {
return dqInstanceFactory.recoveryInstance(instance);
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
b/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
index ebe9091f..dfdd589c 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
@@ -16,7 +16,7 @@ public class ExecuteNodeServiceImpl extends
ExecuteNodeServiceGrpc.ExecuteNodeSe
}
@Override
- public void querySingleDQTask(SQuerySingleDQTaskRequest request,
StreamObserver<QuerySingleDQTaskResponse> responseObserver) {
+ public void querySingleDQTask(QuerySingleDQTaskRequest request,
StreamObserver<QuerySingleDQTaskResponse> responseObserver) {
super.querySingleDQTask(request, responseObserver);
}
}
diff --git
a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
index f866a533..13444ece 100644
---
a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
+++
b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
@@ -1,6 +1,6 @@
package org.apache.griffin.core.worker.stage;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
import org.apache.griffin.core.worker.service.DQStageService;
@@ -13,7 +13,7 @@ public abstract class DQAbstractStage implements DQStage {
protected DQStageService dqStageService;
protected DQStageStatus status;
- protected DQInstance instance;
+ protected DQInstanceBO instance;
protected List<DQBaseTask> subTaskList;
@@ -21,11 +21,11 @@ public abstract class DQAbstractStage implements DQStage {
this.status = DQStageStatus.INIT;
}
- public DQInstance getInstance() {
+ public DQInstanceBO getInstance() {
return instance;
}
- public void setInstance(DQInstance instance) {
+ public void setInstance(DQInstanceBO instance) {
this.instance = instance;
}