This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 999b401  [ZEPPELIN-5225] Remote interpreter soft shutdown
999b401 is described below

commit 999b401629a8e98572aae8627c0a6eaf6d68fb3e
Author: Philipp Dallig <[email protected]>
AuthorDate: Fri Feb 5 14:51:51 2021 +0100

    [ZEPPELIN-5225] Remote interpreter soft shutdown
    
    ### What is this PR for?
    This PR moves the exec code to a new class called 
`ExecRemoteInterpreterProcess`. This allows other `RemoteInterpreterProcess` 
classes to use the better code of `RemoteInterpreterManagedProcess`.
    
    A soft shutdown has been implemented in the new 
`ExecRemoteInterpreterProcess` class.
    
    ### What type of PR is it?
    - Improvement
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5225
    
    ### How should this be tested?
    * CI
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <[email protected]>
    
    Closes #4035 from Reamer/remote_interpreter_soft_shutdown and squashes the 
following commits:
    
    c22df382a [Philipp Dallig] Correct LOGGER messages
    8bebe6ac0 [Philipp Dallig] RemoteInterpreterManagedProcess soft shutdown 
and abstraction
    
    (cherry picked from commit 59bdb47817840e1ede35eb7292c3133174eba9ce)
    Signed-off-by: Philipp Dallig <[email protected]>
---
 .../remote/RemoteInterpreterServer.java            |   2 +-
 .../zeppelin/interpreter/util/ProcessLauncher.java |   2 +-
 .../launcher/ClusterInterpreterProcess.java        |  10 +-
 .../launcher/StandardInterpreterLauncher.java      |  11 +-
 ...cess.java => ExecRemoteInterpreterProcess.java} | 219 ++++++++-------------
 .../remote/RemoteInterpreterManagedProcess.java    | 176 ++---------------
 .../launcher/SparkInterpreterLauncherTest.java     |  30 +--
 .../launcher/StandardInterpreterLauncherTest.java  |  10 +-
 8 files changed, 140 insertions(+), 320 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 9645843..e9ebdd1 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -107,7 +107,7 @@ public class RemoteInterpreterServer extends Thread
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteInterpreterServer.class);
 
-  private static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000;
+  public static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000;
 
   private String interpreterGroupId;
   private InterpreterGroup interpreterGroup;
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
index abe6d0a..eb0b65b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
@@ -197,7 +197,7 @@ public abstract class ProcessLauncher implements 
ExecuteResultHandler {
         try {
           redirectedContext.out.write(s + "\n");
         } catch (IOException e) {
-          e.printStackTrace();
+          LOGGER.error("unable to write to redirectedContext", e);
         }
       }
     }
