Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 8655ffabe -> 3695ccf37
APEXCORE-453 decreased the heartbeat interval in the DelayOperator unit test because otherwise the committed window call may occur much later than expected, hence missing the failure simulation Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3695ccf3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3695ccf3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3695ccf3 Branch: refs/heads/release-3.3 Commit: 3695ccf3753a0962af995f9934bc3cbca99dd473 Parents: 8655ffa Author: David Yan <da...@datatorrent.com> Authored: Mon May 9 16:41:08 2016 -0700 Committer: David Yan <da...@datatorrent.com> Committed: Tue May 10 15:14:01 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/stram/plan/logical/DelayOperatorTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3695ccf3/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index b6d6909..e4883e3 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -231,8 +231,10 @@ public class DelayOperatorTest { if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) && !failureSimulated) { + LOG.debug("FailableFibonacciOperator beginWindow {} {} {}", windowId, windowCount, simulateFailureWindows); if (windowCount++ == simulateFailureWindows) { failureSimulated = true; + LOG.debug("FailableFibonacciOperator is simulating failure"); throw new RuntimeException("simulating failure"); } } @@ -241,11 +243,13 @@ public class DelayOperatorTest @Override public void checkpointed(long windowId) { + LOG.debug("FailableFibonacciOperator is checkpointed {}", windowId); } @Override public void committed(long windowId) { + LOG.debug("FailableFibonacciOperator is committed {}", windowId); committed = true; } @@ -270,8 +274,10 @@ public class DelayOperatorTest super.beginWindow(windowId); if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) && !failureSimulated) { + LOG.debug("FailableDelayOperator beginWindow {} {} {}", windowId, windowCount, simulateFailureWindows); if (windowCount++ == simulateFailureWindows) { failureSimulated = true; + LOG.debug("FailableDelayOperator is simulating failure {}", windowId); throw new RuntimeException("simulating failure"); } } @@ -280,11 +286,13 @@ public class DelayOperatorTest @Override public void checkpointed(long windowId) { + LOG.debug("FailableDelayOperator is checkpointed {}", windowId); } @Override public void committed(long windowId) { + LOG.debug("FailableDelayOperator is committed {}", windowId); committed = true; } @@ -339,6 +347,7 @@ public class DelayOperatorTest dag.addStream("delay_to_operator", opDelay.output, fib.input); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); + dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50); FailableFibonacciOperator.results.clear(); FailableFibonacciOperator.failureSimulated = false; final StramLocalCluster localCluster = new StramLocalCluster(dag); @@ -373,6 +382,7 @@ public class DelayOperatorTest dag.addStream("delay_to_operator", opDelay.output, fib.input); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); + dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50); FibonacciOperator.results.clear(); FailableDelayOperator.failureSimulated = false; final StramLocalCluster localCluster = new StramLocalCluster(dag);