This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new aef5524ee7 Clearer task runnable. (#13689)
aef5524ee7 is described below
commit aef5524ee7d44cfed55aab3304dd259772c99ff3
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Mar 9 20:46:29 2023 +0800
Clearer task runnable. (#13689)
* Clean unused method in AbstractTask
* Kill task by process.destroy
* wait 5/s after destroy process
---
.../plugin/task/api/AbstractCommandExecutor.java | 63 +++++-----------------
.../plugin/task/api/AbstractTask.java | 18 +------
.../plugin/task/api/TaskChannel.java | 2 +
.../plugin/task/api/parameters/IParameters.java | 33 ++++++------
.../plugin/task/api/utils/ProcessUtils.java | 1 +
.../plugin/task/java/JavaTask.java | 16 ------
.../plugin/task/openmldb/OpenmldbTask.java | 6 ---
.../plugin/task/python/PythonTask.java | 43 ---------------
.../server/worker/processor/TaskKillProcessor.java | 1 +
9 files changed, 35 insertions(+), 148 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 92297558d7..684db5d414 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.api;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-import static
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.getPidsStr;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@@ -191,7 +190,7 @@ public abstract class AbstractCommandExecutor {
command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
}
- public TaskResponse run(String execCommand, TaskCallBack taskCallBack)
throws IOException, InterruptedException {
+ public TaskResponse run(String execCommand, TaskCallBack taskCallBack)
throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
@@ -223,8 +222,8 @@ public abstract class AbstractCommandExecutor {
boolean updateTaskExecutionContextStatus =
TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
- ProcessUtils.kill(taskRequest);
result.setExitStatusCode(EXIT_CODE_KILL);
+ cancelApplication();
return result;
}
// print process id
@@ -262,14 +261,13 @@ public abstract class AbstractCommandExecutor {
} else {
logger.error("process has failure, the task timeout configuration
value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
- ProcessUtils.kill(taskRequest);
result.setExitStatusCode(EXIT_CODE_FAILURE);
+ cancelApplication();
}
int exitCode = process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has
killed." : "process has exited.";
- logger.info(exitLogMessage
- + " execute path:{}, processId:{} ,exitStatusCode:{}
,processWaitForStatus:{} ,processExitValue:{}",
- taskRequest.getExecutePath(), processId,
result.getExitStatusCode(), status, exitCode);
+ logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{}
,processWaitForStatus:{} ,processExitValue:{}",
+ exitLogMessage, taskRequest.getExecutePath(), processId,
result.getExitStatusCode(), status, exitCode);
return result;
}
@@ -278,53 +276,18 @@ public abstract class AbstractCommandExecutor {
return varPool.toString();
}
- /**
- * cancel application
- *
- * @throws Exception exception
- */
- public void cancelApplication() throws Exception {
+ public void cancelApplication() throws InterruptedException {
if (process == null) {
return;
}
- int processId = getProcessId(process);
- logger.info("Begin to kill process process, pid is : {}", processId);
- // kill , waiting for completion
- boolean alive = softKill(processId);
-
- if (alive) {
- String cmd = String.format("kill -9 %s", getPidsStr(processId));
- cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
- OSUtils.exeCmd(cmd);
- logger.info("Success kill task: {}, pid: {}, cmd: {}",
taskRequest.getTaskAppId(), processId, cmd);
- } else {
- logger.info("The process: {} is not alive, no need to kill",
processId);
+ // soft kill
+ logger.info("Begin to kill process process, pid is : {}",
taskRequest.getProcessId());
+ process.destroy();
+ if (!process.waitFor(5, TimeUnit.SECONDS)) {
+ process.destroyForcibly();
}
- }
-
- /**
- * soft kill
- *
- * @param processId process id
- * @return process is alive
- */
- private boolean softKill(int processId) {
-
- if (processId != 0 && process.isAlive()) {
- try {
- // sudo -u user command to run command
- String cmd = String.format("kill %d", processId);
- cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
- logger.info("soft kill task:{}, process id:{}, cmd:{}",
taskRequest.getTaskAppId(), processId, cmd);
-
- Runtime.getRuntime().exec(cmd);
- } catch (IOException e) {
- logger.info("kill attempt failed", e);
- }
- }
-
- return process.isAlive();
+ logger.info("Success kill task: {}, pid: {}",
taskRequest.getTaskAppId(), taskRequest.getProcessId());
}
private void printCommand(List<String> commands) {
@@ -424,7 +387,7 @@ public abstract class AbstractCommandExecutor {
f.setAccessible(true);
processId = f.getInt(process);
- } catch (Throwable e) {
+ } catch (Exception e) {
logger.error("Get task pid failed", e);
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index 1c51594b9e..1986957c98 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -55,11 +55,6 @@ public abstract class AbstractTask {
*/
protected int processId;
- /**
- * SHELL result string
- */
- protected String resultString;
-
/**
* other resource manager appId , for example : YARN etc
*/
@@ -89,10 +84,7 @@ public abstract class AbstractTask {
public void init() {
}
- public String getPreScript() {
- return null;
- }
-
+ // todo: return TaskResult rather than store the result in Task
public abstract void handle(TaskCallBack taskCallBack) throws
TaskException;
public abstract void cancel() throws TaskException;
@@ -126,14 +118,6 @@ public abstract class AbstractTask {
this.processId = processId;
}
- public String getResultString() {
- return resultString;
- }
-
- public void setResultString(String resultString) {
- this.resultString = resultString;
- }
-
public String getAppIds() {
return appIds;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
index 421a2646c2..77abae5047 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java
@@ -25,8 +25,10 @@ public interface TaskChannel {
void cancelApplication(boolean status);
+ // todo: return ITask
AbstractTask createTask(TaskExecutionContext taskRequest);
+ // todo: return IParameters
AbstractParameters parseParameters(ParametersNode parametersNode);
ResourceParametersHelper getResources(String parameters);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
index bcd914c8be..a23c25dd07 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java
@@ -1,19 +1,20 @@
-package org.apache.dolphinscheduler.plugin.task.api.parameters;/*
- * Licensed to
the Apache Software Foundation (ASF) under one or more
- * contributor
license agreements. See the NOTICE file distributed with
- * this work for
additional information regarding copyright ownership.
- * The ASF
licenses this file to You under the Apache License, Version 2.0
- * (the
"License"); you may not use this file except in compliance with
- * the License.
You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless
required by applicable law or agreed to in writing, software
- * distributed
under the License is distributed on an "AS IS" BASIS,
- * WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the
License for the specific language governing permissions and
- * limitations
under the License.
- */
+package org.apache.dolphinscheduler.plugin.task.api.parameters;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
index 9ec8480589..e260277020 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -79,6 +79,7 @@ public final class ProcessUtils {
/**
* kill tasks according to different task types.
*/
+ @Deprecated
public static boolean kill(@NonNull TaskExecutionContext request) {
try {
log.info("Begin kill task instance, processId: {}",
request.getProcessId());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index 8628c01ea6..280ede0d7e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -99,22 +99,6 @@ public class JavaTask extends AbstractTask {
log.info("Initialize java task params {}",
JSONUtils.toPrettyJsonString(javaParameters));
}
- /**
- * Gets the Java source file that was initially processed
- *
- * @return String
- **/
- @Override
- public String getPreScript() {
- String rawJavaScript =
javaParameters.getRawScript().replaceAll("\\r\\n", "\n");
- try {
- rawJavaScript = convertJavaSourceCodePlaceholders(rawJavaScript);
- } catch (StringIndexOutOfBoundsException e) {
- log.error("setShareVar field format error, raw java script: {}",
rawJavaScript);
- }
- return rawJavaScript;
- }
-
/**
* Execute Java tasks
*
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
index 75e6b3bce3..88371675ef 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
@@ -70,12 +70,6 @@ public class OpenmldbTask extends PythonTask {
}
}
- @Override
- @Deprecated
- public String getPreScript() {
- return "";
- }
-
/**
* build python command file path
*
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index cb2cdfdcdc..f2853bab5e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -87,17 +87,6 @@ public class PythonTask extends AbstractTask {
}
}
- @Override
- public String getPreScript() {
- String rawPythonScript =
pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
- try {
- rawPythonScript = convertPythonScriptPlaceholders(rawPythonScript);
- } catch (StringIndexOutOfBoundsException e) {
- log.error("setShareVar field format error, raw python script :
{}", rawPythonScript);
- }
- return rawPythonScript;
- }
-
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
@@ -137,38 +126,6 @@ public class PythonTask extends AbstractTask {
return pythonParameters;
}
- /**
- * convertPythonScriptPlaceholders
- *
- * @param rawScript rawScript
- * @return String
- * @throws StringIndexOutOfBoundsException if substring index is out of
bounds
- */
- private static String convertPythonScriptPlaceholders(String rawScript)
throws StringIndexOutOfBoundsException {
- int len = "${setShareVar(${".length();
- int scriptStart = 0;
- while ((scriptStart = rawScript.indexOf("${setShareVar(${",
scriptStart)) != -1) {
- int start = -1;
- int end = rawScript.indexOf('}', scriptStart + len);
- String prop = rawScript.substring(scriptStart + len, end);
-
- start = rawScript.indexOf(',', end);
- end = rawScript.indexOf(')', start);
-
- String value = rawScript.substring(start + 1, end);
-
- start = rawScript.indexOf('}', start) + 1;
- end = rawScript.length();
-
- String replaceScript =
String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
-
- rawScript = rawScript.substring(0, scriptStart) + replaceScript +
rawScript.substring(start, end);
-
- scriptStart += replaceScript.length();
- }
- return rawScript;
- }
-
/**
* create python command file if not exists
*
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 9afc4708c7..6421dc7490 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -186,6 +186,7 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
* @param processId
*/
protected boolean killProcess(String tenantCode, Integer processId) {
+ // todo: directly interrupt the process
boolean processFlag = true;
if (processId == null || processId.equals(0)) {
return true;