diff --git 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index c9ae7f4..6feeec6 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -3,10 +3,10 @@ package org.apache.zeppelin.interpreter.launcher;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 
-public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess 
{
+public class ClusterInterpreterProcess extends ExecRemoteInterpreterProcess {
 
   public ClusterInterpreterProcess(
       String intpRunner,
@@ -22,8 +22,7 @@ public class ClusterInterpreterProcess extends 
RemoteInterpreterManagedProcess {
       String interpreterGroupId,
       boolean isUserImpersonated) {
 
-    super(intpRunner,
-      intpEventServerPort,
+    super(intpEventServerPort,
       intpEventServerHost,
       interpreterPortRange,
       intpDir,
@@ -33,7 +32,8 @@ public class ClusterInterpreterProcess extends 
RemoteInterpreterManagedProcess {
       connectionPoolSize,
       interpreterSettingName,
       interpreterGroupId,
-      isUserImpersonated);
+      isUserImpersonated,
+      intpRunner);
   }
 
   @Override
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index d52276d..46caee9 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -24,12 +24,13 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterRunner;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
@@ -68,14 +69,14 @@ public class StandardInterpreterLauncher extends 
InterpreterLauncher {
           false);
     } else {
       // create new remote process
-      String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+      String localRepoPath = zConf.getInterpreterLocalRepoPath() + 
File.separator
           + context.getInterpreterSettingId();
-      return new RemoteInterpreterManagedProcess(
-          runner != null ? runner.getPath() : 
zConf.getInterpreterRemoteRunnerPath(),
+      return new ExecRemoteInterpreterProcess(
           context.getIntpEventServerPort(), context.getIntpEventServerHost(), 
zConf.getInterpreterPortRange(),
           zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
           buildEnvFromProperties(context), connectTimeout, connectionPoolSize, 
name,
-          context.getInterpreterGroupId(), option.isUserImpersonate());
+          context.getInterpreterGroupId(), option.isUserImpersonate(),
+          runner != null ? runner.getPath() : 
zConf.getInterpreterRemoteRunnerPath());
     }
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
similarity index 53%
copy from 
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
copy to 
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
index 2436000..1141513 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
@@ -17,7 +17,11 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.ExecuteException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -26,36 +30,18 @@ import org.apache.zeppelin.interpreter.util.ProcessLauncher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
 
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
-  private static final Logger LOGGER = LoggerFactory.getLogger(
-      RemoteInterpreterManagedProcess.class);
-  private static final Pattern YARN_APP_PATTER =
-          Pattern.compile("Submitted application (\\w+)");
+public class ExecRemoteInterpreterProcess extends 
RemoteInterpreterManagedProcess {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExecRemoteInterpreterProcess.class);
+
+  private static final Pattern YARN_APP_PATTER = Pattern.compile("Submitted 
application (\\w+)");
 
   private final String interpreterRunner;
-  private final String interpreterPortRange;
   private InterpreterProcessLauncher interpreterProcessLauncher;
-  private String host = null;
-  private int port = -1;
-  private final String interpreterDir;
-  private final String localRepoDir;
-  private final String interpreterSettingName;
-  private final String interpreterGroupId;
-  private final boolean isUserImpersonated;
-  private String errorMessage;
-
-  private Map<String, String> env;
 
-  public RemoteInterpreterManagedProcess(
-      String intpRunner,
+  public ExecRemoteInterpreterProcess(
       int intpEventServerPort,
       String intpEventServerHost,
       String interpreterPortRange,
@@ -66,26 +52,11 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
       int connectionPoolSize,
       String interpreterSettingName,
       String interpreterGroupId,
-      boolean isUserImpersonated) {
-    super(connectTimeout, connectionPoolSize, intpEventServerHost, 
intpEventServerPort);
+      boolean isUserImpersonated,
+      String intpRunner) {
+    super(intpEventServerPort, intpEventServerHost, interpreterPortRange, 
intpDir, localRepoDir, env, connectTimeout,
+        connectionPoolSize, interpreterSettingName, interpreterGroupId, 
isUserImpersonated);
     this.interpreterRunner = intpRunner;
-    this.interpreterPortRange = interpreterPortRange;
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterSettingName = interpreterSettingName;
-    this.interpreterGroupId = interpreterGroupId;
-    this.isUserImpersonated = isUserImpersonated;
-  }
-
-  @Override
-  public String getHost() {
-    return host;
-  }
-
-  @Override
-  public int getPort() {
-    return port;
   }
 
   @Override
@@ -93,37 +64,37 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
     // start server process
     CommandLine cmdLine = CommandLine.parse(interpreterRunner);
     cmdLine.addArgument("-d", false);
-    cmdLine.addArgument(interpreterDir, false);
+    cmdLine.addArgument(getInterpreterDir(), false);
     cmdLine.addArgument("-c", false);
-    cmdLine.addArgument(intpEventServerHost, false);
+    cmdLine.addArgument(getIntpEventServerHost(), false);
     cmdLine.addArgument("-p", false);
     cmdLine.addArgument(String.valueOf(intpEventServerPort), false);
     cmdLine.addArgument("-r", false);
-    cmdLine.addArgument(interpreterPortRange, false);
+    cmdLine.addArgument(getInterpreterPortRange(), false);
     cmdLine.addArgument("-i", false);
-    cmdLine.addArgument(interpreterGroupId, false);
-    if (isUserImpersonated && !userName.equals("anonymous")) {
+    cmdLine.addArgument(getInterpreterGroupId(), false);
+    if (isUserImpersonated() && !userName.equals("anonymous")) {
       cmdLine.addArgument("-u", false);
       cmdLine.addArgument(userName, false);
     }
     cmdLine.addArgument("-l", false);
-    cmdLine.addArgument(localRepoDir, false);
+    cmdLine.addArgument(getLocalRepoDir(), false);
     cmdLine.addArgument("-g", false);
-    cmdLine.addArgument(interpreterSettingName, false);
+    cmdLine.addArgument(getInterpreterSettingName(), false);
 
-    interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, env);
+    interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, 
getEnv());
     interpreterProcessLauncher.launch();
     interpreterProcessLauncher.waitForReady(getConnectTimeout());
     if (interpreterProcessLauncher.isLaunchTimeout()) {
-      throw new IOException(String.format("Interpreter Process creation is 
time out in %d seconds",
-              getConnectTimeout()/1000) + "\n" + "You can increase timeout 
threshold via " +
-              "setting zeppelin.interpreter.connect.timeout of this 
interpreter.\n" +
-              interpreterProcessLauncher.getErrorMessage());
+      throw new IOException(
+          String.format("Interpreter Process creation is time out in %d 
seconds", getConnectTimeout() / 1000) + "\n"
+              + "You can increase timeout threshold via "
+              + "setting zeppelin.interpreter.connect.timeout of this 
interpreter.\n"
+              + interpreterProcessLauncher.getErrorMessage());
     }
 
     if (!interpreterProcessLauncher.isRunning()) {
-      throw new IOException("Fail to launch interpreter process:\n" +
-              interpreterProcessLauncher.getErrorMessage());
+      throw new IOException("Fail to launch interpreter process:\n" + 
interpreterProcessLauncher.getErrorMessage());
     } else {
       String launchOutput = 
interpreterProcessLauncher.getProcessLaunchOutput();
       Matcher m = YARN_APP_PATTER.matcher(launchOutput);
@@ -136,63 +107,25 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   }
 
   @Override
-  public void stop() {
-    if (isRunning()) {
-      LOGGER.info("Kill interpreter process for interpreter group: {}", 
getInterpreterGroupId());
-      try {
-        callRemoteFunction(client -> {
-          client.shutdown();
-          return null;
-        });
-      } catch (Exception e) {
-        LOGGER.warn("ignore the exception when shutting down", e);
-      }
-
-      // Shutdown connection
-      shutdown();
-      this.interpreterProcessLauncher.stop();
-      this.interpreterProcessLauncher = null;
-      LOGGER.info("Remote process of interpreter group: {} is terminated", 
getInterpreterGroupId());
-    }
-  }
-
-  @Override
   public void processStarted(int port, String host) {
-    this.port = port;
-    this.host = host;
+    super.processStarted(port, host);
     // for yarn cluster it may be transitioned from COMPLETED to RUNNING.
     interpreterProcessLauncher.onProcessRunning();
   }
 
-  // called when remote interpreter process is stopped, e.g. YarnAppsMonitor 
will call this
-  // after detecting yarn app is killed/failed.
-  public void processStopped(String errorMessage) {
-    this.errorMessage = errorMessage;
-  }
-
-  @VisibleForTesting
-  public Map<String, String> getEnv() {
-    return env;
-  }
-
-  @VisibleForTesting
-  public String getLocalRepoDir() {
-    return localRepoDir;
-  }
-
-  @VisibleForTesting
-  public String getInterpreterDir() {
-    return interpreterDir;
-  }
-
-  @Override
-  public String getInterpreterSettingName() {
-    return interpreterSettingName;
-  }
-
   @Override
-  public String getInterpreterGroupId() {
-    return interpreterGroupId;
+  public void stop() {
+    if (isRunning()) {
+      super.stop();
+      // wait for a clean shutdown
+      
this.interpreterProcessLauncher.waitForShutdown(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT
 + 500);
+      // kill process
+      this.interpreterProcessLauncher.stop();
+      this.interpreterProcessLauncher = null;
+      LOGGER.info("Remote exec process of interpreter group: {} is 
terminated", getInterpreterGroupId());
+    } else {
+      LOGGER.warn("Try to stop a not running interpreter process of 
interpreter group: {}", getInterpreterGroupId());
+    }
   }
 
   @VisibleForTesting
@@ -200,39 +133,60 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
     return interpreterRunner;
   }
 
-  @VisibleForTesting
-  public boolean isUserImpersonated() {
-    return isUserImpersonated;
-  }
-
   @Override
   public boolean isRunning() {
-    return interpreterProcessLauncher != null && 
interpreterProcessLauncher.isRunning()
-            && errorMessage == null;
+    return interpreterProcessLauncher != null && 
interpreterProcessLauncher.isRunning();
   }
 
   @Override
   public String getErrorMessage() {
-    String interpreterProcessError = this.interpreterProcessLauncher != null
-            ? this.interpreterProcessLauncher.getErrorMessage() : "";
-    return errorMessage != null ? errorMessage : interpreterProcessError;
+    return this.interpreterProcessLauncher != null
+        ? this.interpreterProcessLauncher.getErrorMessage()
+        : "";
   }
 
   private class InterpreterProcessLauncher extends ProcessLauncher {
 
-    public InterpreterProcessLauncher(CommandLine commandLine,
-                                      Map<String, String> envs) {
+    public InterpreterProcessLauncher(CommandLine commandLine, Map<String, 
String> envs) {
       super(commandLine, envs);
     }
 
+    public void waitForShutdown(int timeout) {
+      synchronized (this) {
+        long startTime = System.currentTimeMillis();
+        long timeoutTime = startTime + timeout;
+        while (state == State.RUNNING && 
!Thread.currentThread().isInterrupted()) {
+          long timetoTimeout = timeoutTime - System.currentTimeMillis();
+          if (timetoTimeout <= 0) {
+            LOGGER.warn("Shutdown timeout reached");
+            break;
+          }
+          try {
+            wait(timetoTimeout);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOGGER.error("waitForShutdown interrupted", e);
+          }
+        }
+      }
+    }
+
     @Override
     public void waitForReady(int timeout) {
       synchronized (this) {
-        if (state != State.RUNNING) {
+        long startTime = System.currentTimeMillis();
+        long timeoutTime = startTime + timeout;
+        while (state != State.RUNNING && 
!Thread.currentThread().isInterrupted()) {
+          long timetoTimeout = timeoutTime - System.currentTimeMillis();
+          if (timetoTimeout <= 0) {
+            LOGGER.warn("Ready timeout reached");
+            break;
+          }
           try {
-            wait(timeout);
+            wait(timetoTimeout);
           } catch (InterruptedException e) {
-            LOGGER.error("Remote interpreter is not accessible", e);
+            Thread.currentThread().interrupt();
+            LOGGER.error("waitForReady interrupted", e);
           }
         }
       }
@@ -245,22 +199,23 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
     @Override
     public void onProcessRunning() {
       super.onProcessRunning();
-      synchronized(this) {
-        notify();
+      synchronized (this) {
+        notifyAll();
       }
     }
 
     @Override
     public void onProcessComplete(int exitValue) {
-      LOGGER.warn("Process is exited with exit value " + exitValue);
-      if (env.getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", 
"false").equals("false")) {
+      LOGGER.warn("Process is exited with exit value {}", exitValue);
+      if (getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", 
"false").equals("false")) {
         // don't call notify in yarn-cluster mode
         synchronized (this) {
-          notify();
+          notifyAll();
         }
       }
       // For yarn-cluster mode, client process will exit with exit value 0
-      // after submitting spark app. So don't move to TERMINATED state when 
exitValue is 0.
+      // after submitting spark app. So don't move to TERMINATED state when 
exitValue
+      // is 0.
       if (exitValue != 0) {
         transition(State.TERMINATED);
       } else {
@@ -272,7 +227,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
     public void onProcessFailed(ExecuteException e) {
       super.onProcessFailed(e);
       synchronized (this) {
-        notify();
+        notifyAll();
       }
     }
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 2436000..c2aca53 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -17,32 +17,21 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.ExecuteException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.zeppelin.interpreter.YarnAppMonitor;
-import org.apache.zeppelin.interpreter.util.ProcessLauncher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * This class manages start / stop of remote interpreter process
  */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
+public abstract class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   private static final Logger LOGGER = LoggerFactory.getLogger(
       RemoteInterpreterManagedProcess.class);
-  private static final Pattern YARN_APP_PATTER =
-          Pattern.compile("Submitted application (\\w+)");
 
-  private final String interpreterRunner;
+
   private final String interpreterPortRange;
-  private InterpreterProcessLauncher interpreterProcessLauncher;
+
   private String host = null;
   private int port = -1;
   private final String interpreterDir;
@@ -55,7 +44,6 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   private Map<String, String> env;
 
   public RemoteInterpreterManagedProcess(
-      String intpRunner,
       int intpEventServerPort,
       String intpEventServerHost,
       String interpreterPortRange,
@@ -68,7 +56,6 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
       String interpreterGroupId,
       boolean isUserImpersonated) {
     super(connectTimeout, connectionPoolSize, intpEventServerHost, 
intpEventServerPort);
-    this.interpreterRunner = intpRunner;
     this.interpreterPortRange = interpreterPortRange;
     this.env = env;
     this.interpreterDir = intpDir;
@@ -89,70 +76,17 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   }
 
   @Override
-  public void start(String userName) throws IOException {
-    // start server process
-    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
-    cmdLine.addArgument("-d", false);
-    cmdLine.addArgument(interpreterDir, false);
-    cmdLine.addArgument("-c", false);
-    cmdLine.addArgument(intpEventServerHost, false);
-    cmdLine.addArgument("-p", false);
-    cmdLine.addArgument(String.valueOf(intpEventServerPort), false);
-    cmdLine.addArgument("-r", false);
-    cmdLine.addArgument(interpreterPortRange, false);
-    cmdLine.addArgument("-i", false);
-    cmdLine.addArgument(interpreterGroupId, false);
-    if (isUserImpersonated && !userName.equals("anonymous")) {
-      cmdLine.addArgument("-u", false);
-      cmdLine.addArgument(userName, false);
-    }
-    cmdLine.addArgument("-l", false);
-    cmdLine.addArgument(localRepoDir, false);
-    cmdLine.addArgument("-g", false);
-    cmdLine.addArgument(interpreterSettingName, false);
-
-    interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, env);
-    interpreterProcessLauncher.launch();
-    interpreterProcessLauncher.waitForReady(getConnectTimeout());
-    if (interpreterProcessLauncher.isLaunchTimeout()) {
-      throw new IOException(String.format("Interpreter Process creation is 
time out in %d seconds",
-              getConnectTimeout()/1000) + "\n" + "You can increase timeout 
threshold via " +
-              "setting zeppelin.interpreter.connect.timeout of this 
interpreter.\n" +
-              interpreterProcessLauncher.getErrorMessage());
-    }
-
-    if (!interpreterProcessLauncher.isRunning()) {
-      throw new IOException("Fail to launch interpreter process:\n" +
-              interpreterProcessLauncher.getErrorMessage());
-    } else {
-      String launchOutput = 
interpreterProcessLauncher.getProcessLaunchOutput();
-      Matcher m = YARN_APP_PATTER.matcher(launchOutput);
-      if (m.find()) {
-        String appId = m.group(1);
-        LOGGER.info("Detected yarn app: {}, add it to YarnAppMonitor", appId);
-        YarnAppMonitor.get().addYarnApp(ConverterUtils.toApplicationId(appId), 
this);
-      }
-    }
-  }
-
-  @Override
   public void stop() {
-    if (isRunning()) {
-      LOGGER.info("Kill interpreter process for interpreter group: {}", 
getInterpreterGroupId());
-      try {
-        callRemoteFunction(client -> {
-          client.shutdown();
-          return null;
-        });
-      } catch (Exception e) {
-        LOGGER.warn("ignore the exception when shutting down", e);
-      }
-
+    LOGGER.info("Stop interpreter process for interpreter group: {}", 
getInterpreterGroupId());
+    try {
+      callRemoteFunction(client -> {
+        client.shutdown();
+        return null;
+      });
       // Shutdown connection
       shutdown();
-      this.interpreterProcessLauncher.stop();
-      this.interpreterProcessLauncher = null;
-      LOGGER.info("Remote process of interpreter group: {} is terminated", 
getInterpreterGroupId());
+    } catch (Exception e) {
+      LOGGER.warn("ignore the exception when shutting down", e);
     }
   }
 
@@ -160,8 +94,6 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   public void processStarted(int port, String host) {
     this.port = port;
     this.host = host;
-    // for yarn cluster it may be transitioned from COMPLETED to RUNNING.
-    interpreterProcessLauncher.onProcessRunning();
   }
 
   // called when remote interpreter process is stopped, e.g. YarnAppsMonitor 
will call this
@@ -170,21 +102,26 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
     this.errorMessage = errorMessage;
   }
 
-  @VisibleForTesting
   public Map<String, String> getEnv() {
     return env;
   }
 
-  @VisibleForTesting
   public String getLocalRepoDir() {
     return localRepoDir;
   }
 
-  @VisibleForTesting
   public String getInterpreterDir() {
     return interpreterDir;
   }
 
+  public String getIntpEventServerHost() {
+    return intpEventServerHost;
+  }
+
+  public String getInterpreterPortRange() {
+    return interpreterPortRange;
+  }
+
   @Override
   public String getInterpreterSettingName() {
     return interpreterSettingName;
@@ -195,85 +132,12 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
     return interpreterGroupId;
   }
 
-  @VisibleForTesting
-  public String getInterpreterRunner() {
-    return interpreterRunner;
-  }
-
-  @VisibleForTesting
   public boolean isUserImpersonated() {
     return isUserImpersonated;
   }
 
   @Override
-  public boolean isRunning() {
-    return interpreterProcessLauncher != null && 
interpreterProcessLauncher.isRunning()
-            && errorMessage == null;
-  }
-
-  @Override
   public String getErrorMessage() {
-    String interpreterProcessError = this.interpreterProcessLauncher != null
-            ? this.interpreterProcessLauncher.getErrorMessage() : "";
-    return errorMessage != null ? errorMessage : interpreterProcessError;
-  }
-
-  private class InterpreterProcessLauncher extends ProcessLauncher {
-
-    public InterpreterProcessLauncher(CommandLine commandLine,
-                                      Map<String, String> envs) {
-      super(commandLine, envs);
-    }
-
-    @Override
-    public void waitForReady(int timeout) {
-      synchronized (this) {
-        if (state != State.RUNNING) {
-          try {
-            wait(timeout);
-          } catch (InterruptedException e) {
-            LOGGER.error("Remote interpreter is not accessible", e);
-          }
-        }
-      }
-      this.stopCatchLaunchOutput();
-      if (state == State.LAUNCHED) {
-        onTimeout();
-      }
-    }
-
-    @Override
-    public void onProcessRunning() {
-      super.onProcessRunning();
-      synchronized(this) {
-        notify();
-      }
-    }
-
-    @Override
-    public void onProcessComplete(int exitValue) {
-      LOGGER.warn("Process is exited with exit value " + exitValue);
-      if (env.getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", 
"false").equals("false")) {
-        // don't call notify in yarn-cluster mode
-        synchronized (this) {
-          notify();
-        }
-      }
-      // For yarn-cluster mode, client process will exit with exit value 0
-      // after submitting spark app. So don't move to TERMINATED state when 
exitValue is 0.
-      if (exitValue != 0) {
-        transition(State.TERMINATED);
-      } else {
-        transition(State.COMPLETED);
-      }
-    }
-
-    @Override
-    public void onProcessFailed(ExecuteException e) {
-      super.onProcessFailed(e);
-      synchronized (this) {
-        notify();
-      }
-    }
+    return errorMessage;
   }
 }
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 6aff86a..5c997e1 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -21,7 +21,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.integration.DownloadUtils;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
 import org.apache.zeppelin.util.Util;
 import org.junit.Before;
 import org.junit.Test;
@@ -72,8 +72,8 @@ public class SparkInterpreterLauncherTest {
     option.setUserImpersonate(true);
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue(client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("name", interpreterProcess.getInterpreterSettingName());
     assertEquals(zeppelinHome + "/interpreter/groupName", 
interpreterProcess.getInterpreterDir());
     assertEquals(zeppelinHome + "/local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
@@ -98,8 +98,8 @@ public class SparkInterpreterLauncherTest {
     InterpreterOption option = new InterpreterOption();
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue( client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("spark", interpreterProcess.getInterpreterSettingName());
     
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
     
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -126,8 +126,8 @@ public class SparkInterpreterLauncherTest {
     InterpreterOption option = new InterpreterOption();
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue( client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("spark", interpreterProcess.getInterpreterSettingName());
     
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
     
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -159,8 +159,8 @@ public class SparkInterpreterLauncherTest {
     InterpreterOption option = new InterpreterOption();
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue( client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("spark", interpreterProcess.getInterpreterSettingName());
     
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
     
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -192,8 +192,8 @@ public class SparkInterpreterLauncherTest {
     InterpreterOption option = new InterpreterOption();
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue( client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("spark", interpreterProcess.getInterpreterSettingName());
     
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
     
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -238,8 +238,8 @@ public class SparkInterpreterLauncherTest {
     Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), 
"test.jar"));
 
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue(client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("spark", interpreterProcess.getInterpreterSettingName());
     
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
     
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
@@ -285,8 +285,8 @@ public class SparkInterpreterLauncherTest {
     Files.createDirectories(localRepoPath);
 
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue(client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("spark", interpreterProcess.getInterpreterSettingName());
     
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
     
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
index f0f60d9..8e695f3 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
@@ -19,7 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -48,8 +48,8 @@ public class StandardInterpreterLauncherTest {
     option.setUserImpersonate(true);
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue(client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("name", interpreterProcess.getInterpreterSettingName());
     assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
     assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
@@ -73,8 +73,8 @@ public class StandardInterpreterLauncherTest {
     option.setUserImpersonate(true);
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "groupName", "name", 0, "host");
     InterpreterClient client = launcher.launch(context);
-    assertTrue( client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue(client instanceof ExecRemoteInterpreterProcess);
+    ExecRemoteInterpreterProcess interpreterProcess = 
(ExecRemoteInterpreterProcess) client;
     assertEquals("name", interpreterProcess.getInterpreterSettingName());
     assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
     assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());

Reply via email to