Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 2047a8a8b -> 6de29c12e


APEXCORE-326: Bug fix. Added unit test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f0178e18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f0178e18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f0178e18

Branch: refs/heads/master
Commit: f0178e18a080e7411df2672bd7bd383c119a8923
Parents: 6130bca
Author: bhupesh <[email protected]>
Authored: Sat Mar 19 14:52:36 2016 +0530
Committer: bhupesh <[email protected]>
Committed: Sat Mar 19 14:52:36 2016 +0530

----------------------------------------------------------------------
 .../stram/plan/logical/LogicalPlan.java          |  4 ++--
 .../stram/plan/logical/DelayOperatorTest.java    | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f0178e18/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 173298b..adc1847 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1966,8 +1966,8 @@ public class LogicalPlan implements Serializable, DAG
         else if (ctx.stack.contains(successor)) {
           om.lowlink = Math.min(om.lowlink, successor.nindex);
           boolean isDelayLoop = false;
-          for (int i=ctx.path.size(); i>0; i--) {
-            OperatorMeta om2 = ctx.path.get(i-1);
+          for (int i=ctx.stack.size(); i>0; i--) {
+            OperatorMeta om2 = ctx.stack.get(i-1);
             if (om2.getOperator() instanceof Operator.DelayOperator) {
               isDelayLoop = true;
             }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f0178e18/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 0bcc791..20f032c 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -454,4 +454,23 @@ public class DelayOperatorTest
 
   }
 
+  @Test
+  public void testValidationWithMultipleStreamLoops()
+  {
+    LogicalPlan dag = StramTestSupport.createDAG(testMeta);
+
+    TestGeneratorInputOperator source = dag.addOperator("A", 
TestGeneratorInputOperator.class);
+    GenericTestOperator op1 = dag.addOperator("Op1", 
GenericTestOperator.class);
+    GenericTestOperator op2 = dag.addOperator("Op2", 
GenericTestOperator.class);
+    DefaultDelayOperator<Object> delay = dag.addOperator("Delay", 
DefaultDelayOperator.class);
+
+    dag.addStream("Source", source.outport, op1.inport1);
+    dag.addStream("Stream1", op1.outport1, op2.inport1);
+    dag.addStream("Stream2", op1.outport2, op2.inport2);
+    dag.addStream("Op to Delay", op2.outport1, delay.input);
+    dag.addStream("Delay to Op", delay.output, op1.inport2);
+
+    dag.validate();
+  }
+
 }

Reply via email to