Repository: oozie Updated Branches: refs/heads/master c69d98c91 -> 870e47ccc
OOZIE-1391 Sub wf suspend doesn't update parent wf (jaydeepvishwakarma via shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/870e47cc Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/870e47cc Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/870e47cc Branch: refs/heads/master Commit: 870e47ccc0f28f08f2507ed17ed5e8fc6d16d0c9 Parents: c69d98c Author: Shwetha GS <[email protected]> Authored: Tue Oct 14 11:24:32 2014 +0530 Committer: Shwetha GS <[email protected]> Committed: Tue Oct 14 11:24:32 2014 +0530 ---------------------------------------------------------------------- .../oozie/command/wf/SuspendXCommand.java | 23 +++- .../oozie/action/oozie/JavaSleepAction.java | 25 ++++ .../oozie/TestSubWorkflowActionExecutor.java | 120 +++++++++++++++++-- release-log.txt | 1 + 4 files changed, 152 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/870e47cc/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java index 5ba69f8..199af36 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java @@ -18,10 +18,6 @@ package org.apache.oozie.command.wf; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - import org.apache.oozie.ErrorCode; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; @@ -45,6 +41,10 @@ import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.workflow.WorkflowInstance; import org.apache.oozie.workflow.lite.LiteWorkflowInstance; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + public class SuspendXCommand extends WorkflowXCommand<Void> { private final String wfid; private WorkflowJobBean wfJobBean; @@ -71,6 +71,8 @@ public class SuspendXCommand extends WorkflowXCommand<Void> { this.wfJobBean)); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); queue(new NotificationXCommand(this.wfJobBean)); + //Calling suspend recursively to handle parent workflow + suspendParentWorkFlow(); } catch (WorkflowException e) { throw new CommandException(e); @@ -85,6 +87,19 @@ public class SuspendXCommand extends WorkflowXCommand<Void> { } /** + * It will suspend the parent workflow + * @throws CommandException + */ + private void suspendParentWorkFlow() throws CommandException { + if (this.wfJobBean.getParentId() != null && this.wfJobBean.getParentId().contains("-W")) { + new SuspendXCommand(this.wfJobBean.getParentId()).call(); + } else { + // update the action of the parent workflow if it is launched by coordinator + updateParentIfNecessary(wfJobBean); + } + } + + /** * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or * END_RETRY or END_MANUAL * http://git-wip-us.apache.org/repos/asf/oozie/blob/870e47cc/core/src/test/java/org/apache/oozie/action/oozie/JavaSleepAction.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/oozie/JavaSleepAction.java b/core/src/test/java/org/apache/oozie/action/oozie/JavaSleepAction.java new file mode 100644 index 0000000..77783aa --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/oozie/JavaSleepAction.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.oozie; + +public class JavaSleepAction { + public static void main(String[] args) throws InterruptedException { + Thread.sleep(20000); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/870e47cc/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java index 96f1376..0d7e926 100644 --- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java @@ -18,25 +18,25 @@ package org.apache.oozie.action.oozie; -import org.apache.oozie.action.hadoop.ActionExecutorTestCase; -import org.apache.oozie.WorkflowJobBean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.WorkflowAppService; -import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.action.hadoop.ActionExecutorTestCase; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import java.io.File; -import java.io.StringReader; -import java.io.Writer; -import java.io.OutputStreamWriter; -import java.util.Properties; +import org.apache.oozie.command.wf.SuspendXCommand; import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.XLogService; +import org.apache.oozie.util.XConfiguration; + +import java.io.*; +import java.net.URI; +import java.util.Properties; public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { private static final int JOB_TIMEOUT = 100 * 1000; @@ -416,6 +416,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { FileSystem fs = getFileSystem(); Writer writer = new OutputStreamWriter(fs.create(new Path(subWorkflowAppPath, "workflow.xml"))); // Infinitly recursive workflow + String appStr = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" + "<start to=\"subwf\"/>" + "<action name=\"subwf\">" + @@ -468,4 +469,97 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { LocalOozie.stop(); } } + + public void testSubWorkflowSuspend() throws Exception { + try { + Path subWorkflowAppPath = getFsTestCaseDir(); + FileSystem fs = getFileSystem(); + Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml"); + Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath)); + writer.write(getLazyWorkflow()); + writer.close(); + + String workflowUri = getTestCaseFileUri("workflow.xml"); + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" + + "<start to=\"subwf\"/>" + + "<action name=\"subwf\">" + + " <sub-workflow xmlns='uri:oozie:workflow:0.4'>" + + " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" + + " </sub-workflow>" + + " <ok to=\"end\"/>" + + " <error to=\"fail\"/>" + + "</action>" + + "<kill name=\"fail\">" + + " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + "</kill>" + + "<end name=\"end\"/>" + + "</workflow-app>"; + + writeToFile(appXml, workflowUri); + LocalOozie.start(); + final OozieClient wfClient = LocalOozie.getClient(); + Properties conf = wfClient.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, workflowUri); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + conf.setProperty("appName", "var-app-name"); + final String jobId = wfClient.submit(conf); + wfClient.start(jobId); + + waitFor(JOB_TIMEOUT, new Predicate() { + public boolean evaluate() throws Exception { + return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) && + (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING); + } + }); + WorkflowJob wf = wfClient.getJobInfo(jobId); + // Suspending subworkflow + new SuspendXCommand(wf.getActions().get(1).getExternalId()).call(); + // Check suspend for base workflow + assertEquals(WorkflowJob.Status.SUSPENDED, wfClient.getJobInfo(jobId).getStatus()); + //Check suspend for sub workflow + assertEquals(WorkflowJob.Status.SUSPENDED, wfClient.getJobInfo(wf.getActions().get(1).getExternalId()).getStatus()); + + } finally { + LocalOozie.stop(); + } + + } + + private void writeToFile(String appXml, String appPath) throws IOException { + // TODO Auto-generated method stub + File wf = new File(URI.create(appPath)); + PrintWriter out = null; + try { + out = new PrintWriter(new FileWriter(wf)); + out.println(appXml); + } + catch (IOException iex) { + throw iex; + } + finally { + if (out != null) { + out.close(); + } + } + } + + public String getLazyWorkflow() { + return "<workflow-app xmlns='uri:oozie:workflow:0.3' name='app'>" + + "<start to='java' />" + + " <action name='java'>" + + "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class>" + JavaSleepAction.class.getName() + "</main-class>" + + "<arg>exit0</arg>" + + "</java>" + + "<ok to='end' />" + + "<error to='fail' />" + + "</action>" + + "<kill name='fail'>" + + "<message>shell action fail, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + "</kill>" + + "<end name='end' />" + + "</workflow-app>"; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/870e47cc/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6d43c99..46a400d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1391 Sub wf suspend doesn't update parent wf (jaydeepvishwakarma via shwethags) OOZIE-2023 Job rerun can stuck in prep (puru) OOZIE-1940 StatusTransitService has race condition (puru) OOZIE-1696 Document how to get the action conf in the Java action (jrkinley via rkanter)
