Repository: zeppelin Updated Branches: refs/heads/master 2d60d0b67 -> 7db9ab472
[ZEPPELIN-2995] "auto-restart interpreter on cron execution" should restart interpreter to specific note, not all interpreters ### What is this PR for? Make "auto-restart interpreter on cron execution" restart the interpreters which are specific to the note, not all interpreters. This issue was reported by https://github.com/apache/zeppelin/pull/1302#issuecomment-336521420. ### What type of PR is it? [Bug Fix] ### Todos ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2995 ### How should this be tested? * Tested Manually. * I confirmed that the "auto-restart interpreter on cron execution" feature restarted only the interpreters which were specific to the notebook. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No. * Is there breaking changes for older versions? No. * Does this needs documentation? No. Author: Keiji Yoshida <[email protected]> Closes #2681 from kjmrknsn/ZEPPELIN-2995 and squashes the following commits: 43765a5 [Keiji Yoshida] [ZEPPELIN-2995] "auto-restart interpreter on cron execution" should restart interpreter to specific note, not all interpreters Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/7db9ab47 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/7db9ab47 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/7db9ab47 Branch: refs/heads/master Commit: 7db9ab47224d29c5f53b81bb65383c3581f8817f Parents: 2d60d0b Author: Keiji Yoshida <[email protected]> Authored: Mon Nov 20 22:54:39 2017 +0900 Committer: Lee moon soo <[email protected]> Committed: Tue Nov 21 20:37:47 2017 -0800 ---------------------------------------------------------------------- .../org/apache/zeppelin/notebook/Notebook.java | 11 ++- .../apache/zeppelin/notebook/NotebookTest.java | 82 ++++++++++++++++++++ 2 files changed, 90 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db9ab47/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 3baf4f1..8de981e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -901,10 +901,14 @@ public class Notebook implements NoteEventListener { } boolean releaseResource = false; + String cronExecutingUser = null; try { Map<String, Object> config = note.getConfig(); - if (config != null && config.containsKey("releaseresource")) { - releaseResource = (boolean) note.getConfig().get("releaseresource"); + if (config != null) { + if (config.containsKey("releaseresource")) { + releaseResource = (boolean) config.get("releaseresource"); + } + cronExecutingUser = (String) config.get("cronExecutingUser"); } } catch (ClassCastException e) { logger.error(e.getMessage(), e); @@ -913,7 +917,8 @@ public class Notebook implements NoteEventListener { for (InterpreterSetting setting : notebook.getInterpreterSettingManager() .getInterpreterSettings(note.getId())) { try { - notebook.getInterpreterSettingManager().restart(setting.getId()); + notebook.getInterpreterSettingManager().restart(setting.getId(), noteId, + cronExecutingUser != null ? cronExecutingUser : "anonymous"); } catch (InterpreterException e) { logger.error("Fail to restart interpreter: " + setting.getId(), e); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db9ab47/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 511b4e5..ba9e177 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -451,6 +451,88 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener } @Test + public void testCronWithReleaseResourceClosesOnlySpecificInterpreters() + throws IOException, InterruptedException { + // create a cron scheduled note. + Note cronNote = notebook.createNote(anonymous); + interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), cronNote.getId(), + Arrays.asList(interpreterSettingManager.getInterpreterSettingByName("mock1").getId())); + cronNote.setConfig(new HashMap() { + { + put("cron", "1/5 * * * * ?"); + put("cronExecutingUser", anonymous.getUser()); + put("releaseresource", true); + } + }); + RemoteInterpreter cronNoteInterpreter = + (RemoteInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), + cronNote.getId(), "mock1"); + + // create a paragraph of the cron scheduled note. + Paragraph cronNoteParagraph = cronNote.addNewParagraph(AuthenticationInfo.ANONYMOUS); + cronNoteParagraph.setConfig(new HashMap() { + { put("enabled", true); } + }); + cronNoteParagraph.setText("%mock1 sleep 1000"); + + // create another note + Note anotherNote = notebook.createNote(anonymous); + interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), anotherNote.getId(), + Arrays.asList(interpreterSettingManager.getInterpreterSettingByName("mock2").getId())); + RemoteInterpreter anotherNoteInterpreter = + (RemoteInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), + anotherNote.getId(), "mock2"); + + // create a paragraph of another note + Paragraph anotherNoteParagraph = anotherNote.addNewParagraph(AuthenticationInfo.ANONYMOUS); + anotherNoteParagraph.setConfig(new HashMap() { + { put("enabled", true); } + }); + anotherNoteParagraph.setText("%mock2 echo 1"); + + // run the paragraph of another note + anotherNote.run(anotherNoteParagraph.getId()); + + // wait until anotherNoteInterpreter is opened + while (!anotherNoteInterpreter.isOpened()) { + Thread.yield(); + } + + // refresh the cron schedule + notebook.refreshCron(cronNote.getId()); + + // wait until cronNoteInterpreter is opened + while (!cronNoteInterpreter.isOpened()) { + Thread.yield(); + } + + // wait until cronNoteInterpreter is closed + while (cronNoteInterpreter.isOpened()) { + Thread.yield(); + } + + // wait for a few seconds + Thread.sleep(5 * 1000); + + // test that anotherNoteInterpreter is still opened + assertTrue(anotherNoteInterpreter.isOpened()); + + // remove cron scheduler + cronNote.setConfig(new HashMap() { + { + put("cron", null); + put("cronExecutingUser", null); + put("releaseresource", null); + } + }); + notebook.refreshCron(cronNote.getId()); + + // remove notebooks + notebook.removeNote(cronNote.getId(), anonymous); + notebook.removeNote(anotherNote.getId(), anonymous); + } + + @Test public void testExportAndImportNote() throws IOException, CloneNotSupportedException, InterruptedException, InterpreterException, SchedulerException, RepositoryException { Note note = notebook.createNote(anonymous);
