Repository: apex-core
Updated Branches:
  refs/heads/master e9b01c53f -> e4022674e


APEXCORE-654 fix update recovery window when delay part of group


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/e4022674
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/e4022674
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/e4022674

Branch: refs/heads/master
Commit: e4022674efb6a4db8fc2ec25d72920756b1ebd20
Parents: e9b01c5
Author: bhupeshchawda <[email protected]>
Authored: Sun Mar 26 10:57:51 2017 +0530
Committer: bhupeshchawda <[email protected]>
Committed: Tue Apr 18 07:21:12 2017 +0530

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 20 ++++++-
 .../com/datatorrent/stram/CheckpointTest.java   | 55 ++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/e4022674/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 51e85f7..18d6787 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2082,6 +2082,8 @@ public class StreamingContainerManager implements 
PlanContext
     if (checkpointGroup.size() > 1) {
       for (OperatorMeta om : checkpointGroup) {
         Collection<PTOperator> operators = plan.getAllOperators(om);
+        Collection<PTOperator> unifiers = 
getUnifiersInCheckpointGroup(operators);
+        operators.addAll(unifiers);
         for (PTOperator groupOper : operators) {
           synchronized (groupOper.checkpoints) {
             commonCheckpoints.retainAll(groupOper.checkpoints);
@@ -2175,6 +2177,22 @@ public class StreamingContainerManager implements 
PlanContext
 
   }
 
+  private static Collection<PTOperator> 
getUnifiersInCheckpointGroup(Collection<PTOperator> operators)
+  {
+    Set<PTOperator> unifiers = Sets.newHashSet();
+    for (PTOperator op : operators) {
+      for (PTOperator.PTOutput out : op.getOutputs()) {
+        for (PTOperator.PTInput in : out.sinks) {
+          PTOperator target = in.target;
+          if (target.isUnifier()) {
+            unifiers.add(target);
+          }
+        }
+      }
+    }
+    return unifiers;
+  }
+
   public long windowIdToMillis(long windowId)
   {
     int widthMillis = 
plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
@@ -2186,7 +2204,7 @@ public class StreamingContainerManager implements 
PlanContext
     return this.vars.windowStartMillis;
   }
 
-  private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
+  protected Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
   {
     if (this.checkpointGroups == null) {
       this.checkpointGroups = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/e4022674/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java 
b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 0c997ec..b2b4c0c 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.DefaultOutputPort;
@@ -46,8 +47,10 @@ import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.MockContainer.MockOperatorStats;
 import 
com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
@@ -55,6 +58,7 @@ import com.datatorrent.stram.api.Checkpoint;
 import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.OperatorContext;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.physical.PTContainer;
@@ -282,6 +286,57 @@ public class CheckpointTest
   }
 
   @Test
+  public void testUpdateRecoveryCheckpointWithCycle() throws Exception
+  {
+    Clock clock = new SystemClock();
+
+    
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new 
MemoryStorageAgent());
+
+    // Simulate a DAG with a loop which has a unifier operator
+    TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+    GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
+    DefaultDelayOperator d = dag.addOperator("d", DefaultDelayOperator.class);
+
+    dag.addStream("o1.output1", o1.outport, o2.inport1);
+    dag.addStream("o2.output1", o2.outport1, o3.inport1);
+    dag.addStream("o3.output1", o3.outport1, o4.inport1);
+    dag.addStream("o4.output1", o4.outport1, d.input);
+    dag.addStream("d.output", d.output, o2.inport2);
+    dag.setOperatorAttribute(o3, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
+
+    dag.validate();
+    StreamingContainerManager dnm = new StreamingContainerManager(dag);
+    PhysicalPlan plan = dnm.getPhysicalPlan();
+
+    for (PTOperator oper : plan.getAllOperators().values()) {
+      Assert.assertEquals("Initial activation windowId" + oper, 
Checkpoint.INITIAL_CHECKPOINT, oper.getRecoveryCheckpoint());
+      Assert.assertEquals("Checkpoints empty" + oper, Collections.emptyList(), 
oper.checkpoints);
+    }
+
+    Checkpoint cp1 = new Checkpoint(1L, 0, 0);
+    Checkpoint cp2 = new Checkpoint(2L, 0, 0);
+
+    Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups = 
dnm.getCheckpointGroups();
+
+    Map<Integer, PTOperator> allOperators = plan.getAllOperators();
+    for (PTOperator operator: allOperators.values()) {
+      operator.setState(PTOperator.State.ACTIVE);
+      operator.checkpoints.add(cp1);
+      dnm.updateRecoveryCheckpoints(operator,
+          new UpdateCheckpointsContext(clock, false, checkpointGroups), false);
+    }
+
+    List<PTOperator> physicalO1 = plan.getOperators(dag.getOperatorMeta("o1"));
+    physicalO1.get(0).checkpoints.add(cp2);
+    dnm.updateRecoveryCheckpoints(physicalO1.get(0),
+        new UpdateCheckpointsContext(clock, false, checkpointGroups), false);
+
+    Assert.assertEquals("Recovery checkpoint updated ", 
physicalO1.get(0).getRecoveryCheckpoint(), cp1);
+  }
+
+  @Test
   public void testUpdateCheckpointsRecovery()
   {
     MockClock clock = new MockClock();

Reply via email to