Repository: incubator-apex-core Updated Branches: refs/heads/release-3.4 f6ec4a599 -> 4fb1a5097
APEXCORE-453 decreased the heartbeat interval in the DelayOperator unit test because otherwise the committed window call may occur much later than expected, hence missing the failure simulation Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4fb1a509 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4fb1a509 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4fb1a509 Branch: refs/heads/release-3.4 Commit: 4fb1a509758bdc1343146c1fa4d79c3c6d49c5ad Parents: f6ec4a5 Author: David Yan <da...@datatorrent.com> Authored: Mon May 9 16:41:08 2016 -0700 Committer: David Yan <da...@datatorrent.com> Committed: Tue May 10 15:13:32 2016 -0700 ---------------------------------------------------------------------- .../stram/plan/logical/DelayOperatorTest.java | 22 +++++++++----------- 1 file changed, 10 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4fb1a509/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index 6b31d56..75b20fe 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -233,8 +233,10 @@ public class DelayOperatorTest { if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) && !failureSimulated) { + LOG.debug("FailableFibonacciOperator beginWindow {} {} {}", windowId, windowCount, simulateFailureWindows); if (windowCount++ == simulateFailureWindows) { failureSimulated = true; + LOG.debug("FailableFibonacciOperator is simulating failure"); throw new RuntimeException("simulating failure"); } } @@ -243,11 +245,13 @@ public class DelayOperatorTest @Override public void checkpointed(long windowId) { + LOG.debug("FailableFibonacciOperator is checkpointed {}", windowId); } @Override public void committed(long windowId) { + LOG.debug("FailableFibonacciOperator is committed {}", windowId); committed = true; } @@ -272,8 +276,10 @@ public class DelayOperatorTest super.beginWindow(windowId); if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) && !failureSimulated) { + LOG.debug("FailableDelayOperator beginWindow {} {} {}", windowId, windowCount, simulateFailureWindows); if (windowCount++ == simulateFailureWindows) { failureSimulated = true; + LOG.debug("FailableDelayOperator is simulating failure {}", windowId); throw new RuntimeException("simulating failure"); } } @@ -282,11 +288,13 @@ public class DelayOperatorTest @Override public void checkpointed(long windowId) { + LOG.debug("FailableDelayOperator is checkpointed {}", windowId); } @Override public void committed(long windowId) { + LOG.debug("FailableDelayOperator is committed {}", windowId); committed = true; } @@ -328,12 +336,6 @@ public class DelayOperatorTest @Test public void testFibonacciRecovery1() throws Exception { - if (StramTestSupport.isInTravis()) { - // disable this test in travis because the failure is apparently intermittently not invoked only on travis - // We should remove this when we find a solution to this. - LOG.info("Test testFibonacciRecovery1 is disabled in Travis"); - return; - } LogicalPlan dag = StramTestSupport.createDAG(testMeta); TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class); @@ -347,6 +349,7 @@ public class DelayOperatorTest dag.addStream("delay_to_operator", opDelay.output, fib.input); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); + dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50); FailableFibonacciOperator.results.clear(); FailableFibonacciOperator.failureSimulated = false; final StramLocalCluster localCluster = new StramLocalCluster(dag); @@ -368,12 +371,6 @@ public class DelayOperatorTest @Test public void testFibonacciRecovery2() throws Exception { - if (StramTestSupport.isInTravis()) { - // disable this test in travis because the failure is apparently intermittently not invoked only on travis - // We should remove this when we find a solution to this. - LOG.info("Test testFibonacciRecovery2 is disabled in Travis"); - return; - } LogicalPlan dag = StramTestSupport.createDAG(testMeta); TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class); @@ -387,6 +384,7 @@ public class DelayOperatorTest dag.addStream("delay_to_operator", opDelay.output, fib.input); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); + dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50); FibonacciOperator.results.clear(); FailableDelayOperator.failureSimulated = false; final StramLocalCluster localCluster = new StramLocalCluster(dag);