This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
     new dfe8675c ExecuteNodeGrpc impl (#629)
dfe8675c is described below

commit dfe8675c95a1e2567c104013747ab5ca32ae029a
Author: dabuliud <[email protected]>
AuthorDate: Fri Mar 24 21:36:22 2023 +0800

    ExecuteNodeGrpc impl (#629)
    
    * ExecuteNodeGrpc impl
    
    * ServerImpl
    
    ---------
    
    Co-authored-by: Warden <[email protected]>
---
 api/pom.xml                                        | 70 ++++++++++++++++++-
 .../apache/griffin/api}/dao/DQBusinessRuleDao.java |  5 +-
 .../org/apache/griffin/api/dao/DQContentDao.java   |  8 +++
 .../griffin/api}/dao/DQContentInstanceMapDao.java  |  4 +-
 .../org/apache/griffin/api/dao/DQInstanceDao.java  | 14 ++++
 .../griffin/api}/entity/DQResoueceEnums.java       |  2 +-
 .../griffin/api}/entity/GriffinDQBusinessRule.java |  2 +-
 .../griffin/api}/entity/GriffinDQContent.java      |  2 +-
 .../api}/entity/GriffinDQContentInstanceMap.java   |  2 +-
 .../apache/griffin/api}/entity/GriffinDQTable.java |  2 +-
 .../api}/entity/enums/DQInstanceStatus.java        | 13 +++-
 .../griffin/api/entity/pojo/DQInstanceEntity.java  | 24 +++++++
 .../apache/griffin/api}/entity/pojo/DQTable.java   |  2 +-
 .../apache/griffin/api}/entity/pojo/Metric.java    |  2 +-
 .../griffin/api}/entity/pojo/rule/DQAlertRule.java |  2 +-
 .../api}/entity/pojo/rule/DQEvaluateRule.java      |  6 +-
 .../api}/entity/pojo/rule/DQRecordRule.java        |  8 +--
 .../griffin/api/service/DQInstanceService.java     | 70 +++++++++++++++++++
 .../org/apache/griffin/api}/utils/DQDateUtils.java |  2 +-
 .../apache/griffin/api}/utils/ExpressionUtils.java |  2 +-
 api/src/main/proto/ExecuteNodeProtocols.proto      |  4 +-
 core/pom.xml                                       | 79 +---------------------
 .../griffin/core/common/dao/DQContentDao.java      |  8 ---
 .../core/common/utils/context/WorkerContext.java   | 42 ++++++------
 .../core/master/context/TaskManagerContext.java    | 21 ++++++
 .../griffin/core/master/server/TaskManager.java    | 40 ++++++++---
 .../service/DQTaskManagerInstanceService.java      | 36 ++++++++++
 .../core/master/service/TaskAssignService.java     | 30 --------
 .../core/master/transport/DQCConnection.java       | 26 +++++--
 .../master/transport/DQCConnectionManager.java     |  4 +-
 .../core/worker/client/DispatcherClient.java       |  4 +-
 .../griffin/core/worker/dao/DQInstanceDao.java     | 14 ----
 .../apache/griffin/core/worker/dao/DQTaskDao.java  |  2 -
 .../core/worker/driver/PrestoTemplateDriver.java   |  3 +-
 .../core/worker/driver/SparkTemplateDriver.java    |  3 +-
 .../griffin/core/worker/driver/TemplateDriver.java |  3 +-
 .../bo/{DQInstance.java => DQInstanceBO.java}      | 10 ++-
 .../core/worker/entity/bo/task/DQBaseTask.java     |  6 +-
 .../core/worker/entity/bo/task/DQHiveTask.java     |  1 -
 .../pojo/template/DQRecordCompostiveTemplate.java  |  4 --
 .../{pojo => }/template/DQRecordBaseTemplate.java  |  2 +-
 .../template/DQRecordCompostiveTemplate.java       |  4 ++
 .../{pojo => }/template/DQRecordTemplate.java      |  2 +-
 .../core/worker/factory/DQInstanceFactory.java     | 20 +++---
 .../core/worker/factory/DQStageFactory.java        |  5 +-
 .../griffin/core/worker/factory/DQTaskFactory.java |  4 +-
 .../worker/schedule/TaskDispatcherScheduler.java   | 52 +++++++-------
 .../griffin/core/worker/service/DQTaskService.java |  4 --
 ...ceService.java => DQWorkerInstanceService.java} | 35 ++++++----
 .../worker/service/ExecuteNodeServiceImpl.java     |  2 +-
 .../griffin/core/worker/stage/DQAbstractStage.java |  8 +--
 51 files changed, 438 insertions(+), 282 deletions(-)

diff --git a/api/pom.xml b/api/pom.xml
index 81f9a24d..435c7223 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -17,14 +17,80 @@
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
-        <!-- grpc -->
-<!--        <protobuf.version>3.7.1</protobuf.version>-->
         <protobuf.version>3.17.2</protobuf.version>
         <protobuf-plugin.version>0.6.1</protobuf-plugin.version>
         <grpc.version>1.42.1</grpc.version>
