lanxing2 opened a new issue, #16268: URL: https://github.com/apache/dolphinscheduler/issues/16268
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and found no similar issues. ### What happened DolphinScheduler Version 3.2.1 When stop a Process with Flink Task in CLUSTER Mode, dolphinscheduler will kill the flink job yarn application first. YarnApplicationManager.execYarnKillCommand will be invoke, and the Yarn Kill Command Will failed with error cannot find command yarn ``` [ERROR] 2024-07-03 10:16:53.690 +0800 - Kill yarn application [[application_1714114694986_0041]] failed org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令 at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205) at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118) at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125) at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103) at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86) at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345) at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334) at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89) at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48) at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192) at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100) at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69) at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41) at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [ERROR] 2024-07-03 10:16:53.691 +0800 - Cancel application failed: /tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6: yarn:未找到命令 ``` The root cause is that the shell file is executed by sh not bash https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java#L69-L90 sh do not load /etc/profile automatically for the PATH, so sh cannot find yarn command Need add "source /etc/profile" to load the PATH and execute yarn command Change code like following ``` java private void execYarnKillCommand(String tenantCode, String commandFile, String cmd) throws Exception { StringBuilder sb = new StringBuilder(); sb.append("#!/bin/sh\n"); sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); sb.append("cd $BASEDIR\n"); sb.append("source /etc/profile\n"); sb.append("\n\n"); sb.append(cmd); File f = new File(commandFile); if (!f.exists()) { org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8); } String runCmd = String.format("%s %s", Constants.SH, commandFile); runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd); log.info("kill cmd:{}", runCmd); org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd); } ``` After make this change, the YarnApplicationManager.execYarnKillCommand can kill the yarn process sucessfully when stop the Flink Task However, there are still error in logs ``` [ERROR] 2024-07-03 15:10:04.875 +0800 - Kill yarn application [[application_1714114694986_0057]] failed org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032 2024-07-03 15:10:04,863 INFO impl.YarnClientImpl: Killed application application_1714114694986_0057 at org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205) at org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118) at org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125) at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103) at org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86) at org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345) at org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334) at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89) at org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48) at org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192) at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100) at org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69) at org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41) at org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [ERROR] 2024-07-03 15:10:04.876 +0800 - Cancel application failed: 2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032 ``` I tried start another Flink Task, create the kill command locally and run the command. The command success with output Stream ``` 2024-07-03 15:37:18,381 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node1/172.0.107.57:8032 Killing application application_1714114694986_0059 2024-07-03 15:37:18,883 INFO impl.YarnClientImpl: Killed application application_1714114694986_0059 ``` I am not sure why the AbstractShell do not treat this like a successful execution and put the INFO into error stream ### What you expected to happen 1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton Successful without any error 2.AbstractYarnTask keep tracking the Yarn Applicaiton status, if the Yarn Application is still running, the task is in executing state. ### How to reproduce Currently, Flink Task has not implement tracking Yarn Application Status. If you run the tasks in CLUSTER Mode, after submit the job to Yarn, the task will success and finished. If you want to acutally stop the Flink Job, you need go to the Yarn Application UI to stop the flink Job However I want to track the yarn application status and end the task from dolphinscheduler, because we do not want to expose our Yarn Application WebUI to our users. I add following code in FlinkTask to monitor the Yarn Application Status ``` java @Override public void handle(TaskCallBack taskCallBack) throws TaskException { super.handle(taskCallBack); if (FlinkDeployMode.CLUSTER.equals(flinkParameters.getDeployMode()) || FlinkDeployMode.APPLICATION.equals(flinkParameters.getDeployMode())) { trackApplicationStatus(); } } @Override public void trackApplicationStatus() throws TaskException { log.info("Flink Task Yarn Application Id is " + appIds); YarnClient yarnClient = YarnClient.createYarnClient(); try { initialYarnClient(yarnClient); String[] splitAppIds = appIds.split("_"); ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(splitAppIds[1]), Integer.parseInt(splitAppIds[2])); boolean yarnRunningFlag = true; while (yarnRunningFlag) { ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); YarnApplicationState appState = appReport.getYarnApplicationState(); log.info("Yarn Application State is " + appState); if (YarnApplicationState.FAILED.equals(appState)) { yarnRunningFlag = false; setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); } else if (YarnApplicationState.FINISHED.equals(appState) || YarnApplicationState.KILLED.equals(appState)) { yarnRunningFlag = false; } Thread.sleep(FlinkConstants.FLINK_YARN_TRACKING_SLEEP_MILLIS); } } catch (YarnException | IOException | NullPointerException e) { log.error("Failed to track application status", e); throw new RuntimeException("Failed to track application status"); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); log.info("The current yarn task has been interrupted", ex); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); throw new TaskException("The current yarn task has been interrupted", ex); } finally { try { // Stop YarnClient yarnClient.stop(); // Close YarnClient yarnClient.close(); } catch (IOException e) { log.error("Close Yarn Client Failed!", e); } } } private void initialYarnClient(YarnClient yarnClient) throws MalformedURLException { YarnConfiguration conf = new YarnConfiguration(); conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/hdfs-site.xml")).toURI().toURL()); conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/core-site.xml")).toURI().toURL()); conf.addResource(new File(System.getenv("HADOOP_CONF_DIR").concat("/yarn-site.xml")).toURI().toURL()); yarnClient.init(conf); yarnClient.start(); } ``` After add this code, the Process with Flink Task will keep in EXECUTE state, and when you can stop the process, dolphinsheduler will try to kill the flink yarn application by command during stop the task. ### Anything else _No response_ ### Version 3.2.x ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
