This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new 6c819ee  [Bug] [dolphinscheduler-server] workflow is always running 
when worker sendResult success but master not received  (#7610)
6c819ee is described below

commit 6c819ee16d575e62039d98183b93fd3a06c628ed
Author: zwZjut <[email protected]>
AuthorDate: Fri Dec 24 16:04:15 2021 +0800

    [Bug] [dolphinscheduler-server] workflow is always running when worker 
sendResult success but master not received  (#7610)
    
    * [Feature][dolphinscheduler-api] parse traceId in http header for Cross 
system delivery to #7237 (#7238)
    
    * to #7237
    
    * rerun test
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * chery-pick 05aef27 and handle conflicts
    
    * to #7065: fix ExecutorService and schedulerService (#7072)
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * [Feature][dolphinscheduler-api] access control of taskDefinition and 
taskInstance in project to #7081  (#7082)
    
    * to #7081
    
    * fix #7081
    
    * to #7081
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * chery-pick 8ebe060 and handle conflicts
    
    * cherry-pick 1f18444 and handle conflicts
    
    * fix #6807: dolphinscheduler.zookeeper.env_vars - > 
dolphinscheduler.registry.env_vars (#6808)
    
    Co-authored-by: honghuo.zw <[email protected]>
    Co-authored-by: Kirs <[email protected]>
    
    * add default constructor (#6780)
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * to #7108 (#7109)
    
    * to #7609
    
    Co-authored-by: honghuo.zw <[email protected]>
    Co-authored-by: Kirs <[email protected]>
---
 .../server/worker/processor/DBTaskResponseProcessor.java         | 6 ++++--
 .../server/worker/processor/TaskCallbackService.java             | 9 ++++-----
 .../server/worker/processor/TaskKillProcessor.java               | 2 ++
 3 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index 6da9fdd..f9e206e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -47,13 +47,15 @@ public class DBTaskResponseProcessor implements 
NettyRequestProcessor {
         DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
                 command.getBody(), DBTaskResponseCommand.class);
 
-        if (taskResponseCommand == null){
+        if (taskResponseCommand == null) {
             return;
         }
 
-        if (taskResponseCommand.getStatus() == 
ExecutionStatus.SUCCESS.getCode()){
+        if (taskResponseCommand.getStatus() == 
ExecutionStatus.SUCCESS.getCode()) {
             
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
             logger.debug("removeResponseCache: taskinstance id:{}", 
taskResponseCommand.getTaskInstanceId());
+            
TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
+            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", 
taskResponseCommand.getTaskInstanceId());
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 96ec36b..09b2c3a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,7 +124,7 @@ public class TaskCallbackService {
      *
      * @param taskInstanceId taskInstanceId
      */
-    public void remove(int taskInstanceId) {
+    public static void remove(int taskInstanceId) {
         REMOTE_CHANNELS.remove(taskInstanceId);
     }
 
@@ -156,7 +155,7 @@ public class TaskCallbackService {
                 @Override
                 public void operationComplete(ChannelFuture future) throws 
Exception {
                     if (future.isSuccess()) {
-                        remove(taskInstanceId);
+                        // remove(taskInstanceId);
                         return;
                     }
                 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index c0ecd67..4341b8c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -99,6 +99,8 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
         TaskKillResponseCommand taskKillResponseCommand = 
buildKillTaskResponseCommand(killCommand, result);
         
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), 
taskKillResponseCommand.convert2Command());
         
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
+        TaskCallbackService.remove(killCommand.getTaskInstanceId());
+        logger.info("remove REMOTE_CHANNELS, task instance id:{}", 
killCommand.getTaskInstanceId());
     }
 
     /**

Reply via email to