Repository: apex-core Updated Branches: refs/heads/master 81b8c922c -> eaf041931
APEXCORE-532: Fix issue where new operators added to dag starts from initial checkpoint Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/eaf04193 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/eaf04193 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/eaf04193 Branch: refs/heads/master Commit: eaf04193168ecc43f53958d750ff8f69f6b2c75a Parents: 81b8c92 Author: Tushar R. Gosavi <[email protected]> Authored: Mon Oct 3 12:37:59 2016 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Thu Oct 20 12:34:46 2016 +0530 ---------------------------------------------------------------------- .../stram/plan/physical/PhysicalPlan.java | 7 ++-- .../stram/LogicalPlanModificationTest.java | 34 ++++++++++++++++++++ .../stram/plan/physical/PhysicalPlanTest.java | 6 ++-- 3 files changed, 42 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 11df7ba..4181971 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -781,7 +781,7 @@ public class PhysicalPlan implements Serializable // create operator instance per partition Map<Integer, Partition<Operator>> operatorIdToPartition = Maps.newHashMapWithExpectedSize(partitions.size()); for (Partition<Operator> partition : partitions) { - PTOperator p = addPTOperator(m, partition, Checkpoint.INITIAL_CHECKPOINT); + PTOperator p = addPTOperator(m, partition, null); operatorIdToPartition.put(p.getId(), partition); } @@ -1268,10 +1268,11 @@ public class PhysicalPlan implements Serializable Checkpoint activationCheckpoint = Checkpoint.INITIAL_CHECKPOINT; for (PTInput input : oper.inputs) { PTOperator sourceOper = input.source.source; + Checkpoint checkpoint = sourceOper.recoveryCheckpoint; if (sourceOper.checkpoints.isEmpty()) { - getActivationCheckpoint(sourceOper); + checkpoint = getActivationCheckpoint(sourceOper); } - activationCheckpoint = Checkpoint.max(activationCheckpoint, sourceOper.recoveryCheckpoint); + activationCheckpoint = Checkpoint.max(activationCheckpoint, checkpoint); } return activationCheckpoint; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java index 1704e55..ef44767 100644 --- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java @@ -52,6 +52,7 @@ import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest; import com.datatorrent.stram.plan.physical.PTContainer; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.plan.physical.PhysicalPlan; +import com.datatorrent.stram.plan.physical.PhysicalPlanTest; import com.datatorrent.stram.plan.physical.PlanModifier; import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.support.StramTestSupport.TestMeta; @@ -408,4 +409,37 @@ public class LogicalPlanModificationTest testExecutionManager(new AsyncFSStorageAgent(testMeta.getPath(), null)); } + @Test + public void testNewOperatorRecoveryWindowIds() + { + GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); + + TestPlanContext ctx = new TestPlanContext(); + dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx); + PhysicalPlan plan = new PhysicalPlan(dag, ctx); + ctx.deploy.clear(); + ctx.undeploy.clear(); + + LogicalPlan.OperatorMeta o1Meta = dag.getMeta(o1); + List<PTOperator> o1Partitions = plan.getOperators(o1Meta); + PhysicalPlanTest.setActivationCheckpoint(o1Partitions.get(0), 10); + + PlanModifier pm = new PlanModifier(plan); + GenericTestOperator o2 = new GenericTestOperator(); + GenericTestOperator o3 = new GenericTestOperator(); + pm.addOperator("o2", o2); + pm.addOperator("o3", o3); + pm.addStream("s1", o1.outport1, o2.inport2); + pm.addStream("s2", o2.outport1, o3.inport1); + + pm.applyChanges(ctx); + + LogicalPlan.OperatorMeta o2Meta = plan.getLogicalPlan().getMeta(o2); + List<PTOperator> o2Partitions = plan.getOperators(o2Meta); + Assert.assertEquals("o2 activation checkpoint " + o2Meta, 10, o2Partitions.get(0).getRecoveryCheckpoint().windowId); + + LogicalPlan.OperatorMeta o3Meta = plan.getLogicalPlan().getMeta(o3); + List<PTOperator> o3Partitions = plan.getOperators(o3Meta); + Assert.assertEquals("o3 activation checkpoint " + o2Meta, 10, o3Partitions.get(0).getRecoveryCheckpoint().windowId); + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java index e426194..61a85a5 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java @@ -710,11 +710,13 @@ public class PhysicalPlanTest Assert.assertEquals("unifier activation checkpoint " + o1Meta, 3, o1NewUnifiers.get(0).recoveryCheckpoint.windowId); } - private void setActivationCheckpoint(PTOperator oper, long windowId) + public static void setActivationCheckpoint(PTOperator oper, long windowId) { try { oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).save(oper.operatorMeta.getOperator(), oper.id, windowId); - oper.setRecoveryCheckpoint(new Checkpoint(3, 0, 0)); + Checkpoint cp = new Checkpoint(windowId, 0, 0); + oper.setRecoveryCheckpoint(cp); + oper.checkpoints.add(cp); } catch (Exception e) { Assert.fail(e.toString()); }
