This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 49d9bc4  [ZEPPELIN-5218]. Don't exit shell interpreter if there's more 
logging produced
49d9bc4 is described below

commit 49d9bc4eb0430961108e30a69bc40d286af0526d
Author: Jeff Zhang <[email protected]>
AuthorDate: Sat Feb 6 13:01:48 2021 +0800

    [ZEPPELIN-5218]. Don't exit shell interpreter if there's more logging 
produced
    
    ### What is this PR for?
    
    This PR change the timeout behavior of shell interpreter. Shell command 
would only timeout after there're no output produced. If there's continue 
output, then the shell command would continue to execute.
    
    ### What type of PR is it?
    [ Improvement | Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5218
    
    ### How should this be tested?
    * Test is added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <[email protected]>
    
    Closes #4037 from zjffdu/ZEPPELIN-5218 and squashes the following commits:
    
    45110a47b [Jeff Zhang] fix test
    ba9618a46 [Jeff Zhang] soft shutdown executor
    3fa41ba77 [Jeff Zhang] shutdown executor in close
    ef5a08eee [Jeff Zhang] [ZEPPELIN-5218]. Don't exit shell interpreter if 
there's more logging produced
    
    (cherry picked from commit 05620fe689ac6e3412df605a1a231d5e4ef6012b)
    Signed-off-by: Jeff Zhang <[email protected]>
---
 .../apache/zeppelin/shell/ShellInterpreter.java    | 68 ++++++++++++++++++----
 shell/src/main/resources/interpreter-setting.json  |  7 +++
 .../zeppelin/shell/ShellInterpreterTest.java       | 33 ++++++++---
 .../zeppelin/interpreter/InterpreterOutput.java    | 20 +++++--
 4 files changed, 103 insertions(+), 25 deletions(-)

diff --git 
a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java 
b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
index 7cfcd19..c43a8be 100644
--- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
+++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
@@ -24,13 +24,18 @@ import org.apache.commons.exec.ExecuteWatchdog;
 import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.util.ExecutorUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -47,12 +52,18 @@ public class ShellInterpreter extends KerberosInterpreter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ShellInterpreter.class);
 
   private static final String TIMEOUT_PROPERTY = 
"shell.command.timeout.millisecs";
+  private static final String TIMEOUT_CHECK_INTERVAL_PROPERTY =
+          "shell.command.timeout.check.interval";
   private static final String DEFAULT_TIMEOUT = "60000";
+  private static final String DEFAULT_CHECK_INTERVAL = "10000";
   private static final String DIRECTORY_USER_HOME = 
"shell.working.directory.user.home";
 
   private final boolean isWindows = 
System.getProperty("os.name").startsWith("Windows");
   private final String shell = isWindows ? "cmd /c" : "bash -c";
