Repository: zeppelin
Updated Branches:
  refs/heads/master a4a86862e -> c934b3a47


[ZEPPELIN-1015] Cron job fails to run a paragraph when multiple type of 
interpreter is being used

### What is this PR for?
Cron job can fail when notebook uses multiple types of paragraphs.
Problem reported here 
http://apache-zeppelin-users-incubating-mailing-list.75479.x6.nabble.com/Cron-job-fails-to-run-a-paragraph-that-runs-correctly-manually-tt2265.html

### What type of PR is it?
Bug Fix

### Todos
* [x] - Fix
* [x] - Unittest

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1015

### How should this be tested?
Create two paragraphs in the notebook
First takes longer than second (last) paragraph.
First paragraph and second paragraph should use different interpreter.

If cron schedule the notebook with 'auto-restart interpreter on cron execution' 
checked.
Then interpreters will be restarted when second paragraph finished, but first 
paragraph is still running.
That may cause abort of first paragraph run.

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <[email protected]>

Closes #1019 from Leemoonsoo/ZEPPELIN-1015 and squashes the following commits:

ccee60a [Lee moon soo] update unittest
9ad4cbb [Lee moon soo] Fix problem by waiting all paragraphs in note be finished


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c934b3a4
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c934b3a4
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c934b3a4

Branch: refs/heads/master
Commit: c934b3a47c8147e58f90c0dc2bb7b24b6abc5974
Parents: a4a8686
Author: Lee moon soo <[email protected]>
Authored: Wed Jun 15 21:00:14 2016 -0700
Committer: Lee moon soo <[email protected]>
Committed: Sat Jun 18 10:17:49 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/zeppelin/notebook/Note.java | 17 ++++++-
 .../org/apache/zeppelin/notebook/Notebook.java  |  2 +-
 .../interpreter/mock/MockInterpreter1.java      | 16 +++++++
 .../interpreter/mock/MockInterpreter2.java      | 16 +++++++
 .../apache/zeppelin/notebook/NotebookTest.java  | 49 +++++++++++---------
 5 files changed, 77 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 01d625a..de19fe0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -428,6 +428,22 @@ public class Note implements Serializable, JobListener {
     }
   }
 
+  /**
+   * Check whether all paragraphs belongs to this note has terminated
+   * @return
+   */
+  public boolean isTerminated() {
+    synchronized (paragraphs) {
+      for (Paragraph p : paragraphs) {
+        if (!p.isTerminated()) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
   public List<InterpreterCompletion> completion(String paragraphId, String 
buffer, int cursor) {
     Paragraph p = getParagraph(paragraphId);
     p.setNoteReplLoader(replLoader);
@@ -561,5 +577,4 @@ public class Note implements Serializable, JobListener {
 
   @Override
   public void onProgressUpdate(Job job, int progress) {}
-
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 58a552d..30faeea 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -634,7 +634,7 @@ public class Notebook {
       Note note = notebook.getNote(noteId);
       note.runAll();
     
-      while (!note.getLastParagraph().isTerminated()) {
+      while (!note.isTerminated()) {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
index 1b0ec1a..794ab6c 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
@@ -35,13 +35,22 @@ public class MockInterpreter1 extends Interpreter{
        public MockInterpreter1(Properties property) {
                super(property);
        }
+       boolean open;
+
 
        @Override
        public void open() {
+               open = true;
        }
 
        @Override
        public void close() {
+               open = false;
+       }
+
+
+       public boolean isOpen() {
+               return open;
        }
 
        @Override
@@ -51,6 +60,13 @@ public class MockInterpreter1 extends Interpreter{
                if ("getId".equals(st)) {
                        // get unique id of this interpreter instance
                        result = new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
+               } else if (st.startsWith("sleep")) {
+                       try {
+                               Thread.sleep(Integer.parseInt(st.split(" 
")[1]));
+                       } catch (InterruptedException e) {
+                               // nothing to do
+                       }
+                       result = new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
                } else {
                        result = new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
                }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
index 0fe3a16..169bc3c 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
@@ -36,14 +36,23 @@ public class MockInterpreter2 extends Interpreter{
                super(property);
        }
 
+       boolean open;
+
        @Override
        public void open() {
+               open = true;
        }
 
        @Override
        public void close() {
+               open = false;
+       }
+
+       public boolean isOpen() {
+               return open;
        }
 
+
        @Override
        public InterpreterResult interpret(String st, InterpreterContext 
context) {
                InterpreterResult result;
@@ -51,6 +60,13 @@ public class MockInterpreter2 extends Interpreter{
                if ("getId".equals(st)) {
                        // get unique id of this interpreter instance
                        result = new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
+               } else if (st.startsWith("sleep")) {
+                       try {
+                               Thread.sleep(Integer.parseInt(st.split(" 
")[1]));
+                       } catch (InterruptedException e) {
+                               // nothing to do
+                       }
+                       result = new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
                } else {
                        result = new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
                }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 53749d1..1f7d5c0 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -17,11 +17,7 @@
 
 package org.apache.zeppelin.notebook;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
 import java.io.File;
@@ -284,36 +280,47 @@ public class NotebookTest implements JobListenerFactory{
     Paragraph p = note.addParagraph();
     Map config = new HashMap<String, Object>();
     p.setConfig(config);
-    p.setText("p1");
+    p.setText("sleep 1000");
+
+    Paragraph p2 = note.addParagraph();
+    p2.setConfig(config);
+    p2.setText("%mock2 sleep 500");
 
     // set cron scheduler, once a second
     config = note.getConfig();
     config.put("enabled", true);
-    config.put("cron", "* * * * * ?");
-    config.put("releaseresource", "true");
+    config.put("cron", "1/3 * * * * ?");
+    config.put("releaseresource", true);
     note.setConfig(config);
     notebook.refreshCron(note.id());
-    while (p.getStatus() != Status.FINISHED) {
-      Thread.sleep(100);
-    }
-    Date dateFinished = p.getDateFinished();
-    assertNotNull(dateFinished);
 
-    // restart interpreter
-    for (InterpreterSetting setting : 
note.getNoteReplLoader().getInterpreterSettings()) {
-      notebook.getInterpreterFactory().restart(setting.id());
+
+    MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter)
+        ((LazyOpenInterpreter) 
note.getNoteReplLoader().get("mock1")).getInnerInterpreter())
+        .getInnerInterpreter()));
+
+    MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter)
+        ((LazyOpenInterpreter) 
note.getNoteReplLoader().get("mock2")).getInnerInterpreter())
+        .getInnerInterpreter()));
+
+    // wait until interpreters are started
+    while (!mock1.isOpen() || !mock2.isOpen()) {
+      Thread.yield();
     }
 
-    Thread.sleep(1000);
-    while (p.getStatus() != Status.FINISHED) {
-      Thread.sleep(100);
+    // wait until interpreters are closed
+    while (mock1.isOpen() || mock2.isOpen()) {
+      Thread.yield();
     }
-    assertNotEquals(dateFinished, p.getDateFinished());
-    
+
     // remove cron scheduler.
     config.put("cron", null);
     note.setConfig(config);
     notebook.refreshCron(note.id());
+
+    // make sure all paragraph has been executed
+    assertNotNull(p.getDateFinished());
+    assertNotNull(p2.getDateFinished());
   }
 
   @Test

Reply via email to