This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new f112415 TaskManager refactor (#2302)
f112415 is described below
commit f112415b13d7fdd18cf668c2cc639e79cdf08458
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue Mar 24 18:54:45 2020 +0800
TaskManager refactor (#2302)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
* master fault tolerant bug modify
* UT add pom.xml
* timing online modify
* when taskResponse is faster than taskAck to db,task state will error
add async queue and new a thread reslove this problem
* TaskExecutionContext set host
* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify
* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify
* add UT in pom.xml
* revert dolphinscheduler-daemon.sh
Co-authored-by: qiaozhanwei <[email protected]>
---
.../master/consumer/TaskUpdateQueueConsumer.java | 8 ++-
.../server/master/manager/TaskEvent.java | 36 ++++++--------
.../server/master/manager/TaskEventEnum.java | 58 ++++++++++++++++++++++
.../server/master/manager/TaskManager.java | 44 +++++++++-------
.../server/master/processor/TaskAckProcessor.java | 6 +--
.../master/processor/TaskResponseProcessor.java | 6 +--
pom.xml | 4 ++
7 files changed, 109 insertions(+), 53 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index 5f66c1b..e7b6327 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
@@ -89,12 +88,11 @@ public class TaskUpdateQueueConsumer extends Thread{
public void run() {
while (Stopper.isRunning()){
try {
- if (taskUpdateQueue.size() == 0){
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- continue;
- }
+ // if not task , blocking here
String taskPriorityInfo = taskUpdateQueue.take();
+
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
+
dispatch(taskPriority.getTaskId());
}catch (Exception e){
logger.error("dispatcher task error",e);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
index 5c6740f..98b8d62 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
@@ -26,9 +26,6 @@ import java.util.Date;
*/
public class TaskEvent {
- public static final String ACK = "ack";
- public static final String RESPONSE = "response";
-
/**
* taskInstanceId
*/
@@ -77,7 +74,7 @@ public class TaskEvent {
/**
* ack / response
*/
- private String type;
+ private TaskEventEnum type;
/**
@@ -88,22 +85,21 @@ public class TaskEvent {
* @param executePath executePath
* @param logPath logPath
* @param taskInstanceId taskInstanceId
- * @param type type
*/
- public void receiveAck(ExecutionStatus state,
- Date startTime,
- String workerAddress,
- String executePath,
- String logPath,
- int taskInstanceId,
- String type){
+ public TaskEvent(ExecutionStatus state,
+ Date startTime,
+ String workerAddress,
+ String executePath,
+ String logPath,
+ int taskInstanceId){
this.state = state;
this.startTime = startTime;
this.workerAddress = workerAddress;
this.executePath = executePath;
this.logPath = logPath;
this.taskInstanceId = taskInstanceId;
- this.type = type;
+ this.type = TaskEventEnum.ACK;
+
}
/**
@@ -113,20 +109,18 @@ public class TaskEvent {
* @param processId processId
* @param appIds appIds
* @param taskInstanceId taskInstanceId
- * @param type type
*/
- public void receiveResponse(ExecutionStatus state,
+ public TaskEvent(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
- int taskInstanceId,
- String type){
+ int taskInstanceId){
this.state = state;
this.endTime = endTime;
this.processId = processId;
this.appIds = appIds;
this.taskInstanceId = taskInstanceId;
- this.type = type;
+ this.type = TaskEventEnum.RESPONSE;
}
public int getTaskInstanceId() {
@@ -201,11 +195,11 @@ public class TaskEvent {
this.appIds = appIds;
}
- public String getType() {
+ public TaskEventEnum getType() {
return type;
}
- public void setType(String type) {
+ public void setType(TaskEventEnum type) {
this.type = type;
}
@@ -221,7 +215,7 @@ public class TaskEvent {
", logPath='" + logPath + '\'' +
", processId=" + processId +
", appIds='" + appIds + '\'' +
- ", type='" + type + '\'' +
+ ", type=" + type +
'}';
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
new file mode 100644
index 0000000..f3d7497
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.manager;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+
+import java.util.Date;
+
+/**
+ * task event enum
+ */
+public enum TaskEventEnum {
+
+ ACK(0, "task ack"),
+ RESPONSE(1, "task response result");
+
+ TaskEventEnum(int code, String descp){
+ this.code = code;
+ this.descp = descp;
+ }
+
+ @EnumValue
+ private final int code;
+ private final String descp;
+
+ public String getDescp() {
+ return descp;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public static TaskEventEnum of(int status){
+ for(TaskEventEnum es : values()){
+ if(es.getCode() == status){
+ return es;
+ }
+ }
+ throw new IllegalArgumentException("invalid status : " + status);
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
index a2710ee..ade0aea 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.manager;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -28,6 +27,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import static
org.apache.dolphinscheduler.server.master.manager.TaskEventEnum.*;
/**
* task manager
@@ -56,6 +56,7 @@ public class TaskManager {
@PostConstruct
public void init(){
TaskWorker taskWorker = new TaskWorker();
+ taskWorker.setName("TaskWorkerThread");
taskWorker.start();
}
@@ -83,12 +84,8 @@ public class TaskManager {
while (Stopper.isRunning()){
try {
- if (attemptQueue.size() == 0){
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- continue;
- }
+ // if not task , blocking here
TaskEvent taskEvent = attemptQueue.take();
-
persist(taskEvent);
}catch (Exception e){
@@ -102,19 +99,28 @@ public class TaskManager {
* @param taskEvent taskEvent
*/
private void persist(TaskEvent taskEvent){
- if (TaskEvent.ACK.equals(taskEvent.getType())){
- processService.changeTaskState(taskEvent.getState(),
- taskEvent.getStartTime(),
- taskEvent.getWorkerAddress(),
- taskEvent.getExecutePath(),
- taskEvent.getLogPath(),
- taskEvent.getTaskInstanceId());
- }else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){
- processService.changeTaskState(taskEvent.getState(),
- taskEvent.getEndTime(),
- taskEvent.getProcessId(),
- taskEvent.getAppIds(),
- taskEvent.getTaskInstanceId());
+ // task event type
+ TaskEventEnum type = taskEvent.getType();
+
+ switch (type){
+ case ACK:
+ processService.changeTaskState(taskEvent.getState(),
+ taskEvent.getStartTime(),
+ taskEvent.getWorkerAddress(),
+ taskEvent.getExecutePath(),
+ taskEvent.getLogPath(),
+ taskEvent.getTaskInstanceId());
+ break;
+ case RESPONSE:
+ processService.changeTaskState(taskEvent.getState(),
+ taskEvent.getEndTime(),
+ taskEvent.getProcessId(),
+ taskEvent.getAppIds(),
+ taskEvent.getTaskInstanceId());
+ break;
+ default:
+ throw new IllegalArgumentException("invalid task event
type : " + type);
+
}
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index a678cad..67832a1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -73,14 +73,12 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
// TaskEvent
- TaskEvent taskEvent = new TaskEvent();
- taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()),
+ TaskEvent taskEvent = new
TaskEvent(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
- taskAckCommand.getTaskInstanceId(),
- TaskEvent.ACK);
+ taskAckCommand.getTaskInstanceId());
taskManager.putTask(taskEvent);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index ffc5d72..9cf10b9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -73,13 +73,11 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
// TaskEvent
- TaskEvent taskEvent = new TaskEvent();
-
taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()),
+ TaskEvent taskEvent = new
TaskEvent(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
- responseCommand.getTaskInstanceId(),
- TaskEvent.RESPONSE);
+ responseCommand.getTaskInstanceId());
taskManager.putTask(taskEvent);
}
diff --git a/pom.xml b/pom.xml
index e2b0e66..e7b95d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -732,6 +732,8 @@
<include>**/server/log/WorkerLogFilterTest.java</include>
<include>**/server/master/executor/NettyExecutorManagerTest.java</include>
<include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
+
<include>**/server/master/host/RandomSelectorTest.java</include>
+
<include>**/server/master/host/RoundRobinSelectorTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
@@ -743,6 +745,7 @@
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
+
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
<include>**/server/worker/register/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
@@ -750,6 +753,7 @@
<include>**/server/worker/task/dependent/DependentTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include>
+
</includes>
<!-- <skip>true</skip> -->
<argLine>-Xmx2048m</argLine>