This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 96835eb 1,no worker condition , master will while ture wait for
worker startup 2,worker response task status sync wait for result (#2420)
96835eb is described below
commit 96835ebda2d4f9c5aa13b39bdd1e90f5d702b2f6
Author: qiaozhanwei <[email protected]>
AuthorDate: Wed Apr 15 16:19:42 2020 +0800
1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result (#2420)
* dispatch task fail will set task status failed
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
Co-authored-by: qiaozhanwei <[email protected]>
---
.../master/consumer/TaskPriorityQueueConsumer.java | 27 ++++--
.../server/master/processor/TaskAckProcessor.java | 28 ++++++-
.../master/processor/TaskResponseProcessor.java | 25 ++++++
.../worker/processor/TaskCallbackService.java | 15 +++-
.../worker/processor/TaskCallbackServiceTest.java | 98 ++++++++++++++++++++--
5 files changed, 176 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 50c851c..b2cf53a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -53,6 +53,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.dolphinscheduler.common.Constants.*;
+
/**
* TaskUpdateQueue consumer
*/
@@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* taskUpdateQueue
*/
@Autowired
- private TaskPriorityQueue taskUpdateQueue;
+ private TaskPriorityQueue taskPriorityQueue;
/**
* processService
@@ -93,7 +95,7 @@ public class TaskPriorityQueueConsumer extends Thread{
while (Stopper.isRunning()){
try {
// if not task , blocking here
- String taskPriorityInfo = taskUpdateQueue.take();
+ String taskPriorityInfo = taskPriorityQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
@@ -114,13 +116,22 @@ public class TaskPriorityQueueConsumer extends Thread{
private Boolean dispatch(int taskInstanceId){
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new
ExecutionContext(context.toCommand(), ExecutorType.WORKER,
context.getWorkerGroup());
- try {
- return dispatcher.dispatch(executionContext);
- } catch (ExecuteException e) {
- logger.error("execute exception", e);
- return false;
- }
+ Boolean result = false;
+ while (Stopper.isRunning()){
+ try {
+ result = dispatcher.dispatch(executionContext);
+ } catch (ExecuteException e) {
+ logger.error("dispatch error",e);
+ try {
+ Thread.sleep(SLEEP_TIME_MILLIS);
+ } catch (InterruptedException e1) {}
+ }
+ if (result){
+ break;
+ }
+ }
+ return result;
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 1eb40db..7af9cdc 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -31,9 +33,12 @@ import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.dolphinscheduler.common.Constants.*;
+
/**
* task ack processor
*/
@@ -51,9 +56,16 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
+
+ /**
+ * processService
+ */
+ private ProcessService processService;
+
public TaskAckProcessor(){
this.taskResponseService =
SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager =
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
+ this.processService =
SpringApplicationContext.getBean(ProcessService.class);
}
/**
@@ -71,8 +83,10 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
+ ExecutionStatus ackStatus =
ExecutionStatus.of(taskAckCommand.getStatus());
+
// TaskResponseEvent
- TaskResponseEvent taskResponseEvent =
TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()),
+ TaskResponseEvent taskResponseEvent =
TaskResponseEvent.newAck(ackStatus,
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
@@ -81,6 +95,18 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
taskResponseService.addResponse(taskResponseEvent);
+ while (Stopper.isRunning()){
+ TaskInstance taskInstance =
processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
+
+ if (taskInstance != null && ackStatus.typeIsRunning()){
+ break;
+ }
+
+ try {
+ Thread.sleep(SLEEP_TIME_MILLIS);
+ } catch (InterruptedException e) {}
+ }
+
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 36b3823..ecb8646 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -30,9 +32,12 @@ import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.dolphinscheduler.common.Constants.*;
+
/**
* task response processor
*/
@@ -50,9 +55,15 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
+ /**
+ * processService
+ */
+ private ProcessService processService;
+
public TaskResponseProcessor(){
this.taskResponseService =
SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager =
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
+ this.processService =
SpringApplicationContext.getBean(ProcessService.class);
}
/**
@@ -71,6 +82,8 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
+ ExecutionStatus responseStatus =
ExecutionStatus.of(responseCommand.getStatus());
+
// TaskResponseEvent
TaskResponseEvent taskResponseEvent =
TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
@@ -79,6 +92,18 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
responseCommand.getTaskInstanceId());
taskResponseService.addResponse(taskResponseEvent);
+
+ while (Stopper.isRunning()){
+ TaskInstance taskInstance =
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+
+ if (taskInstance != null && responseStatus.typeIsFinished()){
+ break;
+ }
+
+ try {
+ Thread.sleep(SLEEP_TIME_MILLIS);
+ } catch (InterruptedException e) {}
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index f966591..7cd25cb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -18,9 +18,11 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -93,8 +95,17 @@ public class TaskCallbackService {
}
logger.warn("original master : {} is not reachable, random select
master", nettyRemoteChannel.getHost());
Set<String> masterNodes =
zookeeperRegistryCenter.getMasterNodesDirectly();
- if(CollectionUtils.isEmpty(masterNodes)){
- throw new IllegalStateException("no available master node
exception");
+ while (Stopper.isRunning()) {
+ if (CollectionUtils.isEmpty(masterNodes)) {
+ logger.error("no available master node");
+ try {
+ Thread.sleep(1000);
+ }catch (Exception e){
+
+ }
+ }else {
+ break;
+ }
}
for(String masterNode : masterNodes){
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 5f44e1c..a0fee7c 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -17,21 +17,26 @@
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test;
@@ -47,9 +52,10 @@ import java.util.Date;
* test task call back service
*/
@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class,
SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class,
+@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class,
SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class,
WorkerRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
- ZookeeperCachedOperator.class, ZookeeperConfig.class,
ZookeeperNodeManager.class, TaskCallbackService.class})
+ ZookeeperCachedOperator.class, ZookeeperConfig.class,
ZookeeperNodeManager.class, TaskCallbackService.class,
+ TaskResponseService.class,
TaskAckProcessor.class,TaskResponseProcessor.class})
public class TaskCallbackServiceTest {
@Autowired
@@ -58,12 +64,22 @@ public class TaskCallbackServiceTest {
@Autowired
private MasterRegistry masterRegistry;
+ @Autowired
+ private TaskAckProcessor taskAckProcessor;
+
+ @Autowired
+ private TaskResponseProcessor taskResponseProcessor;
+
+ /**
+ * send ack test
+ * @throws Exception
+ */
@Test
- public void testSendAck(){
+ public void testSendAck() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(serverConfig);
- nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
Mockito.mock(TaskAckProcessor.class));
+ nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
@@ -75,22 +91,64 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command());
+ Thread.sleep(5000);
+
+ Stopper.stop();
+
+ Thread.sleep(5000);
+
nettyRemotingServer.close();
nettyRemotingClient.close();
}
+ /**
+ * send result test
+ * @throws Exception
+ */
+ @Test
+ public void testSendResult() throws Exception{
+ final NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(30000);
+ NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(serverConfig);
+
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
taskResponseProcessor);
+ nettyRemotingServer.start();
+
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient nettyRemotingClient = new
NettyRemotingClient(clientConfig);
+ Channel channel =
nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+ taskCallbackService.addRemoteChannel(1, new
NettyRemoteChannel(channel, 1));
+ TaskExecuteResponseCommand responseCommand = new
TaskExecuteResponseCommand();
+ responseCommand.setTaskInstanceId(1);
+ responseCommand.setEndTime(new Date());
+
+ taskCallbackService.sendResult(1, responseCommand.convert2Command());
+
+ Thread.sleep(5000);
+
+ Stopper.stop();
+
+ Thread.sleep(5000);
+
+ nettyRemotingServer.close();
+ nettyRemotingClient.close();
+ }
+
+
+
@Test(expected = IllegalArgumentException.class)
public void testSendAckWithIllegalArgumentException(){
TaskExecuteAckCommand ackCommand =
Mockito.mock(TaskExecuteAckCommand.class);
taskCallbackService.sendAck(1, ackCommand.convert2Command());
+ Stopper.stop();
}
@Test(expected = IllegalStateException.class)
public void testSendAckWithIllegalStateException1(){
+ masterRegistry.registry();
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(serverConfig);
- nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
Mockito.mock(TaskAckProcessor.class));
+ nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
@@ -103,7 +161,21 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
nettyRemotingServer.close();
+
taskCallbackService.sendAck(1, ackCommand.convert2Command());
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ Stopper.stop();
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
@Test(expected = IllegalStateException.class)
@@ -112,7 +184,7 @@ public class TaskCallbackServiceTest {
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(serverConfig);
- nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
Mockito.mock(TaskAckProcessor.class));
+ nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
@@ -125,6 +197,20 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
nettyRemotingServer.close();
+
taskCallbackService.sendAck(1, ackCommand.convert2Command());
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ Stopper.stop();
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
}