Repository: oozie
Updated Branches:
  refs/heads/master cc4b4398f -> c707867b7


OOZIE-1797 Workflow rerun command should use existing workflow properties (puru 
via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c707867b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c707867b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c707867b

Branch: refs/heads/master
Commit: c707867b75319cf2a3209ed7f664e78fde75541d
Parents: cc4b439
Author: Rohini Palaniswamy <[email protected]>
Authored: Mon Apr 21 16:57:05 2014 -0700
Committer: Rohini Palaniswamy <[email protected]>
Committed: Mon Apr 21 16:57:05 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/cli/OozieCLI.java     |  37 ++++++-
 .../main/java/org/apache/oozie/DagEngine.java   |  18 +++-
 .../apache/oozie/servlet/BaseJobServlet.java    |   6 +-
 .../oozie/command/wf/TestReRunXCommand.java     | 101 ++++++++++++++++++-
 .../apache/oozie/servlet/TestV0JobServlet.java  |   9 --
 docs/src/site/twiki/DG_WorkflowReRun.twiki      |   2 +
 release-log.txt                                 |   1 +
 7 files changed, 152 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java 
b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
index c31c8ee..d964889 100644
--- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
+++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -707,12 +707,12 @@ public class OozieCLI {
     }
 
     private Properties getConfiguration(OozieClient wc, CommandLine 
commandLine) throws IOException {
+        if (!isConfigurationSpecified(wc, commandLine)) {
+            throw new IOException("configuration is not specified");
+        }
         Properties conf = wc.createConfiguration();
         String configFile = commandLine.getOptionValue(CONFIG_OPTION);
-        if (configFile == null) {
-            throw new IOException("configuration file not specified");
-        }
-        else {
+        if (configFile != null) {
             File file = new File(configFile);
             if (!file.exists()) {
                 throw new IOException("configuration file [" + configFile + "] 
not found");
@@ -735,6 +735,28 @@ public class OozieCLI {
     }
 
     /**
+     * Check if configuration has specified
+     * @param wc
+     * @param commandLine
+     * @return
+     * @throws IOException
+     */
+    private boolean isConfigurationSpecified(OozieClient wc, CommandLine 
commandLine) throws IOException {
+        boolean isConf = false;
+        String configFile = commandLine.getOptionValue(CONFIG_OPTION);
+        if (configFile == null) {
+            isConf = false;
+        }
+        else {
+            isConf = new File(configFile).exists();
+        }
+        if (commandLine.hasOption("D")) {
+            isConf = true;
+        }
+        return isConf;
+    }
+
+    /**
      * @param commandLine command line string.
      * @return change value specified by -value.
      * @throws OozieCLIException
@@ -904,7 +926,12 @@ public class OozieCLI {
             }
             else if (options.contains(RERUN_OPTION)) {
                 if (commandLine.getOptionValue(RERUN_OPTION).contains("-W")) {
-                    wc.reRun(commandLine.getOptionValue(RERUN_OPTION), 
getConfiguration(wc, commandLine));
+                    if (isConfigurationSpecified(wc, commandLine)) {
+                        wc.reRun(commandLine.getOptionValue(RERUN_OPTION), 
getConfiguration(wc, commandLine));
+                    }
+                    else {
+                        wc.reRun(commandLine.getOptionValue(RERUN_OPTION), new 
Properties());
+                    }
                 }
                 else if 
(commandLine.getOptionValue(RERUN_OPTION).contains("-B")) {
                     String bundleJobId = 
commandLine.getOptionValue(RERUN_OPTION);

http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java 
b/core/src/main/java/org/apache/oozie/DagEngine.java
index 7d316b8..cad5ddd 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -42,13 +42,18 @@ import org.apache.oozie.command.wf.SubmitSqoopXCommand;
 import org.apache.oozie.command.wf.SubmitXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
 import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XCallable;
+import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.service.XLogStreamingService;
 
+import java.io.StringReader;
 import java.io.Writer;
 import java.util.Date;
 import java.util.List;
@@ -279,13 +284,22 @@ public class DagEngine extends BaseEngine {
     @Override
     public void reRun(String jobId, Configuration conf) throws 
DagEngineException {
         try {
-            validateReRunConfiguration(conf);
-            new ReRunXCommand(jobId, conf).call();
+            WorkflowJobBean wfBean = 
WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, 
jobId);
+            Configuration wfConf = new XConfiguration(new 
StringReader(wfBean.getConf()));
+            XConfiguration.copy(conf, wfConf);
+            validateReRunConfiguration(wfConf);
+            new ReRunXCommand(jobId, wfConf).call();
             start(jobId);
         }
         catch (CommandException ex) {
             throw new DagEngineException(ex);
         }
+        catch (JPAExecutorException ex) {
+            throw new DagEngineException(ex);
+        }
+        catch (IOException ex) {
+            throw new DagEngineException(ErrorCode.E0803, ex.getMessage());
+        }
     }
 
     private void validateReRunConfiguration(Configuration conf) throws 
DagEngineException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java 
b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
index 6b82d7b..8941a02 100644
--- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
@@ -117,8 +117,10 @@ public abstract class BaseJobServlet extends 
JsonRestServlet {
             if (!requestUser.equals(UNDEF)) {
                 conf.set(OozieClient.USER_NAME, requestUser);
             }
-            BaseJobServlet.checkAuthorizationForApp(conf);
-            JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), 
conf.get(OozieClient.GROUP_NAME), conf);
+            if (conf.get(OozieClient.APP_PATH) != null) {
+                BaseJobServlet.checkAuthorizationForApp(conf);
+                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), 
conf.get(OozieClient.GROUP_NAME), conf);
+            }
             reRunJob(request, response, conf);
             startCron();
             response.setStatus(HttpServletResponse.SC_OK);

http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
index e97fec3..1688dc9 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
@@ -17,25 +17,33 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.util.Date;
 import java.util.Properties;
 import java.io.File;
-import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.Writer;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
-import org.apache.oozie.test.XFsTestCase;
+import org.apache.oozie.command.coord.CoordActionStartXCommand;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogService;
 
-public class TestReRunXCommand extends XFsTestCase {
+public class TestReRunXCommand extends XDataTestCase {
     @Override
     protected void setUp() throws Exception {
         super.setUp();
@@ -227,4 +235,89 @@ public class TestReRunXCommand extends XFsTestCase {
         assertEquals(WorkflowJob.Status.SUCCEEDED, 
wfClient.getJobInfo(jobId1).getStatus());
         assertEquals("wf_test-feed_test", 
wfClient.getJobInfo(jobId1).getAppName());
     }
+
+    //rerun should use existing wf conf
+    public void testRerunWithExistingConf() throws IOException, 
OozieClientException {
+        Reader reader = IOUtils.getResourceAsReader("rerun-wf.xml", -1);
+        Writer writer = new FileWriter(new File(getTestCaseDir(), 
"workflow.xml"));
+        IOUtils.copyCharStream(reader, writer);
+        Path path = getFsTestCaseDir();
+        getFileSystem().create(new Path(path, "p2"));
+        final OozieClient wfClient = LocalOozie.getClient();
+        final Properties conf = wfClient.createConfiguration();
+        conf.setProperty(OozieClient.APP_PATH, 
getTestCaseFileUri("workflow.xml"));
+        conf.setProperty(OozieClient.USER_NAME, getTestUser());
+        conf.setProperty("nnbase", path.toString());
+        conf.setProperty("base", path.toUri().getPath());
+
+        Properties newConf = wfClient.createConfiguration();
+        newConf.setProperty("base", path.toUri().getPath());
+        final String jobId = wfClient.submit(conf);
+        wfClient.start(jobId);
+        waitFor(15 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(jobId).getStatus() == 
WorkflowJob.Status.KILLED;
+            }
+        });
+        assertEquals(WorkflowJob.Status.KILLED, 
wfClient.getJobInfo(jobId).getStatus());
+        try {
+            wfClient.reRun(jobId, newConf);
+        }
+        catch (OozieClientException e) {
+            
assertTrue(e.getCause().getMessage().contains(ErrorCode.E0401.toString()));
+        }
+        newConf = wfClient.createConfiguration();
+        // Skip a non-executed node
+        getFileSystem().delete(new Path(path, "p2"), true);
+        newConf.setProperty(OozieClient.RERUN_SKIP_NODES, "fs1");
+        wfClient.reRun(jobId, newConf);
+        waitFor(15 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(jobId).getStatus() == 
WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+        assertEquals(WorkflowJob.Status.SUCCEEDED, 
wfClient.getJobInfo(jobId).getStatus());
+    }
+
+    //rerun should use existing coord conf
+    public void testRerunWithExistingCoodConf() throws Exception {
+        final OozieClient wfClient = LocalOozie.getClient();
+
+        Date start = DateUtils.parseDateOozieTZ("2009-12-15T01:00Z");
+        Date end = DateUtils.parseDateOozieTZ("2009-12-16T01:00Z");
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, 
false,
+                1);
+
+        CoordinatorActionBean action = 
addRecordToCoordActionTable(coordJob.getId(), 1,
+                CoordinatorAction.Status.SUBMITTED, 
"coord-action-start-escape-strings.xml", 0);
+
+        String actionId = action.getId();
+        new CoordActionStartXCommand(actionId, getTestUser(), "myapp", 
"myjob").call();
+
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+
+        if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) {
+            fail("CoordActionStartCommand didn't work because the status for 
action id" + actionId + " is :"
+                    + action.getStatus() + " expected to be NOT SUBMITTED 
(i.e. RUNNING)");
+        }
+        final String wfId = action.getExternalId();
+        wfClient.kill(wfId);
+        waitFor(15 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(wfId).getStatus() == 
WorkflowJob.Status.KILLED;
+            }
+        });
+        Properties newConf = wfClient.createConfiguration();
+        newConf.setProperty(OozieClient.RERUN_FAIL_NODES, "true");
+        wfClient.reRun(wfId, newConf);
+        waitFor(15 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(wfId).getStatus() == 
WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+        assertEquals(WorkflowJob.Status.SUCCEEDED, 
wfClient.getJobInfo(wfId).getStatus());
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java 
b/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java
index cdd4903..8b4026a 100644
--- a/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java
+++ b/core/src/test/java/org/apache/oozie/servlet/TestV0JobServlet.java
@@ -124,15 +124,6 @@ public class TestV0JobServlet extends DagServletTestCase {
         _testAction(RestConstants.JOB_ACTION_RERUN, conf);
     }
 
-    public void testInvalidReRunConfigurations() throws Exception {
-        Configuration conf = new XConfiguration();
-        Path appPath = new Path(getFsTestCaseDir(), "app");
-        getFileSystem().mkdirs(appPath);
-        getFileSystem().create(new Path(appPath, "workflow.xml")).close();
-        conf.set(OozieClient.APP_PATH, appPath.toString());
-        _testAction(RestConstants.JOB_ACTION_RERUN, conf);
-    }
-
     private void _testNonJsonResponses(final String show, final String 
contentType, final String response)
             throws Exception {
         runTest("/v0/job/*", V0JobServlet.class, IS_SECURITY_ENABLED, new 
Callable<Void>() {

http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/docs/src/site/twiki/DG_WorkflowReRun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_WorkflowReRun.twiki 
b/docs/src/site/twiki/DG_WorkflowReRun.twiki
index 94084fc..e6cca80 100644
--- a/docs/src/site/twiki/DG_WorkflowReRun.twiki
+++ b/docs/src/site/twiki/DG_WorkflowReRun.twiki
@@ -25,6 +25,8 @@
 ---++ ReRun
 
    * Reloads the configs.
+   * If no configuration is passed, existing coordinator/workflow 
configuration will be used. If configuration is passed then, it will be merged 
with existing workflow configuration. Input configuration will take the 
precedence.
+   * Currently there is no way to remove an existing configuration but only 
override by passing a different value in the input configuration.
    * Creates a new Workflow Instance with the same wfId.
    * Deletes the actions that are not skipped from the DB and copies data from 
old Workflow Instance to new one for skipped actions.
    * Action handler will skip the nodes given in the config with the same exit 
transition as before.

http://git-wip-us.apache.org/repos/asf/oozie/blob/c707867b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1a11559..d2b7e44 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1797 Workflow rerun command should use existing workflow properties 
(puru via rohini)
 OOZIE-1769 An option to update coord properties/definition (puru via rohini)
 OOZIE-1796 Job status should not transition from KILLED (puru via rohini)
 OOZIE-1781 UI - Last Modified time is not displayed for coord action in coord 
job info grid (puru via mona)

Reply via email to