Author: rkanter
Date: Wed Aug  7 21:58:21 2013
New Revision: 1511514

URL: http://svn.apache.org/r1511514
Log:
OOZIE-1403 forkjoin validation blocks some valid cases involving decision nodes 
(rkanter)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java?rev=1511514&r1=1511513&r2=1511514&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
 Wed Aug  7 21:58:21 2013
@@ -96,10 +96,23 @@ public class LiteWorkflowAppParser {
         VISITING, VISITED
     }
 
+    /**
+     * We use this to store a node name and its top (eldest) decision parent 
node name for the forkjoin validation
+     */
+    class NodeAndTopDecisionParent {
+        String node;
+        String topDecisionParent;
+
+        public NodeAndTopDecisionParent(String node, String topDecisionParent) 
{
+            this.node = node;
+            this.topDecisionParent = topDecisionParent;
+        }
+    }
+
     private List<String> forkList = new ArrayList<String>();
     private List<String> joinList = new ArrayList<String>();
     private StartNodeDef startNode;
-    private List<String> visitedOkNodes = new ArrayList<String>();
+    private List<NodeAndTopDecisionParent> visitedOkNodes = new 
ArrayList<NodeAndTopDecisionParent>();
     private List<String> visitedJoinNodes = new ArrayList<String>();
 
     public LiteWorkflowAppParser(Schema schema,
@@ -171,7 +184,8 @@ public class LiteWorkflowAppParser {
         if (!forkList.isEmpty()) {
             visitedOkNodes.clear();
             visitedJoinNodes.clear();
-            validateForkJoin(startNode, app, new LinkedList<String>(), new 
LinkedList<String>(), new LinkedList<String>(), true);
+            validateForkJoin(startNode, app, new LinkedList<String>(), new 
LinkedList<String>(), new LinkedList<String>(), true,
+                    null);
         }
     }
 
@@ -187,10 +201,11 @@ public class LiteWorkflowAppParser {
      * @param path a stack of the current path
      * @param okTo false if node (or an ancestor of node) was gotten to via an 
"error to" transition or via a join node that has
      * already been visited at least once before
+     * @param topDecisionParent The top (eldest) decision node along the path 
to this node, or null if there isn't one
      * @throws WorkflowException
      */
     private void validateForkJoin(NodeDef node, LiteWorkflowApp app, 
Deque<String> forkNodes, Deque<String> joinNodes,
-            Deque<String> path, boolean okTo) throws WorkflowException {
+            Deque<String> path, boolean okTo, String topDecisionParent) throws 
WorkflowException {
         if (path.contains(node.getName())) {
             // cycle
             throw new WorkflowException(ErrorCode.E0741, node.getName(), 
Arrays.toString(path.toArray()));
@@ -202,36 +217,64 @@ public class LiteWorkflowAppParser {
         // traverse through join nodes multiple times, we have to make sure 
not to throw an exception here when we're really just
         // re-walking the same execution path (this is why we need the 
visitedJoinNodes list used later)
         if (okTo && !(node instanceof KillNodeDef) && !(node instanceof 
JoinNodeDef) && !(node instanceof EndNodeDef)) {
-            if (visitedOkNodes.contains(node.getName())) {
-                throw new WorkflowException(ErrorCode.E0743, node.getName());
+            NodeAndTopDecisionParent natdp = 
findInVisitedOkNodes(node.getName());
+            if (natdp != null) {
+                // However, if we've visited the node and it's under a 
decision node, we may be seeing it again and it's only
+                // illegal if that decision node is not the same as what we're 
seeing now (because during execution we only go
+                // down one path of the decision node, so while we're seeing 
the node multiple times here, during runtime it will
+                // only be executed once).  Also, this decision node should be 
the top (eldest) decision node.  As null indicates
+                // that there isn't a decision node, when this happens they 
must both be null to be valid.  Here is a good example
+                // to visualize a node ("actionX") that has three "ok to" 
paths to it, but should still be a valid workflow (it may
+                // be easier to see if you draw it):
+                    // decisionA --> {actionX, decisionB}
+                    // decisionB --> {actionX, actionY}
+                    // actionY   --> {actionX}
+                // And, if we visit this node twice under the same decision 
node in an invalid way, the path cycle checking code
+                // will catch it, so we don't have to worry about that here.
+                if ((natdp.topDecisionParent == null && topDecisionParent == 
null)
+                     || (natdp.topDecisionParent == null && topDecisionParent 
!= null)
+                     || (natdp.topDecisionParent != null && topDecisionParent 
== null)
+                     || !natdp.topDecisionParent.equals(topDecisionParent)) {
+                    // If we get here, then we've seen this node before from 
an "ok to" transition but they don't have the same
+                    // decision node top parent, which means that this node 
will be executed twice, which is illegal
+                    throw new WorkflowException(ErrorCode.E0743, 
node.getName());
+                }
+            }
+            else {
+                // If we haven't transitioned to this node before, add it and 
its top decision parent node
+                visitedOkNodes.add(new 
NodeAndTopDecisionParent(node.getName(), topDecisionParent));
             }
-            visitedOkNodes.add(node.getName());
         }
 
         if (node instanceof StartNodeDef) {
             String transition = node.getTransitions().get(0);   // start 
always has only 1 transition
             NodeDef tranNode = app.getNode(transition);
-            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
+            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, 
topDecisionParent);
         }
         else if (node instanceof ActionNodeDef) {
             String transition = node.getTransitions().get(0);   // "ok to" 
transition
             NodeDef tranNode = app.getNode(transition);
-            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo); 
 // propogate okTo
+            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, 
topDecisionParent);  // propogate okTo
             transition = node.getTransitions().get(1);          // "error to" 
transition
             tranNode = app.getNode(transition);
-            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
false); // use false
+            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, 
topDecisionParent); // use false
         }
         else if (node instanceof DecisionNodeDef) {
             for(String transition : (new 
HashSet<String>(node.getTransitions()))) {
                 NodeDef tranNode = app.getNode(transition);
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
okTo);
+                // if there currently isn't a topDecisionParent (i.e. null), 
then use this node instead of propagating null
+                String parentDecisionNode = topDecisionParent;
+                if (parentDecisionNode == null) {
+                    parentDecisionNode = node.getName();
+                }
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
okTo, parentDecisionNode);
             }
         }
         else if (node instanceof ForkNodeDef) {
             forkNodes.push(node.getName());
             for(String transition : (new 
HashSet<String>(node.getTransitions()))) {
                 NodeDef tranNode = app.getNode(transition);
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
okTo);
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
okTo, topDecisionParent);
             }
             forkNodes.pop();
             if (!joinNodes.isEmpty()) {
@@ -258,11 +301,11 @@ public class LiteWorkflowAppParser {
             // Or if we've already visited this join node, use false (because 
we've already traversed this path before and we don't
             // want to throw an exception from the check against 
visitedOkNodes)
             if (!okTo || visitedJoinNodes.contains(node.getName())) {
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
false);
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
false, topDecisionParent);
             // Else, use true because this is either the first time we've gone 
