zjffdu commented on a change in pull request #3825:
URL: https://github.com/apache/zeppelin/pull/3825#discussion_r446248037



##########
File path: 
zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
##########
@@ -102,76 +121,71 @@ public String getInterpreterSettingName() {
 
   @Override
   public void start(String userName) throws IOException {
-    /**
-     * If a spark interpreter process is running, userName is set in 
preparation for --proxy-user
-     */
-    if (isUserImpersonatedForSpark && 
!StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
-      this.userName = userName;
-    } else {
-      this.userName = null;
-    }
+
+    Properties templateProperties = getTemplateBindings(userName);
     // create new pod
-    apply(specTempaltes, false);
-    kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", 
getConnectTimeout()/1000);
+    apply(specTempaltes, false, templateProperties);
 
     if (portForward) {
       podPort = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
-      portForwardWatchdog = kubectl.portForward(
-          String.format("pod/%s", getPodName()),
-          new String[] {
-              String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT)
-          });
+      localPortForward = 
client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT,
 podPort);
     }
 
     long startTime = System.currentTimeMillis();
+    long timeoutTime = startTime + getConnectTimeout();
 
     // wait until interpreter send started message through thrift rpc
     synchronized (started) {
-      if (!started.get()) {
+      while (!started.get()) {
+        long timetoTimeout = timeoutTime - System.currentTimeMillis();
+        if (timetoTimeout <= 0) {
+          stop();
+          throw new IOException("Launching zeppelin interpreter on kubernetes 
is time out, kill it now");

Review comment:
       It depends on what kind of launcher it use. If it is the 
StandardInterpreterLauncher, it would launch interpreter process via java 
Process in `InterpreterProcessLauncher` where it would monitor the process 
state. 
https://github.com/apache/zeppelin/blob/master/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java#L116
   Spark yarn cluster mode is a special case because the spark-submit process 
will exit after submitting yarn app, so we would use `YarnAppMonitor` to 
monitor the yarn app, and can detect the interpreter process fail earlier if 
the yarn app is failed. 
https://github.com/apache/zeppelin/blob/master/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java#L83
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to