-  ConcurrentHashMap<String, DefaultExecutor> executors;
+  private ConcurrentHashMap<String, DefaultExecutor> executorMap;
+  private ConcurrentHashMap<String, InterpreterContext> contextMap;
+  private ScheduledExecutorService shellOutputCheckExecutor =
+          Executors.newSingleThreadScheduledExecutor();
 
   public ShellInterpreter(Properties property) {
     super(property);
@@ -61,15 +72,41 @@ public class ShellInterpreter extends KerberosInterpreter {
   @Override
   public void open() {
     super.open();
-    LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
-    executors = new ConcurrentHashMap<>();
+    long timeoutThreshold = Long.parseLong(getProperty(TIMEOUT_PROPERTY, 
DEFAULT_TIMEOUT));
+    long timeoutCheckInterval = Long.parseLong(
+            getProperty(TIMEOUT_CHECK_INTERVAL_PROPERTY, 
DEFAULT_CHECK_INTERVAL));
+    LOGGER.info("Command timeout property: {}", timeoutThreshold);
+    executorMap = new ConcurrentHashMap<>();
+    contextMap = new ConcurrentHashMap<>();
+
+    shellOutputCheckExecutor.scheduleAtFixedRate(() -> {
+      try {
+        for (Map.Entry<String, DefaultExecutor> entry : 
executorMap.entrySet()) {
+          String paragraphId = entry.getKey();
+          DefaultExecutor executor = entry.getValue();
+          InterpreterContext context = contextMap.get(paragraphId);
+          if (context == null) {
+            LOGGER.warn("No InterpreterContext associated with paragraph: {}", 
paragraphId);
+            continue;
+          }
+          if ((System.currentTimeMillis() - 
context.out.getLastWriteTimestamp()) >
+                  timeoutThreshold) {
+            LOGGER.info("No output for paragraph {} for the last {} 
milli-seconds, so kill it",
+                    paragraphId, timeoutThreshold);
+            executor.getWatchdog().destroyProcess();
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error when checking shell command timeout", e);
+      }
+    }, timeoutCheckInterval, timeoutCheckInterval, TimeUnit.MILLISECONDS);
   }
 
   @Override
   public void close() {
     super.close();
-    for (String executorKey : executors.keySet()) {
-      DefaultExecutor executor = executors.remove(executorKey);
+    for (String executorKey : executorMap.keySet()) {
+      DefaultExecutor executor = executorMap.remove(executorKey);
       if (executor != null) {
         try {
           executor.getWatchdog().destroyProcess();
@@ -78,6 +115,11 @@ public class ShellInterpreter extends KerberosInterpreter {
         }
       }
     }
+
+    if (shellOutputCheckExecutor != null) {
+      ExecutorUtil.softShutdown("ShellOutputCheckExecutor", 
shellOutputCheckExecutor,
+              5, TimeUnit.SECONDS);
+    }
   }
 
   @Override
@@ -105,13 +147,14 @@ public class ShellInterpreter extends KerberosInterpreter 
{
     cmdLine.addArgument(cmd, false);
 
     try {
+      contextMap.put(context.getParagraphId(), context);
+
       DefaultExecutor executor = new DefaultExecutor();
       executor.setStreamHandler(new PumpStreamHandler(
           context.out, context.out));
+      executor.setWatchdog(new ExecuteWatchdog(Long.MAX_VALUE));
+      executorMap.put(context.getParagraphId(), executor);
 
-      executor.setWatchdog(new ExecuteWatchdog(
-          Long.valueOf(getProperty(TIMEOUT_PROPERTY, DEFAULT_TIMEOUT))));
-      executors.put(context.getParagraphId(), executor);
       if (Boolean.valueOf(getProperty(DIRECTORY_USER_HOME))) {
         executor.setWorkingDirectory(new 
File(System.getProperty("user.home")));
       }
@@ -140,13 +183,14 @@ public class ShellInterpreter extends KerberosInterpreter 
{
       LOGGER.error("Can not run command: " + cmd, e);
       return new InterpreterResult(Code.ERROR, e.getMessage());
     } finally {
-      executors.remove(context.getParagraphId());
+      executorMap.remove(context.getParagraphId());
+      contextMap.remove(context.getParagraphId());
     }
   }
 
   @Override
   public void cancel(InterpreterContext context) {
-    DefaultExecutor executor = executors.remove(context.getParagraphId());
+    DefaultExecutor executor = executorMap.remove(context.getParagraphId());
     if (executor != null) {
       try {
         executor.getWatchdog().destroyProcess();
@@ -183,6 +227,10 @@ public class ShellInterpreter extends KerberosInterpreter {
     return false;
   }
 
+  public ConcurrentHashMap<String, DefaultExecutor> getExecutorMap() {
+    return executorMap;
+  }
+
   public void createSecureConfiguration() throws InterpreterException {
     Properties properties = getProperties();
     CommandLine cmdLine = CommandLine.parse(shell);
diff --git a/shell/src/main/resources/interpreter-setting.json 
b/shell/src/main/resources/interpreter-setting.json
index 57b6fa1..7bbed28 100644
--- a/shell/src/main/resources/interpreter-setting.json
+++ b/shell/src/main/resources/interpreter-setting.json
@@ -12,6 +12,13 @@
         "description": "Shell command time out in millisecs. Default = 60000",
         "type": "number"
       },
+      "shell.command.timeout.check.interval": {
+        "envName": "",
+        "propertyName": "shell.command.timeout.check.interval",
+        "defaultValue": "60000",
+        "description": "Shell command output check interval in millisecs. 
Default = 10000",
+        "type": "number"
+      },
       "shell.working.directory.user.home": {
         "envName": "SHELL_WORKING_DIRECTORY_USER_HOME",
         "propertyName": "shell.working.directory.user.home",
diff --git 
a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java 
b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
index 78efa1d..ddaf0db 100644
--- a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
+++ b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,10 +41,12 @@ public class ShellInterpreterTest {
   @Before
   public void setUp() throws Exception {
     Properties p = new Properties();
-    p.setProperty("shell.command.timeout.millisecs", "2000");
+    p.setProperty("shell.command.timeout.millisecs", "5000");
+    p.setProperty("shell.command.timeout.check.interval", "1000");
     shell = new ShellInterpreter(p);
-
-    context = 
InterpreterContext.builder().setParagraphId("paragraphId").build();
+    context = InterpreterContext.builder()
+            .setInterpreterOut(new InterpreterOutput())
+            .setParagraphId("paragraphId").build();
     shell.open();
   }
 
@@ -59,10 +62,10 @@ public class ShellInterpreterTest {
       result = shell.interpret("ls", context);
     }
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    assertTrue(shell.executors.isEmpty());
+    assertTrue(shell.getExecutorMap().isEmpty());
     // it should be fine to cancel a statement that has been completed.
     shell.cancel(context);
-    assertTrue(shell.executors.isEmpty());
+    assertTrue(shell.getExecutorMap().isEmpty());
   }
 
   @Test
@@ -73,18 +76,30 @@ public class ShellInterpreterTest {
       result = shell.interpret("invalid_command\nls", context);
     }
     assertEquals(Code.SUCCESS, result.code());
-    assertTrue(shell.executors.isEmpty());
+    assertTrue(shell.getExecutorMap().isEmpty());
   }
 
   @Test
   public void testShellTimeout() throws InterpreterException {
     if (System.getProperty("os.name").startsWith("Windows")) {
-      result = shell.interpret("timeout 4", context);
+      result = shell.interpret("timeout 8", context);
     } else {
-      result = shell.interpret("sleep 4", context);
+      result = shell.interpret("sleep 8", context);
     }
-
+    // exit shell process because no output is produced during the timeout 
threshold
     assertEquals(Code.INCOMPLETE, result.code());
     assertTrue(result.message().get(0).getData().contains("Paragraph received 
a SIGTERM"));
   }
+
+  @Test
+  public void testShellTimeout2() throws InterpreterException {
+    context = InterpreterContext.builder()
+            .setParagraphId("paragraphId")
+            .setInterpreterOut(new InterpreterOutput())
+            .build();
+    result = shell.interpret("for i in {1..10}\ndo\n\tsleep 1\n\techo 
$i\ndone", context);
+    // won't exit shell because the continues output is produced
+    assertEquals(Code.SUCCESS, result.code());
+    assertEquals("1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n", context.out.toString());
+  }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index 4462635..3c20c12 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -36,7 +36,13 @@ import java.util.List;
  * in addition to InterpreterResult which used to return from 
Interpreter.interpret().
  */
 public class InterpreterOutput extends OutputStream {
-  Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InterpreterOutput.class);
+
+  // change static var to set interpreter output limit
+  // limit will be applied to all InterpreterOutput object.
+  // so we can expect the consistent behavior
+  public static int LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+
   private static final int NEW_LINE_CHAR = '\n';
   private static final int LINE_FEED_CHAR = '\r';
 
@@ -56,10 +62,7 @@ public class InterpreterOutput extends OutputStream {
   // so just refresh all output for streaming application, such as flink 
streaming sql output.
   private boolean enableTableAppend = false;
 
-  // change static var to set interpreter output limit
-  // limit will be applied to all InterpreterOutput object.
-  // so we can expect the consistent behavior
-  public static int LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+  private long lastWriteTimestamp = System.currentTimeMillis();
 
   public InterpreterOutput() {
     changeListener = null;
@@ -177,7 +180,7 @@ public class InterpreterOutput extends OutputStream {
         try {
           out.close();
         } catch (IOException e) {
-          logger.error(e.getMessage(), e);
+          LOGGER.error(e.getMessage(), e);
         }
       }
 
@@ -212,6 +215,7 @@ public class InterpreterOutput extends OutputStream {
       return;
     }
 
+    this.lastWriteTimestamp = System.currentTimeMillis();
     synchronized (resultMessageOutputs) {
       currentOut = getCurrentOutput();
 
@@ -407,4 +411,8 @@ public class InterpreterOutput extends OutputStream {
       }
     }
   }
+
+  public long getLastWriteTimestamp() {
+    return lastWriteTimestamp;
+  }
 }

Reply via email to