This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new a7e6ecec8 Engineplugin shell Code optimization (#4499)
a7e6ecec8 is described below
commit a7e6ecec8c436ee3798643d4a9ebbca98decf0a0
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon May 1 00:02:52 2023 +0800
Engineplugin shell Code optimization (#4499)
---
.../shell/conf/ShellEngineConnConf.java | 2 +
.../ShellEngineConnConcurrentExecutor.java | 295 +++------------------
.../shell/executor/ShellEngineConnExecutor.java | 113 ++++----
.../resources/conf/linkis-engineconn.properties | 3 +-
.../executor/TestShellEngineConnExecutor.java | 1 +
5 files changed, 110 insertions(+), 304 deletions(-)
diff --git
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
index 3849bc1ee..9c8bccd8a 100644
---
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
+++
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
@@ -25,4 +25,6 @@ public class ShellEngineConnConf {
public static final int LOG_SERVICE_MAX_THREAD_SIZE =
CommonVars.apply("linkis.engineconn.shell.log.max.thread.size",
50).getValue();
+ public static final int SHELL_ENGINECONN_OUTPUT_PRINT_LIMIT =
+ CommonVars.apply("wds.linkis.engineconn.shell.output.print.limit",
1000).getValue();
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
index 3f445ddd5..b426a22eb 100644
---
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
+++
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
@@ -17,286 +17,91 @@
package org.apache.linkis.manager.engineplugin.shell.executor;
-import org.apache.linkis.common.utils.Utils;
import
org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
-import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.governance.common.utils.GovernanceUtils;
-import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
-import
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf;
-import
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.protocol.engine.JobProgressInfo;
-import org.apache.linkis.rpc.Sender;
-import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
-import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.InputStreamReader;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.concurrent.ExecutionContextExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ShellEngineConnConcurrentExecutor extends
ConcurrentComputationExecutor {
+
private static final Logger logger =
LoggerFactory.getLogger(ShellEngineConnConcurrentExecutor.class);
- private EngineExecutionContext engineExecutionContext;
+ private ShellEngineConnExecutor shellEngineConnExecutor;
- private List<Label<?>> executorLabels = new ArrayList<>();
-
- private Map<String, ShellECTaskInfo> shellECTaskInfoCache = new
ConcurrentHashMap<>();
-
- private int id;
private int maxRunningNumber;
public ShellEngineConnConcurrentExecutor(int id, int maxRunningNumber) {
- super(maxRunningNumber);
- this.id = id;
+ super(ShellEngineConnConf.SHELL_ENGINECONN_OUTPUT_PRINT_LIMIT);
+ this.shellEngineConnExecutor = new ShellEngineConnExecutor(id);
this.maxRunningNumber = maxRunningNumber;
}
- private final ExecutionContextExecutorService logAsyncService =
- Utils.newCachedExecutionContext(
- ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE,
"ShelLogService-Thread-", true);
-
@Override
- public void init() {
- logger.info("Ready to change engine state!");
- super.init();
+ public ExecuteResponse executeLine(EngineExecutionContext
engineExecutorContext, String code) {
+ return shellEngineConnExecutor.executeLine(engineExecutorContext, code);
}
@Override
public ExecuteResponse executeCompletely(
- EngineExecutionContext engineExecutionContext, String code, String
completedLine) {
- String newcode = completedLine + code;
- logger.debug("newcode is " + newcode);
- return executeLine(engineExecutionContext, newcode);
- }
-
- @Override
- public ExecuteResponse executeLine(EngineExecutionContext
engineExecutionContext, String code) {
- if (engineExecutionContext != null) {
- this.engineExecutionContext = engineExecutionContext;
- logger.info("Shell executor reset new engineExecutionContext!");
- }
-
- if (engineExecutionContext.getJobId().isEmpty()) {
- return new ErrorExecuteResponse("taskID is null", null);
- }
-
- String taskId = engineExecutionContext.getJobId().get();
- BufferedReader bufferedReader = null;
- BufferedReader errorsReader = null;
-
- AtomicBoolean completed = new AtomicBoolean(false);
- ReaderThread errReaderThread = null;
- ReaderThread inputReaderThread = null;
-
- try {
- engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
-
- String[] argsArr;
- if (engineExecutionContext.getTotalParagraph() == 1
- && engineExecutionContext.getProperties() != null
- && engineExecutionContext
- .getProperties()
- .containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
- ArrayList<String> argsList =
- (ArrayList<String>)
- engineExecutionContext
- .getProperties()
- .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
- argsArr = argsList.toArray(new String[argsList.size()]);
- logger.info(
- "Will execute shell task with user-specified arguments: '{}'",
- StringUtils.join(argsArr, "' '"));
- } else {
- argsArr = null;
- }
-
- String workingDirectory;
- if (engineExecutionContext.getTotalParagraph() == 1
- && engineExecutionContext.getProperties() != null
- && engineExecutionContext
- .getProperties()
-
.containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
- String wdStr =
- (String)
- engineExecutionContext
- .getProperties()
-
.get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
- if (isExecutePathExist(wdStr)) {
- logger.info(
- "Will execute shell task under user-specified working-directory:
'{}'", wdStr);
- workingDirectory = wdStr;
- } else {
- logger.warn(
- "User-specified working-directory: '{}' does not exist or user
does not have access permission. Will execute shell task under default
working-directory. Please contact the administrator!",
- wdStr);
- workingDirectory = null;
- }
- } else {
- workingDirectory = null;
- }
-
- String[] generatedCode =
- argsArr == null || argsArr.length == 0
- ? generateRunCode(code)
- : generateRunCodeWithArgs(code, argsArr);
-
- ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
-
- if (StringUtils.isNotBlank(workingDirectory)) {
- processBuilder.directory(new File(workingDirectory));
- }
-
- processBuilder.redirectErrorStream(false);
- YarnAppIdExtractor extractor = new YarnAppIdExtractor();
- Process process = processBuilder.start();
- bufferedReader = new BufferedReader(new
InputStreamReader(process.getInputStream()));
- errorsReader = new BufferedReader(new
InputStreamReader(process.getErrorStream()));
-
- // add task id and task Info cache
- shellECTaskInfoCache.put(taskId, new ShellECTaskInfo(taskId, process,
extractor));
-
- CountDownLatch counter = new CountDownLatch(2);
- inputReaderThread =
- new ReaderThread(engineExecutionContext, bufferedReader, extractor,
true, counter);
- errReaderThread =
- new ReaderThread(engineExecutionContext, errorsReader, extractor,
false, counter);
-
- logAsyncService.execute(inputReaderThread);
- logAsyncService.execute(errReaderThread);
-
- int exitCode = process.waitFor();
- counter.await();
-
- completed.set(true);
-
- if (exitCode != 0) {
- return new ErrorExecuteResponse("run shell failed", new
ShellCodeErrorException());
- } else {
- return new SuccessExecuteResponse();
- }
- } catch (Exception e) {
- logger.error("Execute shell code failed, reason:", e);
- return new ErrorExecuteResponse("run shell failed", e);
- } finally {
- if (errorsReader != null) {
- errReaderThread.onDestroy();
- }
- if (inputReaderThread != null) {
- inputReaderThread.onDestroy();
- }
- shellECTaskInfoCache.remove(taskId);
- }
- }
-
- private boolean isExecutePathExist(String executePath) {
- File etlHomeDir = new File(executePath);
- return (etlHomeDir.exists() && etlHomeDir.isDirectory());
- }
-
- private String[] generateRunCode(String code) {
- return new String[] {"sh", "-c", code};
- }
-
- private String[] generateRunCodeWithArgs(String code, String[] args) {
- return new String[] {
- "sh",
- "-c",
- "echo \"dummy " + StringUtils.join(args, " ") + "\" | xargs sh -c \'" +
code + "\'"
- };
+ EngineExecutionContext engineExecutorContext, String code, String
completedLine) {
+ return shellEngineConnExecutor.executeCompletely(engineExecutorContext,
code, completedLine);
}
@Override
- public String getId() {
- return Sender.getThisServiceInstance().getInstance() + "_" + id;
+ public float progress(String taskID) {
+ return shellEngineConnExecutor.progress(taskID);
}
@Override
public JobProgressInfo[] getProgressInfo(String taskID) {
- List<JobProgressInfo> jobProgressInfos = new ArrayList<>();
- if (this.engineExecutionContext == null) {
- return jobProgressInfos.toArray(new JobProgressInfo[0]);
- }
-
- String jobId =
- engineExecutionContext.getJobId().isDefined()
- ? engineExecutionContext.getJobId().get()
- : "";
- if (progress(taskID) == 0.0f) {
- jobProgressInfos.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
- } else {
- jobProgressInfos.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
- }
- return jobProgressInfos.toArray(new JobProgressInfo[0]);
- }
-
- @Override
- public float progress(String taskID) {
- if (this.engineExecutionContext != null) {
- return ((float) this.engineExecutionContext.getCurrentParagraph())
- / this.engineExecutionContext.getTotalParagraph();
- } else {
- return 0.0f;
- }
+ return shellEngineConnExecutor.getProgressInfo(taskID);
}
@Override
public boolean supportCallBackLogs() {
- // todo
- return true;
+ return shellEngineConnExecutor.supportCallBackLogs();
}
@Override
- public NodeResource requestExpectedResource(NodeResource expectedResource) {
- return null;
- }
-
- @Override
- public NodeResource getCurrentNodeResource() {
- CommonNodeResource resource = new CommonNodeResource();
- resource.setUsedResource(
- NodeResourceUtils.applyAsLoadInstanceResource(
- EngineConnObject.getEngineCreationContext().getOptions()));
- return resource;
+ public String getId() {
+ return shellEngineConnExecutor.getId();
}
@Override
- public List<Label<?>> getExecutorLabels() {
- return executorLabels;
+ public void close() {
+ try {
+ killAll();
+ shellEngineConnExecutor.logAsyncService.shutdown();
+ } catch (Exception e) {
+ logger.error("Shell ec failed to close ");
+ }
+ super.close();
}
@Override
- public void setExecutorLabels(List<Label<?>> labels) {
- if (labels != null) {
- executorLabels.clear();
- executorLabels.addAll(labels);
+ public void killAll() {
+ Iterator<ShellECTaskInfo> iterator =
+ shellEngineConnExecutor.shellECTaskInfoCache.values().iterator();
+ while (iterator.hasNext()) {
+ ShellECTaskInfo shellECTaskInfo = iterator.next();
+ killTask(shellECTaskInfo.getTaskId());
}
}
@Override
public void killTask(String taskID) {
- ShellECTaskInfo shellECTaskInfo = shellECTaskInfoCache.remove(taskID);
+ ShellECTaskInfo shellECTaskInfo =
shellEngineConnExecutor.shellECTaskInfoCache.remove(taskID);
if (shellECTaskInfo == null) {
return;
}
@@ -304,7 +109,7 @@ public class ShellEngineConnConcurrentExecutor extends
ConcurrentComputationExec
/*
Kill sub-processes
*/
- int pid = getPid(shellECTaskInfo.getProcess());
+ int pid = shellEngineConnExecutor.getPid(shellECTaskInfo.getProcess());
GovernanceUtils.killProcess(String.valueOf(pid), "kill task " + taskID + "
process", false);
/*
@@ -319,40 +124,28 @@ public class ShellEngineConnConcurrentExecutor extends
ConcurrentComputationExec
super.killTask(taskID);
}
- private int getPid(Process process) {
- try {
- Class<?> clazz = Class.forName("java.lang.UNIXProcess");
- Field field = clazz.getDeclaredField("pid");
- field.setAccessible(true);
- return field.getInt(process);
- } catch (Exception e) {
- logger.warn("Failed to acquire pid for shell process");
- return -1;
- }
+ @Override
+ public int getConcurrentLimit() {
+ return maxRunningNumber;
}
@Override
- public void close() {
- try {
- killAll();
- logAsyncService.shutdown();
- } catch (Exception e) {
- logger.error("Shell ec failed to close ");
- }
- super.close();
+ public List<Label<?>> getExecutorLabels() {
+ return shellEngineConnExecutor.getExecutorLabels();
}
@Override
- public void killAll() {
- Iterator<ShellECTaskInfo> iterator =
shellECTaskInfoCache.values().iterator();
- while (iterator.hasNext()) {
- ShellECTaskInfo shellECTaskInfo = iterator.next();
- killTask(shellECTaskInfo.getTaskId());
- }
+ public void setExecutorLabels(List<Label<?>> labels) {
+ shellEngineConnExecutor.setExecutorLabels(labels);
}
@Override
- public int getConcurrentLimit() {
- return maxRunningNumber;
+ public NodeResource requestExpectedResource(NodeResource expectedResource) {
+ return shellEngineConnExecutor.requestExpectedResource(expectedResource);
+ }
+
+ @Override
+ public NodeResource getCurrentNodeResource() {
+ return shellEngineConnExecutor.getCurrentNodeResource();
}
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
index 964417765..28e451d73 100644
---
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
+++
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
@@ -17,6 +17,7 @@
package org.apache.linkis.manager.engineplugin.shell.executor;
+import org.apache.linkis.common.utils.Utils;
import
org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor;
import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
@@ -25,6 +26,7 @@ import
org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf;
import
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.protocol.engine.JobProgressInfo;
@@ -33,7 +35,6 @@ import
org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
@@ -41,28 +42,41 @@ import java.io.File;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import scala.concurrent.ExecutionContextExecutorService;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ShellEngineConnExecutor extends ComputationExecutor {
private static final Logger logger =
LoggerFactory.getLogger(ShellEngineConnExecutor.class);
- private int id;
private EngineExecutionContext engineExecutionContext;
+
private List<Label<?>> executorLabels = new ArrayList<>();
+
+ Map<String, ShellECTaskInfo> shellECTaskInfoCache = new
ConcurrentHashMap<>();
+
+ private int id;
+
private Process process;
+
private YarnAppIdExtractor extractor;
public ShellEngineConnExecutor(int id) {
- super(id);
+ super(ShellEngineConnConf.SHELL_ENGINECONN_OUTPUT_PRINT_LIMIT);
this.id = id;
}
+ final ExecutionContextExecutorService logAsyncService =
+ Utils.newCachedExecutionContext(
+ ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE,
"ShelLogService-Thread-", true);
+
@Override
public void init() {
logger.info("Ready to change engine state!");
@@ -72,7 +86,7 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
@Override
public ExecuteResponse executeCompletely(
EngineExecutionContext engineExecutionContext, String code, String
completedLine) {
- final String newcode = completedLine + code;
+ String newcode = completedLine + code;
logger.debug("newcode is " + newcode);
return executeLine(engineExecutionContext, newcode);
}
@@ -84,6 +98,11 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
logger.info("Shell executor reset new engineExecutionContext!");
}
+ if (engineExecutionContext.getJobId().isEmpty()) {
+ return new ErrorExecuteResponse("taskID is null", null);
+ }
+
+ String taskId = engineExecutionContext.getJobId().get();
BufferedReader bufferedReader = null;
BufferedReader errorsReader = null;
@@ -94,60 +113,48 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
try {
engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
- String[] argsArr = null;
+ String[] argsArr;
if (engineExecutionContext.getTotalParagraph() == 1
&& engineExecutionContext.getProperties() != null
&& engineExecutionContext
.getProperties()
.containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
-
ArrayList<String> argsList =
(ArrayList<String>)
engineExecutionContext
.getProperties()
.get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
-
- try {
- argsArr = argsList.toArray(new String[argsList.size()]);
- logger.info(
- "Will execute shell task with user-specified arguments: '{}'",
- Arrays.toString(argsArr));
- } catch (Exception t) {
- logger.warn(
- "Cannot read user-input shell arguments. Will execute shell task
without them.", t);
- }
+ argsArr = argsList.toArray(new String[argsList.size()]);
+ logger.info(
+ "Will execute shell task with user-specified arguments: '{}'",
+ StringUtils.join(argsArr, "' '"));
+ } else {
+ argsArr = null;
}
- String workingDirectory = null;
+ String workingDirectory;
if (engineExecutionContext.getTotalParagraph() == 1
&& engineExecutionContext.getProperties() != null
&& engineExecutionContext
.getProperties()
.containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
-
String wdStr =
(String)
engineExecutionContext
.getProperties()
.get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
-
- try {
- if (isExecutePathExist(wdStr)) {
- logger.info(
- "Will execute shell task under user-specified
working-directory: '" + wdStr + "'");
- workingDirectory = wdStr;
- } else {
- logger.warn(
- "User-specified working-directory: '"
- + wdStr
- + "' does not exist or user does not have access
permission. "
- + "Will execute shell task under default
working-directory. Please contact the administrator!");
- }
- } catch (Exception t) {
+ if (isExecutePathExist(wdStr)) {
+ logger.info(
+ "Will execute shell task under user-specified working-directory:
'{}'", wdStr);
+ workingDirectory = wdStr;
+ } else {
logger.warn(
- "Cannot read user-input working-directory. Will execute shell
task under default working-directory.",
- t);
+ "User-specified working-directory: '{}' does not exist or user
does not have access permission. Will execute shell task under default
working-directory. Please contact the administrator!",
+ wdStr);
+ workingDirectory = null;
}
+ } else {
+ workingDirectory = null;
}
String[] generatedCode =
@@ -156,6 +163,7 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
: generateRunCodeWithArgs(code, argsArr);
ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
+
if (StringUtils.isNotBlank(workingDirectory)) {
processBuilder.directory(new File(workingDirectory));
}
@@ -163,17 +171,20 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
processBuilder.redirectErrorStream(false);
extractor = new YarnAppIdExtractor();
process = processBuilder.start();
-
bufferedReader = new BufferedReader(new
InputStreamReader(process.getInputStream()));
errorsReader = new BufferedReader(new
InputStreamReader(process.getErrorStream()));
+
+ // add task id and task Info cache
+ shellECTaskInfoCache.put(taskId, new ShellECTaskInfo(taskId, process,
extractor));
+
CountDownLatch counter = new CountDownLatch(2);
inputReaderThread =
new ReaderThread(engineExecutionContext, bufferedReader, extractor,
true, counter);
errReaderThread =
new ReaderThread(engineExecutionContext, errorsReader, extractor,
false, counter);
- inputReaderThread.start();
- errReaderThread.start();
+ logAsyncService.execute(inputReaderThread);
+ logAsyncService.execute(errReaderThread);
int exitCode = process.waitFor();
counter.await();
@@ -185,20 +196,17 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
} else {
return new SuccessExecuteResponse();
}
-
} catch (Exception e) {
logger.error("Execute shell code failed, reason:", e);
return new ErrorExecuteResponse("run shell failed", e);
-
} finally {
if (errorsReader != null) {
- inputReaderThread.onDestroy();
+ errReaderThread.onDestroy();
}
if (inputReaderThread != null) {
- errReaderThread.onDestroy();
+ inputReaderThread.onDestroy();
}
- IOUtils.closeQuietly(bufferedReader);
- IOUtils.closeQuietly(errorsReader);
+ shellECTaskInfoCache.remove(taskId);
}
}
@@ -226,9 +234,9 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
@Override
public JobProgressInfo[] getProgressInfo(String taskID) {
- List<JobProgressInfo> jobProgressInfo = new ArrayList<>();
+ List<JobProgressInfo> jobProgressInfos = new ArrayList<>();
if (this.engineExecutionContext == null) {
- return jobProgressInfo.toArray(new JobProgressInfo[0]);
+ return jobProgressInfos.toArray(new JobProgressInfo[0]);
}
String jobId =
@@ -236,18 +244,18 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
? engineExecutionContext.getJobId().get()
: "";
if (progress(taskID) == 0.0f) {
- jobProgressInfo.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
+ jobProgressInfos.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
} else {
- jobProgressInfo.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
+ jobProgressInfos.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
}
- return jobProgressInfo.toArray(new JobProgressInfo[0]);
+ return jobProgressInfos.toArray(new JobProgressInfo[0]);
}
@Override
public float progress(String taskID) {
if (this.engineExecutionContext != null) {
- return this.engineExecutionContext.getCurrentParagraph()
- / (float) this.engineExecutionContext.getTotalParagraph();
+ return ((float) this.engineExecutionContext.getCurrentParagraph())
+ / this.engineExecutionContext.getTotalParagraph();
} else {
return 0.0f;
}
@@ -299,12 +307,13 @@ public class ShellEngineConnExecutor extends
ComputationExecutor {
Kill yarn-applications
*/
List<String> yarnAppIds = extractor.getExtractedYarnAppIds();
+
GovernanceUtils.killYarnJobApp(yarnAppIds);
logger.info("Finished kill yarn app ids in the engine of ({})", getId());
super.killTask(taskID);
}
- private int getPid(Process process) {
+ int getPid(Process process) {
try {
Class<?> clazz = Class.forName("java.lang.UNIXProcess");
Field field = clazz.getDeclaredField("pid");
diff --git
a/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
b/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
index dad0da1c4..87e5025b4 100644
---
a/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
+++
b/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
@@ -19,4 +19,5 @@
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.manager.engineplugi
wds.linkis.engineconn.max.free.time=30m
wds.linkis.engineconn.log.list.count=50
wds.linkis.engineconn.support.parallelism=true
-linkis.engineconn.shell.concurrent.limit=15
\ No newline at end of file
+linkis.engineconn.shell.concurrent.limit=15
+wds.linkis.engineconn.shell.output.print.limit=1000
\ No newline at end of file
diff --git
a/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
index d327282b2..3baa00b6f 100644
---
a/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
+++
b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
@@ -53,6 +53,7 @@ public class TestShellEngineConnExecutor {
if (!isWindows) {
EngineExecutionContext engineExecutionContext =
new EngineExecutionContext(shellEngineConnExecutor,
Utils.getJvmUser());
+ engineExecutionContext.setJobId("1");
ExecuteResponse response =
shellEngineConnExecutor.executeLine(engineExecutionContext, "id");
Assertions.assertNotNull(response);
shellEngineConnExecutor.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]