This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
     new f034a09d25 [Bug-11650][worker] #11650 fix SQL type task, stop task 
cause NPE (#11668) (#11958)
f034a09d25 is described below

commit f034a09d25188ed432572780041f804ad8e4f2c2
Author: caishunfeng <[email protected]>
AuthorDate: Thu Sep 15 14:24:34 2022 +0800

    [Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668) 
(#11958)
    
    Co-authored-by: 冯剑 <[email protected]>
---
 .../server/worker/processor/TaskKillProcessor.java | 39 ++++++++++++++--------
 1 file changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 3aadb5a5cc..d73cc5b3db 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+
+import io.micrometer.core.lang.NonNull;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -43,6 +43,11 @@ import 
org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
 import org.apache.dolphinscheduler.service.log.LogClientService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -78,7 +83,7 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == 
command.getType(),
-                String.format("invalid command type : %s", command.getType()));
+            String.format("invalid command type : %s", command.getType()));
         TaskKillRequestCommand killCommand = 
JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
         if (killCommand == null) {
             logger.error("task kill request command is null");
@@ -88,7 +93,7 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
 
         int taskInstanceId = killCommand.getTaskInstanceId();
         TaskExecutionContext taskExecutionContext =
-                
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+            
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
         if (taskExecutionContext == null) {
             logger.error("taskRequest cache is null, taskInstanceId: {}", 
killCommand.getTaskInstanceId());
             return;
@@ -110,7 +115,7 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
         Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
 
         taskExecutionContext.setCurrentExecutionStatus(
-                result.getLeft() ? TaskExecutionStatus.SUCCESS : 
TaskExecutionStatus.FAILURE);
+            result.getLeft() ? TaskExecutionStatus.SUCCESS : 
TaskExecutionStatus.FAILURE);
         taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, 
result.getRight()));
         sendTaskKillResponseCommand(channel, taskExecutionContext);
 
@@ -123,7 +128,9 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
     private void sendTaskKillResponseCommand(Channel channel, 
TaskExecutionContext taskExecutionContext) {
         TaskKillResponseCommand taskKillResponseCommand = new 
TaskKillResponseCommand();
         
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus());
-        
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+        if (taskExecutionContext.getAppIds() != null) {
+            
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+        }
         
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
         taskKillResponseCommand.setHost(taskExecutionContext.getHost());
         
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
@@ -138,14 +145,20 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
         });
     }
 
+    /**
+     * do kill
+     *
+     * @return kill result
+     */
     private Pair<Boolean, List<String>> doKill(TaskExecutionContext 
taskExecutionContext) {
         // kill system process
         boolean processFlag = 
killProcess(taskExecutionContext.getTenantCode(), 
taskExecutionContext.getProcessId());
+
         // find log and kill yarn job
         Pair<Boolean, List<String>> yarnResult = 
killYarnJob(Host.of(taskExecutionContext.getHost()),
-                taskExecutionContext.getLogPath(),
-                taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTenantCode());
+            taskExecutionContext.getLogPath(),
+            taskExecutionContext.getExecutePath(),
+            taskExecutionContext.getTenantCode());
         return Pair.of(processFlag && yarnResult.getLeft(), 
yarnResult.getRight());
     }
 
@@ -200,8 +213,8 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
     /**
      * kill yarn job
      *
-     * @param host        host
-     * @param logPath     logPath
+     * @param host host
+     * @param logPath logPath
      * @param executePath executePath
      * @param tenantCode  tenantCode
      * @return Pair<Boolean, List < String>> yarn kill result
@@ -212,7 +225,7 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
                                                     String tenantCode) {
         if (logPath == null || executePath == null || tenantCode == null) {
             logger.error("Kill yarn job error, the input params is illegal, 
host: {}, logPath: {}, executePath: {}, tenantCode: {}",
-                    host, logPath, executePath, tenantCode);
+                host, logPath, executePath, tenantCode);
             return Pair.of(false, Collections.emptyList());
         }
         try (LogClientService logClient = new LogClientService()) {
@@ -233,4 +246,4 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
         return Pair.of(false, Collections.emptyList());
     }
 
-}
+}
\ No newline at end of file

Reply via email to