+        <spring.boot.version>2.1.7.RELEASE</spring.boot.version>
+        
<spring.security.kerberos.version>1.0.1.RELEASE</spring.security.kerberos.version>
     </properties>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>${spring.boot.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-log4j2</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-properties-migrator</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.vaadin.external.google</groupId>
+                    <artifactId>android-json</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-json</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.hibernate</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.aspectj</groupId>
+                    <artifactId>aspectjrt</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-aspects</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.security.kerberos</groupId>
+            <artifactId>spring-security-kerberos-client</artifactId>
+            <version>${spring.security.kerberos.version}</version>
+        </dependency>
         <!--   new dependencies     -->
         <dependency>
             <groupId>org.projectlombok</groupId>
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/dao/DQBusinessRuleDao.java 
b/api/src/main/java/org/apache/griffin/api/dao/DQBusinessRuleDao.java
similarity index 62%
rename from 
core/src/main/java/org/apache/griffin/core/common/dao/DQBusinessRuleDao.java
rename to api/src/main/java/org/apache/griffin/api/dao/DQBusinessRuleDao.java
index 1a791e2e..1f67e260 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/dao/DQBusinessRuleDao.java
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQBusinessRuleDao.java
@@ -1,10 +1,11 @@
-package org.apache.griffin.core.common.dao;
+package org.apache.griffin.api.dao;
 
-import org.apache.griffin.core.common.entity.GriffinDQBusinessRule;
+import org.apache.griffin.api.entity.GriffinDQBusinessRule;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
 
+
 @Component
 public interface DQBusinessRuleDao {
     List<GriffinDQBusinessRule> getListByDqcId(Long dqcId);
diff --git a/api/src/main/java/org/apache/griffin/api/dao/DQContentDao.java 
b/api/src/main/java/org/apache/griffin/api/dao/DQContentDao.java
new file mode 100644
index 00000000..c7e12279
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQContentDao.java
@@ -0,0 +1,8 @@
+package org.apache.griffin.api.dao;
+
+
+import org.apache.griffin.api.entity.GriffinDQContent;
+
+public interface DQContentDao {
+    GriffinDQContent getById(Long id);
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentInstanceMapDao.java
 b/api/src/main/java/org/apache/griffin/api/dao/DQContentInstanceMapDao.java
similarity index 60%
rename from 
core/src/main/java/org/apache/griffin/core/common/dao/DQContentInstanceMapDao.java
rename to 
api/src/main/java/org/apache/griffin/api/dao/DQContentInstanceMapDao.java
index 94ad5018..fd9f30af 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentInstanceMapDao.java
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQContentInstanceMapDao.java
@@ -1,6 +1,6 @@
-package org.apache.griffin.core.common.dao;
+package org.apache.griffin.api.dao;
 
-import org.apache.griffin.core.common.entity.GriffinDQContentInstanceMap;
+import org.apache.griffin.api.entity.GriffinDQContentInstanceMap;
 import org.springframework.stereotype.Component;
 
 @Component
diff --git a/api/src/main/java/org/apache/griffin/api/dao/DQInstanceDao.java 
b/api/src/main/java/org/apache/griffin/api/dao/DQInstanceDao.java
new file mode 100644
index 00000000..10289d92
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/dao/DQInstanceDao.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.api.dao;
+
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface DQInstanceDao {
+
+    DQInstanceEntity getById(Long id);
+
+    void updateDQInstanceStatus(Long id, int status);
+
+    void insert(DQInstanceEntity instance);
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/entity/DQResoueceEnums.java 
b/api/src/main/java/org/apache/griffin/api/entity/DQResoueceEnums.java
similarity index 51%
rename from 
core/src/main/java/org/apache/griffin/core/common/entity/DQResoueceEnums.java
rename to api/src/main/java/org/apache/griffin/api/entity/DQResoueceEnums.java
index 1e7f94b2..a6c1ec50 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/entity/DQResoueceEnums.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/DQResoueceEnums.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
 
 public enum DQResoueceEnums {
     HIVE, KAFKA;
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQBusinessRule.java
 b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQBusinessRule.java
similarity index 70%
rename from 
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQBusinessRule.java
rename to 
api/src/main/java/org/apache/griffin/api/entity/GriffinDQBusinessRule.java
index 214010b0..66892dc0 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQBusinessRule.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQBusinessRule.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
 
 import lombok.Data;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContent.java
 b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContent.java
similarity index 86%
rename from 
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContent.java
rename to api/src/main/java/org/apache/griffin/api/entity/GriffinDQContent.java
index de773467..1e9ee24b 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContent.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContent.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
 
 import lombok.Data;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContentInstanceMap.java
 
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContentInstanceMap.java
similarity index 75%
rename from 
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContentInstanceMap.java
rename to 
api/src/main/java/org/apache/griffin/api/entity/GriffinDQContentInstanceMap.java
index d46d7cc4..44156ba6 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQContentInstanceMap.java
+++ 
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQContentInstanceMap.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
 
 import lombok.Data;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQTable.java 
b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQTable.java
similarity index 78%
rename from 
core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQTable.java
rename to api/src/main/java/org/apache/griffin/api/entity/GriffinDQTable.java
index acbde9e5..0c92e531 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/entity/GriffinDQTable.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/GriffinDQTable.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.common.entity;
+package org.apache.griffin.api.entity;
 
 import lombok.Data;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
 b/api/src/main/java/org/apache/griffin/api/entity/enums/DQInstanceStatus.java
similarity index 52%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
rename to 
api/src/main/java/org/apache/griffin/api/entity/enums/DQInstanceStatus.java
index cd626a7e..fbd1ad98 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
+++ 
b/api/src/main/java/org/apache/griffin/api/entity/enums/DQInstanceStatus.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.enums;
+package org.apache.griffin.api.entity.enums;
 
 public enum DQInstanceStatus {
     ACCEPTED(0),
@@ -11,7 +11,8 @@ public enum DQInstanceStatus {
 //    EVALUATE_ALERTING(6), // Metric 需要告警
     FAILED_ALERTING(7),   // 任务执行失败需要告警
     SUCCESS(8),
-    FAILED(9);
+    FAILED(9),
+    STOPPED(10);
 
     private final int code;
 
@@ -22,4 +23,12 @@ public enum DQInstanceStatus {
     public int getCode() {
         return code;
     }
+
+    public static DQInstanceStatus findByCode(int statusCode) throws Exception 
{
+        DQInstanceStatus[] values = DQInstanceStatus.values();
+        for (DQInstanceStatus value : values) {
+            if (value.code == statusCode) return value;
+        }
+        throw new Exception("Unknown DQInstanceStatus Code: " + statusCode);
+    }
 }
diff --git 
a/api/src/main/java/org/apache/griffin/api/entity/pojo/DQInstanceEntity.java 
b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQInstanceEntity.java
new file mode 100644
index 00000000..18f20f37
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQInstanceEntity.java
@@ -0,0 +1,24 @@
+package org.apache.griffin.api.entity.pojo;
+
+import lombok.Data;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+
+/**
+ * 任务实例, 当前任务运行时的快照
+ *      一个实例包含多个子任务
+ */
+@Data
+public class DQInstanceEntity implements Comparable<DQInstanceEntity> {
+
+    private Long id;
+    private Long dqcId;
+    // 实例状态
+    private DQInstanceStatus status;
+    // 记录状态年龄  状态更新是重置
+    private int statusAge;
+
+    @Override
+    public int compareTo(DQInstanceEntity o) {
+        return 0;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java 
b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQTable.java
similarity index 81%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
rename to api/src/main/java/org/apache/griffin/api/entity/pojo/DQTable.java
index 7be0c5ac..5bfc6d78 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/DQTable.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo;
+package org.apache.griffin.api.entity.pojo;
 
 import lombok.Data;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java 
b/api/src/main/java/org/apache/griffin/api/entity/pojo/Metric.java
similarity index 76%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
rename to api/src/main/java/org/apache/griffin/api/entity/pojo/Metric.java
index 6a36e9b4..41354d01 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/Metric.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/Metric.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo;
+package org.apache.griffin.api.entity.pojo;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
 b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQAlertRule.java
similarity index 75%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
rename to 
api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQAlertRule.java
index 37dd63f1..47388b7c 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
+++ b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQAlertRule.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo.rule;
+package org.apache.griffin.api.entity.pojo.rule;
 
 import java.util.List;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
 b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQEvaluateRule.java
similarity index 68%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
rename to 
api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQEvaluateRule.java
index 2ee5ec6a..42ec4ef9 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
+++ 
b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQEvaluateRule.java
@@ -1,7 +1,7 @@
-package org.apache.griffin.core.worker.entity.pojo.rule;
+package org.apache.griffin.api.entity.pojo.rule;
 
-import org.apache.griffin.core.worker.entity.pojo.Metric;
-import org.apache.griffin.core.worker.utils.ExpressionUtils;
+import org.apache.griffin.api.entity.pojo.Metric;
+import org.apache.griffin.api.utils.ExpressionUtils;
 
 import java.util.List;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
 b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQRecordRule.java
similarity index 87%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
rename to 
api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQRecordRule.java
index 3cf5605d..c47eb4a6 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
+++ 
b/api/src/main/java/org/apache/griffin/api/entity/pojo/rule/DQRecordRule.java
@@ -1,13 +1,13 @@
-package org.apache.griffin.core.worker.entity.pojo.rule;
+package org.apache.griffin.api.entity.pojo.rule;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import lombok.Data;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
-import org.apache.griffin.core.worker.entity.pojo.DQTable;
-import 
org.apache.griffin.core.worker.entity.pojo.template.DQRecordBaseTemplate;
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.api.entity.pojo.DQTable;
+import org.apache.griffin.core.worker.entity.template.DQRecordBaseTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
 import org.apache.griffin.core.worker.utils.DQDateUtils;
 
 import java.util.List;
diff --git 
a/api/src/main/java/org/apache/griffin/api/service/DQInstanceService.java 
b/api/src/main/java/org/apache/griffin/api/service/DQInstanceService.java
new file mode 100644
index 00000000..0cbcf2b6
--- /dev/null
+++ b/api/src/main/java/org/apache/griffin/api/service/DQInstanceService.java
@@ -0,0 +1,70 @@
+package org.apache.griffin.api.service;
+
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQInstanceService {
+    private static final Logger log = 
LoggerFactory.getLogger(DQInstanceService.class);
+
+    @Autowired
+    private DQInstanceDao dqInstanceDao;
+//    @Autowired
+//    private DQContentInstanceMapDao dqContentInstanceMapDao;
+//    @Autowired
+//    private DQInstanceFactory dqInstanceFactory;
+
+    /**
+     * 数据库和内存需要保持同步
+     * @param instance obj
+     * @param status stat
+     * @return true or false
+     */
+    public boolean updateStatus(DQInstanceEntity instance, DQInstanceStatus 
status) {
+        try {
+            dqInstanceDao.updateDQInstanceStatus(instance, status.getCode());
+            instance.setStatus(status);
+            log.info("instance {} {} => {} success.", instance.getId(), 
instance.getStatus(), status);
+            return true;
+        } catch (Exception e) {
+            log.error("instance {} {} => {} failed, ex", instance.getId(), 
instance.getStatus(), status, e);
+        }
+        return false;
+    }
+
+    /**
+     * construct instance
+     * @param id master assign id
+     * @return DQInstance
+     */
+    public DQInstanceEntity getById(Long id) {
+        if (id == null) {
+            log.error("Unknown instance id: null");
+        }
+        DQInstanceEntity ins = dqInstanceDao.getById(id);
+        return ins;
+//        if (ins == null) {
+//            // the ins id has no task info
+//            return constructInstance(id);
+//        }
+//        return recoveryInstance(ins);
+    }
+
+//    private DQInstanceEntity constructInstance(Long id) {
+//        log.info("constructInstance id: {}", id);
+//        GriffinDQContentInstanceMap contentInstanceMap = 
dqContentInstanceMapDao.getContentInstanceMapByInstanceId(id);
+//        Long instanceId = contentInstanceMap.getInstanceId();
+//        Long dqcId = contentInstanceMap.getDqcId();
+//        return dqInstanceFactory.constructInstance(instanceId, dqcId);
+//    }
+//
+//    private DQInstance recoveryInstance(DQInstance instance) {
+//        return dqInstanceFactory.recoveryInstance(instance);
+//    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java 
b/api/src/main/java/org/apache/griffin/api/utils/DQDateUtils.java
similarity index 86%
rename from 
core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
rename to api/src/main/java/org/apache/griffin/api/utils/DQDateUtils.java
index 8e5a1cf4..02ef18de 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
+++ b/api/src/main/java/org/apache/griffin/api/utils/DQDateUtils.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.utils;
+package org.apache.griffin.api.utils;
 
 import java.util.concurrent.TimeUnit;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java 
b/api/src/main/java/org/apache/griffin/api/utils/ExpressionUtils.java
similarity index 75%
rename from 
core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
rename to api/src/main/java/org/apache/griffin/api/utils/ExpressionUtils.java
index 23884c88..3114b0aa 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
+++ b/api/src/main/java/org/apache/griffin/api/utils/ExpressionUtils.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.utils;
+package org.apache.griffin.api.utils;
 
 public class ExpressionUtils {
     public static boolean evaluate(String expression, double metricValue) {
diff --git a/api/src/main/proto/ExecuteNodeProtocols.proto 
b/api/src/main/proto/ExecuteNodeProtocols.proto
index 871dab7b..4d775118 100644
--- a/api/src/main/proto/ExecuteNodeProtocols.proto
+++ b/api/src/main/proto/ExecuteNodeProtocols.proto
@@ -16,7 +16,7 @@ service ExecuteNodeService {
   // Ask ExecuteNode to stop one task
   rpc stopDQTask(StopDQTaskRequest) returns (StopDQTaskResponse) {}
   // Ask ExecuteNode to stop one task
-  rpc querySingleDQTask(SQuerySingleDQTaskRequest) returns 
(QuerySingleDQTaskResponse) {}
+  rpc querySingleDQTask(QuerySingleDQTaskRequest) returns 
(QuerySingleDQTaskResponse) {}
   rpc sayHello(SayHelloRequest) returns (SayHelloResponse) {}
 }
 
@@ -38,7 +38,7 @@ message StopDQTaskResponse {
   optional int32 status = 2;
 }
 
-message SQuerySingleDQTaskRequest {
+message QuerySingleDQTaskRequest {
   int64 instanceId = 1;
 }
 
diff --git a/core/pom.xml b/core/pom.xml
index d07312a8..296bbf27 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -48,10 +48,6 @@ under the License.
         <livy.core.version>0.3.0</livy.core.version>
         
<elasticsearch-rest-client.version>6.2.4</elasticsearch-rest-client.version>
         <jackson-databind.version>2.9.9.3</jackson-databind.version>
-        <!-- grpc -->
-<!--        <protobuf.version>3.19.1</protobuf.version>-->
-<!--        <protobuf-plugin.version>0.6.1</protobuf-plugin.version>-->
-<!--        <grpc.version>1.42.1</grpc.version>-->
     </properties>
 
     <repositories>
@@ -139,11 +135,7 @@ under the License.
             <artifactId>postgresql</artifactId>
             <version>${postgresql.version}</version>
         </dependency>
-        <!--<dependency>-->
-        <!--<groupId>mysql</groupId>-->
-        <!--<artifactId>mysql-connector-java</artifactId>-->
-        <!--<version>${mysql.java.version}</version>-->
-        <!--</dependency>-->
+
         <dependency>
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
@@ -159,75 +151,6 @@ under the License.
             <version>${jackson-databind.version}</version>
         </dependency>
 
-        <!-- to access metastore from hive-->
-<!--        <dependency>-->
-<!--            <groupId>org.apache.hadoop</groupId>-->
-<!--            <artifactId>hadoop-client</artifactId>-->
-<!--            <version>${hadoop.version}</version>-->
-<!--            &lt;!&ndash;<scope>provided</scope>&ndash;&gt;-->
-<!--            <exclusions>-->
-<!--                <exclusion>-->
-<!--                    <groupId>javax.servlet</groupId>-->
-<!--                    <artifactId>servlet-api</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>org.slf4j</groupId>-->
-<!--                    <artifactId>slf4j-log4j12</artifactId>-->
-<!--                </exclusion>-->
-<!--            </exclusions>-->
-<!--        </dependency>-->
-<!--        <dependency>-->
-<!--            <groupId>org.apache.hive</groupId>-->
-<!--            <artifactId>hive-metastore</artifactId>-->
-<!--            <version>${hive.version}</version>-->
-<!--            <exclusions>-->
-<!--                <exclusion>-->
-<!--                    <groupId>org.eclipse.jetty.aggregate</groupId>-->
-<!--                    <artifactId>jetty-all</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>org.eclipse.jetty.orbit</groupId>-->
-<!--                    <artifactId>javax.servlet</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>javax.servlet</groupId>-->
-<!--                    <artifactId>servlet-api</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>de.ruedigermoeller</groupId>-->
-<!--                    <artifactId>fst</artifactId>-->
-<!--                </exclusion>-->
-<!--            </exclusions>-->
-<!--        </dependency>-->
-
-        <!-- to access Hive using JDBC -->
-<!--        <dependency>-->
-<!--            <groupId>org.apache.hive</groupId>-->
-<!--            <artifactId>hive-jdbc</artifactId>-->
-<!--            <version>${hive.version}</version>-->
-<!--            <exclusions>-->
-<!--                <exclusion>-->
-<!--                    <groupId>org.eclipse.jetty.aggregate</groupId>-->
-<!--                    <artifactId>*</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>org.eclipse.jetty.orbit</groupId>-->
-<!--                    <artifactId>javax.servlet</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>javax.servlet</groupId>-->
-<!--                    <artifactId>servlet-api</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>org.mortbay.jetty</groupId>-->
-<!--                    <artifactId>servlet-api-2.5</artifactId>-->
-<!--                </exclusion>-->
-<!--                <exclusion>-->
-<!--                    <groupId>org.slf4j</groupId>-->
-<!--                    <artifactId>slf4j-log4j12</artifactId>-->
-<!--                </exclusion>-->
-<!--            </exclusions>-->
-<!--        </dependency>-->
         <!--schedule-->
         <dependency>
             <groupId>org.springframework</groupId>
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentDao.java 
b/core/src/main/java/org/apache/griffin/core/common/dao/DQContentDao.java
deleted file mode 100644
index 97182d9f..00000000
--- a/core/src/main/java/org/apache/griffin/core/common/dao/DQContentDao.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.griffin.core.common.dao;
-
-
-import org.apache.griffin.core.common.entity.GriffinDQContent;
-
-public interface DQContentDao {
-    GriffinDQContent getById(Long id);
-}
diff --git 
a/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
 
b/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
index 49f99949..40da7a6e 100644
--- 
a/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
+++ 
b/core/src/main/java/org/apache/griffin/core/common/utils/context/WorkerContext.java
@@ -2,7 +2,7 @@ package org.apache.griffin.core.common.utils.context;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.springframework.stereotype.Component;
 
@@ -17,14 +17,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 @Component
 public class WorkerContext {
 
-    private final List<DQInstance> WAITTING_TASK_QUEUE;
-    private final List<DQInstance> RECORDING_TASK_QUEUE;
-    private final LinkedBlockingQueue<DQInstance> EVALUATING_TASK_QUEUE;
-    private final LinkedBlockingQueue<DQInstance> ALERTING_TASK_QUEUE;
+    private final List<DQInstanceBO> WAITTING_TASK_QUEUE;
+    private final List<DQInstanceBO> RECORDING_TASK_QUEUE;
+    private final LinkedBlockingQueue<DQInstanceBO> EVALUATING_TASK_QUEUE;
+    private final LinkedBlockingQueue<DQInstanceBO> ALERTING_TASK_QUEUE;
 
     // success和failed队列数据老化问题?
-    public final List<DQInstance> successTaskIdList;
-    public final List<DQInstance> failedTaskIdList;
+    public final List<DQInstanceBO> successTaskIdList;
+    public final List<DQInstanceBO> failedTaskIdList;
     
     public WorkerContext() {
         // 设置队列长度
@@ -37,27 +37,27 @@ public class WorkerContext {
         failedTaskIdList = Lists.newArrayList();
     }
 
-    public List<DQInstance> getWAITTING_TASK_QUEUE() {
+    public List<DQInstanceBO> getWAITTING_TASK_QUEUE() {
         return WAITTING_TASK_QUEUE;
     }
 
-    public List<DQInstance> getRECORDING_TASK_QUEUE() {
+    public List<DQInstanceBO> getRECORDING_TASK_QUEUE() {
         return RECORDING_TASK_QUEUE;
     }
 
-    public LinkedBlockingQueue<DQInstance> getEVALUATING_TASK_QUEUE() {
+    public LinkedBlockingQueue<DQInstanceBO> getEVALUATING_TASK_QUEUE() {
         return EVALUATING_TASK_QUEUE;
     }
 
-    public LinkedBlockingQueue<DQInstance> getALERTING_TASK_QUEUE() {
+    public LinkedBlockingQueue<DQInstanceBO> getALERTING_TASK_QUEUE() {
         return ALERTING_TASK_QUEUE;
     }
 
-    public List<DQInstance> getSuccessTaskIdList() {
+    public List<DQInstanceBO> getSuccessTaskIdList() {
         return successTaskIdList;
     }
 
-    public List<DQInstance> getFailedTaskIdList() {
+    public List<DQInstanceBO> getFailedTaskIdList() {
         return failedTaskIdList;
     }
 
@@ -66,18 +66,18 @@ public class WorkerContext {
         resetTaskStatusWhenStartUp();
     }
 
-    public DQInstance getWaittingTask() {
+    public DQInstanceBO getWaittingTask() {
         return null;
     }
 
-    public DQInstance getRecordingTask() {
+    public DQInstanceBO getRecordingTask() {
         return null;
     }
 
-    public boolean offerToRecordingTaskQueue(DQInstance dqInstance) {
+    public boolean offerToRecordingTaskQueue(DQInstanceBO dqInstance) {
         return false;
     }
-    public void offerToAlertingTaskQueue(DQInstance dqInstance) {
+    public void offerToAlertingTaskQueue(DQInstanceBO dqInstance) {
     }
 
     /**
@@ -97,18 +97,18 @@ public class WorkerContext {
     }
 
 
-    public void offerToEvaluatingTaskQueue(DQInstance dqInstance) {
+    public void offerToEvaluatingTaskQueue(DQInstanceBO dqInstance) {
     }
 
-    public void removeAll(List<DQInstance> targetList, List<DQInstance> 
waittingToRemoveFromRecordingList) {
+    public void removeAll(List<DQInstanceBO> targetList, List<DQInstanceBO> 
waittingToRemoveFromRecordingList) {
         targetList.removeAll(waittingToRemoveFromRecordingList);
     }
 
-    public void addFailedDQInstanceInfo(DQInstance instance) {
+    public void addFailedDQInstanceInfo(DQInstanceBO instance) {
 
     }
 
-    public void addSuccessDQInstanceInfo(DQInstance instance) {
+    public void addSuccessDQInstanceInfo(DQInstanceBO instance) {
 
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/master/context/TaskManagerContext.java
 
b/core/src/main/java/org/apache/griffin/core/master/context/TaskManagerContext.java
new file mode 100644
index 00000000..4184b2c9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/master/context/TaskManagerContext.java
@@ -0,0 +1,21 @@
+package org.apache.griffin.core.master.context;
+
+import com.google.common.collect.Maps;
+import org.apache.griffin.core.master.transport.DQCConnection;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+@Component
+public class TaskManagerContext {
+
+    public static final Map<Long, DQCConnection> submitTaskContainer = 
Maps.newConcurrentMap();
+
+    public void addTask(Long instanceId, DQCConnection client) {
+        submitTaskContainer.put(instanceId, client);
+    }
+
+    public void clearTask(Long instanceId) {
+        submitTaskContainer.remove(instanceId);
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java 
b/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java
index 258dce68..4d30f51b 100644
--- a/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java
+++ b/core/src/main/java/org/apache/griffin/core/master/server/TaskManager.java
@@ -1,14 +1,9 @@
 package org.apache.griffin.core.master.server;
 
 
-import com.google.common.collect.Maps;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.griffin.api.common.GRPCCode;
-import org.apache.griffin.api.proto.protocol.ExecuteNodeServiceGrpc;
-import org.apache.griffin.api.proto.protocol.SayHelloRequest;
-import org.apache.griffin.api.proto.protocol.SayHelloResponse;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.master.context.TaskManagerContext;
 import org.apache.griffin.core.master.transport.DQCConnection;
 import org.apache.griffin.core.master.transport.DQCConnectionManager;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -22,6 +17,8 @@ public class TaskManager {
 
     @Autowired
     private DQCConnectionManager dqcConnectionManager;
+    @Autowired
+    private TaskManagerContext taskManagerContext;
 
     public void registerWorker(String hostName, int port) throws 
UnknownHostException {
         try {
@@ -36,10 +33,35 @@ public class TaskManager {
     public void submitDQTask(Long instanceId) {
         DQCConnection aliveClient = dqcConnectionManager.getAliveClient();
         if (aliveClient == null) {
-
+            log.error("getAliveClient Failed, Please check");
+            return;
         }
         if (aliveClient.submitDQTask(instanceId)) {
-            // todo add task and client info to cache
+            taskManagerContext.addTask(instanceId, aliveClient);
+            // todo flush to db
+        }
+    }
+
+    public DQInstanceStatus querySingleDQTask(Long instanceId) throws 
Exception {
+        DQCConnection aliveClient = dqcConnectionManager.getAliveClient();
+        if (aliveClient == null) {
+            log.error("getAliveClient Failed, Please check");
+            return null;
+        }
+        DQInstanceStatus dqInstanceStatus = 
aliveClient.querySingleDQTask(instanceId);
+        return dqInstanceStatus;
+    }
+
+    public boolean stopDQTask(Long instanceId) {
+        DQCConnection aliveClient = dqcConnectionManager.getAliveClient();
+        if (aliveClient == null) {
+            log.error("getAliveClient Failed, Please check");
+            return false;
+        }
+        if (aliveClient.stopDQTask(instanceId)) {
+            taskManagerContext.clearTask(instanceId);
+            return true;
         }
+        return false;
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/master/service/DQTaskManagerInstanceService.java
 
b/core/src/main/java/org/apache/griffin/core/master/service/DQTaskManagerInstanceService.java
new file mode 100644
index 00000000..fec88cb9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/master/service/DQTaskManagerInstanceService.java
@@ -0,0 +1,36 @@
+package org.apache.griffin.core.master.service;
+
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.core.master.server.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DQTaskManagerInstanceService {
+    private static final Logger log = 
LoggerFactory.getLogger(DQTaskManagerInstanceService.class);
+
+    @Autowired
+    private DQInstanceDao dqInstanceDao;
+    @Autowired
+    private TaskManager taskManager;
+
+    public boolean stopTask(Long instanceId) {
+        try {
+            boolean stopResp = taskManager.stopDQTask(instanceId);
+            if (stopResp) {
+                dqInstanceDao.updateDQInstanceStatus(instanceId, 
DQInstanceStatus.STOPPED.getCode());
+                log.info("instance {} STOPPED success.", instanceId);
+                return true;
+            } else {
+                log.info("instance {} STOPPED failed.", instanceId);
+                return false;
+            }
+        } catch (Exception e) {
+            log.error("instance {} STOPPED failed, ex", instanceId, e);
+        }
+        return false;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
 
b/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
deleted file mode 100644
index 05e8a761..00000000
--- 
a/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.griffin.core.master.service;
-
-import org.apache.griffin.core.master.strategy.AbstractAssignStrategy;
-import org.apache.griffin.core.master.strategy.AssignStrategyFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
-
-import javax.annotation.PostConstruct;
-
-
-//@Component
-public class TaskAssignService {
-
-    @Value("${task.assign.strategy}")
-    private String assignTaskStrategtClass;
-
-    private AbstractAssignStrategy strategy;
-
-    @PostConstruct
-    public void init() {
-        strategy = AssignStrategyFactory.getStrategy(assignTaskStrategtClass);
-        Assert.notNull(strategy, "Task Assign Strategy init failed");
-    }
-
-
-//    public String assignTask(long instanceId) {
-//        return strategy.assignTask(instanceId);
-//    }
-}
diff --git 
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
 
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
index d2d4c7be..8c1d2532 100644
--- 
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
+++ 
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
@@ -1,16 +1,12 @@
 package org.apache.griffin.core.master.transport;
 
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import io.grpc.stub.StreamObserver;
 import lombok.Builder;
 import lombok.Data;
 import org.apache.griffin.api.common.GRPCCode;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
 import org.apache.griffin.api.proto.protocol.*;
 
 import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.Future;
 
 /**
  * the obj has a socketChannel to worker node
@@ -44,4 +40,24 @@ public class DQCConnection {
         return submitDQTaskResponse.getCode() == GRPCCode.SUCCESS.getCode();
     }
 
+    public DQInstanceStatus querySingleDQTask(Long instanceId) throws 
Exception {
+        QuerySingleDQTaskRequest querySingleDQTaskRequest = 
QuerySingleDQTaskRequest.newBuilder()
+                .setInstanceId(instanceId)
+                .build();
+        QuerySingleDQTaskResponse querySingleDQTaskResponse = 
client.querySingleDQTask(querySingleDQTaskRequest);
+        if (querySingleDQTaskResponse.getCode() == GRPCCode.SUCCESS.getCode()) 
{
+            int status = querySingleDQTaskResponse.getStatus();
+            DQInstanceStatus dqInstanceStatus = 
DQInstanceStatus.findByCode(status);
+            return dqInstanceStatus;
+        }
+        return null;
+    }
+
+    public boolean stopDQTask(Long instanceId) {
+        StopDQTaskRequest stopDQTaskRequest = StopDQTaskRequest.newBuilder()
+                .setInstanceId(instanceId)
+                .build();
+        StopDQTaskResponse stopDQTaskResponse = 
client.stopDQTask(stopDQTaskRequest);
+        return stopDQTaskResponse.getCode() == GRPCCode.SUCCESS.getCode();
+    }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
 
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
index 9cddce5c..620ab38f 100644
--- 
a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
+++ 
b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnectionManager.java
@@ -22,8 +22,10 @@ import java.util.stream.Collectors;
 public class DQCConnectionManager {
 
     private List<DQCConnection> clientList = Lists.newCopyOnWriteArrayList();
-    private String assignStrategyClassName = 
"org.apache.griffin.core.master.strategy.LooperAssignStrategy";
+    // todo read from conf
+    private final String assignStrategyClassName = 
"org.apache.griffin.core.master.strategy.LooperAssignStrategy";
     private AbstractAssignStrategy strategy;
+
     @PostConstruct
     public void init() {
         strategy = AssignStrategyFactory.getStrategy(assignStrategyClassName);
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
 
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
index dbd4bd39..75035470 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -1,7 +1,7 @@
 package org.apache.griffin.core.worker.client;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
 import org.apache.griffin.core.worker.entity.dispatcher.*;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
@@ -32,7 +32,7 @@ public class DispatcherClient {
         return null;
     }
 
-    public SubmitRequest wrapperDQTask(DQInstance waittingTask) {
+    public SubmitRequest wrapperDQTask(DQInstanceEntity waittingTask) {
         return null;
     }
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java 
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
deleted file mode 100644
index 544adb36..00000000
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.griffin.core.worker.dao;
-
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.springframework.stereotype.Component;
-
-@Component
-public interface DQInstanceDao {
-
-    DQInstance getById(Long id);
-
-    void updateDQInstanceStatus(DQInstance instance, int status);
-
-    void insert(DQInstance instance);
-}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java 
b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
index f87fbc46..ff8bc3fd 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
@@ -1,8 +1,6 @@
 package org.apache.griffin.core.worker.dao;
 
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
-import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
 
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
index b5d2cf62..826aa1f0 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -1,8 +1,7 @@
 package org.apache.griffin.core.worker.driver;
 
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
 
-import java.util.List;
 import java.util.Map;
 
 public class PrestoTemplateDriver extends TemplateDriver{
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
 
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
index 4d500832..df9f2c32 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -1,8 +1,7 @@
 package org.apache.griffin.core.worker.driver;
 
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
 
-import java.util.List;
 import java.util.Map;
 
 public class SparkTemplateDriver extends TemplateDriver {
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java 
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
index 9e2aab97..0516d556 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -1,12 +1,11 @@
 package org.apache.griffin.core.worker.driver;
 
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
-import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.entity.template.DQRecordTemplate;
 import org.apache.griffin.core.worker.factory.TemplateDriverFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.List;
 import java.util.Map;
 
 @Service
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstanceBO.java
similarity index 82%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
rename to 
core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstanceBO.java
index 303c7d13..9f22f35b 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstanceBO.java
@@ -1,13 +1,11 @@
 package org.apache.griffin.core.worker.entity.bo;
 
 import lombok.Data;
-import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.pojo.rule.DQAlertRule;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
 import org.apache.griffin.core.worker.stage.DQAbstractStage;
-import org.apache.griffin.core.worker.stage.DQStage;
 
 import java.util.List;
 
@@ -16,7 +14,7 @@ import java.util.List;
  *      一个实例包含多个子任务
  */
 @Data
-public class DQInstance implements Comparable<DQInstance> {
+public class DQInstanceBO implements Comparable<DQInstanceBO> {
     private Long id;
 
     private Long dqcId;
@@ -70,7 +68,7 @@ public class DQInstance implements Comparable<DQInstance> {
     }
 
     @Override
-    public int compareTo(DQInstance o) {
+    public int compareTo(DQInstanceBO o) {
         return 0;
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
index 5fdf4ed9..22d9d9e3 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -7,9 +7,9 @@ import 
org.apache.griffin.core.common.utils.context.WorkerContext;
 import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
-import org.apache.griffin.core.worker.entity.pojo.Metric;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
+import org.apache.griffin.api.entity.pojo.Metric;
+import org.apache.griffin.api.entity.pojo.rule.DQEvaluateRule;
+import org.apache.griffin.api.entity.pojo.rule.DQRecordRule;
 
 import java.util.List;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
index be6aee5e..e793a1f2 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -1,7 +1,6 @@
 package org.apache.griffin.core.worker.entity.bo.task;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
 
 import java.util.List;
 
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
deleted file mode 100644
index cfccf84a..00000000
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordCompostiveTemplate.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.griffin.core.worker.entity.pojo.template;
-
-public class DQRecordCompostiveTemplate {
-}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordBaseTemplate.java
similarity index 51%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
rename to 
core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordBaseTemplate.java
index 44646da8..aa31523d 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordBaseTemplate.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo.template;
+package org.apache.griffin.core.worker.entity.template;
 
 public class DQRecordBaseTemplate extends DQRecordTemplate {
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordCompostiveTemplate.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordCompostiveTemplate.java
new file mode 100644
index 00000000..37bcbba9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordCompostiveTemplate.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.core.worker.entity.template;
+
+public class DQRecordCompostiveTemplate {
+}
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
 
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordTemplate.java
similarity index 86%
rename from 
core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
rename to 
core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordTemplate.java
index 49db5868..3af68a33 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/entity/template/DQRecordTemplate.java
@@ -1,4 +1,4 @@
-package org.apache.griffin.core.worker.entity.pojo.template;
+package org.apache.griffin.core.worker.entity.template;
 
 import org.apache.griffin.core.worker.driver.TemplateDriver;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
 
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
index 7ea41cb1..ed70ea0d 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
@@ -1,11 +1,12 @@
 package org.apache.griffin.core.worker.factory;
 
-import org.apache.griffin.core.common.dao.DQBusinessRuleDao;
-import org.apache.griffin.core.common.dao.DQContentDao;
-import org.apache.griffin.core.common.entity.GriffinDQBusinessRule;
-import org.apache.griffin.core.common.entity.GriffinDQContent;
-import org.apache.griffin.core.worker.dao.DQInstanceDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.api.dao.DQBusinessRuleDao;
+import org.apache.griffin.api.dao.DQContentDao;
+import org.apache.griffin.api.entity.GriffinDQBusinessRule;
+import org.apache.griffin.api.entity.GriffinDQContent;
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
 import org.apache.griffin.core.worker.stage.DQAbstractStage;
@@ -28,8 +29,8 @@ public class DQInstanceFactory {
     @Autowired
     private DQTaskFactory dqTaskFactory;
 
-    public DQInstance constructInstance(Long id, Long dqcId) {
-        DQInstance instance = new DQInstance();
+    public DQInstanceBO constructInstance(Long id, Long dqcId) {
+        DQInstanceBO instance = new DQInstanceBO();
         instance.setId(id);
         instance.setDqcId(dqcId);
         GriffinDQContent griffinDQContent = dqContentDao.getById(dqcId);
@@ -46,11 +47,10 @@ public class DQInstanceFactory {
         // construct alert stage
         DQAbstractStage alertStage = 
dqStageFactory.constructStage(DQStageTypeEnum.ALERT, instance);
         instance.setAlertingStage(alertStage);
-        dqInstanceDao.insert(instance);
         return instance;
     }
 
-    public DQInstance recoveryInstance(DQInstance instance) {
+    public DQInstanceBO recoveryInstance(DQInstanceEntity instance) {
         return null;
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java 
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
index b8f4ec56..6f287b53 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
@@ -2,7 +2,8 @@ package org.apache.griffin.core.worker.factory;
 
 import org.apache.griffin.core.common.utils.SpringUtils;
 import org.apache.griffin.core.worker.dao.DQStageDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
 import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
 import org.apache.griffin.core.worker.service.DQStageService;
 import org.apache.griffin.core.worker.service.DQTaskService;
@@ -23,7 +24,7 @@ public class DQStageFactory {
     @Autowired
     private DQStageDao dqStageDao;
 
-    public DQAbstractStage constructStage(DQStageTypeEnum stageTypeEnum, 
DQInstance instance) {
+    public DQAbstractStage constructStage(DQStageTypeEnum stageTypeEnum, 
DQInstanceBO instance) {
         DQAbstractStage stage = null;
         switch (stageTypeEnum) {
             case RECORD:
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java 
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
index 11d6d900..73801705 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
@@ -1,7 +1,7 @@
 package org.apache.griffin.core.worker.factory;
 
-import org.apache.griffin.core.common.entity.DQResoueceEnums;
-import org.apache.griffin.core.common.entity.GriffinDQBusinessRule;
+import org.apache.griffin.api.entity.DQResoueceEnums;
+import org.apache.griffin.api.entity.GriffinDQBusinessRule;
 import org.apache.griffin.core.worker.dao.DQTaskDao;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.slf4j.Logger;
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
 
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
index 4ddc34cd..a71e442b 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -3,13 +3,13 @@ package org.apache.griffin.core.worker.schedule;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
 import org.apache.griffin.core.common.utils.context.WorkerContext;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
 import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
 import org.apache.griffin.core.worker.exception.StageSubmitException;
-import org.apache.griffin.core.worker.service.DQInstanceService;
 import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.service.DQWorkerInstanceService;
 import org.apache.griffin.core.worker.stage.DQAbstractStage;
 import org.apache.griffin.core.worker.stage.DQStage;
 import org.slf4j.Logger;
@@ -33,7 +33,7 @@ public class TaskDispatcherScheduler {
     private static final Logger log = 
LoggerFactory.getLogger(TaskDispatcherScheduler.class);
 
     private WorkerContext wc;
-    private DQInstanceService dqInstanceService;
+    private DQWorkerInstanceService dqWorkerInstanceService;
     private DQStageService dqStageService;
 
     @Autowired
@@ -41,8 +41,8 @@ public class TaskDispatcherScheduler {
         this.wc = wc;
     }
     @Autowired
-    public void setDqInstanceService(DQInstanceService dqInstanceService) {
-        this.dqInstanceService = dqInstanceService;
+    public void setDqWorkerInstanceService(DQWorkerInstanceService 
dqWorkerInstanceService) {
+        this.dqWorkerInstanceService = dqWorkerInstanceService;
     }
     @Autowired
     public void setDqStageService(DQStageService dqStageService) {
@@ -63,23 +63,23 @@ public class TaskDispatcherScheduler {
     @Scheduled(fixedDelay = 5 * 1000L)
     public void doTaskDispatcherScheduler() {
         log.info("doTaskDispatcherScheduler start.");
-        List<DQInstance> waittingToRemoveFromWaitingList = 
Lists.newArrayList();
-        Queue<DQInstance> waitingToRecordingDQInstanceQueue = 
Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
+        List<DQInstanceBO> waittingToRemoveFromWaitingList = 
Lists.newArrayList();
+        Queue<DQInstanceBO> waitingToRecordingDQInstanceQueue = 
Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
 
         try {
             while (true) {
                 try {
-                    DQInstance dqInstance = 
waitingToRecordingDQInstanceQueue.poll();
+                    DQInstanceBO dqInstance = 
waitingToRecordingDQInstanceQueue.poll();
                     // queue is empty, quit
                     if (dqInstance == null) break;
                     if (DQInstanceStatus.ACCEPTED != dqInstance.getStatus()) {
                         // State is not init, sync from database and reassign 
it
-                        dqInstance = 
dqInstanceService.getById(dqInstance.getId());
+                        dqInstance = 
dqWorkerInstanceService.getById(dqInstance.getId());
                         // assign task by status
                         waittingToRemoveFromWaitingList.add(dqInstance);
                     } else {
                         // normal, update status and remove from queue, then 
put it to the queue of recording task
-                        if (dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.WAITTING)) {
+                        if (dqWorkerInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.WAITTING)) {
                             waittingToRemoveFromWaitingList.add(dqInstance);
                         }
                     }
@@ -100,7 +100,7 @@ public class TaskDispatcherScheduler {
         log.info("doTaskDispatcherScheduler end.");
     }
 
-    private void offerToSpecQueueByStatus(DQInstance instance) {
+    private void offerToSpecQueueByStatus(DQInstanceBO instance) {
         DQInstanceStatus status = instance.getStatus();
         switch (status) {
             case WAITTING:
@@ -132,12 +132,12 @@ public class TaskDispatcherScheduler {
 
     @Scheduled(fixedDelay = 5 * 1000L)
     public void scanRecordingTask() {
-        List<DQInstance> waittingToRemoveFromRecordingList = 
Lists.newArrayList();
-        Queue<DQInstance> waitingToSubmitDQInstanceQueue = 
Queues.newPriorityBlockingQueue(wc.getRECORDING_TASK_QUEUE());
+        List<DQInstanceBO> waittingToRemoveFromRecordingList = 
Lists.newArrayList();
+        Queue<DQInstanceBO> waitingToSubmitDQInstanceQueue = 
Queues.newPriorityBlockingQueue(wc.getRECORDING_TASK_QUEUE());
         try {
             while (CollectionUtils.isNotEmpty(waitingToSubmitDQInstanceQueue)) 
{
                 try {
-                    DQInstance dqInstance = 
waitingToSubmitDQInstanceQueue.poll();
+                    DQInstanceBO dqInstance = 
waitingToSubmitDQInstanceQueue.poll();
                     if (dqInstance == null) break;
                     processRecordingInstance(dqInstance, 
waittingToRemoveFromRecordingList);
                 } catch (Exception e) {
@@ -156,7 +156,7 @@ public class TaskDispatcherScheduler {
         }
     }
 
-    private void processRecordingInstance(DQInstance dqInstance, 
List<DQInstance> waittingToRemoveFromRecordingList) {
+    private void processRecordingInstance(DQInstanceBO dqInstance, 
List<DQInstanceBO> waittingToRemoveFromRecordingList) {
         try {
             DQAbstractStage recordingStage = dqInstance.getRecordingStage();
             DQStageStatus stageStatus = recordingStage.getStatus();
@@ -164,13 +164,13 @@ public class TaskDispatcherScheduler {
                 if (!dqStageService.submitStage(recordingStage)) {
                     throw new StageSubmitException("Submit stage failed!, 
instance id: " + dqInstance.getId());
                 } else {
-                    dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.RECORDING);
+                    dqWorkerInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.RECORDING);
                 }
             } else if (stageStatus == DQStageStatus.FINISH) {
                 // if there is one task success, the instance should be 
EVALUATING;
                 // if all tasks are failed, the instance should be 
FAILED_ALERTING;
                 DQInstanceStatus instanceStatus = recordingStage.hasSuccess()? 
DQInstanceStatus.EVALUATING: DQInstanceStatus.FAILED_ALERTING;
-                dqInstanceService.updateStatus(dqInstance, instanceStatus);
+                dqWorkerInstanceService.updateStatus(dqInstance, 
instanceStatus);
                 waittingToRemoveFromRecordingList.add(dqInstance);
             }
         } catch (Exception e) {
@@ -181,9 +181,9 @@ public class TaskDispatcherScheduler {
 
     public void scanEvaluatingTask() {
         Executors.newCachedThreadPool().execute(() -> {
-            LinkedBlockingQueue<DQInstance> evaluating_task_queue = 
wc.getEVALUATING_TASK_QUEUE();
+            LinkedBlockingQueue<DQInstanceBO> evaluating_task_queue = 
wc.getEVALUATING_TASK_QUEUE();
             while (true) {
-                DQInstance dqInstance = null;
+                DQInstanceBO dqInstance = null;
                 try {
                     dqInstance = evaluating_task_queue.poll(5, 
TimeUnit.SECONDS);
                     if (dqInstance == null) continue;
@@ -192,13 +192,13 @@ public class TaskDispatcherScheduler {
                     if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING) 
{
                         dqStageService.executeStage(evaluatingStage);
                         DQInstanceStatus dqInstanceStatus = 
evaluatingStage.hasSuccess() ? DQInstanceStatus.ALERTING : 
DQInstanceStatus.FAILED_ALERTING;
-                        dqInstanceService.updateStatus(dqInstance, 
dqInstanceStatus);
+                        dqWorkerInstanceService.updateStatus(dqInstance, 
dqInstanceStatus);
                     }
                     offerToSpecQueueByStatus(dqInstance);
                 } catch (Exception e) {
                     if (dqInstance != null) {
                         log.error("scanEvaluatingTask doEvalute failed, id : 
{}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
-                        dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.FAILED_ALERTING);
+                        dqWorkerInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.FAILED_ALERTING);
                         offerToSpecQueueByStatus(dqInstance);
                     } else {
                         log.error("scanEvaluatingTask poll instance failed. 
ex:", e);
@@ -210,9 +210,9 @@ public class TaskDispatcherScheduler {
 
     public void scanAlertingTask() {
         Executors.newCachedThreadPool().execute(() -> {
-            LinkedBlockingQueue<DQInstance> alerting_task_queue = 
wc.getALERTING_TASK_QUEUE();
+            LinkedBlockingQueue<DQInstanceBO> alerting_task_queue = 
wc.getALERTING_TASK_QUEUE();
             while (true) {
-                DQInstance dqInstance = null;
+                DQInstanceBO dqInstance = null;
                 try {
                     dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
                     if (dqInstance == null) continue;
@@ -221,7 +221,7 @@ public class TaskDispatcherScheduler {
                         DQStage alertingStage = dqInstance.getAlertingStage();
                         dqStageService.executeStage(alertingStage);
                         DQInstanceStatus dqInstanceStatus = 
alertingStage.hasSuccess()? DQInstanceStatus.SUCCESS : DQInstanceStatus.FAILED;
-                        dqInstanceService.updateStatus(dqInstance, 
dqInstanceStatus);
+                        dqWorkerInstanceService.updateStatus(dqInstance, 
dqInstanceStatus);
                     }
                     offerToSpecQueueByStatus(dqInstance);
                 } catch (Exception e) {
@@ -229,7 +229,7 @@ public class TaskDispatcherScheduler {
                         log.error("scanAlertingTask doAlert failed, id : {}, 
instance : {}, ex:", dqInstance.getId(), dqInstance, e);
                         if (dqInstance.isFailed()) {
                             // retry 5 times, set failed
-                            dqInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.FAILED);
+                            dqWorkerInstanceService.updateStatus(dqInstance, 
DQInstanceStatus.FAILED);
                             // retry times less than 5, do not modify status 
and put task back
                         }
                         // put task back
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
index bf5b38b6..bd50374e 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
@@ -3,14 +3,10 @@ package org.apache.griffin.core.worker.service;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.griffin.core.worker.client.DispatcherClient;
-import org.apache.griffin.core.worker.dao.DQInstanceDao;
 import org.apache.griffin.core.worker.dao.DQTaskDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.apache.griffin.core.worker.entity.dispatcher.*;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
 import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQWorkerInstanceService.java
similarity index 62%
rename from 
core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
rename to 
core/src/main/java/org/apache/griffin/core/worker/service/DQWorkerInstanceService.java
index 904de4ed..ae12415d 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/DQWorkerInstanceService.java
@@ -1,10 +1,11 @@
 package org.apache.griffin.core.worker.service;
 
-import org.apache.griffin.core.common.entity.GriffinDQContentInstanceMap;
-import org.apache.griffin.core.common.dao.DQContentInstanceMapDao;
-import org.apache.griffin.core.worker.dao.DQInstanceDao;
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.dao.DQInstanceDao;
+import org.apache.griffin.api.entity.GriffinDQContentInstanceMap;
+import org.apache.griffin.api.entity.enums.DQInstanceStatus;
+import org.apache.griffin.api.entity.pojo.DQInstanceEntity;
+import org.apache.griffin.api.dao.DQContentInstanceMapDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
 import org.apache.griffin.core.worker.factory.DQInstanceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -12,8 +13,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
-public class DQInstanceService {
-    private static final Logger log = 
LoggerFactory.getLogger(DQInstanceService.class);
+public class DQWorkerInstanceService {
+    private static final Logger log = 
LoggerFactory.getLogger(DQWorkerInstanceService.class);
 
     @Autowired
     private DQInstanceDao dqInstanceDao;
@@ -28,9 +29,10 @@ public class DQInstanceService {
      * @param status stat
      * @return true or false
      */
-    public boolean updateStatus(DQInstance instance, DQInstanceStatus status) {
+    public boolean updateStatus(DQInstanceBO instance, DQInstanceStatus 
status) {
         try {
-            dqInstanceDao.updateDQInstanceStatus(instance, status.getCode());
+            // todo  report to master
+            dqInstanceDao.updateDQInstanceStatus(instance.getId(), 
status.getCode());
             instance.setStatus(status);
             log.info("instance {} {} => {} success.", instance.getId(), 
instance.getStatus(), status);
             return true;
@@ -45,19 +47,24 @@ public class DQInstanceService {
      * @param id master assign id
      * @return DQInstance
      */
-    public DQInstance getById(Long id) {
+    public DQInstanceBO getById(Long id) {
         if (id == null) {
             log.error("Unknown instance id: null");
         }
-        DQInstance ins = dqInstanceDao.getById(id);
+        DQInstanceEntity ins = dqInstanceDao.getById(id);
         if (ins == null) {
             // the ins id has no task info
-            return constructInstance(id);
+            DQInstanceBO dqInstanceBO = constructInstance(id);
+            saveDqcInstanceEntityFromBO(dqInstanceBO);
+            return dqInstanceBO;
         }
         return recoveryInstance(ins);
     }
 
-    private DQInstance constructInstance(Long id) {
+    private void saveDqcInstanceEntityFromBO(DQInstanceBO dqInstanceBO) {
+    }
+
+    private DQInstanceBO constructInstance(Long id) {
         log.info("constructInstance id: {}", id);
         GriffinDQContentInstanceMap contentInstanceMap = 
dqContentInstanceMapDao.getContentInstanceMapByInstanceId(id);
         Long instanceId = contentInstanceMap.getInstanceId();
@@ -65,7 +72,7 @@ public class DQInstanceService {
         return dqInstanceFactory.constructInstance(instanceId, dqcId);
     }
 
-    private DQInstance recoveryInstance(DQInstance instance) {
+    private DQInstanceBO recoveryInstance(DQInstanceEntity instance) {
         return dqInstanceFactory.recoveryInstance(instance);
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
 
b/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
index ebe9091f..dfdd589c 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/service/ExecuteNodeServiceImpl.java
@@ -16,7 +16,7 @@ public class ExecuteNodeServiceImpl extends 
ExecuteNodeServiceGrpc.ExecuteNodeSe
     }
 
     @Override
-    public void querySingleDQTask(SQuerySingleDQTaskRequest request, 
StreamObserver<QuerySingleDQTaskResponse> responseObserver) {
+    public void querySingleDQTask(QuerySingleDQTaskRequest request, 
StreamObserver<QuerySingleDQTaskResponse> responseObserver) {
         super.querySingleDQTask(request, responseObserver);
     }
 }
diff --git 
a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java 
b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
index f866a533..13444ece 100644
--- 
a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
+++ 
b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
@@ -1,6 +1,6 @@
 package org.apache.griffin.core.worker.stage;
 
-import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.DQInstanceBO;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
 import org.apache.griffin.core.worker.service.DQStageService;
@@ -13,7 +13,7 @@ public abstract class DQAbstractStage implements DQStage {
     protected DQStageService dqStageService;
 
     protected DQStageStatus status;
-    protected DQInstance instance;
+    protected DQInstanceBO instance;
 
     protected List<DQBaseTask> subTaskList;
 
@@ -21,11 +21,11 @@ public abstract class DQAbstractStage implements DQStage {
         this.status = DQStageStatus.INIT;
     }
 
-    public DQInstance getInstance() {
+    public DQInstanceBO getInstance() {
         return instance;
     }
 
-    public void setInstance(DQInstance instance) {
+    public void setInstance(DQInstanceBO instance) {
         this.instance = instance;
     }
 

Reply via email to