This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 552fd17  [Plugin][Task]Fix Yarn Task Kill Not valid (#6293)
552fd17 is described below

commit 552fd17878641a638bb5a8a46339b70426ba6c8c
Author: Kirs <[email protected]>
AuthorDate: Wed Sep 22 21:59:12 2021 +0800

    [Plugin][Task]Fix Yarn Task Kill Not valid (#6293)
---
 .../server/worker/runner/TaskExecuteThread.java           | 15 +++++++++++++++
 .../plugin/task/api/AbstractYarnTask.java                 |  4 ----
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index b6ad565..19dedb9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -31,14 +31,18 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.RetryerUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
@@ -109,6 +113,11 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
     private TaskPluginManager taskPluginManager;
 
     /**
+     * process database access
+     */
+    protected ProcessService processService;
+
+    /**
      * constructor
      *
      * @param taskExecutionContext taskExecutionContext
@@ -120,6 +129,7 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
         this.taskExecutionContext = taskExecutionContext;
         this.taskCallbackService = taskCallbackService;
         this.alertClientService = alertClientService;
+        this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
     }
 
     public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
@@ -130,6 +140,7 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
         this.taskCallbackService = taskCallbackService;
         this.alertClientService = alertClientService;
         this.taskPluginManager = taskPluginManager;
+        this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
     }
 
     @Override
@@ -266,6 +277,10 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
         if (task != null) {
             try {
                 task.cancelApplication(true);
+                TaskInstance taskInstance = 
processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
+                if (taskInstance != null) {
+                    ProcessUtils.killYarnJob(taskExecutionContext);
+                }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index d2c7c55..b68fe1b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -66,10 +66,6 @@ public abstract class AbstractYarnTask extends 
AbstractTaskExecutor {
         cancel = true;
         // cancel process
         shellCommandExecutor.cancelApplication();
-        //  TaskInstance taskInstance = 
processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
-        // if (status && taskInstance != null){
-        //   ProcessUtils.killYarnJob(taskExecutionContext);
-        //  }
     }
 
     /**

Reply via email to