Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 8655ffabe -> 3695ccf37


APEXCORE-453 decreased the heartbeat interval in the DelayOperator unit test 
because otherwise the committed window call may occur much later than expected, 
hence missing the failure simulation


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3695ccf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3695ccf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3695ccf3

Branch: refs/heads/release-3.3
Commit: 3695ccf3753a0962af995f9934bc3cbca99dd473
Parents: 8655ffa
Author: David Yan <da...@datatorrent.com>
Authored: Mon May 9 16:41:08 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue May 10 15:14:01 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/stram/plan/logical/DelayOperatorTest.java | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3695ccf3/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index b6d6909..e4883e3 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -231,8 +231,10 @@ public class DelayOperatorTest
     {
       if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && 
committed) || !simulateFailureAfterCommit) &&
           !failureSimulated) {
+        LOG.debug("FailableFibonacciOperator beginWindow {} {} {}", windowId, 
windowCount, simulateFailureWindows);
         if (windowCount++ == simulateFailureWindows) {
           failureSimulated = true;
+          LOG.debug("FailableFibonacciOperator is simulating failure");
           throw new RuntimeException("simulating failure");
         }
       }
@@ -241,11 +243,13 @@ public class DelayOperatorTest
     @Override
     public void checkpointed(long windowId)
     {
+      LOG.debug("FailableFibonacciOperator is checkpointed {}", windowId);
     }
 
     @Override
     public void committed(long windowId)
     {
+      LOG.debug("FailableFibonacciOperator is committed {}", windowId);
       committed = true;
     }
 
@@ -270,8 +274,10 @@ public class DelayOperatorTest
       super.beginWindow(windowId);
       if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && 
committed) || !simulateFailureAfterCommit) &&
           !failureSimulated) {
+        LOG.debug("FailableDelayOperator beginWindow {} {} {}", windowId, 
windowCount, simulateFailureWindows);
         if (windowCount++ == simulateFailureWindows) {
           failureSimulated = true;
+          LOG.debug("FailableDelayOperator is simulating failure {}", 
windowId);
           throw new RuntimeException("simulating failure");
         }
       }
@@ -280,11 +286,13 @@ public class DelayOperatorTest
     @Override
     public void checkpointed(long windowId)
     {
+      LOG.debug("FailableDelayOperator is checkpointed {}", windowId);
     }
 
     @Override
     public void committed(long windowId)
     {
+      LOG.debug("FailableDelayOperator is committed {}", windowId);
       committed = true;
     }
 
@@ -339,6 +347,7 @@ public class DelayOperatorTest
     dag.addStream("delay_to_operator", opDelay.output, fib.input);
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+    dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
     FailableFibonacciOperator.results.clear();
     FailableFibonacciOperator.failureSimulated = false;
     final StramLocalCluster localCluster = new StramLocalCluster(dag);
@@ -373,6 +382,7 @@ public class DelayOperatorTest
     dag.addStream("delay_to_operator", opDelay.output, fib.input);
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+    dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
     FibonacciOperator.results.clear();
     FailableDelayOperator.failureSimulated = false;
     final StramLocalCluster localCluster = new StramLocalCluster(dag);

Reply via email to