This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 653eae2  [BUG][Master] process cannot finish and its status is always 
running (#6886)
653eae2 is described below

commit 653eae24195957b01d1a911aada020372d1742e6
Author: OS <[email protected]>
AuthorDate: Wed Nov 17 17:39:22 2021 +0800

    [BUG][Master] process cannot finish and its status is always running (#6886)
    
    * fix 6882: process cannot finish and its status is always running
    
    * fix 6882: process cannot finish and its status is always running
---
 .../dolphinscheduler/server/master/runner/EventExecuteService.java    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index bfd8b39..c40618a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -118,6 +118,7 @@ public class EventExecuteService extends Thread {
         for (WorkflowExecuteThread workflowExecuteThread : 
this.processInstanceExecCacheManager.getAll()) {
             if (workflowExecuteThread.eventSize() == 0
                     || StringUtils.isEmpty(workflowExecuteThread.getKey())
+                    || !workflowExecuteThread.isStart()
                     || 
eventHandlerMap.containsKey(workflowExecuteThread.getKey())) {
                 continue;
             }
@@ -186,12 +187,13 @@ public class EventExecuteService extends Thread {
                     StateEventChangeCommand stateEventChangeCommand = new 
StateEventChangeCommand(
                             processInstanceId, 0, 
workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), 
taskInstance.getId()
                     );
-
                     stateEventCallbackService.sendResult(address, port, 
stateEventChangeCommand.convert2Command());
                 }
 
                 @Override
                 public void onFailure(Throwable throwable) {
+                    logger.info("handle events {} failed.", processInstanceId);
+                    logger.info("handle events failed.", throwable);
                 }
             };
             Futures.addCallback(future, futureCallback, 
this.listeningExecutorService);

Reply via email to