Author: rkanter
Date: Wed Aug 7 21:58:21 2013
New Revision: 1511514
URL: http://svn.apache.org/r1511514
Log:
OOZIE-1403 forkjoin validation blocks some valid cases involving decision nodes
(rkanter)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java?rev=1511514&r1=1511513&r2=1511514&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
Wed Aug 7 21:58:21 2013
@@ -96,10 +96,23 @@ public class LiteWorkflowAppParser {
VISITING, VISITED
}
+ /**
+ * We use this to store a node name and its top (eldest) decision parent
node name for the forkjoin validation
+ */
+ class NodeAndTopDecisionParent {
+ String node;
+ String topDecisionParent;
+
+ public NodeAndTopDecisionParent(String node, String topDecisionParent)
{
+ this.node = node;
+ this.topDecisionParent = topDecisionParent;
+ }
+ }
+
private List<String> forkList = new ArrayList<String>();
private List<String> joinList = new ArrayList<String>();
private StartNodeDef startNode;
- private List<String> visitedOkNodes = new ArrayList<String>();
+ private List<NodeAndTopDecisionParent> visitedOkNodes = new
ArrayList<NodeAndTopDecisionParent>();
private List<String> visitedJoinNodes = new ArrayList<String>();
public LiteWorkflowAppParser(Schema schema,
@@ -171,7 +184,8 @@ public class LiteWorkflowAppParser {
if (!forkList.isEmpty()) {
visitedOkNodes.clear();
visitedJoinNodes.clear();
- validateForkJoin(startNode, app, new LinkedList<String>(), new
LinkedList<String>(), new LinkedList<String>(), true);
+ validateForkJoin(startNode, app, new LinkedList<String>(), new
LinkedList<String>(), new LinkedList<String>(), true,
+ null);
}
}
@@ -187,10 +201,11 @@ public class LiteWorkflowAppParser {
* @param path a stack of the current path
* @param okTo false if node (or an ancestor of node) was gotten to via an
"error to" transition or via a join node that has
* already been visited at least once before
+ * @param topDecisionParent The top (eldest) decision node along the path
to this node, or null if there isn't one
* @throws WorkflowException
*/
private void validateForkJoin(NodeDef node, LiteWorkflowApp app,
Deque<String> forkNodes, Deque<String> joinNodes,
- Deque<String> path, boolean okTo) throws WorkflowException {
+ Deque<String> path, boolean okTo, String topDecisionParent) throws
WorkflowException {
if (path.contains(node.getName())) {
// cycle
throw new WorkflowException(ErrorCode.E0741, node.getName(),
Arrays.toString(path.toArray()));
@@ -202,36 +217,64 @@ public class LiteWorkflowAppParser {
// traverse through join nodes multiple times, we have to make sure
not to throw an exception here when we're really just
// re-walking the same execution path (this is why we need the
visitedJoinNodes list used later)
if (okTo && !(node instanceof KillNodeDef) && !(node instanceof
JoinNodeDef) && !(node instanceof EndNodeDef)) {
- if (visitedOkNodes.contains(node.getName())) {
- throw new WorkflowException(ErrorCode.E0743, node.getName());
+ NodeAndTopDecisionParent natdp =
findInVisitedOkNodes(node.getName());
+ if (natdp != null) {
+ // However, if we've visited the node and it's under a
decision node, we may be seeing it again and it's only
+ // illegal if that decision node is not the same as what we're
seeing now (because during execution we only go
+ // down one path of the decision node, so while we're seeing
the node multiple times here, during runtime it will
+ // only be executed once). Also, this decision node should be
the top (eldest) decision node. As null indicates
+ // that there isn't a decision node, when this happens they
must both be null to be valid. Here is a good example
+ // to visualize a node ("actionX") that has three "ok to"
paths to it, but should still be a valid workflow (it may
+ // be easier to see if you draw it):
+ // decisionA --> {actionX, decisionB}
+ // decisionB --> {actionX, actionY}
+ // actionY --> {actionX}
+ // And, if we visit this node twice under the same decision
node in an invalid way, the path cycle checking code
+ // will catch it, so we don't have to worry about that here.
+ if ((natdp.topDecisionParent == null && topDecisionParent ==
null)
+ || (natdp.topDecisionParent == null && topDecisionParent
!= null)
+ || (natdp.topDecisionParent != null && topDecisionParent
== null)
+ || !natdp.topDecisionParent.equals(topDecisionParent)) {
+ // If we get here, then we've seen this node before from
an "ok to" transition but they don't have the same
+ // decision node top parent, which means that this node
will be executed twice, which is illegal
+ throw new WorkflowException(ErrorCode.E0743,
node.getName());
+ }
+ }
+ else {
+ // If we haven't transitioned to this node before, add it and
its top decision parent node
+ visitedOkNodes.add(new
NodeAndTopDecisionParent(node.getName(), topDecisionParent));
}
- visitedOkNodes.add(node.getName());
}
if (node instanceof StartNodeDef) {
String transition = node.getTransitions().get(0); // start
always has only 1 transition
NodeDef tranNode = app.getNode(transition);
- validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
+ validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo,
topDecisionParent);
}
else if (node instanceof ActionNodeDef) {
String transition = node.getTransitions().get(0); // "ok to"
transition
NodeDef tranNode = app.getNode(transition);
- validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
// propogate okTo
+ validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo,
topDecisionParent); // propogate okTo
transition = node.getTransitions().get(1); // "error to"
transition
tranNode = app.getNode(transition);
- validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
false); // use false
+ validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false,
topDecisionParent); // use false
}
else if (node instanceof DecisionNodeDef) {
for(String transition : (new
HashSet<String>(node.getTransitions()))) {
NodeDef tranNode = app.getNode(transition);
- validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
okTo);
+ // if there currently isn't a topDecisionParent (i.e. null),
then use this node instead of propagating null
+ String parentDecisionNode = topDecisionParent;
+ if (parentDecisionNode == null) {
+ parentDecisionNode = node.getName();
+ }
+ validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
okTo, parentDecisionNode);
}
}
else if (node instanceof ForkNodeDef) {
forkNodes.push(node.getName());
for(String transition : (new
HashSet<String>(node.getTransitions()))) {
NodeDef tranNode = app.getNode(transition);
- validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
okTo);
+ validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
okTo, topDecisionParent);
}
forkNodes.pop();
if (!joinNodes.isEmpty()) {
@@ -258,11 +301,11 @@ public class LiteWorkflowAppParser {
// Or if we've already visited this join node, use false (because
we've already traversed this path before and we don't
// want to throw an exception from the check against
visitedOkNodes)
if (!okTo || visitedJoinNodes.contains(node.getName())) {
- validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
false);
+ validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
false, topDecisionParent);
// Else, use true because this is either the first time we've gone
through this join node or okTo was already false
} else {
visitedJoinNodes.add(node.getName());
- validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
true);
+ validateForkJoin(tranNode, app, forkNodes, joinNodes, path,
true, topDecisionParent);
}
forkNodes.push(currentForkNode);
joinNodes.push(node.getName());
@@ -286,6 +329,24 @@ public class LiteWorkflowAppParser {
}
/**
+ * Return a {@link NodeAndTopDecisionParent} whose {@link
NodeAndTopDecisionParent#node} is equal to the passed in name, or null
+ * if it isn't in the {@link LiteWorkflowAppParser#visitedOkNodes} list.
+ *
+ * @param name The name to search for
+ * @return a NodeAndTopDecisionParent or null
+ */
+ private NodeAndTopDecisionParent findInVisitedOkNodes(String name) {
+ NodeAndTopDecisionParent natdp = null;
+ for (NodeAndTopDecisionParent v : visitedOkNodes) {
+ if (v.node.equals(name)) {
+ natdp = v;
+ break;
+ }
+ }
+ return natdp;
+ }
+
+ /**
* Parse xml to {@link LiteWorkflowApp}
*
* @param strDef
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java?rev=1511514&r1=1511513&r2=1511514&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
Wed Aug 7 21:58:21 2013
@@ -833,6 +833,164 @@ public class TestLiteWorkflowAppParser e
}
/*
+ *f->(2,j)
+ *2->decision node->{3,4}
+ *3->ok->4
+ *3->fail->k
+ *4->ok->j
+ *4->fail->k
+ *j->end
+ */
+ public void testDecisionTwoPathsForkJoin() throws WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+ new
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+ .addNode(new ActionNodeDef("one", dummyConf,
TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f",
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ Arrays.asList(new String[]{"two", "j"})))
+ .addNode(new DecisionNodeDef("two", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new
String[]{"three","four"})))
+ .addNode(new ActionNodeDef("three", dummyConf,
TestActionNodeHandler.class, "four", "k"))
+ .addNode(new ActionNodeDef("four", dummyConf,
TestActionNodeHandler.class, "j", "k"))
+ .addNode(new JoinNodeDef("j",
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+ .addNode(new KillNodeDef("k", "kill",
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+ .addNode(new EndNodeDef("end",
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+ try {
+ invokeForkJoin(parser, def);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected Exception");
+ }
+ }
+
+ /*
+ *f->(2,j)
+ *2->decision node->{3,4}
+ *3->decision node->{4,5}
+ *4->ok->j
+ *4->fail->k
+ *5->ok->4
+ *5->fail->k
+ *j->end
+ */
+ public void testMultipleDecisionThreePathsForkJoin() throws
WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+ new
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+ .addNode(new ActionNodeDef("one", dummyConf,
TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f",
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ Arrays.asList(new String[]{"two", "j"})))
+ .addNode(new DecisionNodeDef("two", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new
String[]{"three","four"})))
+ .addNode(new DecisionNodeDef("three", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new String[]{"four","five"})))
+ .addNode(new ActionNodeDef("four", dummyConf,
TestActionNodeHandler.class, "j", "k"))
+ .addNode(new ActionNodeDef("five", dummyConf,
TestActionNodeHandler.class, "four", "k"))
+ .addNode(new JoinNodeDef("j",
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+ .addNode(new KillNodeDef("k", "kill",
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+ .addNode(new EndNodeDef("end",
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+ try {
+ invokeForkJoin(parser, def);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected Exception");
+ }
+ }
+
+ /*
+ *f->(2,4)
+ *2->decision node->{3,4}
+ *3->decision node->{4,5}
+ *4->ok->j
+ *4->fail->k
+ *5->ok->4
+ *5->fail->k
+ *j->end
+ */
+ public void testMultipleDecisionThreePathsForkJoinFailure() throws
WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+ new
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+ .addNode(new ActionNodeDef("one", dummyConf,
TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f",
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ Arrays.asList(new String[]{"two", "four"})))
+ .addNode(new DecisionNodeDef("two", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new
String[]{"three","four"})))
+ .addNode(new DecisionNodeDef("three", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new String[]{"four","five"})))
+ .addNode(new ActionNodeDef("four", dummyConf,
TestActionNodeHandler.class, "j", "k"))
+ .addNode(new ActionNodeDef("five", dummyConf,
TestActionNodeHandler.class, "four", "k"))
+ .addNode(new JoinNodeDef("j",
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+ .addNode(new KillNodeDef("k", "kill",
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+ .addNode(new EndNodeDef("end",
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+ try {
+ invokeForkJoin(parser, def);
+ fail("Expected to catch an exception but did not encounter any");
+ } catch (Exception ex) {
+ WorkflowException we = (WorkflowException) ex.getCause();
+ assertEquals(ErrorCode.E0743, we.getErrorCode());
+ // Make sure the message contains the node involved in the invalid
transition
+ assertTrue(we.getMessage().contains("four"));
+ }
+ }
+
+ /*
+ *f->(2,6)
+ *2->decision node->{3,4}
+ *3->decision node->{4,5}
+ *6->decision node->{4,j}
+ *4->ok->j
+ *4->fail->k
+ *5->ok->4
+ *5->fail->k
+ *j->end
+ */
+ public void testMultipleDecisionThreePathsForkJoinFailure2() throws
WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+ new
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+ .addNode(new ActionNodeDef("one", dummyConf,
TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f",
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ Arrays.asList(new String[]{"two", "four"})))
+ .addNode(new DecisionNodeDef("two", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new
String[]{"three","four"})))
+ .addNode(new DecisionNodeDef("three", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new String[]{"four","five"})))
+ .addNode(new DecisionNodeDef("six", dummyConf,
TestDecisionNodeHandler.class,
+ Arrays.asList(new String[]{"four","j"})))
+ .addNode(new ActionNodeDef("four", dummyConf,
TestActionNodeHandler.class, "j", "k"))
+ .addNode(new ActionNodeDef("five", dummyConf,
TestActionNodeHandler.class, "four", "k"))
+ .addNode(new JoinNodeDef("j",
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+ .addNode(new KillNodeDef("k", "kill",
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+ .addNode(new EndNodeDef("end",
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+ try {
+ invokeForkJoin(parser, def);
+ fail("Expected to catch an exception but did not encounter any");
+ } catch (Exception ex) {
+ WorkflowException we = (WorkflowException) ex.getCause();
+ assertEquals(ErrorCode.E0743, we.getErrorCode());
+ // Make sure the message contains the node involved in the invalid
transition
+ assertTrue(we.getMessage().contains("four"));
+ }
+ }
+
+ /*
* 1->decision node->{f1, f2}
* f1->(2,3)
* f2->(4,5)
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1511514&r1=1511513&r2=1511514&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Aug 7 21:58:21 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1403 forkjoin validation blocks some valid cases involving decision
nodes (rkanter)
OOZIE-1449 Coordinator Workflow parent relationship is broken for purge
service (rkanter)
OOZIE-1458 If a Credentials type is not defined, Oozie should say something
(rkanter)
OOZIE-1425 param checker should validate cron syntax (bowenzhangusa via
rkanter)