through this join node or okTo was already false
             } else {
                 visitedJoinNodes.add(node.getName());
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
true);
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
true, topDecisionParent);
             }
             forkNodes.push(currentForkNode);
             joinNodes.push(node.getName());
@@ -286,6 +329,24 @@ public class LiteWorkflowAppParser {
     }
 
     /**
+     * Return a {@link NodeAndTopDecisionParent} whose {@link 
NodeAndTopDecisionParent#node} is equal to the passed in name, or null
+     * if it isn't in the {@link LiteWorkflowAppParser#visitedOkNodes} list.
+     *
+     * @param name The name to search for
+     * @return a NodeAndTopDecisionParent or null
+     */
+    private NodeAndTopDecisionParent findInVisitedOkNodes(String name) {
+        NodeAndTopDecisionParent natdp = null;
+        for (NodeAndTopDecisionParent v : visitedOkNodes) {
+            if (v.node.equals(name)) {
+                natdp = v;
+                break;
+            }
+        }
+        return natdp;
+    }
+
+    /**
      * Parse xml to {@link LiteWorkflowApp}
      *
      * @param strDef

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java?rev=1511514&r1=1511513&r2=1511514&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
 Wed Aug  7 21:58:21 2013
@@ -833,6 +833,164 @@ public class TestLiteWorkflowAppParser e
     }
 
     /*
+     *f->(2,j)
+     *2->decision node->{3,4}
+     *3->ok->4
+     *3->fail->k
+     *4->ok->j
+     *4->fail->k
+     *j->end
+     */
+    public void testDecisionTwoPathsForkJoin() throws WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+            new 
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+        .addNode(new ActionNodeDef("one", dummyConf, 
TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", 
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                                 Arrays.asList(new String[]{"two", "j"})))
+        .addNode(new DecisionNodeDef("two", dummyConf, 
TestDecisionNodeHandler.class,
+                                     Arrays.asList(new 
String[]{"three","four"})))
+        .addNode(new ActionNodeDef("three", dummyConf, 
TestActionNodeHandler.class, "four", "k"))
+        .addNode(new ActionNodeDef("four", dummyConf, 
TestActionNodeHandler.class, "j", "k"))
+        .addNode(new JoinNodeDef("j", 
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+        .addNode(new KillNodeDef("k", "kill", 
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+        .addNode(new EndNodeDef("end", 
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+        try {
+            invokeForkJoin(parser, def);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail("Unexpected Exception");
+        }
+    }
+
+    /*
+     *f->(2,j)
+     *2->decision node->{3,4}
+     *3->decision node->{4,5}
+     *4->ok->j
+     *4->fail->k
+     *5->ok->4
+     *5->fail->k
+     *j->end
+     */
+    public void testMultipleDecisionThreePathsForkJoin() throws 
WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+            new 
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+        .addNode(new ActionNodeDef("one", dummyConf, 
TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", 
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                                 Arrays.asList(new String[]{"two", "j"})))
+        .addNode(new DecisionNodeDef("two", dummyConf, 
TestDecisionNodeHandler.class,
+                                     Arrays.asList(new 
String[]{"three","four"})))
+        .addNode(new DecisionNodeDef("three", dummyConf, 
TestDecisionNodeHandler.class,
+                             Arrays.asList(new String[]{"four","five"})))
+        .addNode(new ActionNodeDef("four", dummyConf, 
TestActionNodeHandler.class, "j", "k"))
+        .addNode(new ActionNodeDef("five", dummyConf, 
TestActionNodeHandler.class, "four", "k"))
+        .addNode(new JoinNodeDef("j", 
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+        .addNode(new KillNodeDef("k", "kill", 
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+        .addNode(new EndNodeDef("end", 
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+        try {
+            invokeForkJoin(parser, def);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail("Unexpected Exception");
+        }
+    }
+
+    /*
+     *f->(2,4)
+     *2->decision node->{3,4}
+     *3->decision node->{4,5}
+     *4->ok->j
+     *4->fail->k
+     *5->ok->4
+     *5->fail->k
+     *j->end
+     */
+    public void testMultipleDecisionThreePathsForkJoinFailure() throws 
WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+            new 
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+        .addNode(new ActionNodeDef("one", dummyConf, 
TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", 
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                                 Arrays.asList(new String[]{"two", "four"})))
+        .addNode(new DecisionNodeDef("two", dummyConf, 
TestDecisionNodeHandler.class,
+                                     Arrays.asList(new 
String[]{"three","four"})))
+        .addNode(new DecisionNodeDef("three", dummyConf, 
TestDecisionNodeHandler.class,
+                             Arrays.asList(new String[]{"four","five"})))
+        .addNode(new ActionNodeDef("four", dummyConf, 
TestActionNodeHandler.class, "j", "k"))
+        .addNode(new ActionNodeDef("five", dummyConf, 
TestActionNodeHandler.class, "four", "k"))
+        .addNode(new JoinNodeDef("j", 
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+        .addNode(new KillNodeDef("k", "kill", 
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+        .addNode(new EndNodeDef("end", 
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+        try {
+            invokeForkJoin(parser, def);
+            fail("Expected to catch an exception but did not encounter any");
+        } catch (Exception ex) {
+            WorkflowException we = (WorkflowException) ex.getCause();
+            assertEquals(ErrorCode.E0743, we.getErrorCode());
+            // Make sure the message contains the node involved in the invalid 
transition
+            assertTrue(we.getMessage().contains("four"));
+        }
+    }
+
+    /*
+     *f->(2,6)
+     *2->decision node->{3,4}
+     *3->decision node->{4,5}
+     *6->decision node->{4,j}
+     *4->ok->j
+     *4->fail->k
+     *5->ok->4
+     *5->fail->k
+     *j->end
+     */
+    public void testMultipleDecisionThreePathsForkJoinFailure2() throws 
WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+            new 
StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "one"))
+        .addNode(new ActionNodeDef("one", dummyConf, 
TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", 
LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                                 Arrays.asList(new String[]{"two", "four"})))
+        .addNode(new DecisionNodeDef("two", dummyConf, 
TestDecisionNodeHandler.class,
+                                     Arrays.asList(new 
String[]{"three","four"})))
+        .addNode(new DecisionNodeDef("three", dummyConf, 
TestDecisionNodeHandler.class,
+                             Arrays.asList(new String[]{"four","five"})))
+        .addNode(new DecisionNodeDef("six", dummyConf, 
TestDecisionNodeHandler.class,
+                     Arrays.asList(new String[]{"four","j"})))
+        .addNode(new ActionNodeDef("four", dummyConf, 
TestActionNodeHandler.class, "j", "k"))
+        .addNode(new ActionNodeDef("five", dummyConf, 
TestActionNodeHandler.class, "four", "k"))
+        .addNode(new JoinNodeDef("j", 
LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+        .addNode(new KillNodeDef("k", "kill", 
LiteWorkflowStoreService.LiteControlNodeHandler.class))
+        .addNode(new EndNodeDef("end", 
LiteWorkflowStoreService.LiteControlNodeHandler.class));
+
+        try {
+            invokeForkJoin(parser, def);
+            fail("Expected to catch an exception but did not encounter any");
+        } catch (Exception ex) {
+            WorkflowException we = (WorkflowException) ex.getCause();
+            assertEquals(ErrorCode.E0743, we.getErrorCode());
+            // Make sure the message contains the node involved in the invalid 
transition
+            assertTrue(we.getMessage().contains("four"));
+        }
+    }
+
+    /*
      * 1->decision node->{f1, f2}
      * f1->(2,3)
      * f2->(4,5)

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1511514&r1=1511513&r2=1511514&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Aug  7 21:58:21 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1403 forkjoin validation blocks some valid cases involving decision 
nodes (rkanter)
 OOZIE-1449 Coordinator Workflow parent relationship is broken for purge 
service (rkanter)
 OOZIE-1458 If a Credentials type is not defined, Oozie should say something 
(rkanter)
 OOZIE-1425 param checker should validate cron syntax (bowenzhangusa via 
rkanter)


Reply via email to