Repository: zeppelin Updated Branches: refs/heads/master a4a86862e -> c934b3a47
[ZEPPELIN-1015] Cron job fails to run a paragraph when multiple type of interpreter is being used ### What is this PR for? Cron job can fail when notebook uses multiple types of paragraphs. Problem reported here http://apache-zeppelin-users-incubating-mailing-list.75479.x6.nabble.com/Cron-job-fails-to-run-a-paragraph-that-runs-correctly-manually-tt2265.html ### What type of PR is it? Bug Fix ### Todos * [x] - Fix * [x] - Unittest ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-1015 ### How should this be tested? Create two paragraphs in the notebook First takes longer than second (last) paragraph. First paragraph and second paragraph should use different interpreter. If cron schedule the notebook with 'auto-restart interpreter on cron execution' checked. Then interpreters will be restarted when second paragraph finished, but first paragraph is still running. That may cause abort of first paragraph run. ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Lee moon soo <[email protected]> Closes #1019 from Leemoonsoo/ZEPPELIN-1015 and squashes the following commits: ccee60a [Lee moon soo] update unittest 9ad4cbb [Lee moon soo] Fix problem by waiting all paragraphs in note be finished Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c934b3a4 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c934b3a4 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c934b3a4 Branch: refs/heads/master Commit: c934b3a47c8147e58f90c0dc2bb7b24b6abc5974 Parents: a4a8686 Author: Lee moon soo <[email protected]> Authored: Wed Jun 15 21:00:14 2016 -0700 Committer: Lee moon soo <[email protected]> Committed: Sat Jun 18 10:17:49 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/zeppelin/notebook/Note.java | 17 ++++++- .../org/apache/zeppelin/notebook/Notebook.java | 2 +- .../interpreter/mock/MockInterpreter1.java | 16 +++++++ .../interpreter/mock/MockInterpreter2.java | 16 +++++++ .../apache/zeppelin/notebook/NotebookTest.java | 49 +++++++++++--------- 5 files changed, 77 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 01d625a..de19fe0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -428,6 +428,22 @@ public class Note implements Serializable, JobListener { } } + /** + * Check whether all paragraphs belongs to this note has terminated + * @return + */ + public boolean isTerminated() { + synchronized (paragraphs) { + for (Paragraph p : paragraphs) { + if (!p.isTerminated()) { + return false; + } + } + } + + return true; + } + public List<InterpreterCompletion> completion(String paragraphId, String buffer, int cursor) { Paragraph p = getParagraph(paragraphId); p.setNoteReplLoader(replLoader); @@ -561,5 +577,4 @@ public class Note implements Serializable, JobListener { @Override public void onProgressUpdate(Job job, int progress) {} - } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 58a552d..30faeea 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -634,7 +634,7 @@ public class Notebook { Note note = notebook.getNote(noteId); note.runAll(); - while (!note.getLastParagraph().isTerminated()) { + while (!note.isTerminated()) { try { Thread.sleep(1000); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java index 1b0ec1a..794ab6c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java @@ -35,13 +35,22 @@ public class MockInterpreter1 extends Interpreter{ public MockInterpreter1(Properties property) { super(property); } + boolean open; + @Override public void open() { + open = true; } @Override public void close() { + open = false; + } + + + public boolean isOpen() { + return open; } @Override @@ -51,6 +60,13 @@ public class MockInterpreter1 extends Interpreter{ if ("getId".equals(st)) { // get unique id of this interpreter instance result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else if (st.startsWith("sleep")) { + try { + Thread.sleep(Integer.parseInt(st.split(" ")[1])); + } catch (InterruptedException e) { + // nothing to do + } + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); } else { result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java index 0fe3a16..169bc3c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java @@ -36,14 +36,23 @@ public class MockInterpreter2 extends Interpreter{ super(property); } + boolean open; + @Override public void open() { + open = true; } @Override public void close() { + open = false; + } + + public boolean isOpen() { + return open; } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { InterpreterResult result; @@ -51,6 +60,13 @@ public class MockInterpreter2 extends Interpreter{ if ("getId".equals(st)) { // get unique id of this interpreter instance result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else if (st.startsWith("sleep")) { + try { + Thread.sleep(Integer.parseInt(st.split(" ")[1])); + } catch (InterruptedException e) { + // nothing to do + } + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st); } else { result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c934b3a4/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 53749d1..1f7d5c0 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -17,11 +17,7 @@ package org.apache.zeppelin.notebook; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import java.io.File; @@ -284,36 +280,47 @@ public class NotebookTest implements JobListenerFactory{ Paragraph p = note.addParagraph(); Map config = new HashMap<String, Object>(); p.setConfig(config); - p.setText("p1"); + p.setText("sleep 1000"); + + Paragraph p2 = note.addParagraph(); + p2.setConfig(config); + p2.setText("%mock2 sleep 500"); // set cron scheduler, once a second config = note.getConfig(); config.put("enabled", true); - config.put("cron", "* * * * * ?"); - config.put("releaseresource", "true"); + config.put("cron", "1/3 * * * * ?"); + config.put("releaseresource", true); note.setConfig(config); notebook.refreshCron(note.id()); - while (p.getStatus() != Status.FINISHED) { - Thread.sleep(100); - } - Date dateFinished = p.getDateFinished(); - assertNotNull(dateFinished); - // restart interpreter - for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) { - notebook.getInterpreterFactory().restart(setting.id()); + + MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter) + ((LazyOpenInterpreter) note.getNoteReplLoader().get("mock1")).getInnerInterpreter()) + .getInnerInterpreter())); + + MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter) + ((LazyOpenInterpreter) note.getNoteReplLoader().get("mock2")).getInnerInterpreter()) + .getInnerInterpreter())); + + // wait until interpreters are started + while (!mock1.isOpen() || !mock2.isOpen()) { + Thread.yield(); } - Thread.sleep(1000); - while (p.getStatus() != Status.FINISHED) { - Thread.sleep(100); + // wait until interpreters are closed + while (mock1.isOpen() || mock2.isOpen()) { + Thread.yield(); } - assertNotEquals(dateFinished, p.getDateFinished()); - + // remove cron scheduler. config.put("cron", null); note.setConfig(config); notebook.refreshCron(note.id()); + + // make sure all paragraph has been executed + assertNotNull(p.getDateFinished()); + assertNotNull(p2.getDateFinished()); } @Test
