Repository: incubator-apex-core Updated Branches: refs/heads/master 2047a8a8b -> 6de29c12e
APEXCORE-326: Bug fix. Added unit test. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f0178e18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f0178e18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f0178e18 Branch: refs/heads/master Commit: f0178e18a080e7411df2672bd7bd383c119a8923 Parents: 6130bca Author: bhupesh <[email protected]> Authored: Sat Mar 19 14:52:36 2016 +0530 Committer: bhupesh <[email protected]> Committed: Sat Mar 19 14:52:36 2016 +0530 ---------------------------------------------------------------------- .../stram/plan/logical/LogicalPlan.java | 4 ++-- .../stram/plan/logical/DelayOperatorTest.java | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f0178e18/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 173298b..adc1847 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1966,8 +1966,8 @@ public class LogicalPlan implements Serializable, DAG else if (ctx.stack.contains(successor)) { om.lowlink = Math.min(om.lowlink, successor.nindex); boolean isDelayLoop = false; - for (int i=ctx.path.size(); i>0; i--) { - OperatorMeta om2 = ctx.path.get(i-1); + for (int i=ctx.stack.size(); i>0; i--) { + OperatorMeta om2 = ctx.stack.get(i-1); if (om2.getOperator() instanceof Operator.DelayOperator) { isDelayLoop = true; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f0178e18/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index 0bcc791..20f032c 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -454,4 +454,23 @@ public class DelayOperatorTest } + @Test + public void testValidationWithMultipleStreamLoops() + { + LogicalPlan dag = StramTestSupport.createDAG(testMeta); + + TestGeneratorInputOperator source = dag.addOperator("A", TestGeneratorInputOperator.class); + GenericTestOperator op1 = dag.addOperator("Op1", GenericTestOperator.class); + GenericTestOperator op2 = dag.addOperator("Op2", GenericTestOperator.class); + DefaultDelayOperator<Object> delay = dag.addOperator("Delay", DefaultDelayOperator.class); + + dag.addStream("Source", source.outport, op1.inport1); + dag.addStream("Stream1", op1.outport1, op2.inport1); + dag.addStream("Stream2", op1.outport2, op2.inport2); + dag.addStream("Op to Delay", op2.outport1, delay.input); + dag.addStream("Delay to Op", delay.output, op1.inport2); + + dag.validate(); + } + }
