APEXCORE-306 Skip recovery checkpoint upgrade for entire group during deploy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5371bc7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5371bc7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5371bc7b

Branch: refs/heads/master
Commit: 5371bc7b7cf33e5d5c96775e241af002fc22d5ed
Parents: b3402be
Author: Thomas Weise <[email protected]>
Authored: Sat Jan 23 00:34:42 2016 -0800
Committer: Thomas Weise <[email protected]>
Committed: Sat Jan 23 00:34:42 2016 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/StreamingContainerManager.java    | 4 +++-
 .../com/datatorrent/stram/plan/logical/DelayOperatorTest.java    | 1 -
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index a687a37..df3bfc4 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1939,6 +1939,7 @@ public class StreamingContainerManager implements 
PlanContext
       commonCheckpoints.addAll(operator.checkpoints);
     }
     Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size());
+    boolean pendingDeploy = operator.getState() == 
PTOperator.State.PENDING_DEPLOY;
     if (checkpointGroup.size() > 1) {
       for (OperatorMeta om : checkpointGroup) {
         Collection<PTOperator> operators = plan.getAllOperators(om);
@@ -1949,6 +1950,7 @@ public class StreamingContainerManager implements 
PlanContext
           // visit all downstream operators of the group
           ctx.visited.add(groupOper);
           groupOpers.add(groupOper);
+          pendingDeploy |= operator.getState() == 
PTOperator.State.PENDING_DEPLOY;
         }
       }
       // highest common checkpoint
@@ -2004,7 +2006,7 @@ public class StreamingContainerManager implements 
PlanContext
 
     for (PTOperator groupOper : groupOpers) {
       // checkpoint frozen during deployment
-      if (ctx.recovery || groupOper.getState() != 
PTOperator.State.PENDING_DEPLOY) {
+      if (!pendingDeploy || ctx.recovery) {
         // remove previous checkpoints
         Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
         LinkedList<Checkpoint> checkpoints = groupOper.checkpoints;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 06f184f..cb4222a 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -321,7 +321,6 @@ public class DelayOperatorTest
         FibonacciOperator.results.subList(0, 10).toArray());
   }
 
-  @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the 
next version
   @Test
   public void testFibonacciRecovery1() throws Exception
   {

Reply via email to