APEXCORE-306 Skip recovery checkpoint upgrade for entire group during deploy.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5371bc7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5371bc7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5371bc7b Branch: refs/heads/master Commit: 5371bc7b7cf33e5d5c96775e241af002fc22d5ed Parents: b3402be Author: Thomas Weise <[email protected]> Authored: Sat Jan 23 00:34:42 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Sat Jan 23 00:34:42 2016 -0800 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/StreamingContainerManager.java | 4 +++- .../com/datatorrent/stram/plan/logical/DelayOperatorTest.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index a687a37..df3bfc4 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1939,6 +1939,7 @@ public class StreamingContainerManager implements PlanContext commonCheckpoints.addAll(operator.checkpoints); } Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size()); + boolean pendingDeploy = operator.getState() == PTOperator.State.PENDING_DEPLOY; if (checkpointGroup.size() > 1) { for (OperatorMeta om : checkpointGroup) { Collection<PTOperator> operators = plan.getAllOperators(om); @@ -1949,6 +1950,7 @@ public class StreamingContainerManager implements PlanContext // visit all downstream operators of the group ctx.visited.add(groupOper); groupOpers.add(groupOper); + pendingDeploy |= operator.getState() == PTOperator.State.PENDING_DEPLOY; } } // highest common checkpoint @@ -2004,7 +2006,7 @@ public class StreamingContainerManager implements PlanContext for (PTOperator groupOper : groupOpers) { // checkpoint frozen during deployment - if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) { + if (!pendingDeploy || ctx.recovery) { // remove previous checkpoints Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT; LinkedList<Checkpoint> checkpoints = groupOper.checkpoints; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index 06f184f..cb4222a 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -321,7 +321,6 @@ public class DelayOperatorTest FibonacciOperator.results.subList(0, 10).toArray()); } - @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version @Test public void testFibonacciRecovery1() throws Exception {
