This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 96835eb  1,no worker condition , master will while ture wait for 
worker startup 2,worker response task status sync wait for result (#2420)
96835eb is described below

commit 96835ebda2d4f9c5aa13b39bdd1e90f5d702b2f6
Author: qiaozhanwei <[email protected]>
AuthorDate: Wed Apr 15 16:19:42 2020 +0800

    1,no worker condition , master will while ture wait for worker startup 
2,worker response task status sync wait for result (#2420)
    
    * dispatch task fail will set task status failed
    
    * 1,no worker condition , master will while ture wait for worker startup
    2,worker response task status sync wait for result
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../master/consumer/TaskPriorityQueueConsumer.java | 27 ++++--
 .../server/master/processor/TaskAckProcessor.java  | 28 ++++++-
 .../master/processor/TaskResponseProcessor.java    | 25 ++++++
 .../worker/processor/TaskCallbackService.java      | 15 +++-
 .../worker/processor/TaskCallbackServiceTest.java  | 98 ++++++++++++++++++++--
 5 files changed, 176 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 50c851c..b2cf53a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -53,6 +53,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.dolphinscheduler.common.Constants.*;
+
 /**
  * TaskUpdateQueue consumer
  */
@@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{
      * taskUpdateQueue
      */
     @Autowired
-    private TaskPriorityQueue taskUpdateQueue;
+    private TaskPriorityQueue taskPriorityQueue;
 
     /**
      * processService
@@ -93,7 +95,7 @@ public class TaskPriorityQueueConsumer extends Thread{
         while (Stopper.isRunning()){
             try {
                 // if not task , blocking here
-                String taskPriorityInfo = taskUpdateQueue.take();
+                String taskPriorityInfo = taskPriorityQueue.take();
 
                 TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
 
@@ -114,13 +116,22 @@ public class TaskPriorityQueueConsumer extends Thread{
     private Boolean dispatch(int taskInstanceId){
         TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
         ExecutionContext executionContext = new 
ExecutionContext(context.toCommand(), ExecutorType.WORKER, 
context.getWorkerGroup());
-        try {
-            return dispatcher.dispatch(executionContext);
-        } catch (ExecuteException e) {
-            logger.error("execute exception", e);
-            return false;
-        }
+        Boolean result = false;
+        while (Stopper.isRunning()){
+            try {
+                result =  dispatcher.dispatch(executionContext);
+            } catch (ExecuteException e) {
+                logger.error("dispatch error",e);
+                try {
+                    Thread.sleep(SLEEP_TIME_MILLIS);
+                } catch (InterruptedException e1) {}
+            }
 
+            if (result){
+                break;
+            }
+        }
+        return result;
     }
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 1eb40db..7af9cdc 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
 
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -31,9 +33,12 @@ import 
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.dolphinscheduler.common.Constants.*;
+
 /**
  *  task ack processor
  */
@@ -51,9 +56,16 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
      */
     private final TaskInstanceCacheManager taskInstanceCacheManager;
 
+
+    /**
+     * processService
+     */
+    private ProcessService processService;
+
     public TaskAckProcessor(){
         this.taskResponseService = 
SpringApplicationContext.getBean(TaskResponseService.class);
         this.taskInstanceCacheManager = 
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
+        this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
     }
 
     /**
@@ -71,8 +83,10 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
 
         String workerAddress = ChannelUtils.toAddress(channel).getAddress();
 
+        ExecutionStatus ackStatus = 
ExecutionStatus.of(taskAckCommand.getStatus());
+
         // TaskResponseEvent
-        TaskResponseEvent taskResponseEvent = 
TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()),
+        TaskResponseEvent taskResponseEvent = 
TaskResponseEvent.newAck(ackStatus,
                 taskAckCommand.getStartTime(),
                 workerAddress,
                 taskAckCommand.getExecutePath(),
@@ -81,6 +95,18 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
 
         taskResponseService.addResponse(taskResponseEvent);
 
+        while (Stopper.isRunning()){
+            TaskInstance taskInstance = 
processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
+
+            if (taskInstance != null && ackStatus.typeIsRunning()){
+                break;
+            }
+
+            try {
+                Thread.sleep(SLEEP_TIME_MILLIS);
+            } catch (InterruptedException e) {}
+        }
+
     }
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 36b3823..ecb8646 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
 
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -30,9 +32,12 @@ import 
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.dolphinscheduler.common.Constants.*;
+
 /**
  *  task response processor
  */
@@ -50,9 +55,15 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
      */
     private final TaskInstanceCacheManager taskInstanceCacheManager;
 
+    /**
+     * processService
+     */
+    private ProcessService processService;
+
     public TaskResponseProcessor(){
         this.taskResponseService = 
SpringApplicationContext.getBean(TaskResponseService.class);
         this.taskInstanceCacheManager = 
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
+        this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
     }
 
     /**
@@ -71,6 +82,8 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
 
         taskInstanceCacheManager.cacheTaskInstance(responseCommand);
 
+        ExecutionStatus responseStatus = 
ExecutionStatus.of(responseCommand.getStatus());
+
         // TaskResponseEvent
         TaskResponseEvent taskResponseEvent = 
TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
                 responseCommand.getEndTime(),
@@ -79,6 +92,18 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
                 responseCommand.getTaskInstanceId());
 
         taskResponseService.addResponse(taskResponseEvent);
+
+        while (Stopper.isRunning()){
+            TaskInstance taskInstance = 
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+
+            if (taskInstance != null && responseStatus.typeIsFinished()){
+                break;
+            }
+
+            try {
+                Thread.sleep(SLEEP_TIME_MILLIS);
+            } catch (InterruptedException e) {}
+        }
     }
 
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index f966591..7cd25cb 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -18,9 +18,11 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 
+import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -93,8 +95,17 @@ public class TaskCallbackService {
         }
         logger.warn("original master : {} is not reachable, random select 
master", nettyRemoteChannel.getHost());
         Set<String> masterNodes = 
zookeeperRegistryCenter.getMasterNodesDirectly();
-        if(CollectionUtils.isEmpty(masterNodes)){
-            throw new IllegalStateException("no available master node 
exception");
+        while (Stopper.isRunning()) {
+            if (CollectionUtils.isEmpty(masterNodes)) {
+                logger.error("no available master node");
+                try {
+                    Thread.sleep(1000);
+                }catch (Exception e){
+
+                }
+            }else {
+                break;
+            }
         }
         for(String masterNode : masterNodes){
             newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 5f44e1c..a0fee7c 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -17,21 +17,26 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
+import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
 import org.junit.Test;
@@ -47,9 +52,10 @@ import java.util.Date;
  * test task call back service
  */
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, 
SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class,
+@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, 
SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, 
WorkerRegistry.class,
         ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
-        ZookeeperCachedOperator.class, ZookeeperConfig.class, 
ZookeeperNodeManager.class, TaskCallbackService.class})
+        ZookeeperCachedOperator.class, ZookeeperConfig.class, 
ZookeeperNodeManager.class, TaskCallbackService.class,
+        TaskResponseService.class, 
TaskAckProcessor.class,TaskResponseProcessor.class})
 public class TaskCallbackServiceTest {
 
     @Autowired
@@ -58,12 +64,22 @@ public class TaskCallbackServiceTest {
     @Autowired
     private MasterRegistry masterRegistry;
 
+    @Autowired
+    private TaskAckProcessor taskAckProcessor;
+
+    @Autowired
+    private TaskResponseProcessor taskResponseProcessor;
+
+    /**
+     * send ack test
+     * @throws Exception
+     */
     @Test
-    public void testSendAck(){
+    public void testSendAck() throws Exception{
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
         NettyRemotingServer nettyRemotingServer = new 
NettyRemotingServer(serverConfig);
-        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
Mockito.mock(TaskAckProcessor.class));
+        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
         nettyRemotingServer.start();
 
         final NettyClientConfig clientConfig = new NettyClientConfig();
@@ -75,22 +91,64 @@ public class TaskCallbackServiceTest {
         ackCommand.setStartTime(new Date());
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
 
+        Thread.sleep(5000);
+
+        Stopper.stop();
+
+        Thread.sleep(5000);
+
         nettyRemotingServer.close();
         nettyRemotingClient.close();
     }
 
+    /**
+     * send result test
+     * @throws Exception
+     */
+    @Test
+    public void testSendResult() throws Exception{
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(30000);
+        NettyRemotingServer nettyRemotingServer = new 
NettyRemotingServer(serverConfig);
+        
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
taskResponseProcessor);
+        nettyRemotingServer.start();
+
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        NettyRemotingClient nettyRemotingClient = new 
NettyRemotingClient(clientConfig);
+        Channel channel = 
nettyRemotingClient.getChannel(Host.of("localhost:30000"));
+        taskCallbackService.addRemoteChannel(1, new 
NettyRemoteChannel(channel, 1));
+        TaskExecuteResponseCommand responseCommand  = new 
TaskExecuteResponseCommand();
+        responseCommand.setTaskInstanceId(1);
+        responseCommand.setEndTime(new Date());
+
+        taskCallbackService.sendResult(1, responseCommand.convert2Command());
+
+        Thread.sleep(5000);
+
+        Stopper.stop();
+
+        Thread.sleep(5000);
+
+        nettyRemotingServer.close();
+        nettyRemotingClient.close();
+    }
+
+
+
     @Test(expected = IllegalArgumentException.class)
     public void testSendAckWithIllegalArgumentException(){
         TaskExecuteAckCommand ackCommand = 
Mockito.mock(TaskExecuteAckCommand.class);
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
+        Stopper.stop();
     }
 
     @Test(expected = IllegalStateException.class)
     public void testSendAckWithIllegalStateException1(){
+        masterRegistry.registry();
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
         NettyRemotingServer nettyRemotingServer = new 
NettyRemotingServer(serverConfig);
-        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
Mockito.mock(TaskAckProcessor.class));
+        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
         nettyRemotingServer.start();
 
         final NettyClientConfig clientConfig = new NettyClientConfig();
@@ -103,7 +161,21 @@ public class TaskCallbackServiceTest {
         ackCommand.setStartTime(new Date());
 
         nettyRemotingServer.close();
+
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        Stopper.stop();
+
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
     }
 
     @Test(expected = IllegalStateException.class)
@@ -112,7 +184,7 @@ public class TaskCallbackServiceTest {
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
         NettyRemotingServer nettyRemotingServer = new 
NettyRemotingServer(serverConfig);
-        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
Mockito.mock(TaskAckProcessor.class));
+        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
         nettyRemotingServer.start();
 
         final NettyClientConfig clientConfig = new NettyClientConfig();
@@ -125,6 +197,20 @@ public class TaskCallbackServiceTest {
         ackCommand.setStartTime(new Date());
 
         nettyRemotingServer.close();
+
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        Stopper.stop();
+
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
     }
 }

Reply via email to