This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 5730bfe Refactor worker (#2319)
5730bfe is described below
commit 5730bfe2bede516d8879533ec6bf6a619415dcdb
Author: Tboy <[email protected]>
AuthorDate: Fri Mar 27 09:43:45 2020 +0800
Refactor worker (#2319)
* let quartz use the same datasource
* move master/worker config from dao.properties to each config
add master/worker registry test
* move mybatis config from application.properties to SpringConnectionFactory
* move mybatis-plus config from application.properties to
SpringConnectionFactory
* refactor TaskCallbackService
* add ZookeeperNodeManagerTest
* add NettyExecutorManagerTest
* refactor TaskKillProcessor
* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest
* add RoundRobinHostManagerTest, ExecutorDispatcherTest
* refactor task response service
---
.../server/master/manager/TaskEventEnum.java | 58 -------------
.../server/master/processor/TaskAckProcessor.java | 15 ++--
.../master/processor/TaskResponseProcessor.java | 15 ++--
.../queue/TaskResponseEvent.java} | 99 +++++++---------------
.../queue/TaskResponseService.java} | 59 ++++++-------
.../server/registry/DependencyConfig.java | 6 +-
6 files changed, 77 insertions(+), 175 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
deleted file mode 100644
index f3d7497..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.manager;
-
-import com.baomidou.mybatisplus.annotation.EnumValue;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-
-import java.util.Date;
-
-/**
- * task event enum
- */
-public enum TaskEventEnum {
-
- ACK(0, "task ack"),
- RESPONSE(1, "task response result");
-
- TaskEventEnum(int code, String descp){
- this.code = code;
- this.descp = descp;
- }
-
- @EnumValue
- private final int code;
- private final String descp;
-
- public String getDescp() {
- return descp;
- }
-
- public int getCode() {
- return code;
- }
-
- public static TaskEventEnum of(int status){
- for(TaskEventEnum es : values()){
- if(es.getCode() == status){
- return es;
- }
- }
- throw new IllegalArgumentException("invalid status : " + status);
- }
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 67832a1..1eb40db 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -28,10 +28,9 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
-import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
-import org.apache.dolphinscheduler.server.master.manager.TaskManager;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +44,7 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
/**
* process service
*/
- private final TaskManager taskManager;
+ private final TaskResponseService taskResponseService;
/**
* taskInstance cache manager
@@ -53,7 +52,7 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskAckProcessor(){
- this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
+ this.taskResponseService =
SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager =
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
@@ -72,15 +71,15 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
- // TaskEvent
- TaskEvent taskEvent = new
TaskEvent(ExecutionStatus.of(taskAckCommand.getStatus()),
+ // TaskResponseEvent
+ TaskResponseEvent taskResponseEvent =
TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId());
- taskManager.putTask(taskEvent);
+ taskResponseService.addResponse(taskResponseEvent);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 9cf10b9..36b3823 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -27,10 +27,9 @@ import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
-import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
-import org.apache.dolphinscheduler.server.master.manager.TaskManager;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +43,7 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
/**
* process service
*/
- private final TaskManager taskManager;
+ private final TaskResponseService taskResponseService;
/**
* taskInstance cache manager
@@ -52,7 +51,7 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskResponseProcessor(){
- this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
+ this.taskResponseService =
SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager =
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
@@ -72,14 +71,14 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
- // TaskEvent
- TaskEvent taskEvent = new
TaskEvent(ExecutionStatus.of(responseCommand.getStatus()),
+ // TaskResponseEvent
+ TaskResponseEvent taskResponseEvent =
TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId());
- taskManager.putTask(taskEvent);
+ taskResponseService.addResponse(taskResponseEvent);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
similarity index 57%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 98b8d62..9e8813f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.manager;
+package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -24,7 +24,7 @@ import java.util.Date;
/**
* task event
*/
-public class TaskEvent {
+public class TaskResponseEvent {
/**
* taskInstanceId
@@ -74,53 +74,29 @@ public class TaskEvent {
/**
* ack / response
*/
- private TaskEventEnum type;
-
-
- /**
- * receive ack info
- * @param state state
- * @param startTime startTime
- * @param workerAddress workerAddress
- * @param executePath executePath
- * @param logPath logPath
- * @param taskInstanceId taskInstanceId
- */
- public TaskEvent(ExecutionStatus state,
- Date startTime,
- String workerAddress,
- String executePath,
- String logPath,
- int taskInstanceId){
- this.state = state;
- this.startTime = startTime;
- this.workerAddress = workerAddress;
- this.executePath = executePath;
- this.logPath = logPath;
- this.taskInstanceId = taskInstanceId;
- this.type = TaskEventEnum.ACK;
-
- }
-
- /**
- * receive response info
- * @param state state
- * @param endTime endTime
- * @param processId processId
- * @param appIds appIds
- * @param taskInstanceId taskInstanceId
- */
- public TaskEvent(ExecutionStatus state,
- Date endTime,
- int processId,
- String appIds,
- int taskInstanceId){
- this.state = state;
- this.endTime = endTime;
- this.processId = processId;
- this.appIds = appIds;
- this.taskInstanceId = taskInstanceId;
- this.type = TaskEventEnum.RESPONSE;
+ private Event event;
+
+ public static TaskResponseEvent newAck(ExecutionStatus state, Date
startTime, String workerAddress, String executePath, String logPath, int
taskInstanceId){
+ TaskResponseEvent event = new TaskResponseEvent();
+ event.setState(state);
+ event.setStartTime(startTime);
+ event.setWorkerAddress(workerAddress);
+ event.setExecutePath(executePath);
+ event.setLogPath(logPath);
+ event.setTaskInstanceId(taskInstanceId);
+ event.setEvent(Event.ACK);
+ return event;
+ }
+
+ public static TaskResponseEvent newResult(ExecutionStatus state, Date
endTime, int processId, String appIds, int taskInstanceId){
+ TaskResponseEvent event = new TaskResponseEvent();
+ event.setState(state);
+ event.setEndTime(endTime);
+ event.setProcessId(processId);
+ event.setAppIds(appIds);
+ event.setTaskInstanceId(taskInstanceId);
+ event.setEvent(Event.RESULT);
+ return event;
}
public int getTaskInstanceId() {
@@ -195,27 +171,16 @@ public class TaskEvent {
this.appIds = appIds;
}
- public TaskEventEnum getType() {
- return type;
+ public Event getEvent() {
+ return event;
}
- public void setType(TaskEventEnum type) {
- this.type = type;
+ public void setEvent(Event event) {
+ this.event = event;
}
- @Override
- public String toString() {
- return "TaskEvent{" +
- "taskInstanceId=" + taskInstanceId +
- ", workerAddress='" + workerAddress + '\'' +
- ", state=" + state +
- ", startTime=" + startTime +
- ", endTime=" + endTime +
- ", executePath='" + executePath + '\'' +
- ", logPath='" + logPath + '\'' +
- ", processId=" + processId +
- ", appIds='" + appIds + '\'' +
- ", type=" + type +
- '}';
+ public enum Event{
+ ACK,
+ RESULT;
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
similarity index 59%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index ade0aea..c471c9c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.manager;
+package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -27,23 +27,22 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import static
org.apache.dolphinscheduler.server.master.manager.TaskEventEnum.*;
/**
* task manager
*/
@Component
-public class TaskManager {
+public class TaskResponseService {
/**
* logger
*/
- private static final Logger logger =
LoggerFactory.getLogger(TaskManager.class);
+ private static final Logger logger =
LoggerFactory.getLogger(TaskResponseService.class);
/**
* attemptQueue
*/
- private final BlockingQueue<TaskEvent> attemptQueue = new
LinkedBlockingQueue<>(5000);
+ private final BlockingQueue<TaskResponseEvent> attemptQueue = new
LinkedBlockingQueue<>(5000);
/**
@@ -63,13 +62,13 @@ public class TaskManager {
/**
* put task to attemptQueue
*
- * @param taskEvent taskEvent
+ * @param taskResponseEvent taskResponseEvent
*/
- public void putTask(TaskEvent taskEvent){
+ public void addResponse(TaskResponseEvent taskResponseEvent){
try {
- attemptQueue.put(taskEvent);
+ attemptQueue.put(taskResponseEvent);
} catch (InterruptedException e) {
- logger.error("put task : {} error :{}",taskEvent,e);
+ logger.error("put task : {} error :{}", taskResponseEvent,e);
}
}
@@ -85,8 +84,8 @@ public class TaskManager {
while (Stopper.isRunning()){
try {
// if not task , blocking here
- TaskEvent taskEvent = attemptQueue.take();
- persist(taskEvent);
+ TaskResponseEvent taskResponseEvent = attemptQueue.take();
+ persist(taskResponseEvent);
}catch (Exception e){
logger.error("persist task error",e);
@@ -95,32 +94,30 @@ public class TaskManager {
}
/**
- * persist taskEvent
- * @param taskEvent taskEvent
+ * persist taskResponseEvent
+ * @param taskResponseEvent taskResponseEvent
*/
- private void persist(TaskEvent taskEvent){
- // task event type
- TaskEventEnum type = taskEvent.getType();
+ private void persist(TaskResponseEvent taskResponseEvent){
+ TaskResponseEvent.Event event = taskResponseEvent.getEvent();
- switch (type){
+ switch (event){
case ACK:
- processService.changeTaskState(taskEvent.getState(),
- taskEvent.getStartTime(),
- taskEvent.getWorkerAddress(),
- taskEvent.getExecutePath(),
- taskEvent.getLogPath(),
- taskEvent.getTaskInstanceId());
+
processService.changeTaskState(taskResponseEvent.getState(),
+ taskResponseEvent.getStartTime(),
+ taskResponseEvent.getWorkerAddress(),
+ taskResponseEvent.getExecutePath(),
+ taskResponseEvent.getLogPath(),
+ taskResponseEvent.getTaskInstanceId());
break;
- case RESPONSE:
- processService.changeTaskState(taskEvent.getState(),
- taskEvent.getEndTime(),
- taskEvent.getProcessId(),
- taskEvent.getAppIds(),
- taskEvent.getTaskInstanceId());
+ case RESULT:
+
processService.changeTaskState(taskResponseEvent.getState(),
+ taskResponseEvent.getEndTime(),
+ taskResponseEvent.getProcessId(),
+ taskResponseEvent.getAppIds(),
+ taskResponseEvent.getTaskInstanceId());
break;
default:
- throw new IllegalArgumentException("invalid task event
type : " + type);
-
+ throw new IllegalArgumentException("invalid event type : "
+ event);
}
}
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
index 2d6e3ec..0adea44 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.dao.mapper.*;
import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import
org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
-import org.apache.dolphinscheduler.server.master.manager.TaskManager;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.mockito.Mockito;
@@ -141,7 +141,7 @@ public class DependencyConfig {
}
@Bean
- public TaskManager taskManager(){
- return Mockito.mock(TaskManager.class);
+ public TaskResponseService taskResponseService(){
+ return Mockito.mock(TaskResponseService.class);
}
}