Repository: oozie Updated Branches: refs/heads/master cc4b4398f -> c707867b7
OOZIE-1797 Workflow rerun command should use existing workflow properties (puru via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c707867b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c707867b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c707867b Branch: refs/heads/master Commit: c707867b75319cf2a3209ed7f664e78fde75541d Parents: cc4b439 Author: Rohini Palaniswamy <[email protected]> Authored: Mon Apr 21 16:57:05 2014 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Mon Apr 21 16:57:05 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/oozie/cli/OozieCLI.java | 37 ++++++- .../main/java/org/apache/oozie/DagEngine.java | 18 +++- .../apache/oozie/servlet/BaseJobServlet.java | 6 +- .../oozie/command/wf/TestReRunXCommand.java | 101 ++++++++++++++++++- .../apache/oozie/servlet/TestV0JobServlet.java | 9 -- docs/src/site/twiki/DG_WorkflowReRun.twiki | 2 + release-log.txt | 1 + 7 files changed, 152 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java index c31c8ee..d964889 100644 --- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java +++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java @@ -707,12 +707,12 @@ public class OozieCLI { } private Properties getConfiguration(OozieClient wc, CommandLine commandLine) throws IOException { + if (!isConfigurationSpecified(wc, commandLine)) { + throw new IOException("configuration is not specified"); + } Properties conf = wc.createConfiguration(); String configFile = commandLine.getOptionValue(CONFIG_OPTION); - if (configFile == null) { - throw new IOException("configuration file not specified"); - } - else { + if (configFile != null) { File file = new File(configFile); if (!file.exists()) { throw new IOException("configuration file [" + configFile + "] not found"); @@ -735,6 +735,28 @@ public class OozieCLI { } /** + * Check if configuration has specified + * @param wc + * @param commandLine + * @return + * @throws IOException + */ + private boolean isConfigurationSpecified(OozieClient wc, CommandLine commandLine) throws IOException { + boolean isConf = false; + String configFile = commandLine.getOptionValue(CONFIG_OPTION); + if (configFile == null) { + isConf = false; + } + else { + isConf = new File(configFile).exists(); + } + if (commandLine.hasOption("D")) { + isConf = true; + } + return isConf; + } + + /** * @param commandLine command line string. * @return change value specified by -value. * @throws OozieCLIException @@ -904,7 +926,12 @@ public class OozieCLI { } else if (options.contains(RERUN_OPTION)) { if (commandLine.getOptionValue(RERUN_OPTION).contains("-W")) { - wc.reRun(commandLine.getOptionValue(RERUN_OPTION), getConfiguration(wc, commandLine)); + if (isConfigurationSpecified(wc, commandLine)) { + wc.reRun(commandLine.getOptionValue(RERUN_OPTION), getConfiguration(wc, commandLine)); + } + else { + wc.reRun(commandLine.getOptionValue(RERUN_OPTION), new Properties()); + } } else if (commandLine.getOptionValue(RERUN_OPTION).contains("-B")) { String bundleJobId = commandLine.getOptionValue(RERUN_OPTION); http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/main/java/org/apache/oozie/DagEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java index 7d316b8..cad5ddd 100644 --- a/core/src/main/java/org/apache/oozie/DagEngine.java +++ b/core/src/main/java/org/apache/oozie/DagEngine.java @@ -42,13 +42,18 @@ import org.apache.oozie.command.wf.SubmitSqoopXCommand; import org.apache.oozie.command.wf.SubmitXCommand; import org.apache.oozie.command.wf.SuspendXCommand; import org.apache.oozie.command.wf.WorkflowActionInfoXCommand; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.service.Services; import org.apache.oozie.service.CallableQueueService; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XCallable; +import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; import org.apache.oozie.service.XLogStreamingService; +import java.io.StringReader; import java.io.Writer; import java.util.Date; import java.util.List; @@ -279,13 +284,22 @@ public class DagEngine extends BaseEngine { @Override public void reRun(String jobId, Configuration conf) throws DagEngineException { try { - validateReRunConfiguration(conf); - new ReRunXCommand(jobId, conf).call(); + WorkflowJobBean wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); + Configuration wfConf = new XConfiguration(new StringReader(wfBean.getConf())); + XConfiguration.copy(conf, wfConf); + validateReRunConfiguration(wfConf); + new ReRunXCommand(jobId, wfConf).call(); start(jobId); } catch (CommandException ex) { throw new DagEngineException(ex); } + catch (JPAExecutorException ex) { + throw new DagEngineException(ex); + } + catch (IOException ex) { + throw new DagEngineException(ErrorCode.E0803, ex.getMessage()); + } } private void validateReRunConfiguration(Configuration conf) throws DagEngineException { http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java index 6b82d7b..8941a02 100644 --- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java @@ -117,8 +117,10 @@ public abstract class BaseJobServlet extends JsonRestServlet { if (!requestUser.equals(UNDEF)) { conf.set(OozieClient.USER_NAME, requestUser); } - BaseJobServlet.checkAuthorizationForApp(conf); - JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); + if (conf.get(OozieClient.APP_PATH) != null) { + BaseJobServlet.checkAuthorizationForApp(conf); + JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); + } reRunJob(request, response, conf); startCron(); response.setStatus(HttpServletResponse.SC_OK); http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java index e97fec3..1688dc9 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java @@ -17,25 +17,33 @@ */ package org.apache.oozie.command.wf; +import java.util.Date; import java.util.Properties; import java.io.File; -import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.Reader; import java.io.Writer; - import org.apache.hadoop.fs.Path; import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; -import org.apache.oozie.test.XFsTestCase; +import org.apache.oozie.command.coord.CoordActionStartXCommand; +import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.IOUtils; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogService; -public class TestReRunXCommand extends XFsTestCase { +public class TestReRunXCommand extends XDataTestCase { @Override protected void setUp() throws Exception { super.setUp(); @@ -227,4 +235,89 @@ public class TestReRunXCommand extends XFsTestCase { assertEquals(WorkflowJob.Status.SUCCEEDED, wfClient.getJobInfo(jobId1).getStatus()); assertEquals("wf_test-feed_test", wfClient.getJobInfo(jobId1).getAppName()); } + + //rerun should use existing wf conf + public void testRerunWithExistingConf() throws IOException, OozieClientException { + Reader reader = IOUtils.getResourceAsReader("rerun-wf.xml", -1); + Writer writer = new FileWriter(new File(getTestCaseDir(), "workflow.xml")); + IOUtils.copyCharStream(reader, writer); + Path path = getFsTestCaseDir(); + getFileSystem().create(new Path(path, "p2")); + final OozieClient wfClient = LocalOozie.getClient(); + final Properties conf = wfClient.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, getTestCaseFileUri("workflow.xml")); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + conf.setProperty("nnbase", path.toString()); + conf.setProperty("base", path.toUri().getPath()); + + Properties newConf = wfClient.createConfiguration(); + newConf.setProperty("base", path.toUri().getPath()); + final String jobId = wfClient.submit(conf); + wfClient.start(jobId); + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.KILLED; + } + }); + assertEquals(WorkflowJob.Status.KILLED, wfClient.getJobInfo(jobId).getStatus()); + try { + wfClient.reRun(jobId, newConf); + } + catch (OozieClientException e) { + assertTrue(e.getCause().getMessage().contains(ErrorCode.E0401.toString())); + } + newConf = wfClient.createConfiguration(); + // Skip a non-executed node + getFileSystem().delete(new Path(path, "p2"), true); + newConf.setProperty(OozieClient.RERUN_SKIP_NODES, "fs1"); + wfClient.reRun(jobId, newConf); + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.SUCCEEDED; + } + }); + assertEquals(WorkflowJob.Status.SUCCEEDED, wfClient.getJobInfo(jobId).getStatus()); + } + + //rerun should use existing coord conf + public void testRerunWithExistingCoodConf() throws Exception { + final OozieClient wfClient = LocalOozie.getClient(); + + Date start = DateUtils.parseDateOozieTZ("2009-12-15T01:00Z"); + Date end = DateUtils.parseDateOozieTZ("2009-12-16T01:00Z"); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, + 1); + + CoordinatorActionBean action = addRecordToCoordActionTable(coordJob.getId(), 1, + CoordinatorAction.Status.SUBMITTED, "coord-action-start-escape-strings.xml", 0); + + String actionId = action.getId(); + new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "myjob").call(); + + final JPAService jpaService = Services.get().get(JPAService.class); + action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); + + if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) { + fail("CoordActionStartCommand didn't work because the status for action id" + actionId + " is :" + + action.getStatus() + " expected to be NOT SUBMITTED (i.e. RUNNING)"); + } + final String wfId = action.getExternalId(); + wfClient.kill(wfId); + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return wfClient.getJobInfo(wfId).getStatus() == WorkflowJob.Status.KILLED; + } + }); + Properties newConf = wfClient.createConfiguration(); + newConf.setProperty(OozieClient.RERUN_FAIL_NODES, "true"); + wfClient.reRun(wfId, newConf); + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return wfClient.getJobInfo(wfId).getStatus() == WorkflowJob.Status.SUCCEEDED; + } + }); + assertEquals(WorkflowJob.Status.SUCCEEDED, wfClient.getJobInfo(wfId).getStatus()); + + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java b/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java index cdd4903..8b4026a 100644 --- a/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java +++ b/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java @@ -124,15 +124,6 @@ public class TestV0JobServlet extends DagServletTestCase { _testAction(RestConstants.JOB_ACTION_RERUN, conf); } - public void testInvalidReRunConfigurations() throws Exception { - Configuration conf = new XConfiguration(); - Path appPath = new Path(getFsTestCaseDir(), "app"); - getFileSystem().mkdirs(appPath); - getFileSystem().create(new Path(appPath, "workflow.xml")).close(); - conf.set(OozieClient.APP_PATH, appPath.toString()); - _testAction(RestConstants.JOB_ACTION_RERUN, conf); - } - private void _testNonJsonResponses(final String show, final String contentType, final String response) throws Exception { runTest("/v0/job/*", V0JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() { http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/docs/src/site/twiki/DG_WorkflowReRun.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_WorkflowReRun.twiki b/docs/src/site/twiki/DG_WorkflowReRun.twiki index 94084fc..e6cca80 100644 --- a/docs/src/site/twiki/DG_WorkflowReRun.twiki +++ b/docs/src/site/twiki/DG_WorkflowReRun.twiki @@ -25,6 +25,8 @@ ---++ ReRun * Reloads the configs. + * If no configuration is passed, existing coordinator/workflow configuration will be used. If configuration is passed then, it will be merged with existing workflow configuration. Input configuration will take the precedence. + * Currently there is no way to remove an existing configuration but only override by passing a different value in the input configuration. * Creates a new Workflow Instance with the same wfId. * Deletes the actions that are not skipped from the DB and copies data from old Workflow Instance to new one for skipped actions. * Action handler will skip the nodes given in the config with the same exit transition as before. http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 1a11559..d2b7e44 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1797 Workflow rerun command should use existing workflow properties (puru via rohini) OOZIE-1769 An option to update coord properties/definition (puru via rohini) OOZIE-1796 Job status should not transition from KILLED (puru via rohini) OOZIE-1781 UI - Last Modified time is not displayed for coord action in coord job info grid (puru via mona)
