This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 052f9d1 UT Coverage rate test (#2276)
052f9d1 is described below
commit 052f9d10bf9507ab68c32ca03441a8e6bf3db991
Author: qiaozhanwei <[email protected]>
AuthorDate: Mon Mar 23 14:44:17 2020 +0800
UT Coverage rate test (#2276)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
* master fault tolerant bug modify
* UT add pom.xml
Co-authored-by: qiaozhanwei <[email protected]>
---
.../apache/dolphinscheduler/common/Constants.java | 2 +-
.../server/master/runner/MasterExecThread.java | 37 ++++++++++++++++++++--
pom.xml | 36 +++++++++++++++++----
3 files changed, 65 insertions(+), 10 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 2aded0f..1636072 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -121,7 +121,7 @@ public final class Constants {
/**
* MasterServer directory registered in zookeeper
*/
- public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS =
"/nodes/masters";
+ public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS =
"/nodes/master";
/**
* WorkerServer directory registered in zookeeper
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index df1eac3..db7cdbb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -725,7 +725,7 @@ public class MasterExecThread implements Runnable {
ProcessInstance instance =
processService.findProcessInstanceById(processInstance.getId());
ExecutionStatus state = instance.getState();
- if(activeTaskNode.size() > 0){
+ if(activeTaskNode.size() > 0 || haveRetryTaskStandBy()){
return runningState(state);
}
// process failure
@@ -769,6 +769,24 @@ public class MasterExecThread implements Runnable {
}
/**
+ * whether standby task list have retry tasks
+ * @return
+ */
+ private boolean haveRetryTaskStandBy() {
+
+ boolean result = false;
+
+ for(String taskName : readyToSubmitTaskList.keySet()){
+ TaskInstance task = readyToSubmitTaskList.get(taskName);
+ if(task.getState().typeIsFailure()){
+ result = true;
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
* whether complement end
* @return Boolean whether is complement end
*/
@@ -856,7 +874,11 @@ public class MasterExecThread implements Runnable {
// submit start node
submitPostNode(null);
boolean sendTimeWarning = false;
- while(!processInstance.IsProcessInstanceStop()){
+ while(Stopper.isRunning()){
+
+ if(processInstance.IsProcessInstanceStop()){
+ break;
+ }
// send warning email if process time out.
if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){
@@ -871,12 +893,21 @@ public class MasterExecThread implements Runnable {
if(!future.isDone()){
continue;
}
+
// node monitor thread complete
- activeTaskNode.remove(entry.getKey());
+ task = this.processService.findTaskInstanceById(task.getId());
+
if(task == null){
this.taskFailedSubmit = true;
+ activeTaskNode.remove(entry.getKey());
continue;
}
+
+ // node monitor thread complete
+ if(task.getState().typeIsFinished()){
+ activeTaskNode.remove(entry.getKey());
+ }
+
logger.info("task :{}, id:{} complete, state is {} ",
task.getName(), task.getId(),
task.getState().toString());
//TODO node success , post node submit
diff --git a/pom.xml b/pom.xml
index b17573d..e2b0e66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -712,20 +712,44 @@
<include>**/alert/utils/FuncUtilsTest.java</include>
<include>**/alert/utils/JSONUtilsTest.java</include>
<include>**/alert/utils/PropertyUtilsTest.java</include>
-
<include>**/server/utils/SparkArgsUtilsTest.java</include>
-
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
- <include>**/server/utils/ParamUtilsTest.java</include>
-
<include>**/server/master/MasterExecThreadTest.java</include>
+
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
+
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/dao/mapper/AccessTokenMapperTest.java</include>
<include>**/dao/mapper/AlertGroupMapperTest.java</include>
<include>**/dao/mapper/AlertMapperTest.java</include>
<include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/dao/cron/CronUtilsTest.java</include>
<include>**/dao/utils/DagHelperTest.java</include>
-
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
-
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
+
<include>**/server/utils/SparkArgsUtilsTest.java</include>
+
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
+ <include>**/server/utils/ParamUtilsTest.java</include>
+
<include>**/server/log/MasterLogFilterTest.java</include>
+
<include>**/server/log/SensitiveDataConverterTest.java</include>
+
<include>**/server/log/TaskLogDiscriminatorTest.java</include>
+ <include>**/server/log/TaskLogFilterTest.java</include>
+
<include>**/server/log/WorkerLogFilterTest.java</include>
+
<include>**/server/master/executor/NettyExecutorManagerTest.java</include>
+
<include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
+
<include>**/server/master/register/MasterRegistryTest.java</include>
+
<include>**/server/master/AlertManagerTest.java</include>
+
<include>**/server/master/MasterCommandTest.java</include>
+
<include>**/server/master/MasterExecThreadTest.java</include>
+ <include>**/server/master/ParamsTest.java</include>
+
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
+ <include>**/server/utils/DataxUtilsTest.java</include>
+
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
+ <include>**/server/utils/ParamUtilsTest.java</include>
+
<include>**/server/utils/ProcessUtilsTest.java</include>
+
<include>**/server/utils/SparkArgsUtilsTest.java</include>
+
<include>**/server/worker/register/WorkerRegistryTest.java</include>
+
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
+
<include>**/server/worker/sql/SqlExecutorTest.java</include>
+
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
+
<include>**/server/worker/task/dependent/DependentTaskTest.java</include>
+
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
+
<include>**/server/worker/task/EnvFileTest.java</include>
</includes>
<!-- <skip>true</skip> -->
<argLine>-Xmx2048m</argLine>