Repository: oozie Updated Branches: refs/heads/branch-4.2 ce3cc4b6c -> c0148e2d3
OOZIE-1993 Rerun fails during join in certain condition (shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c0148e2d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c0148e2d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c0148e2d Branch: refs/heads/branch-4.2 Commit: c0148e2d38718c2747f4cb3364d6eb51f7f88200 Parents: ce3cc4b Author: Shwetha GS <[email protected]> Authored: Wed May 20 12:01:44 2015 +0530 Committer: Shwetha GS <[email protected]> Committed: Wed May 20 12:01:44 2015 +0530 ---------------------------------------------------------------------- .../org/apache/oozie/WorkflowActionBean.java | 2 +- .../apache/oozie/command/wf/ReRunXCommand.java | 10 +- .../apache/oozie/command/wf/SignalXCommand.java | 1 + .../jpa/WorkflowActionQueryExecutor.java | 1 + .../oozie/service/LiteWorkflowStoreService.java | 5 +- .../org/apache/oozie/workflow/WorkflowLib.java | 3 +- .../workflow/lite/LiteWorkflowInstance.java | 190 ++++++------------- .../oozie/workflow/lite/LiteWorkflowLib.java | 4 +- core/src/main/resources/oozie-log4j.properties | 2 +- .../oozie/command/wf/TestReRunXCommand.java | 19 +- core/src/test/resources/rerun-wf-fork.xml | 14 +- release-log.txt | 1 + 12 files changed, 89 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/WorkflowActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java index 6289158..7f00278 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java @@ -68,7 +68,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_ACTION_END", query = "update WorkflowActionBean a set a.stats = :stats, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.retries = :retries, a.endTimestamp = :endTime, a.statusStr = :status, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.userRetryCount = :userRetryCount, a.externalStatus = :externalStatus where a.id = :id"), - @NamedQuery(name = "UPDATE_ACTION_PENDING", query = "update WorkflowActionBean a set a.pending = :pending, a.pendingAgeTimestamp = :pendingAge where a.id = :id"), + @NamedQuery(name = "UPDATE_ACTION_PENDING", query = "update WorkflowActionBean a set a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.executionPath = :executionPath where a.id = :id"), @NamedQuery(name = "UPDATE_ACTION_STATUS_PENDING", query = "update WorkflowActionBean a set a.statusStr = :status, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge where a.id = :id"), http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java index 6138278..bb6d4e7 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java @@ -178,16 +178,8 @@ public class ReRunXCommand extends WorkflowXCommand<Void> { // necessary to ensure propagation of Oozie properties to Hadoop calls downstream conf = ((XConfiguration) conf).resolve(); - // Prepare the action endtimes map - Map<String, Date> actionEndTimes = new HashMap<String, Date>(); - for (WorkflowActionBean action : actions) { - if (action.getEndTime() != null) { - actionEndTimes.put(action.getName(), action.getEndTime()); - } - } - try { - newWfInstance = workflowLib.createInstance(app, conf, jobId, actionEndTimes); + newWfInstance = workflowLib.createInstance(app, conf, jobId); } catch (WorkflowException e) { throw new CommandException(e); http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java index bccac51..1b4b0b6 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java @@ -350,6 +350,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { WorkflowActionBean oldAction = new WorkflowActionBean(); oldAction.setId(newAction.getId()); oldAction.setPending(); + oldAction.setExecutionPath(newAction.getExecutionPath()); updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING, oldAction)); queue(new SignalXCommand(jobId, oldAction.getId())); http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java index 0e99ae2..4dec9da 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java @@ -110,6 +110,7 @@ public class WorkflowActionQueryExecutor extends case UPDATE_ACTION_PENDING: query.setParameter("pending", actionBean.getPending()); query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); + query.setParameter("executionPath", actionBean.getExecutionPath()); query.setParameter("id", actionBean.getId()); break; case UPDATE_ACTION_STATUS_PENDING: http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java index d661d08..f1f26ec 100644 --- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java +++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java @@ -93,7 +93,6 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService { if (!skipAction) { String nodeConf = context.getNodeDef().getConf(); - String executionPath = context.getExecutionPath(); if (actionType == null) { try { @@ -107,12 +106,14 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService { } log.debug(" Creating action for node [{0}]", nodeName); action.setType(actionType); - action.setExecutionPath(executionPath); action.setConf(nodeConf); action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken()); action.setStatus(WorkflowAction.Status.PREP); action.setJobId(jobId); } + + String executionPath = context.getExecutionPath(); + action.setExecutionPath(executionPath); action.setCred(context.getNodeDef().getCred()); log.debug("Setting action for cred: '"+context.getNodeDef().getCred() + "', name: '"+ context.getNodeDef().getName() + "'"); http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java index ba5b6b1..ccbca62 100644 --- a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java +++ b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java @@ -58,11 +58,10 @@ public interface WorkflowLib { * @param app application to create a workflow instance of. * @param conf job configuration. * @param wfId Workflow ID. - * @param actionEndTimes A map of the actions to their endtimes; actions with no endtime should be omitted * @return the newly created workflow instance. * @throws WorkflowException thrown if the instance could not be created. */ - public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId, Map<String, Date> actionEndTimes) + public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId) throws WorkflowException; /** http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java index 919c95a..2b13e67 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java @@ -18,30 +18,26 @@ package org.apache.oozie.workflow.lite; -import org.apache.oozie.service.XLogService; -import org.apache.oozie.service.DagXLogInfoService; -import org.apache.oozie.client.OozieClient; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.service.DagXLogInfoService; +import org.apache.oozie.service.XLogService; +import org.apache.oozie.util.ParamChecker; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XLog; import org.apache.oozie.workflow.WorkflowApp; import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.workflow.WorkflowInstance; -import org.apache.oozie.util.ParamChecker; -import org.apache.oozie.util.XLog; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.ErrorCode; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.ByteArrayOutputStream; -import java.io.ByteArrayInputStream; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,9 +46,7 @@ import java.util.Map; public class LiteWorkflowInstance implements Writable, WorkflowInstance { private static final String TRANSITION_TO = "transition.to"; - private final Date FAR_INTO_THE_FUTURE = new Date(Long.MAX_VALUE); - - private XLog log; + private XLog log = XLog.getLog(getClass()); private static String PATH_SEPARATOR = "/"; private static String ROOT = PATH_SEPARATOR; @@ -161,7 +155,6 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>(); private Map<String, String> persistentVars = new HashMap<String, String>(); private Map<String, Object> transientVars = new HashMap<String, Object>(); - private ActionEndTimesComparator actionEndTimesComparator = null; protected LiteWorkflowInstance() { log = XLog.getLog(getClass()); @@ -176,11 +169,6 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { status = Status.PREP; } - public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId, Map<String, Date> actionEndTimes) { - this(def, conf, instanceId); - actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes); - } - public synchronized boolean start() throws WorkflowException { if (status != Status.PREP) { throw new WorkflowException(ErrorCode.E0719); @@ -196,15 +184,19 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { ParamChecker.notEmpty(executionPath, "executionPath"); ParamChecker.notNull(signalValue, "signalValue"); - log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue); + if (status != Status.RUNNING) { throw new WorkflowException(ErrorCode.E0716); } + NodeInstance nodeJob = executionPaths.get(executionPath); + log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}] for node [{2}]", executionPath, + signalValue, (nodeJob == null ? null : nodeJob.nodeName)); if (nodeJob == null) { status = Status.FAILED; log.error("invalid execution path [{0}]", executionPath); } + NodeDef nodeDef = null; if (!status.isEndState()) { nodeDef = def.getNode(nodeJob.nodeName); @@ -213,6 +205,7 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { log.error("invalid transition [{0}]", nodeJob.nodeName); } } + if (!status.isEndState()) { NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); boolean exiting = true; @@ -260,72 +253,49 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { if (context.status == Status.KILLED) { status = Status.KILLED; log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName); - } - else { - if (context.status == Status.FAILED) { - status = Status.FAILED; - log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName); - } - else { - if (context.status == Status.SUCCEEDED) { - status = Status.SUCCEEDED; - log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); - } -/* - else if (context.status == Status.SUSPENDED) { - status = Status.SUSPENDED; + } else if (context.status == Status.FAILED) { + status = Status.FAILED; + log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName); + } else if (context.status == Status.SUCCEEDED) { + status = Status.SUCCEEDED; log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); - } -*/ - else { - for (String fullTransition : fullTransitions) { - // this is the whole trick for forking, we need the - // executionpath and the transition - // in the case of no forking last element of - // executionpath is different from transition - // in the case of forking they are the same - - log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName, - fullTransition); - - String execPathFromTransition = getExecutionPath(fullTransition); - String transition = getTransitionNode(fullTransition); - def.validateTransition(nodeJob.nodeName, transition); - - NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition); - if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) { - // TODO explain this IF better - // If the WfJob is signaled with the parent - // execution executionPath again - // The Fork node will execute again.. and replace - // the Node WorkflowJobBean - // so this is required to prevent that.. - // Question : Should we throw an error in this case - // ?? - executionPaths.put(execPathFromTransition, new NodeInstance(transition)); - pathsToStart.add(execPathFromTransition); - } - - } + } else { + for (String fullTransition : fullTransitions) { + //this is the whole trick for forking, we need the executionpath and the transition. + //in case of no forking, last element of executionpath is different from transition. + //in case of forking, they are the same + + log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName, + fullTransition); + + String execPathFromTransition = getExecutionPath(fullTransition); + String transition = getTransitionNode(fullTransition); + def.validateTransition(nodeJob.nodeName, transition); + + NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition); + if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) { + // TODO explain this IF better + // If the WfJob is signaled with the parent + // execution executionPath again + // The Fork node will execute again.. and replace + // the Node WorkflowJobBean + // so this is required to prevent that.. + // Question : Should we throw an error in this case + // ?? + executionPaths.put(execPathFromTransition, new NodeInstance(transition)); + pathsToStart.add(execPathFromTransition); + } - // If we're doing a rerun, then we need to make sure to put the actions in pathToStart into the order - // that they ended in. Otherwise, it could result in an error later on in some edge cases. - // e.g. You have a fork with two nodes, A and B, that both succeeded, followed by a join and some more - // nodes, some of which failed. If you do the rerun, it will always signal A and then B, even if in the - // original run B signaled first and then A. By sorting this, we maintain the proper signal ordering. - if (actionEndTimesComparator != null && pathsToStart.size() > 1) { - Collections.sort(pathsToStart, actionEndTimesComparator); - } + } - // signal all new synch transitions - for (String pathToStart : pathsToStart) { - signal(pathToStart, "::synch::"); - } - } + // signal all new synch transitions + for (String pathToStart : pathsToStart) { + signal(pathToStart, "::synch::"); } } } } + if (status.isEndState()) { if (status == Status.FAILED) { List<String> failedNodes = terminateNodes(status); @@ -341,6 +311,7 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { } } } + return status.isEndState(); } @@ -608,14 +579,6 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { dOut.writeUTF(entry.getKey()); writeStringAsBytes(entry.getValue(), dOut); } - if (actionEndTimesComparator != null) { - Map<String, Date> actionEndTimes = actionEndTimesComparator.getActionEndTimes(); - dOut.writeInt(actionEndTimes.size()); - for (Map.Entry<String, Date> entry : actionEndTimes.entrySet()) { - dOut.writeUTF(entry.getKey()); - dOut.writeLong(entry.getValue().getTime()); - } - } } @Override @@ -647,21 +610,6 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { String vVal = readBytesAsString(dIn); persistentVars.put(vName, vVal); } - int numActionEndTimes = -1; - try { - numActionEndTimes = dIn.readInt(); - } catch (IOException ioe) { - // This means that there isn't an actionEndTimes, so just ignore - } - if (numActionEndTimes > 0) { - Map<String, Date> actionEndTimes = new HashMap<String, Date>(numActionEndTimes); - for (int x = 0; x < numActionEndTimes; x++) { - String name = dIn.readUTF(); - long endTime = dIn.readLong(); - actionEndTimes.put(name, new Date(endTime)); - } - actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes); - } refreshLog(); } @@ -716,34 +664,4 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { public int hashCode() { return instanceId.hashCode(); } - - private class ActionEndTimesComparator implements Comparator<String> { - - private final Map<String, Date> actionEndTimes; - - public ActionEndTimesComparator(Map<String, Date> actionEndTimes) { - this.actionEndTimes = actionEndTimes; - } - - @Override - public int compare(String node1, String node2) { - Date date1 = null; - Date date2 = null; - NodeInstance node1Instance = executionPaths.get(node1); - if (node1Instance != null) { - date1 = this.actionEndTimes.get(node1Instance.nodeName); - } - NodeInstance node2Instance = executionPaths.get(node2); - if (node2Instance != null) { - date2 = this.actionEndTimes.get(node2Instance.nodeName); - } - date1 = (date1 == null) ? FAR_INTO_THE_FUTURE : date1; - date2 = (date2 == null) ? FAR_INTO_THE_FUTURE : date2; - return date1.compareTo(date2); - } - - public Map<String, Date> getActionEndTimes() { - return actionEndTimes; - } - } } http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java index 706cc2a..23df086 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java @@ -66,10 +66,10 @@ public abstract class LiteWorkflowLib implements WorkflowLib { } @Override - public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId, Map<String, Date> actionEndTimes) + public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId) throws WorkflowException { ParamChecker.notNull(app, "app"); ParamChecker.notNull(wfId, "wfId"); - return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId, actionEndTimes); + return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/main/resources/oozie-log4j.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-log4j.properties b/core/src/main/resources/oozie-log4j.properties index 4b532ca..c86b301 100644 --- a/core/src/main/resources/oozie-log4j.properties +++ b/core/src/main/resources/oozie-log4j.properties @@ -47,7 +47,7 @@ log4j.logger.org.apache.oozie.command=DEBUG, test log4j.logger.org.apache.oozie.wf.service=INFO, test log4j.logger.org.apache.oozie.wf.servlet=INFO, test log4j.logger.org.apache.oozie.store=DEBUG, test -log4j.logger.org.apache.oozie.workflow=INFO, test +log4j.logger.org.apache.oozie.workflow=DEBUG, test log4j.logger.org.apache.oozie.service=DEBUG, test log4j.logger.org.apache.oozie.servlet=INFO, test log4j.logger.org.apache.oozie.sla=DEBUG, test http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java index ce32e51..c9bbd00 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java @@ -119,7 +119,7 @@ public class TestReRunXCommand extends XDataTestCase { * This tests a specific edge case where rerun can fail when there's a fork, the actions in the fork succeed, but an action * after the fork fails. Previously, the rerun would step through the forked actions in the order they were listed in the * fork action's XML; if they happened to finish in a different order, this would cause an error during rerun. This is fixed by - * enforcing the same order in LiteWorkflowInstance#signal, which this test verifies. + * using the new execution path for LiteWorkflowInstance#signal, which this test verifies. * * @throws Exception */ @@ -139,7 +139,7 @@ public class TestReRunXCommand extends XDataTestCase { conf.setProperty("jobTracker", getJobTrackerUri()); conf.setProperty(OozieClient.APP_PATH, getTestCaseFileUri("workflow.xml")); conf.setProperty(OozieClient.USER_NAME, getTestUser()); - conf.setProperty("cmd3", "echo1"); // expected to fail + conf.setProperty("cmd4", "echo1"); //expected to fail final String jobId1 = wfClient.submit(conf); wfClient.start(jobId1); @@ -149,17 +149,21 @@ public class TestReRunXCommand extends XDataTestCase { return wfClient.getJobInfo(jobId1).getStatus() == WorkflowJob.Status.KILLED; } }); + wfClient.kill(jobId1); + assertEquals(WorkflowJob.Status.KILLED, wfClient.getJobInfo(jobId1).getStatus()); + List<WorkflowAction> actions = wfClient.getJobInfo(jobId1).getActions(); assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus()); // fork assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus()); // sh1 assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus()); // sh2 - assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus()); // join - assertEquals(WorkflowAction.Status.ERROR, actions.get(5).getStatus()); // sh3 + assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus()); // sh3 + assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus()); // j + assertEquals(WorkflowAction.Status.ERROR, actions.get(6).getStatus()); // sh4 // rerun failed node, which is after the fork conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true"); - conf.setProperty("cmd3", "echo"); // expected to succeed + conf.setProperty("cmd4", "echo"); wfClient.reRun(jobId1, conf); waitFor(200 * 1000, new Predicate() { @@ -173,8 +177,9 @@ public class TestReRunXCommand extends XDataTestCase { assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus()); // fork assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus()); // sh1 assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus()); // sh2 - assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus()); // join - assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus()); // sh3 + assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus()); // sh3 + assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus()); // join + assertEquals(WorkflowAction.Status.OK, actions.get(6).getStatus()); // sh4 } /* http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/core/src/test/resources/rerun-wf-fork.xml ---------------------------------------------------------------------- diff --git a/core/src/test/resources/rerun-wf-fork.xml b/core/src/test/resources/rerun-wf-fork.xml index 8fa8f34..9172207 100644 --- a/core/src/test/resources/rerun-wf-fork.xml +++ b/core/src/test/resources/rerun-wf-fork.xml @@ -41,15 +41,23 @@ <shell xmlns="uri:oozie:shell-action:0.3"> <exec>echo</exec> </shell> + <ok to="sh3"/> + <error to="k"/> + </action> + + <action name="sh3"> + <shell xmlns="uri:oozie:shell-action:0.3"> + <exec>echo</exec> + </shell> <ok to="j"/> <error to="k"/> </action> - <join name="j" to="sh3"/> + <join name="j" to="sh4"/> - <action name="sh3"> + <action name="sh4"> <shell xmlns="uri:oozie:shell-action:0.3"> - <exec>${cmd3}</exec> + <exec>${cmd4}</exec> </shell> <ok to="end"/> <error to="k"/> http://git-wip-us.apache.org/repos/asf/oozie/blob/c0148e2d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 25ee99c..096078a 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (unreleased) +OOZIE-1993 Rerun fails during join in certain condition (shwethags) OOZIE-2236 Need to package hive-hcatalog-server-extensions.jar in the hcatalog sharelib (venkatnrangan via bzhang) OOZIE-2232 Oozie should invalidate bulk write command when "-filter" is missing (venkatnrangan via bzhang) OOZIE-2224 Add example worklfow.xml for hive in secure mode (venkatnrangan via bzhang)
