Repository: apex-core Updated Branches: refs/heads/master e9b01c53f -> e4022674e
APEXCORE-654 fix update recovery window when delay part of group Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/e4022674 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/e4022674 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/e4022674 Branch: refs/heads/master Commit: e4022674efb6a4db8fc2ec25d72920756b1ebd20 Parents: e9b01c5 Author: bhupeshchawda <[email protected]> Authored: Sun Mar 26 10:57:51 2017 +0530 Committer: bhupeshchawda <[email protected]> Committed: Tue Apr 18 07:21:12 2017 +0530 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 20 ++++++- .../com/datatorrent/stram/CheckpointTest.java | 55 ++++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/e4022674/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 51e85f7..18d6787 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2082,6 +2082,8 @@ public class StreamingContainerManager implements PlanContext if (checkpointGroup.size() > 1) { for (OperatorMeta om : checkpointGroup) { Collection<PTOperator> operators = plan.getAllOperators(om); + Collection<PTOperator> unifiers = getUnifiersInCheckpointGroup(operators); + operators.addAll(unifiers); for (PTOperator groupOper : operators) { synchronized (groupOper.checkpoints) { commonCheckpoints.retainAll(groupOper.checkpoints); @@ -2175,6 +2177,22 @@ public class StreamingContainerManager implements PlanContext } + private static Collection<PTOperator> getUnifiersInCheckpointGroup(Collection<PTOperator> operators) + { + Set<PTOperator> unifiers = Sets.newHashSet(); + for (PTOperator op : operators) { + for (PTOperator.PTOutput out : op.getOutputs()) { + for (PTOperator.PTInput in : out.sinks) { + PTOperator target = in.target; + if (target.isUnifier()) { + unifiers.add(target); + } + } + } + } + return unifiers; + } + public long windowIdToMillis(long windowId) { int widthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS); @@ -2186,7 +2204,7 @@ public class StreamingContainerManager implements PlanContext return this.vars.windowStartMillis; } - private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups() + protected Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups() { if (this.checkpointGroups == null) { this.checkpointGroups = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/e4022674/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index 0c997ec..b2b4c0c 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.DefaultOutputPort; @@ -46,8 +47,10 @@ import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.DefaultDelayOperator; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.MockContainer.MockOperatorStats; import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext; @@ -55,6 +58,7 @@ import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState; import com.datatorrent.stram.engine.GenericTestOperator; import com.datatorrent.stram.engine.OperatorContext; +import com.datatorrent.stram.engine.TestGeneratorInputOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.physical.PTContainer; @@ -282,6 +286,57 @@ public class CheckpointTest } @Test + public void testUpdateRecoveryCheckpointWithCycle() throws Exception + { + Clock clock = new SystemClock(); + + dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); + + // Simulate a DAG with a loop which has a unifier operator + TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); + GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class); + DefaultDelayOperator d = dag.addOperator("d", DefaultDelayOperator.class); + + dag.addStream("o1.output1", o1.outport, o2.inport1); + dag.addStream("o2.output1", o2.outport1, o3.inport1); + dag.addStream("o3.output1", o3.outport1, o4.inport1); + dag.addStream("o4.output1", o4.outport1, d.input); + dag.addStream("d.output", d.output, o2.inport2); + dag.setOperatorAttribute(o3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); + + dag.validate(); + StreamingContainerManager dnm = new StreamingContainerManager(dag); + PhysicalPlan plan = dnm.getPhysicalPlan(); + + for (PTOperator oper : plan.getAllOperators().values()) { + Assert.assertEquals("Initial activation windowId" + oper, Checkpoint.INITIAL_CHECKPOINT, oper.getRecoveryCheckpoint()); + Assert.assertEquals("Checkpoints empty" + oper, Collections.emptyList(), oper.checkpoints); + } + + Checkpoint cp1 = new Checkpoint(1L, 0, 0); + Checkpoint cp2 = new Checkpoint(2L, 0, 0); + + Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups = dnm.getCheckpointGroups(); + + Map<Integer, PTOperator> allOperators = plan.getAllOperators(); + for (PTOperator operator: allOperators.values()) { + operator.setState(PTOperator.State.ACTIVE); + operator.checkpoints.add(cp1); + dnm.updateRecoveryCheckpoints(operator, + new UpdateCheckpointsContext(clock, false, checkpointGroups), false); + } + + List<PTOperator> physicalO1 = plan.getOperators(dag.getOperatorMeta("o1")); + physicalO1.get(0).checkpoints.add(cp2); + dnm.updateRecoveryCheckpoints(physicalO1.get(0), + new UpdateCheckpointsContext(clock, false, checkpointGroups), false); + + Assert.assertEquals("Recovery checkpoint updated ", physicalO1.get(0).getRecoveryCheckpoint(), cp1); + } + + @Test public void testUpdateCheckpointsRecovery() { MockClock clock = new MockClock();
