This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
new c4c943277c [3.0.1-prepare]cherry-pick [Bug] [Worker] Optimize the
getAppId method to avoid worker OOM when kill task (#11994)
c4c943277c is described below
commit c4c943277c82ef944c912d4a0dfc493b42586d25
Author: Kerwin <[email protected]>
AuthorDate: Sun Sep 18 14:23:35 2022 +0800
[3.0.1-prepare]cherry-pick [Bug] [Worker] Optimize the getAppId method to
avoid worker OOM when kill task (#11994)
* cherry-pick [Bug] [Worker] Optimize the getAppId method to avoid worker
OOM when kill task
Co-authored-by: Wenjun Ruan <[email protected]>
---
.../dolphinscheduler/common/utils/LoggerUtils.java | 47 ++++----------
.../server/log/LoggerRequestProcessor.java | 12 ++++
.../remote/command/CommandType.java | 27 +-------
.../remote/command/log/GetAppIdRequestCommand.java | 44 +++++++++++++
.../command/log/GetAppIdResponseCommand.java | 44 +++++++++++++
.../dolphinscheduler/rpc/remote/NettyClient.java | 2 +-
.../server/utils/ProcessUtils.java | 33 ++++------
.../server/utils/ProcessUtilsTest.java | 3 +-
.../service/log/LogClientService.java | 31 ++++++++-
.../plugin/task/api/AbstractCommandExecutor.java | 22 +++----
.../plugin/task/api/TaskConstants.java | 2 +-
.../plugin/task/api/utils/LogUtils.java | 75 ++++++++++++++++++++++
.../plugin/task/api/utils/LogUtilsTest.java | 36 +++++++++++
.../src/test/resources/appId.txt | 29 +++++++++
.../server/worker/config/WorkerConfig.java | 2 +-
.../server/worker/processor/TaskKillProcessor.java | 59 +++++++++--------
.../server/worker/rpc/WorkerRpcServer.java | 1 +
17 files changed, 341 insertions(+), 128 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 8c749c53fc..231d84877b 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -17,24 +17,29 @@
package org.apache.dolphinscheduler.common.utils;
+import lombok.experimental.UtilityClass;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import java.io.BufferedReader;
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import lombok.experimental.UtilityClass;
+import java.util.stream.Stream;
/**
* logger utils
@@ -44,11 +49,6 @@ public class LoggerUtils {
private static final Logger logger =
LoggerFactory.getLogger(LoggerUtils.class);
- /**
- * rules for extracting application ID
- */
- private static final Pattern APPLICATION_REGEX =
Pattern.compile(Constants.APPLICATION_REGEX);
-
/**
* build job id
*
@@ -65,31 +65,6 @@ public class LoggerUtils {
TaskConstants.TASK_APPID_LOG_FORMAT,
TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, processDefineCode,
processDefineVersion, processInstId, taskId);
}
- /**
- * processing log
- * get yarn application id list
- *
- * @param log log content
- * @param logger logger
- * @return app id list
- */
- public static List<String> getAppIds(String log, Logger logger) {
-
- List<String> appIds = new ArrayList<>();
-
- Matcher matcher = APPLICATION_REGEX.matcher(log);
-
- // analyse logs to get all submit yarn application id
- while (matcher.find()) {
- String appId = matcher.group();
- if (!appIds.contains(appId)) {
- logger.info("find app id: {}", appId);
- appIds.add(appId);
- }
- }
- return appIds;
- }
-
/**
* read whole file content
*
diff --git
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index b5cae3562d..ea60fe918c 100644
---
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand;
import
org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
import
org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
@@ -157,6 +160,15 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
RemoveTaskLogResponseCommand removeTaskLogResponse = new
RemoveTaskLogResponseCommand(status);
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
break;
+ case GET_APP_ID_REQUEST:
+ GetAppIdRequestCommand getAppIdRequestCommand =
JSONUtils.parseObject(command.getBody(), GetAppIdRequestCommand.class);
+ String logPath = getAppIdRequestCommand.getLogPath();
+ if (!checkPathSecurity(logPath)) {
+ throw new IllegalArgumentException("Illegal path");
+ }
+ List<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
+ channel.writeAndFlush(new
GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque()));
+ break;
default:
throw new IllegalArgumentException("unknown commandType: " +
commandType);
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 62bc7a53fc..2c8b9a8bdb 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -19,44 +19,23 @@ package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
- /**
- * remove task log request,
- */
+ GET_APP_ID_REQUEST,
+ GET_APP_ID_RESPONSE,
+
REMOVE_TAK_LOG_REQUEST,
- /**
- * remove task log response
- */
REMOVE_TAK_LOG_RESPONSE,
- /**
- * roll view log request
- */
ROLL_VIEW_LOG_REQUEST,
- /**
- * roll view log response
- */
ROLL_VIEW_LOG_RESPONSE,
- /**
- * view whole log request
- */
VIEW_WHOLE_LOG_REQUEST,
- /**
- * view whole log response
- */
VIEW_WHOLE_LOG_RESPONSE,
- /**
- * get log bytes request
- */
GET_LOG_BYTES_REQUEST,
- /**
- * get log bytes response
- */
GET_LOG_BYTES_RESPONSE,
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java
new file mode 100644
index 0000000000..26412b8283
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetAppIdRequestCommand implements Serializable {
+
+ private String logPath;
+
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.GET_APP_ID_REQUEST);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java
new file mode 100644
index 0000000000..2e54a9008d
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetAppIdResponseCommand implements Serializable {
+
+ private List<String> appIds;
+
+ public Command convert2Command(long opaque) {
+ Command command = new Command(opaque);
+ command.setType(CommandType.GET_APP_ID_RESPONSE);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
index 9206da6ac2..9b8930d139 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
@@ -93,7 +93,7 @@ public class NettyClient {
/**
* channels
*/
- private final ConcurrentHashMap<Host, Channel> channels = new
ConcurrentHashMap(128);
+ private final ConcurrentHashMap<Host, Channel> channels = new
ConcurrentHashMap<>(128);
/**
* get channel
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 4c5ed4a3bc..26cad39534 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -17,11 +17,14 @@
package org.apache.dolphinscheduler.server.utils;
+import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -29,9 +32,8 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.SystemUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -41,11 +43,6 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.NonNull;
-
/**
* mainly used to get the start command line of a process.
*/
@@ -190,12 +187,12 @@ public class ProcessUtils {
}
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- String log;
+ List<String> appIds;
try (LogClientService logClient = new LogClientService()) {
Host host = Host.of(taskExecutionContext.getHost());
- log = logClient.viewLog(host.getIp(), host.getPort(),
taskExecutionContext.getLogPath());
+ appIds = logClient.getAppIds(host.getIp(), host.getPort(),
taskExecutionContext.getLogPath());
}
- if (!StringUtils.isEmpty(log)) {
+ if (CollectionUtils.isNotEmpty(appIds)) {
if
(StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext.setExecutePath(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
@@ -204,15 +201,13 @@ public class ProcessUtils {
taskExecutionContext.getTaskInstanceId()));
}
FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath());
- List<String> appIds = LoggerUtils.getAppIds(log, logger);
- if (CollectionUtils.isNotEmpty(appIds)) {
- cancelApplication(appIds, logger,
taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
- return appIds;
- }
+ cancelApplication(appIds, logger,
taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+ return appIds;
+ } else {
+ logger.info("The current appId is empty, don't need to kill
the yarn job, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId());
}
-
} catch (Exception e) {
- logger.error("kill yarn job failure", e);
+ logger.error("Kill yarn job failure, taskInstanceId: {}",
taskExecutionContext.getTaskInstanceId(), e);
}
return Collections.emptyList();
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
index a6243198b7..18527abd42 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
@@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.utils;
import static org.powermock.api.mockito.PowerMockito.when;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.commons.lang.SystemUtils;
-
import java.util.ArrayList;
import java.util.List;
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index f682780ef7..3088724ed4 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.service.log;
+import lombok.NonNull;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand;
import
org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
import
org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
@@ -31,11 +35,14 @@ import
org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand
import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.util.List;
+
/**
* log client
*/
@@ -195,6 +202,28 @@ public class LogClientService implements AutoCloseable {
return result;
}
+ public @Nullable List<String> getAppIds(@NonNull String host, int port,
@NonNull String taskLogFilePath) throws RemotingException, InterruptedException
{
+ logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}",
host, port, taskLogFilePath);
+ final Host workerAddress = new Host(host, port);
+ List<String> appIds = null;
+ try {
+ if (NetUtils.getHost().equals(host)) {
+ appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath);
+ } else {
+ final Command command = new
GetAppIdRequestCommand(taskLogFilePath).convert2Command();
+ Command response = this.client.sendSync(workerAddress,
command, LOG_REQUEST_TIMEOUT);
+ if (response != null) {
+ GetAppIdResponseCommand responseCommand =
JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
+ appIds = responseCommand.getAppIds();
+ }
+ }
+ } finally {
+ client.closeChannel(workerAddress);
+ }
+ logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}",
appIds, host, port, taskLogFilePath);
+ return appIds;
+ }
+
public boolean isRunning() {
return isRunning;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index a2401e30fe..5faa07bba8 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -17,18 +17,14 @@
package org.apache.dolphinscheduler.plugin.task.api;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
+import org.slf4j.Logger;
+
+import java.io.*;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -44,9 +40,8 @@ import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.slf4j.Logger;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
/**
* abstract command executor
@@ -55,8 +50,8 @@ public abstract class AbstractCommandExecutor {
/**
* rules for extracting application ID
*/
- protected static final Pattern APPLICATION_REGEX =
Pattern.compile(TaskConstants.APPLICATION_REGEX);
-
+ protected static final Pattern APPLICATION_REGEX =
Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
+
/**
* rules for extracting Var Pool
*/
@@ -406,6 +401,7 @@ public abstract class AbstractCommandExecutor {
/**
* find var pool
+ *
* @param line
* @return
*/
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index bb00b6257b..d087773d64 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -26,7 +26,7 @@ public class TaskConstants {
throw new IllegalStateException("Utility class");
}
- public static final String APPLICATION_REGEX = "application_\\d+_\\d+";
+ public static final String YARN_APPLICATION_REGEX =
"application_\\d+_\\d+";
public static final String SETVALUE_REGEX =
"[\\$#]\\{setValue\\(([^)]*)\\)}";
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
new file mode 100644
index 0000000000..220d2583d2
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+@Slf4j
+@UtilityClass
+public class LogUtils {
+
+ private static final Pattern APPLICATION_REGEX =
Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
+
+ public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
+ return getAppIdsFromLogFile(logPath, log);
+ }
+
+ public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger
logger) {
+ File logFile = new File(logPath);
+ if (!logFile.exists() || !logFile.isFile()) {
+ return Collections.emptyList();
+ }
+ Set<String> appIds = new HashSet<>();
+ try (Stream<String> stream = Files.lines(Paths.get(logPath))) {
+ stream.filter(line -> {
+ Matcher matcher = APPLICATION_REGEX.matcher(line);
+ return matcher.find();
+ }
+ ).forEach(line -> {
+ Matcher matcher = APPLICATION_REGEX.matcher(line);
+ if (matcher.find()) {
+ String appId = matcher.group();
+ if (appIds.add(appId)) {
+ logger.info("Find appId: {} from {}", appId, logPath);
+ }
+ }
+ });
+ return new ArrayList<>(appIds);
+ } catch (IOException e) {
+ logger.error("Get appId from log file erro, logPath: {}", logPath,
e);
+ return Collections.emptyList();
+ }
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
new file mode 100644
index 0000000000..feb10ad859
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class LogUtilsTest {
+
+ private static final String APP_ID_FILE =
LogUtilsTest.class.getResource("/appId.txt")
+ .getFile();
+
+ @Test
+ public void getAppIdsFromLogFile() {
+ List<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
+
Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"),
appIds);
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt
new file mode 100644
index 0000000000..7de4480eea
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+19/04/02 11:40:22 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:23 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:24 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:25 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:26 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:27 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:28 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:29 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:30 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:31 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:32 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: RUNNING)
+19/04/02 11:40:33 INFO yarn.Client: Application report for
application_1548381669007_1234 (state: RUNNING)
\ No newline at end of file
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 2367975715..4e1274456e 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -68,7 +68,7 @@ public class WorkerConfig implements Validator {
if (workerConfig.getExecThreads() <= 0) {
errors.rejectValue("exec-threads", null, "should be a positive
value");
}
- if (workerConfig.getHeartbeatInterval().toMillis() <= 0) {
+ if (workerConfig.getHeartbeatInterval().getSeconds() <= 0) {
errors.rejectValue("heartbeat-interval", null, "shoule be a valid
duration");
}
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index eece06ce09..1006b4cb23 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,8 +17,14 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -49,12 +55,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.base.Preconditions;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-
/**
* task kill processor
*/
@@ -140,11 +140,6 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
});
}
- /**
- * do kill
- *
- * @return kill result
- */
private Pair<Boolean, List<String>> doKill(TaskExecutionContext
taskExecutionContext) {
// kill system process
boolean processFlag =
killProcess(taskExecutionContext.getTenantCode(),
taskExecutionContext.getProcessId());
@@ -207,29 +202,33 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
/**
* kill yarn job
*
- * @param host host
- * @param logPath logPath
+ * @param host host
+ * @param logPath logPath
* @param executePath executePath
- * @param tenantCode tenantCode
+ * @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result
*/
- private Pair<Boolean, List<String>> killYarnJob(Host host, String logPath,
String executePath, String tenantCode) {
- try (LogClientService logClient = new LogClientService();) {
- logger.info("log host : {} , logPath : {} , port : {}",
host.getIp(), logPath,
- host.getPort());
- String log = logClient.viewLog(host.getIp(), host.getPort(),
logPath);
- List<String> appIds = Collections.emptyList();
- if (!StringUtils.isEmpty(log)) {
- appIds = LoggerUtils.getAppIds(log, logger);
- if (StringUtils.isEmpty(executePath)) {
- logger.error("task instance execute path is empty");
- throw new RuntimeException("task instance execute path is
empty");
- }
- if (appIds.size() > 0) {
- ProcessUtils.cancelApplication(appIds, logger, tenantCode,
executePath);
- }
+ private Pair<Boolean, List<String>> killYarnJob(@NonNull Host host,
+ String logPath,
+ String executePath,
+ String tenantCode) {
+ if (logPath == null || executePath == null || tenantCode == null) {
+ logger.error("Kill yarn job error, the input params is illegal,
host: {}, logPath: {}, executePath: {}, tenantCode: {}",
+ host, logPath, executePath, tenantCode);
+ return Pair.of(false, Collections.emptyList());
+ }
+ try (LogClientService logClient = new LogClientService()) {
+ logger.info("Get appIds from worker {}:{} taskLogPath: {}",
host.getIp(), host.getPort(), logPath);
+ List<String> appIds = logClient.getAppIds(host.getIp(),
host.getPort(), logPath);
+ if (CollectionUtils.isEmpty(appIds)) {
+ return Pair.of(true, Collections.emptyList());
}
+
+ ProcessUtils.cancelApplication(appIds, logger, tenantCode,
executePath);
return Pair.of(true, appIds);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("kill yarn job error, the current thread has been
interrtpted", e);
} catch (Exception e) {
logger.error("kill yarn job error", e);
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 8fcf6966f8..dafd089975 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -74,6 +74,7 @@ public class WorkerRpcServer implements Closeable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK,
taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
hostUpdateProcessor);
// logger server
+
this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST,
loggerRequestProcessor);