This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new aef5524ee7 Clearer task runnable. (#13689)
aef5524ee7 is described below

commit aef5524ee7d44cfed55aab3304dd259772c99ff3
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Mar 9 20:46:29 2023 +0800

    Clearer task runnable. (#13689)
    
    * Clean unused method in AbstractTask
    
    * Kill task by process.destroy
    
    * wait 5/s after destroy process
---
 .../plugin/task/api/AbstractCommandExecutor.java   | 63 +++++-----------------
 .../plugin/task/api/AbstractTask.java              | 18 +------
 .../plugin/task/api/TaskChannel.java               |  2 +
 .../plugin/task/api/parameters/IParameters.java    | 33 ++++++------
 .../plugin/task/api/utils/ProcessUtils.java        |  1 +
 .../plugin/task/java/JavaTask.java                 | 16 ------
 .../plugin/task/openmldb/OpenmldbTask.java         |  6 ---
 .../plugin/task/python/PythonTask.java             | 43 ---------------
 .../server/worker/processor/TaskKillProcessor.java |  1 +
 9 files changed, 35 insertions(+), 148 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 92297558d7..684db5d414 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.api;
 
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-import static 
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.getPidsStr;
 
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@@ -191,7 +190,7 @@ public abstract class AbstractCommandExecutor {
         command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
     }
 
-    public TaskResponse run(String execCommand, TaskCallBack taskCallBack) 
throws IOException, InterruptedException {
+    public TaskResponse run(String execCommand, TaskCallBack taskCallBack) 
throws Exception {
         TaskResponse result = new TaskResponse();
         int taskInstanceId = taskRequest.getTaskInstanceId();
         if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
@@ -223,8 +222,8 @@ public abstract class AbstractCommandExecutor {
         boolean updateTaskExecutionContextStatus =
                 
TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
         if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
-            ProcessUtils.kill(taskRequest);
             result.setExitStatusCode(EXIT_CODE_KILL);
+            cancelApplication();
             return result;
         }
         // print process id
@@ -262,14 +261,13 @@ public abstract class AbstractCommandExecutor {
         } else {
             logger.error("process has failure, the task timeout configuration 
value is:{}, ready to kill ...",
                     taskRequest.getTaskTimeout());
-            ProcessUtils.kill(taskRequest);
             result.setExitStatusCode(EXIT_CODE_FAILURE);
+            cancelApplication();
         }
         int exitCode = process.exitValue();
         String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has 
killed." : "process has exited.";
-        logger.info(exitLogMessage
-                + " execute path:{}, processId:{} ,exitStatusCode:{} 
,processWaitForStatus:{} ,processExitValue:{}",
-                taskRequest.getExecutePath(), processId, 
result.getExitStatusCode(), status, exitCode);
+        logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} 
,processWaitForStatus:{} ,processExitValue:{}",
+                exitLogMessage, taskRequest.getExecutePath(), processId, 
result.getExitStatusCode(), status, exitCode);
         return result;
 
     }
@@ -278,53 +276,18 @@ public abstract class AbstractCommandExecutor {
         return varPool.toString();
     }
 
-    /**
-     * cancel application
-     *
-     * @throws Exception exception
-     */
-    public void cancelApplication() throws Exception {
+    public void cancelApplication() throws InterruptedException {
         if (process == null) {
             return;
         }
 
-        int processId = getProcessId(process);
-        logger.info("Begin to kill process process, pid is : {}", processId);
-        // kill , waiting for completion
-        boolean alive = softKill(processId);
-
-        if (alive) {
-            String cmd = String.format("kill -9 %s", getPidsStr(processId));
-            cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
-            OSUtils.exeCmd(cmd);
-            logger.info("Success kill task: {}, pid: {}, cmd: {}", 
taskRequest.getTaskAppId(), processId, cmd);
-        } else {
-            logger.info("The process: {} is not alive, no need to kill", 
processId);
+        // soft kill
+        logger.info("Begin to kill process process, pid is : {}", 
taskRequest.getProcessId());
+        process.destroy();
+        if (!process.waitFor(5, TimeUnit.SECONDS)) {
+            process.destroyForcibly();
         }
-    }
-
-    /**
-     * soft kill
-     *
-     * @param processId process id
-     * @return process is alive
-     */
-    private boolean softKill(int processId) {
-
-        if (processId != 0 && process.isAlive()) {
-            try {
-                // sudo -u user command to run command
-                String cmd = String.format("kill %d", processId);
-                cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
-                logger.info("soft kill task:{}, process id:{}, cmd:{}", 
taskRequest.getTaskAppId(), processId, cmd);
-
-                Runtime.getRuntime().exec(cmd);
-            } catch (IOException e) {
-                logger.info("kill attempt failed", e);
-            }
-        }
-
-        return process.isAlive();
+        logger.info("Success kill task: {}, pid: {}", 
taskRequest.getTaskAppId(), taskRequest.getProcessId());
     }
 
     private void printCommand(List<String> commands) {
@@ -424,7 +387,7 @@ public abstract class AbstractCommandExecutor {
             f.setAccessible(true);
 
             processId = f.getInt(process);
-        } catch (Throwable e) {
+        } catch (Exception e) {
             logger.error("Get task pid failed", e);
         }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index 1c51594b9e..1986957c98 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -55,11 +55,6 @@ public abstract class AbstractTask {
      */
     protected int processId;
 
-    /**
-     * SHELL result string
-     */
-    protected String resultString;
-
     /**
      * other resource manager appId , for example : YARN etc
      */
@@ -89,10 +84,7 @@ public abstract class AbstractTask {
     public void init() {
     }
 
-    public String getPreScript() {
-        return null;
-    }
-
+    // todo: return TaskResult rather than store the result in Task
     public abstract void handle(TaskCallBack taskCallBack) throws 
TaskException;
 
     public abstract void cancel() throws TaskException;
@@ -126,14 +118,6 @@ public abstract class AbstractTask {
         this.processId = processId;
     }
 
-    public String getResultString() {
-        return resultString;
-    }
-
-    public void setResultString(String resultString) {
-        this.resultString = resultString;
-    }
-
     public String getAppIds() {
         return appIds;
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
index 421a2646c2..77abae5047 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
@@ -25,8 +25,10 @@ public interface TaskChannel {
 
     void cancelApplication(boolean status);
 
+    // todo: return ITask
     AbstractTask createTask(TaskExecutionContext taskRequest);
 
+    // todo: return IParameters
     AbstractParameters parseParameters(ParametersNode parametersNode);
 
     ResourceParametersHelper getResources(String parameters);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
index bcd914c8be..a23c25dd07 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
@@ -1,19 +1,20 @@
-package org.apache.dolphinscheduler.plugin.task.api.parameters;/*
-                                                               * Licensed to 
the Apache Software Foundation (ASF) under one or more
-                                                               * contributor 
license agreements.  See the NOTICE file distributed with
-                                                               * this work for 
additional information regarding copyright ownership.
-                                                               * The ASF 
licenses this file to You under the Apache License, Version 2.0
-                                                               * (the 
"License"); you may not use this file except in compliance with
-                                                               * the License.  
You may obtain a copy of the License at
-                                                               *
-                                                               *    
http://www.apache.org/licenses/LICENSE-2.0
-                                                               *
-                                                               * Unless 
required by applicable law or agreed to in writing, software
-                                                               * distributed 
under the License is distributed on an "AS IS" BASIS,
-                                                               * WITHOUT 
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-                                                               * See the 
License for the specific language governing permissions and
-                                                               * limitations 
under the License.
-                                                               */
+package org.apache.dolphinscheduler.plugin.task.api.parameters;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
index 9ec8480589..e260277020 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -79,6 +79,7 @@ public final class ProcessUtils {
     /**
      * kill tasks according to different task types.
      */
+    @Deprecated
     public static boolean kill(@NonNull TaskExecutionContext request) {
         try {
             log.info("Begin kill task instance, processId: {}", 
request.getProcessId());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index 8628c01ea6..280ede0d7e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -99,22 +99,6 @@ public class JavaTask extends AbstractTask {
         log.info("Initialize java task params {}", 
JSONUtils.toPrettyJsonString(javaParameters));
     }
 
-    /**
-     *  Gets the Java source file that was initially processed
-     *
-     * @return String
-     **/
-    @Override
-    public String getPreScript() {
-        String rawJavaScript = 
javaParameters.getRawScript().replaceAll("\\r\\n", "\n");
-        try {
-            rawJavaScript = convertJavaSourceCodePlaceholders(rawJavaScript);
-        } catch (StringIndexOutOfBoundsException e) {
-            log.error("setShareVar field format error, raw java script: {}", 
rawJavaScript);
-        }
-        return rawJavaScript;
-    }
-
     /**
      * Execute Java tasks
      *
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
index 75e6b3bce3..88371675ef 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
@@ -70,12 +70,6 @@ public class OpenmldbTask extends PythonTask {
         }
     }
 
-    @Override
-    @Deprecated
-    public String getPreScript() {
-        return "";
-    }
-
     /**
      * build python command file path
      *
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index cb2cdfdcdc..f2853bab5e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -87,17 +87,6 @@ public class PythonTask extends AbstractTask {
         }
     }
 
-    @Override
-    public String getPreScript() {
-        String rawPythonScript = 
pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
-        try {
-            rawPythonScript = convertPythonScriptPlaceholders(rawPythonScript);
-        } catch (StringIndexOutOfBoundsException e) {
-            log.error("setShareVar field format error, raw python script : 
{}", rawPythonScript);
-        }
-        return rawPythonScript;
-    }
-
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
@@ -137,38 +126,6 @@ public class PythonTask extends AbstractTask {
         return pythonParameters;
     }
 
-    /**
-     * convertPythonScriptPlaceholders
-     *
-     * @param rawScript rawScript
-     * @return String
-     * @throws StringIndexOutOfBoundsException if substring index is out of 
bounds
-     */
-    private static String convertPythonScriptPlaceholders(String rawScript) 
throws StringIndexOutOfBoundsException {
-        int len = "${setShareVar(${".length();
-        int scriptStart = 0;
-        while ((scriptStart = rawScript.indexOf("${setShareVar(${", 
scriptStart)) != -1) {
-            int start = -1;
-            int end = rawScript.indexOf('}', scriptStart + len);
-            String prop = rawScript.substring(scriptStart + len, end);
-
-            start = rawScript.indexOf(',', end);
-            end = rawScript.indexOf(')', start);
-
-            String value = rawScript.substring(start + 1, end);
-
-            start = rawScript.indexOf('}', start) + 1;
-            end = rawScript.length();
-
-            String replaceScript = 
String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
-
-            rawScript = rawScript.substring(0, scriptStart) + replaceScript + 
rawScript.substring(start, end);
-
-            scriptStart += replaceScript.length();
-        }
-        return rawScript;
-    }
-
     /**
      * create python command file if not exists
      *
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 9afc4708c7..6421dc7490 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -186,6 +186,7 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
      * @param processId
      */
     protected boolean killProcess(String tenantCode, Integer processId) {
+        // todo: directly interrupt the process
         boolean processFlag = true;
         if (processId == null || processId.equals(0)) {
             return true;

Reply via email to