This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new eb45ff9 TaskPriority refactor (#2097)
eb45ff9 is described below
commit eb45ff9e83534e569352511c2a7bf03ac06f5106
Author: qiaozhanwei <[email protected]>
AuthorDate: Fri Mar 6 12:02:14 2020 +0800
TaskPriority refactor (#2097)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
Co-authored-by: qiaozhanwei <[email protected]>
---
.../api/controller/ProcessInstanceController.java | 7 +--
.../api/service/DataAnalysisService.java | 7 +--
.../api/service/ProcessInstanceService.java | 52 +---------------------
.../api/service/DataAnalysisServiceTest.java | 28 +-----------
.../dolphinscheduler/dao/entity/TaskInstance.java | 1 +
.../server/entity/TaskPriority.java | 3 +-
.../server/master/MasterServer.java | 5 ++-
.../master/runner/MasterBaseTaskExecThread.java | 5 +--
8 files changed, 15 insertions(+), 93 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index 80db6c8..b6533ad 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -26,7 +26,6 @@ import
org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,8 +239,7 @@ public class ProcessInstanceController extends
BaseController{
logger.info("delete process instance by id, login user:{}, project
name:{}, process instance id:{}",
loginUser.getUserName(), projectName, processInstanceId);
// task queue
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
- Map<String, Object> result =
processInstanceService.deleteProcessInstanceById(loginUser, projectName,
processInstanceId,tasksQueue);
+ Map<String, Object> result =
processInstanceService.deleteProcessInstanceById(loginUser, projectName,
processInstanceId);
return returnDataList(result);
}catch (Exception e){
logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e);
@@ -370,7 +368,6 @@ public class ProcessInstanceController extends
BaseController{
logger.info("delete process instance by ids, login user:{},
project name:{}, process instance ids :{}",
loginUser.getUserName(), projectName, processInstanceIds);
// task queue
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = new HashMap<>(5);
List<String> deleteFailedIdList = new ArrayList<>();
if(StringUtils.isNotEmpty(processInstanceIds)){
@@ -379,7 +376,7 @@ public class ProcessInstanceController extends
BaseController{
for (String strProcessInstanceId:processInstanceIdArray) {
int processInstanceId =
Integer.parseInt(strProcessInstanceId);
try {
- Map<String, Object> deleteResult =
processInstanceService.deleteProcessInstanceById(loginUser, projectName,
processInstanceId,tasksQueue);
+ Map<String, Object> deleteResult =
processInstanceService.deleteProcessInstanceById(loginUser, projectName,
processInstanceId);
if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){
deleteFailedIdList.add(strProcessInstanceId);
logger.error((String)deleteResult.get(Constants.MSG));
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
index bafe833..0c93e00 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
@@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -318,9 +316,8 @@ public class DataAnalysisService extends BaseService{
return result;
}
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
- List<String> tasksQueueList =
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- List<String> tasksKillList =
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL);
+ List<String> tasksQueueList = new ArrayList<>();
+ List<String> tasksKillList = new ArrayList<>();
Map<String,Integer> dataMap = new HashMap<>();
if (loginUser.getUserType() == UserType.ADMIN_USER){
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 2b1f04e..4b809a8 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -38,7 +38,6 @@ import
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -404,8 +403,6 @@ public class ProcessInstanceService extends BaseDAGService {
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
-// int update = processDao.updateProcessInstance(processInstanceId,
processInstanceJson,
-// globalParams, schedule, flag, locations, connects);
int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
@@ -472,11 +469,10 @@ public class ProcessInstanceService extends
BaseDAGService {
* @param loginUser login user
* @param projectName project name
* @param processInstanceId process instance id
- * @param tasksQueue task queue
* @return delete result code
*/
@Transactional(rollbackFor = Exception.class)
- public Map<String, Object> deleteProcessInstanceById(User loginUser,
String projectName, Integer processInstanceId, ITaskQueue tasksQueue) {
+ public Map<String, Object> deleteProcessInstanceById(User loginUser,
String projectName, Integer processInstanceId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
@@ -494,52 +490,6 @@ public class ProcessInstanceService extends BaseDAGService
{
return result;
}
- //process instance priority
- int processInstancePriority =
processInstance.getProcessInstancePriority().ordinal();
- // delete zk queue
- if (CollectionUtils.isNotEmpty(taskInstanceList)){
- for (TaskInstance taskInstance : taskInstanceList){
- // task instance priority
- int taskInstancePriority =
taskInstance.getTaskInstancePriority().ordinal();
-
- StringBuilder nodeValueSb = new StringBuilder(100);
- nodeValueSb.append(processInstancePriority)
- .append(UNDERLINE)
- .append(processInstanceId)
- .append(UNDERLINE)
- .append(taskInstancePriority)
- .append(UNDERLINE)
- .append(taskInstance.getId())
- .append(UNDERLINE);
-
- int taskWorkerGroupId =
processService.getTaskWorkerGroupId(taskInstance);
- WorkerGroup workerGroup =
workerGroupMapper.selectById(taskWorkerGroupId);
-
- if(workerGroup == null){
- nodeValueSb.append(DEFAULT_WORKER_ID);
- }else {
-
- String ips = workerGroup.getIpList();
- StringBuilder ipSb = new StringBuilder(100);
- String[] ipArray = ips.split(COMMA);
-
- for (String ip : ipArray) {
- long ipLong = IpUtils.ipToLong(ip);
- ipSb.append(ipLong).append(COMMA);
- }
-
- if(ipSb.length() > 0) {
- ipSb.deleteCharAt(ipSb.length() - 1);
- }
- nodeValueSb.append(ipSb);
- }
-
- logger.info("delete task queue node :
{}",nodeValueSb.toString());
-
tasksQueue.removeNode(org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE,
nodeValueSb.toString());
-
- }
- }
-
// delete database cascade
int delete =
processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
index 6f308e7..10220e2 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.junit.After;
import org.junit.Assert;
@@ -74,8 +73,7 @@ public class DataAnalysisServiceTest {
@Mock
TaskInstanceMapper taskInstanceMapper;
- @Mock
- ITaskQueue taskQueue;
+
@Mock
ProcessService processService;
@@ -183,30 +181,6 @@ public class DataAnalysisServiceTest {
}
- @Test
- public void testCountQueueState(){
-
- PowerMockito.mockStatic(TaskQueueFactory.class);
- List<String> taskQueueList = new ArrayList<>(1);
- taskQueueList.add("1_0_1_1_-1");
- List<String> taskKillList = new ArrayList<>(1);
- taskKillList.add("1-0");
-
PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE)).thenReturn(taskQueueList);
-
PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL)).thenReturn(taskKillList);
-
PowerMockito.when(TaskQueueFactory.getTaskQueueInstance()).thenReturn(taskQueue);
- //checkProject false
- Map<String, Object> result =
dataAnalysisService.countQueueState(user,2);
- Assert.assertTrue(result.isEmpty());
-
- result = dataAnalysisService.countQueueState(user,1);
- Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
- //admin
- user.setUserType(UserType.ADMIN_USER);
- result = dataAnalysisService.countQueueState(user,1);
- Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
-
- }
-
/**
* get list
* @return
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index dc463fe..3fc40ca 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -194,6 +194,7 @@ public class TaskInstance implements Serializable {
/**
* workerGroup
*/
+ @TableField(exist = false)
private String workerGroup;
public ProcessInstance getProcessInstance() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
index 7db5f45..991eeed 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
@@ -132,7 +132,8 @@ public class TaskPriority {
*/
public static TaskPriority of(String taskPriorityInfo){
String[] parts = taskPriorityInfo.split(UNDERLINE);
- if (parts.length != 4) {
+
+ if (parts.length != 5) {
throw new IllegalArgumentException(String.format("TaskPriority :
%s illegal.", taskPriorityInfo));
}
TaskPriority taskPriority = new TaskPriority(
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6923cd0..12fe25b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
@@ -37,6 +38,7 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,7 +168,8 @@ public class MasterServer implements IStoppable {
logger.error("start Quartz failed", e);
}
-
+ TaskUpdateQueueConsumer taskUpdateQueueConsumer =
SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
+ taskUpdateQueueConsumer.start();
/**
* register hooks, which are called before the process exits
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index c8b7b0e..b0fd632 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
- this.taskUpdateQueue = new TaskUpdateQueueImpl();
+ this.taskUpdateQueue =
SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
}
/**
@@ -180,8 +180,7 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
- taskInstance.getWorkerGroup());
-
+
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
taskUpdateQueue.put(taskPriorityInfo);
logger.info(String.format("master submit success, task : %s",
taskInstance.getName()) );
return true;