This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
     new c4c943277c [3.0.1-prepare]cherry-pick [Bug] [Worker] Optimize the 
getAppId method to avoid worker OOM when kill task (#11994)
c4c943277c is described below

commit c4c943277c82ef944c912d4a0dfc493b42586d25
Author: Kerwin <[email protected]>
AuthorDate: Sun Sep 18 14:23:35 2022 +0800

    [3.0.1-prepare]cherry-pick [Bug] [Worker] Optimize the getAppId method to 
avoid worker OOM when kill task (#11994)
    
    * cherry-pick [Bug] [Worker] Optimize the getAppId method to avoid worker 
OOM when kill task
    Co-authored-by: Wenjun Ruan <[email protected]>
---
 .../dolphinscheduler/common/utils/LoggerUtils.java | 47 ++++----------
 .../server/log/LoggerRequestProcessor.java         | 12 ++++
 .../remote/command/CommandType.java                | 27 +-------
 .../remote/command/log/GetAppIdRequestCommand.java | 44 +++++++++++++
 .../command/log/GetAppIdResponseCommand.java       | 44 +++++++++++++
 .../dolphinscheduler/rpc/remote/NettyClient.java   |  2 +-
 .../server/utils/ProcessUtils.java                 | 33 ++++------
 .../server/utils/ProcessUtilsTest.java             |  3 +-
 .../service/log/LogClientService.java              | 31 ++++++++-
 .../plugin/task/api/AbstractCommandExecutor.java   | 22 +++----
 .../plugin/task/api/TaskConstants.java             |  2 +-
 .../plugin/task/api/utils/LogUtils.java            | 75 ++++++++++++++++++++++
 .../plugin/task/api/utils/LogUtilsTest.java        | 36 +++++++++++
 .../src/test/resources/appId.txt                   | 29 +++++++++
 .../server/worker/config/WorkerConfig.java         |  2 +-
 .../server/worker/processor/TaskKillProcessor.java | 59 +++++++++--------
 .../server/worker/rpc/WorkerRpcServer.java         |  1 +
 17 files changed, 341 insertions(+), 128 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 8c749c53fc..231d84877b 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -17,24 +17,29 @@
 
 package org.apache.dolphinscheduler.common.utils;
 
+import lombok.experimental.UtilityClass;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import lombok.experimental.UtilityClass;
+import java.util.stream.Stream;
 
 /**
  * logger utils
@@ -44,11 +49,6 @@ public class LoggerUtils {
 
     private static final Logger logger = 
LoggerFactory.getLogger(LoggerUtils.class);
 
-    /**
-     * rules for extracting application ID
-     */
-    private static final Pattern APPLICATION_REGEX = 
Pattern.compile(Constants.APPLICATION_REGEX);
-
     /**
      * build job id
      *
@@ -65,31 +65,6 @@ public class LoggerUtils {
                 TaskConstants.TASK_APPID_LOG_FORMAT, 
TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, processDefineCode, 
processDefineVersion, processInstId, taskId);
     }
 
-    /**
-     * processing log
-     * get yarn application id list
-     *
-     * @param log log content
-     * @param logger logger
-     * @return app id list
-     */
-    public static List<String> getAppIds(String log, Logger logger) {
-
-        List<String> appIds = new ArrayList<>();
-
-        Matcher matcher = APPLICATION_REGEX.matcher(log);
-
-        // analyse logs to get all submit yarn application id
-        while (matcher.find()) {
-            String appId = matcher.group();
-            if (!appIds.contains(appId)) {
-                logger.info("find app id: {}", appId);
-                appIds.add(appId);
-            }
-        }
-        return appIds;
-    }
-
     /**
      * read whole file content
      *
diff --git 
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
 
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index b5cae3562d..ea60fe918c 100644
--- 
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ 
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.log;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand;
 import 
org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
 import 
org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
 import 
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
@@ -157,6 +160,15 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
                 RemoveTaskLogResponseCommand removeTaskLogResponse = new 
RemoveTaskLogResponseCommand(status);
                 
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
                 break;
+            case GET_APP_ID_REQUEST:
+                GetAppIdRequestCommand getAppIdRequestCommand = 
JSONUtils.parseObject(command.getBody(), GetAppIdRequestCommand.class);
+                String logPath = getAppIdRequestCommand.getLogPath();
+                if (!checkPathSecurity(logPath)) {
+                    throw new IllegalArgumentException("Illegal path");
+                }
+                List<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
+                channel.writeAndFlush(new 
GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque()));
+                break;
             default:
                 throw new IllegalArgumentException("unknown commandType: " + 
commandType);
         }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 62bc7a53fc..2c8b9a8bdb 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -19,44 +19,23 @@ package org.apache.dolphinscheduler.remote.command;
 
 public enum CommandType {
 
-    /**
-     * remove task log request,
-     */
+    GET_APP_ID_REQUEST,
+    GET_APP_ID_RESPONSE,
+
     REMOVE_TAK_LOG_REQUEST,
 
-    /**
-     * remove task log response
-     */
     REMOVE_TAK_LOG_RESPONSE,
 
-    /**
-     * roll view log request
-     */
     ROLL_VIEW_LOG_REQUEST,
 
-    /**
-     * roll view log response
-     */
     ROLL_VIEW_LOG_RESPONSE,
 
-    /**
-     * view whole log request
-     */
     VIEW_WHOLE_LOG_REQUEST,
 
-    /**
-     * view whole log response
-     */
     VIEW_WHOLE_LOG_RESPONSE,
 
-    /**
-     * get log bytes request
-     */
     GET_LOG_BYTES_REQUEST,
 
-    /**
-     * get log bytes response
-     */
     GET_LOG_BYTES_RESPONSE,
 
 
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java
new file mode 100644
index 0000000000..26412b8283
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetAppIdRequestCommand implements Serializable {
+
+    private String logPath;
+
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.GET_APP_ID_REQUEST);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java
new file mode 100644
index 0000000000..2e54a9008d
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetAppIdResponseCommand implements Serializable {
+
+    private List<String> appIds;
+
+    public Command convert2Command(long opaque) {
+        Command command = new Command(opaque);
+        command.setType(CommandType.GET_APP_ID_RESPONSE);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
index 9206da6ac2..9b8930d139 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
@@ -93,7 +93,7 @@ public class NettyClient {
     /**
      * channels
      */
-    private final ConcurrentHashMap<Host, Channel> channels = new 
ConcurrentHashMap(128);
+    private final ConcurrentHashMap<Host, Channel> channels = new 
ConcurrentHashMap<>(128);
 
     /**
      * get channel
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 4c5ed4a3bc..26cad39534 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -17,11 +17,14 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
+import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -29,9 +32,8 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.SystemUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -41,11 +43,6 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.NonNull;
-
 /**
  * mainly used to get the start command line of a process.
  */
@@ -190,12 +187,12 @@ public class ProcessUtils {
         }
         try {
             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-            String log;
+            List<String> appIds;
             try (LogClientService logClient = new LogClientService()) {
                 Host host = Host.of(taskExecutionContext.getHost());
-                log = logClient.viewLog(host.getIp(), host.getPort(), 
taskExecutionContext.getLogPath());
+                appIds = logClient.getAppIds(host.getIp(), host.getPort(), 
taskExecutionContext.getLogPath());
             }
-            if (!StringUtils.isEmpty(log)) {
+            if (CollectionUtils.isNotEmpty(appIds)) {
                 if 
(StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
                     
taskExecutionContext.setExecutePath(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
                             taskExecutionContext.getProcessDefineCode(),
@@ -204,15 +201,13 @@ public class ProcessUtils {
                             taskExecutionContext.getTaskInstanceId()));
                 }
                 
FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath());
-                List<String> appIds = LoggerUtils.getAppIds(log, logger);
-                if (CollectionUtils.isNotEmpty(appIds)) {
-                    cancelApplication(appIds, logger, 
taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
-                    return appIds;
-                }
+                cancelApplication(appIds, logger, 
taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+                return appIds;
+            } else {
+                logger.info("The current appId is empty, don't need to kill 
the yarn job, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId());
             }
-
         } catch (Exception e) {
-            logger.error("kill yarn job failure", e);
+            logger.error("Kill yarn job failure, taskInstanceId: {}", 
taskExecutionContext.getTaskInstanceId(), e);
         }
         return Collections.emptyList();
     }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
index a6243198b7..18527abd42 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
@@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.utils;
 
 import static org.powermock.api.mockito.PowerMockito.when;
 
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
-import org.apache.commons.lang.SystemUtils;
-
 import java.util.ArrayList;
 import java.util.List;
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index f682780ef7..3088724ed4 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -17,11 +17,15 @@
 
 package org.apache.dolphinscheduler.service.log;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand;
 import 
org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
 import 
org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
 import 
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
@@ -31,11 +35,14 @@ import 
org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand
 import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Host;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.List;
+
 /**
  * log client
  */
@@ -195,6 +202,28 @@ public class LogClientService implements AutoCloseable {
         return result;
     }
 
+    public @Nullable List<String> getAppIds(@NonNull String host, int port, 
@NonNull String taskLogFilePath) throws RemotingException, InterruptedException 
{
+        logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", 
host, port, taskLogFilePath);
+        final Host workerAddress = new Host(host, port);
+        List<String> appIds = null;
+        try {
+            if (NetUtils.getHost().equals(host)) {
+                appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath);
+            } else {
+                final Command command = new 
GetAppIdRequestCommand(taskLogFilePath).convert2Command();
+                Command response = this.client.sendSync(workerAddress, 
command, LOG_REQUEST_TIMEOUT);
+                if (response != null) {
+                    GetAppIdResponseCommand responseCommand = 
JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
+                    appIds = responseCommand.getAppIds();
+                }
+            }
+        } finally {
+            client.closeChannel(workerAddress);
+        }
+        logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", 
appIds, host, port, taskLogFilePath);
+        return appIds;
+    }
+
     public boolean isRunning() {
         return isRunning;
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index a2401e30fe..5faa07bba8 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -17,18 +17,14 @@
 
 package org.apache.dolphinscheduler.plugin.task.api;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
+import org.slf4j.Logger;
+
+import java.io.*;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -44,9 +40,8 @@ import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.slf4j.Logger;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
 
 /**
  * abstract command executor
@@ -55,8 +50,8 @@ public abstract class AbstractCommandExecutor {
     /**
      * rules for extracting application ID
      */
-    protected static final Pattern APPLICATION_REGEX = 
Pattern.compile(TaskConstants.APPLICATION_REGEX);
-    
+    protected static final Pattern APPLICATION_REGEX = 
Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
+
     /**
      * rules for extracting Var Pool
      */
@@ -406,6 +401,7 @@ public abstract class AbstractCommandExecutor {
     
     /**
      * find var pool
+     *
      * @param line
      * @return
      */
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index bb00b6257b..d087773d64 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -26,7 +26,7 @@ public class TaskConstants {
         throw new IllegalStateException("Utility class");
     }
 
-    public static final String APPLICATION_REGEX = "application_\\d+_\\d+";
+    public static final String YARN_APPLICATION_REGEX = 
"application_\\d+_\\d+";
 
     public static final String SETVALUE_REGEX = 
"[\\$#]\\{setValue\\(([^)]*)\\)}";
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
new file mode 100644
index 0000000000..220d2583d2
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+@Slf4j
+@UtilityClass
+public class LogUtils {
+
+    private static final Pattern APPLICATION_REGEX = 
Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
+
+    public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
+        return getAppIdsFromLogFile(logPath, log);
+    }
+
+    public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger 
logger) {
+        File logFile = new File(logPath);
+        if (!logFile.exists() || !logFile.isFile()) {
+            return Collections.emptyList();
+        }
+        Set<String> appIds = new HashSet<>();
+        try (Stream<String> stream = Files.lines(Paths.get(logPath))) {
+            stream.filter(line -> {
+                        Matcher matcher = APPLICATION_REGEX.matcher(line);
+                        return matcher.find();
+                    }
+            ).forEach(line -> {
+                Matcher matcher = APPLICATION_REGEX.matcher(line);
+                if (matcher.find()) {
+                    String appId = matcher.group();
+                    if (appIds.add(appId)) {
+                        logger.info("Find appId: {} from {}", appId, logPath);
+                    }
+                }
+            });
+            return new ArrayList<>(appIds);
+        } catch (IOException e) {
+            logger.error("Get appId from log file erro, logPath: {}", logPath, 
e);
+            return Collections.emptyList();
+        }
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
new file mode 100644
index 0000000000..feb10ad859
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class LogUtilsTest {
+
+    private static final String APP_ID_FILE = 
LogUtilsTest.class.getResource("/appId.txt")
+            .getFile();
+
+    @Test
+    public void getAppIdsFromLogFile() {
+        List<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
+        
Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), 
appIds);
+    }
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt
new file mode 100644
index 0000000000..7de4480eea
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+19/04/02 11:40:22 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:23 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:24 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:25 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:26 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:27 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:28 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:29 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:30 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:31 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: ACCEPTED)
+19/04/02 11:40:32 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: RUNNING)
+19/04/02 11:40:33 INFO yarn.Client: Application report for 
application_1548381669007_1234 (state: RUNNING)
\ No newline at end of file
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 2367975715..4e1274456e 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -68,7 +68,7 @@ public class WorkerConfig implements Validator {
         if (workerConfig.getExecThreads() <= 0) {
             errors.rejectValue("exec-threads", null, "should be a positive 
value");
         }
-        if (workerConfig.getHeartbeatInterval().toMillis() <= 0) {
+        if (workerConfig.getHeartbeatInterval().getSeconds() <= 0) {
             errors.rejectValue("heartbeat-interval", null, "shoule be a valid 
duration");
         }
         if (workerConfig.getMaxCpuLoadAvg() <= 0) {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index eece06ce09..1006b4cb23 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,8 +17,14 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -49,12 +55,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.google.common.base.Preconditions;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-
 /**
  * task kill processor
  */
@@ -140,11 +140,6 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
         });
     }
 
-    /**
-     * do kill
-     *
-     * @return kill result
-     */
     private Pair<Boolean, List<String>> doKill(TaskExecutionContext 
taskExecutionContext) {
         // kill system process
         boolean processFlag = 
killProcess(taskExecutionContext.getTenantCode(), 
taskExecutionContext.getProcessId());
@@ -207,29 +202,33 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
     /**
      * kill yarn job
      *
-     * @param host host
-     * @param logPath logPath
+     * @param host        host
+     * @param logPath     logPath
      * @param executePath executePath
-     * @param tenantCode tenantCode
+     * @param tenantCode  tenantCode
      * @return Pair<Boolean, List < String>> yarn kill result
      */
-    private Pair<Boolean, List<String>> killYarnJob(Host host, String logPath, 
String executePath, String tenantCode) {
-        try (LogClientService logClient = new LogClientService();) {
-            logger.info("log host : {} , logPath : {} , port : {}", 
host.getIp(), logPath,
-                    host.getPort());
-            String log = logClient.viewLog(host.getIp(), host.getPort(), 
logPath);
-            List<String> appIds = Collections.emptyList();
-            if (!StringUtils.isEmpty(log)) {
-                appIds = LoggerUtils.getAppIds(log, logger);
-                if (StringUtils.isEmpty(executePath)) {
-                    logger.error("task instance execute path is empty");
-                    throw new RuntimeException("task instance execute path is 
empty");
-                }
-                if (appIds.size() > 0) {
-                    ProcessUtils.cancelApplication(appIds, logger, tenantCode, 
executePath);
-                }
+    private Pair<Boolean, List<String>> killYarnJob(@NonNull Host host,
+                                                    String logPath,
+                                                    String executePath,
+                                                    String tenantCode) {
+        if (logPath == null || executePath == null || tenantCode == null) {
+            logger.error("Kill yarn job error, the input params is illegal, 
host: {}, logPath: {}, executePath: {}, tenantCode: {}",
+                    host, logPath, executePath, tenantCode);
+            return Pair.of(false, Collections.emptyList());
+        }
+        try (LogClientService logClient = new LogClientService()) {
+            logger.info("Get appIds from worker {}:{} taskLogPath: {}", 
host.getIp(), host.getPort(), logPath);
+            List<String> appIds = logClient.getAppIds(host.getIp(), 
host.getPort(), logPath);
+            if (CollectionUtils.isEmpty(appIds)) {
+                return Pair.of(true, Collections.emptyList());
             }
+
+            ProcessUtils.cancelApplication(appIds, logger, tenantCode, 
executePath);
             return Pair.of(true, appIds);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("kill yarn job error, the current thread has been 
interrtpted", e);
         } catch (Exception e) {
             logger.error("kill yarn job error", e);
         }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 8fcf6966f8..dafd089975 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -74,6 +74,7 @@ public class WorkerRpcServer implements Closeable {
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, 
taskExecuteResultAckProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
 hostUpdateProcessor);
         // logger server
+        
this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, 
loggerRequestProcessor);


Reply via email to