This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new a7e6ecec8 Engineplugin shell Code optimization (#4499)
a7e6ecec8 is described below

commit a7e6ecec8c436ee3798643d4a9ebbca98decf0a0
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon May 1 00:02:52 2023 +0800

    Engineplugin shell Code optimization (#4499)
---
 .../shell/conf/ShellEngineConnConf.java            |   2 +
 .../ShellEngineConnConcurrentExecutor.java         | 295 +++------------------
 .../shell/executor/ShellEngineConnExecutor.java    | 113 ++++----
 .../resources/conf/linkis-engineconn.properties    |   3 +-
 .../executor/TestShellEngineConnExecutor.java      |   1 +
 5 files changed, 110 insertions(+), 304 deletions(-)

diff --git 
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
 
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
index 3849bc1ee..9c8bccd8a 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
+++ 
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
@@ -25,4 +25,6 @@ public class ShellEngineConnConf {
 
   public static final int LOG_SERVICE_MAX_THREAD_SIZE =
       CommonVars.apply("linkis.engineconn.shell.log.max.thread.size", 
50).getValue();
+  public static final int SHELL_ENGINECONN_OUTPUT_PRINT_LIMIT =
+      CommonVars.apply("wds.linkis.engineconn.shell.output.print.limit", 
1000).getValue();
 }
diff --git 
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
 
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
index 3f445ddd5..b426a22eb 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
+++ 
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
@@ -17,286 +17,91 @@
 
 package org.apache.linkis.manager.engineplugin.shell.executor;
 
-import org.apache.linkis.common.utils.Utils;
 import 
org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
 import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
-import org.apache.linkis.engineconn.core.EngineConnObject;
 import org.apache.linkis.governance.common.utils.GovernanceUtils;
-import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
 import org.apache.linkis.manager.common.entity.resource.NodeResource;
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
-import 
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
 import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf;
-import 
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
 import org.apache.linkis.manager.label.entity.Label;
 import org.apache.linkis.protocol.engine.JobProgressInfo;
-import org.apache.linkis.rpc.Sender;
-import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
 import org.apache.linkis.scheduler.executer.ExecuteResponse;
-import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
 
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.InputStreamReader;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.concurrent.ExecutionContextExecutorService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ShellEngineConnConcurrentExecutor extends 
ConcurrentComputationExecutor {
+
   private static final Logger logger =
       LoggerFactory.getLogger(ShellEngineConnConcurrentExecutor.class);
 
-  private EngineExecutionContext engineExecutionContext;
+  private ShellEngineConnExecutor shellEngineConnExecutor;
 
-  private List<Label<?>> executorLabels = new ArrayList<>();
-
-  private Map<String, ShellECTaskInfo> shellECTaskInfoCache = new 
ConcurrentHashMap<>();
-
-  private int id;
   private int maxRunningNumber;
 
   public ShellEngineConnConcurrentExecutor(int id, int maxRunningNumber) {
-    super(maxRunningNumber);
-    this.id = id;
+    super(ShellEngineConnConf.SHELL_ENGINECONN_OUTPUT_PRINT_LIMIT);
+    this.shellEngineConnExecutor = new ShellEngineConnExecutor(id);
     this.maxRunningNumber = maxRunningNumber;
   }
 
-  private final ExecutionContextExecutorService logAsyncService =
-      Utils.newCachedExecutionContext(
-          ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE, 
"ShelLogService-Thread-", true);
-
   @Override
-  public void init() {
-    logger.info("Ready to change engine state!");
-    super.init();
+  public ExecuteResponse executeLine(EngineExecutionContext 
engineExecutorContext, String code) {
+    return shellEngineConnExecutor.executeLine(engineExecutorContext, code);
   }
 
   @Override
   public ExecuteResponse executeCompletely(
-      EngineExecutionContext engineExecutionContext, String code, String 
completedLine) {
-    String newcode = completedLine + code;
-    logger.debug("newcode is " + newcode);
-    return executeLine(engineExecutionContext, newcode);
-  }
-
-  @Override
-  public ExecuteResponse executeLine(EngineExecutionContext 
engineExecutionContext, String code) {
-    if (engineExecutionContext != null) {
-      this.engineExecutionContext = engineExecutionContext;
-      logger.info("Shell executor reset new engineExecutionContext!");
-    }
-
-    if (engineExecutionContext.getJobId().isEmpty()) {
-      return new ErrorExecuteResponse("taskID is null", null);
-    }
-
-    String taskId = engineExecutionContext.getJobId().get();
-    BufferedReader bufferedReader = null;
-    BufferedReader errorsReader = null;
-
-    AtomicBoolean completed = new AtomicBoolean(false);
-    ReaderThread errReaderThread = null;
-    ReaderThread inputReaderThread = null;
-
-    try {
-      engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
-
-      String[] argsArr;
-      if (engineExecutionContext.getTotalParagraph() == 1
-          && engineExecutionContext.getProperties() != null
-          && engineExecutionContext
-              .getProperties()
-              .containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
-        ArrayList<String> argsList =
-            (ArrayList<String>)
-                engineExecutionContext
-                    .getProperties()
-                    .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
-        argsArr = argsList.toArray(new String[argsList.size()]);
-        logger.info(
-            "Will execute shell task with user-specified arguments: '{}'",
-            StringUtils.join(argsArr, "' '"));
-      } else {
-        argsArr = null;
-      }
-
-      String workingDirectory;
-      if (engineExecutionContext.getTotalParagraph() == 1
-          && engineExecutionContext.getProperties() != null
-          && engineExecutionContext
-              .getProperties()
-              
.containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
-        String wdStr =
-            (String)
-                engineExecutionContext
-                    .getProperties()
-                    
.get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
-        if (isExecutePathExist(wdStr)) {
-          logger.info(
-              "Will execute shell task under user-specified working-directory: 
'{}'", wdStr);
-          workingDirectory = wdStr;
-        } else {
-          logger.warn(
-              "User-specified working-directory: '{}' does not exist or user 
does not have access permission. Will execute shell task under default 
working-directory. Please contact the administrator!",
-              wdStr);
-          workingDirectory = null;
-        }
-      } else {
-        workingDirectory = null;
-      }
-
-      String[] generatedCode =
-          argsArr == null || argsArr.length == 0
-              ? generateRunCode(code)
-              : generateRunCodeWithArgs(code, argsArr);
-
-      ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
-
-      if (StringUtils.isNotBlank(workingDirectory)) {
-        processBuilder.directory(new File(workingDirectory));
-      }
-
-      processBuilder.redirectErrorStream(false);
-      YarnAppIdExtractor extractor = new YarnAppIdExtractor();
-      Process process = processBuilder.start();
-      bufferedReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
-      errorsReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
-
-      // add task id and task Info cache
-      shellECTaskInfoCache.put(taskId, new ShellECTaskInfo(taskId, process, 
extractor));
-
-      CountDownLatch counter = new CountDownLatch(2);
-      inputReaderThread =
-          new ReaderThread(engineExecutionContext, bufferedReader, extractor, 
true, counter);
-      errReaderThread =
-          new ReaderThread(engineExecutionContext, errorsReader, extractor, 
false, counter);
-
-      logAsyncService.execute(inputReaderThread);
-      logAsyncService.execute(errReaderThread);
-
-      int exitCode = process.waitFor();
-      counter.await();
-
-      completed.set(true);
-
-      if (exitCode != 0) {
-        return new ErrorExecuteResponse("run shell failed", new 
ShellCodeErrorException());
-      } else {
-        return new SuccessExecuteResponse();
-      }
-    } catch (Exception e) {
-      logger.error("Execute shell code failed, reason:", e);
-      return new ErrorExecuteResponse("run shell failed", e);
-    } finally {
-      if (errorsReader != null) {
-        errReaderThread.onDestroy();
-      }
-      if (inputReaderThread != null) {
-        inputReaderThread.onDestroy();
-      }
-      shellECTaskInfoCache.remove(taskId);
-    }
-  }
-
-  private boolean isExecutePathExist(String executePath) {
-    File etlHomeDir = new File(executePath);
-    return (etlHomeDir.exists() && etlHomeDir.isDirectory());
-  }
-
-  private String[] generateRunCode(String code) {
-    return new String[] {"sh", "-c", code};
-  }
-
-  private String[] generateRunCodeWithArgs(String code, String[] args) {
-    return new String[] {
-      "sh",
-      "-c",
-      "echo \"dummy " + StringUtils.join(args, " ") + "\" | xargs sh -c \'" + 
code + "\'"
-    };
+      EngineExecutionContext engineExecutorContext, String code, String 
completedLine) {
+    return shellEngineConnExecutor.executeCompletely(engineExecutorContext, 
code, completedLine);
   }
 
   @Override
-  public String getId() {
-    return Sender.getThisServiceInstance().getInstance() + "_" + id;
+  public float progress(String taskID) {
+    return shellEngineConnExecutor.progress(taskID);
   }
 
   @Override
   public JobProgressInfo[] getProgressInfo(String taskID) {
-    List<JobProgressInfo> jobProgressInfos = new ArrayList<>();
-    if (this.engineExecutionContext == null) {
-      return jobProgressInfos.toArray(new JobProgressInfo[0]);
-    }
-
-    String jobId =
-        engineExecutionContext.getJobId().isDefined()
-            ? engineExecutionContext.getJobId().get()
-            : "";
-    if (progress(taskID) == 0.0f) {
-      jobProgressInfos.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
-    } else {
-      jobProgressInfos.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
-    }
-    return jobProgressInfos.toArray(new JobProgressInfo[0]);
-  }
-
-  @Override
-  public float progress(String taskID) {
-    if (this.engineExecutionContext != null) {
-      return ((float) this.engineExecutionContext.getCurrentParagraph())
-          / this.engineExecutionContext.getTotalParagraph();
-    } else {
-      return 0.0f;
-    }
+    return shellEngineConnExecutor.getProgressInfo(taskID);
   }
 
   @Override
   public boolean supportCallBackLogs() {
-    // todo
-    return true;
+    return shellEngineConnExecutor.supportCallBackLogs();
   }
 
   @Override
-  public NodeResource requestExpectedResource(NodeResource expectedResource) {
-    return null;
-  }
-
-  @Override
-  public NodeResource getCurrentNodeResource() {
-    CommonNodeResource resource = new CommonNodeResource();
-    resource.setUsedResource(
-        NodeResourceUtils.applyAsLoadInstanceResource(
-            EngineConnObject.getEngineCreationContext().getOptions()));
-    return resource;
+  public String getId() {
+    return shellEngineConnExecutor.getId();
   }
 
   @Override
-  public List<Label<?>> getExecutorLabels() {
-    return executorLabels;
+  public void close() {
+    try {
+      killAll();
+      shellEngineConnExecutor.logAsyncService.shutdown();
+    } catch (Exception e) {
+      logger.error("Shell ec failed to close ");
+    }
+    super.close();
   }
 
   @Override
-  public void setExecutorLabels(List<Label<?>> labels) {
-    if (labels != null) {
-      executorLabels.clear();
-      executorLabels.addAll(labels);
+  public void killAll() {
+    Iterator<ShellECTaskInfo> iterator =
+        shellEngineConnExecutor.shellECTaskInfoCache.values().iterator();
+    while (iterator.hasNext()) {
+      ShellECTaskInfo shellECTaskInfo = iterator.next();
+      killTask(shellECTaskInfo.getTaskId());
     }
   }
 
   @Override
   public void killTask(String taskID) {
-    ShellECTaskInfo shellECTaskInfo = shellECTaskInfoCache.remove(taskID);
+    ShellECTaskInfo shellECTaskInfo = 
shellEngineConnExecutor.shellECTaskInfoCache.remove(taskID);
     if (shellECTaskInfo == null) {
       return;
     }
@@ -304,7 +109,7 @@ public class ShellEngineConnConcurrentExecutor extends 
ConcurrentComputationExec
     /*
      Kill sub-processes
     */
-    int pid = getPid(shellECTaskInfo.getProcess());
+    int pid = shellEngineConnExecutor.getPid(shellECTaskInfo.getProcess());
     GovernanceUtils.killProcess(String.valueOf(pid), "kill task " + taskID + " 
process", false);
 
     /*
@@ -319,40 +124,28 @@ public class ShellEngineConnConcurrentExecutor extends 
ConcurrentComputationExec
     super.killTask(taskID);
   }
 
-  private int getPid(Process process) {
-    try {
-      Class<?> clazz = Class.forName("java.lang.UNIXProcess");
-      Field field = clazz.getDeclaredField("pid");
-      field.setAccessible(true);
-      return field.getInt(process);
-    } catch (Exception e) {
-      logger.warn("Failed to acquire pid for shell process");
-      return -1;
-    }
+  @Override
+  public int getConcurrentLimit() {
+    return maxRunningNumber;
   }
 
   @Override
-  public void close() {
-    try {
-      killAll();
-      logAsyncService.shutdown();
-    } catch (Exception e) {
-      logger.error("Shell ec failed to close ");
-    }
-    super.close();
+  public List<Label<?>> getExecutorLabels() {
+    return shellEngineConnExecutor.getExecutorLabels();
   }
 
   @Override
-  public void killAll() {
-    Iterator<ShellECTaskInfo> iterator = 
shellECTaskInfoCache.values().iterator();
-    while (iterator.hasNext()) {
-      ShellECTaskInfo shellECTaskInfo = iterator.next();
-      killTask(shellECTaskInfo.getTaskId());
-    }
+  public void setExecutorLabels(List<Label<?>> labels) {
+    shellEngineConnExecutor.setExecutorLabels(labels);
   }
 
   @Override
-  public int getConcurrentLimit() {
-    return maxRunningNumber;
+  public NodeResource requestExpectedResource(NodeResource expectedResource) {
+    return shellEngineConnExecutor.requestExpectedResource(expectedResource);
+  }
+
+  @Override
+  public NodeResource getCurrentNodeResource() {
+    return shellEngineConnExecutor.getCurrentNodeResource();
   }
 }
diff --git 
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
 
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
index 964417765..28e451d73 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
+++ 
b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.manager.engineplugin.shell.executor;
 
+import org.apache.linkis.common.utils.Utils;
 import 
org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor;
 import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
 import org.apache.linkis.engineconn.core.EngineConnObject;
@@ -25,6 +26,7 @@ import 
org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
 import org.apache.linkis.manager.common.entity.resource.NodeResource;
 import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
 import 
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf;
 import 
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
 import org.apache.linkis.manager.label.entity.Label;
 import org.apache.linkis.protocol.engine.JobProgressInfo;
@@ -33,7 +35,6 @@ import 
org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
 import org.apache.linkis.scheduler.executer.ExecuteResponse;
 import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.BufferedReader;
@@ -41,28 +42,41 @@ import java.io.File;
 import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import scala.concurrent.ExecutionContextExecutorService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ShellEngineConnExecutor extends ComputationExecutor {
   private static final Logger logger = 
LoggerFactory.getLogger(ShellEngineConnExecutor.class);
 
-  private int id;
   private EngineExecutionContext engineExecutionContext;
+
   private List<Label<?>> executorLabels = new ArrayList<>();
+
+  Map<String, ShellECTaskInfo> shellECTaskInfoCache = new 
ConcurrentHashMap<>();
+
+  private int id;
+
   private Process process;
+
   private YarnAppIdExtractor extractor;
 
   public ShellEngineConnExecutor(int id) {
-    super(id);
+    super(ShellEngineConnConf.SHELL_ENGINECONN_OUTPUT_PRINT_LIMIT);
     this.id = id;
   }
 
+  final ExecutionContextExecutorService logAsyncService =
+      Utils.newCachedExecutionContext(
+          ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE, 
"ShelLogService-Thread-", true);
+
   @Override
   public void init() {
     logger.info("Ready to change engine state!");
@@ -72,7 +86,7 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
   @Override
   public ExecuteResponse executeCompletely(
       EngineExecutionContext engineExecutionContext, String code, String 
completedLine) {
-    final String newcode = completedLine + code;
+    String newcode = completedLine + code;
     logger.debug("newcode is " + newcode);
     return executeLine(engineExecutionContext, newcode);
   }
@@ -84,6 +98,11 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
       logger.info("Shell executor reset new engineExecutionContext!");
     }
 
+    if (engineExecutionContext.getJobId().isEmpty()) {
+      return new ErrorExecuteResponse("taskID is null", null);
+    }
+
+    String taskId = engineExecutionContext.getJobId().get();
     BufferedReader bufferedReader = null;
     BufferedReader errorsReader = null;
 
@@ -94,60 +113,48 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
     try {
       engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
 
-      String[] argsArr = null;
+      String[] argsArr;
       if (engineExecutionContext.getTotalParagraph() == 1
           && engineExecutionContext.getProperties() != null
           && engineExecutionContext
               .getProperties()
               .containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
-
         ArrayList<String> argsList =
             (ArrayList<String>)
                 engineExecutionContext
                     .getProperties()
                     .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
-
-        try {
-          argsArr = argsList.toArray(new String[argsList.size()]);
-          logger.info(
-              "Will execute shell task with user-specified arguments: '{}'",
-              Arrays.toString(argsArr));
-        } catch (Exception t) {
-          logger.warn(
-              "Cannot read user-input shell arguments. Will execute shell task 
without them.", t);
-        }
+        argsArr = argsList.toArray(new String[argsList.size()]);
+        logger.info(
+            "Will execute shell task with user-specified arguments: '{}'",
+            StringUtils.join(argsArr, "' '"));
+      } else {
+        argsArr = null;
       }
 
-      String workingDirectory = null;
+      String workingDirectory;
       if (engineExecutionContext.getTotalParagraph() == 1
           && engineExecutionContext.getProperties() != null
           && engineExecutionContext
               .getProperties()
               
.containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
-
         String wdStr =
             (String)
                 engineExecutionContext
                     .getProperties()
                     
.get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
-
-        try {
-          if (isExecutePathExist(wdStr)) {
-            logger.info(
-                "Will execute shell task under user-specified 
working-directory: '" + wdStr + "'");
-            workingDirectory = wdStr;
-          } else {
-            logger.warn(
-                "User-specified working-directory: '"
-                    + wdStr
-                    + "' does not exist or user does not have access 
permission. "
-                    + "Will execute shell task under default 
working-directory. Please contact the administrator!");
-          }
-        } catch (Exception t) {
+        if (isExecutePathExist(wdStr)) {
+          logger.info(
+              "Will execute shell task under user-specified working-directory: 
'{}'", wdStr);
+          workingDirectory = wdStr;
+        } else {
           logger.warn(
-              "Cannot read user-input working-directory. Will execute shell 
task under default working-directory.",
-              t);
+              "User-specified working-directory: '{}' does not exist or user 
does not have access permission. Will execute shell task under default 
working-directory. Please contact the administrator!",
+              wdStr);
+          workingDirectory = null;
         }
+      } else {
+        workingDirectory = null;
       }
 
       String[] generatedCode =
@@ -156,6 +163,7 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
               : generateRunCodeWithArgs(code, argsArr);
 
       ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
+
       if (StringUtils.isNotBlank(workingDirectory)) {
         processBuilder.directory(new File(workingDirectory));
       }
@@ -163,17 +171,20 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
       processBuilder.redirectErrorStream(false);
       extractor = new YarnAppIdExtractor();
       process = processBuilder.start();
-
       bufferedReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
       errorsReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
+
+      // add task id and task Info cache
+      shellECTaskInfoCache.put(taskId, new ShellECTaskInfo(taskId, process, 
extractor));
+
       CountDownLatch counter = new CountDownLatch(2);
       inputReaderThread =
           new ReaderThread(engineExecutionContext, bufferedReader, extractor, 
true, counter);
       errReaderThread =
           new ReaderThread(engineExecutionContext, errorsReader, extractor, 
false, counter);
 
-      inputReaderThread.start();
-      errReaderThread.start();
+      logAsyncService.execute(inputReaderThread);
+      logAsyncService.execute(errReaderThread);
 
       int exitCode = process.waitFor();
       counter.await();
@@ -185,20 +196,17 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
       } else {
         return new SuccessExecuteResponse();
       }
-
     } catch (Exception e) {
       logger.error("Execute shell code failed, reason:", e);
       return new ErrorExecuteResponse("run shell failed", e);
-
     } finally {
       if (errorsReader != null) {
-        inputReaderThread.onDestroy();
+        errReaderThread.onDestroy();
       }
       if (inputReaderThread != null) {
-        errReaderThread.onDestroy();
+        inputReaderThread.onDestroy();
       }
-      IOUtils.closeQuietly(bufferedReader);
-      IOUtils.closeQuietly(errorsReader);
+      shellECTaskInfoCache.remove(taskId);
     }
   }
 
@@ -226,9 +234,9 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
 
   @Override
   public JobProgressInfo[] getProgressInfo(String taskID) {
-    List<JobProgressInfo> jobProgressInfo = new ArrayList<>();
+    List<JobProgressInfo> jobProgressInfos = new ArrayList<>();
     if (this.engineExecutionContext == null) {
-      return jobProgressInfo.toArray(new JobProgressInfo[0]);
+      return jobProgressInfos.toArray(new JobProgressInfo[0]);
     }
 
     String jobId =
@@ -236,18 +244,18 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
             ? engineExecutionContext.getJobId().get()
             : "";
     if (progress(taskID) == 0.0f) {
-      jobProgressInfo.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
+      jobProgressInfos.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
     } else {
-      jobProgressInfo.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
+      jobProgressInfos.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
     }
-    return jobProgressInfo.toArray(new JobProgressInfo[0]);
+    return jobProgressInfos.toArray(new JobProgressInfo[0]);
   }
 
   @Override
   public float progress(String taskID) {
     if (this.engineExecutionContext != null) {
-      return this.engineExecutionContext.getCurrentParagraph()
-          / (float) this.engineExecutionContext.getTotalParagraph();
+      return ((float) this.engineExecutionContext.getCurrentParagraph())
+          / this.engineExecutionContext.getTotalParagraph();
     } else {
       return 0.0f;
     }
@@ -299,12 +307,13 @@ public class ShellEngineConnExecutor extends 
ComputationExecutor {
      Kill yarn-applications
     */
     List<String> yarnAppIds = extractor.getExtractedYarnAppIds();
+
     GovernanceUtils.killYarnJobApp(yarnAppIds);
     logger.info("Finished kill yarn app ids in the engine of ({})", getId());
     super.killTask(taskID);
   }
 
-  private int getPid(Process process) {
+  int getPid(Process process) {
     try {
       Class<?> clazz = Class.forName("java.lang.UNIXProcess");
       Field field = clazz.getDeclaredField("pid");
diff --git 
a/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
 
b/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
index dad0da1c4..87e5025b4 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
+++ 
b/linkis-engineconn-plugins/shell/src/main/resources/conf/linkis-engineconn.properties
@@ -19,4 +19,5 @@ 
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.manager.engineplugi
 wds.linkis.engineconn.max.free.time=30m
 wds.linkis.engineconn.log.list.count=50
 wds.linkis.engineconn.support.parallelism=true
-linkis.engineconn.shell.concurrent.limit=15
\ No newline at end of file
+linkis.engineconn.shell.concurrent.limit=15
+wds.linkis.engineconn.shell.output.print.limit=1000
\ No newline at end of file
diff --git 
a/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
 
b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
index d327282b2..3baa00b6f 100644
--- 
a/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
+++ 
b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
@@ -53,6 +53,7 @@ public class TestShellEngineConnExecutor {
     if (!isWindows) {
       EngineExecutionContext engineExecutionContext =
           new EngineExecutionContext(shellEngineConnExecutor, 
Utils.getJvmUser());
+      engineExecutionContext.setJobId("1");
       ExecuteResponse response = 
shellEngineConnExecutor.executeLine(engineExecutionContext, "id");
       Assertions.assertNotNull(response);
       shellEngineConnExecutor.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to