Repository: zeppelin Updated Branches: refs/heads/master 684f48457 -> 5f3f28916
[ZEPPELIN-2016] add test case for cron job scheduler resource depletion ### What is this PR for? followup test case after https://issues.apache.org/jira/browse/ZEPPELIN-2009 ### What type of PR is it? Improvement | test ### Todos * [x] - add test ### What is the Jira issue? [ZEPPELIN-2016](https://issues.apache.org/jira/browse/ZEPPELIN-2016) ### How should this be tested? CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Khalid Huseynov <khalid...@gmail.com> Closes #1948 from khalidhuseynov/test/cron-scheduler and squashes the following commits: eb555fb [Khalid Huseynov] add complete cron scheduler test Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5f3f2891 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5f3f2891 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5f3f2891 Branch: refs/heads/master Commit: 5f3f28916793cf05b3a9c0e3195a42645e298fea Parents: 684f484 Author: Khalid Huseynov <khalid...@gmail.com> Authored: Wed Jan 25 16:41:24 2017 -0800 Committer: Lee moon soo <m...@apache.org> Committed: Sat Jan 28 18:28:18 2017 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/notebook/NotebookTest.java | 70 ++++++++++++++++++++ 1 file changed, 70 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5f3f2891/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index dd54258..4bc9459 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -25,8 +25,10 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.dep.DependencyResolver; @@ -359,6 +361,74 @@ public class NotebookTest implements JobListenerFactory{ } @Test + public void testSchedulePoolUsage() throws InterruptedException, IOException { + // create a note and a paragraph + Note note = notebook.createNote(anonymous); + factory.setInterpreters("user", note.getId(), factory.getDefaultInterpreterSettingList()); + Paragraph p = note.addParagraph(AuthenticationInfo.ANONYMOUS); + Map config = Maps.newHashMap(); + p.setConfig(config); + Date dateFinished = p.getDateFinished(); + String result = getResultString(p.getResult()); + assertEquals(result, StringUtils.EMPTY); + assertNull(dateFinished); + + // set cron scheduler, once a second + config = note.getConfig(); + config.put("enabled", true); + config.put("cron", "* * * * * ?"); + note.setConfig(config); + notebook.refreshCron(note.getId()); + + // run job maxExecutionCount times + int maxExecutionCount = 13; + int maxRetryCount = 4 * maxExecutionCount; + int executionCount = 0; + int retryCount = 0; + String resultTemplate = "%text repl1: p"; + p.setText("p" + executionCount); + + while (executionCount < maxExecutionCount) { + if (p.getDateFinished() != null && !p.getDateFinished().equals(dateFinished)) { + // paragraph has been executed + assertNotEquals(dateFinished, p.getDateFinished()); + assertNotEquals(result, getResultString(p.getResult())); + assertEquals(p.getResult().toString(), resultTemplate + executionCount); + assertEquals(p.getStatus(), Status.FINISHED); + executionCount++; + dateFinished = p.getDateFinished(); + result = getResultString(p.getResult()); + p.setText("p" + executionCount); + } + Thread.sleep(1100); + if (++retryCount > maxRetryCount) { + logger.error("Couldn't schedule {} number of note executions after {} retries", + maxExecutionCount, maxRetryCount); + fail(); + } + } + + // save results and update paragraph + dateFinished = p.getDateFinished(); + result = getResultString(p.getResult()); + p.setText("new text"); + // remove cron scheduler + config.put("cron", null); + note.setConfig(config); + notebook.refreshCron(note.getId()); + + Thread.sleep(1100); + + // ensure that hasn't been run again + assertEquals(dateFinished, p.getDateFinished()); + assertEquals(result, getResultString(p.getResult())); + } + + private String getResultString(InterpreterResult result) { + return result == null ? StringUtils.EMPTY : result.toString(); + } + + @Test public void testAutoRestartInterpreterAfterSchedule() throws InterruptedException, IOException{ // create a note and a paragraph Note note = notebook.createNote(anonymous);