Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 52c16418e -> 99466a3ad


APEXCORE-360 #resolve Providing a way for operator to check how many windows 
till checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1a372857
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1a372857
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1a372857

Branch: refs/heads/master
Commit: 1a372857765bfcae137b1d212f0ae73b8fa51897
Parents: bbc4257
Author: Pramod Immaneni <[email protected]>
Authored: Thu Feb 18 17:48:15 2016 -0800
Committer: Pramod Immaneni <[email protected]>
Committed: Thu Feb 25 16:11:58 2016 -0800

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |   6 +
 .../stram/api/OperatorDeployInfo.java           |   6 +
 .../datatorrent/stram/engine/GenericNode.java   |  20 ++-
 .../com/datatorrent/stram/engine/InputNode.java |   7 ++
 .../java/com/datatorrent/stram/engine/Node.java |  17 +++
 .../com/datatorrent/stram/engine/OiONode.java   |   1 +
 .../stram/engine/OperatorContext.java           |  12 ++
 .../stram/engine/GenericNodeTest.java           | 123 ++++++++++++++++++-
 .../com/datatorrent/stram/engine/NodeTest.java  |   8 ++
 9 files changed, 193 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java 
b/api/src/main/java/com/datatorrent/api/Context.java
index ceed8a2..90d2108 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -306,6 +306,12 @@ public interface Context
      */
     int getId();
 
+    /**
+     * Return the number of windows before the next checkpoint including the 
current window.
+     * @return Number of windows from checkpoint, 1 if the checkpoint will be 
after the current window
+     */
+    int getWindowsFromCheckpoint();
+
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = 
AttributeMap.AttributeInitializer.initialize(OperatorContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java 
b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
index ae89bc9..22bebfc 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
@@ -76,6 +76,12 @@ public class OperatorDeployInfo implements Serializable, 
OperatorContext
     throw new UnsupportedOperationException("Not supported yet.");
   }
 
+  @Override
+  public int getWindowsFromCheckpoint()
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
   public enum OperatorType
   {
     INPUT, UNIFIER, GENERIC, OIO

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 1ccec31..61176e0 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -18,7 +18,13 @@
  */
 package com.datatorrent.stram.engine;
 
-import java.util.*;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.slf4j.Logger;
@@ -33,11 +39,10 @@ import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Operator.ShutdownException;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.netlet.util.CircularBuffer;
+import com.datatorrent.netlet.util.DTThrowable;
 import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
 import com.datatorrent.stram.debug.TappedReservoir;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
@@ -156,6 +161,10 @@ public class GenericNode extends Node<Operator>
       controlTupleCount++;
     }
 
+    if (doCheckpoint) {
+      dagCheckpointOffsetCount = (dagCheckpointOffsetCount + 1) % 
DAG_CHECKPOINT_WINDOW_COUNT;
+    }
+
     if (++checkpointWindowCount == CHECKPOINT_WINDOW_COUNT) {
       checkpointWindowCount = 0;
       if (doCheckpoint) {
@@ -240,6 +249,8 @@ public class GenericNode extends Node<Operator>
     int receivedEndWindow = 0;
     long firstWindowId = -1;
 
+    calculateNextCheckpointWindow();
+
     TupleTracker tracker;
     LinkedList<TupleTracker> resetTupleTracker = new 
LinkedList<TupleTracker>();
     try {
@@ -291,6 +302,8 @@ public class GenericNode extends Node<Operator>
                   }
                   controlTupleCount++;
 
+                  
context.setWindowsFromCheckpoint(nextCheckpointWindowCount--);
+
                   if (applicationWindowCount == 0) {
                     insideWindow = true;
                     operator.beginWindow(currentWindowId);
@@ -360,6 +373,7 @@ public class GenericNode extends Node<Operator>
                 activePort.remove();
                 long checkpointWindow = t.getWindowId();
                 if (lastCheckpointWindowId < checkpointWindow) {
+                  dagCheckpointOffsetCount = 0;
                   if (PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) {
                     lastCheckpointWindowId = checkpointWindow;
                   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
index 92a61f0..318f796 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
@@ -73,6 +73,8 @@ public class InputNode extends Node<InputOperator>
     boolean doCheckpoint = false;
     boolean insideStreamingWindow = false;
 
+    calculateNextCheckpointWindow();
+
     try {
       while (alive) {
         Tuple t = controlTuples.sweep();
@@ -135,6 +137,10 @@ public class InputNode extends Node<InputOperator>
               }
               controlTupleCount++;
 
+              if (doCheckpoint) {
+                dagCheckpointOffsetCount = (dagCheckpointOffsetCount + 1) % 
DAG_CHECKPOINT_WINDOW_COUNT;
+              }
+
               if (++checkpointWindowCount == CHECKPOINT_WINDOW_COUNT) {
                 checkpointWindowCount = 0;
                 if (doCheckpoint) {
@@ -155,6 +161,7 @@ public class InputNode extends Node<InputOperator>
               break;
 
             case CHECKPOINT:
+              dagCheckpointOffsetCount = 0;
               if (checkpointWindowCount == 0 && PROCESSING_MODE != 
ProcessingMode.EXACTLY_ONCE) {
                 checkpoint(currentWindowId);
               }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java 
b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index d4970cd..9eae7e9 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -55,6 +55,7 @@ import com.google.common.math.IntMath;
 
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
@@ -99,6 +100,8 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
   public static final String OUTPUT = "output";
   protected int APPLICATION_WINDOW_COUNT; /* this is write once variable */
 
+  protected int DAG_CHECKPOINT_WINDOW_COUNT; /* this is write once variable */
+
   protected int CHECKPOINT_WINDOW_COUNT; /* this is write once variable */
 
   protected boolean DATA_TUPLE_AWARE; /* this is write once variable */
@@ -118,6 +121,8 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
   protected Checkpoint checkpoint;
   public int applicationWindowCount;
   public int checkpointWindowCount;
+  public int nextCheckpointWindowCount;
+  public int dagCheckpointOffsetCount;
   protected int controlTupleCount;
   public final OperatorContext context;
   public final BlockingQueue<StatsListener.OperatorResponse> commandResponse;
@@ -540,12 +545,23 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
       }
     }
 
+    calculateNextCheckpointWindow();
+    dagCheckpointOffsetCount = 0;
     checkpoint = new Checkpoint(windowId, applicationWindowCount, 
checkpointWindowCount);
     if (operator instanceof Operator.CheckpointListener) {
       ((Operator.CheckpointListener) operator).checkpointed(windowId);
     }
   }
 
+  protected void calculateNextCheckpointWindow()
+  {
+    if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+      nextCheckpointWindowCount = ((DAG_CHECKPOINT_WINDOW_COUNT - 
dagCheckpointOffsetCount + CHECKPOINT_WINDOW_COUNT - 
1)/CHECKPOINT_WINDOW_COUNT) * CHECKPOINT_WINDOW_COUNT;
+    } else {
+      nextCheckpointWindowCount = 1;
+    }
+  }
+
   @SuppressWarnings("unchecked")
   public static Node<?> retrieveNode(Object operator, OperatorContext context, 
OperatorDeployInfo.OperatorType type)
   {
@@ -598,6 +614,7 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
       int slidingWindowCount = 
context.getValue(OperatorContext.SLIDE_BY_WINDOW_COUNT);
       APPLICATION_WINDOW_COUNT = IntMath.gcd(APPLICATION_WINDOW_COUNT, 
slidingWindowCount);
     }
+    DAG_CHECKPOINT_WINDOW_COUNT = 
context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
     CHECKPOINT_WINDOW_COUNT = 
context.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT);
     Collection<StatsListener> statsListeners = 
context.getValue(OperatorContext.STATS_LISTENERS);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
index c90966f..39b3fa4 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
@@ -88,6 +88,7 @@ public class OiONode extends GenericNode
           break;
 
         case CHECKPOINT:
+          dagCheckpointOffsetCount = 0;
           if (lastCheckpointWindowId < t.getWindowId() && !doCheckpoint) {
             if (checkpointWindowCount == 0) {
               checkpoint(t.getWindowId());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java 
b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 2967b47..424ffcc 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -48,6 +48,7 @@ public class OperatorContext extends BaseContext implements 
Context.OperatorCont
   private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new 
CircularBuffer<ContainerStats.OperatorStats>(1024);
   private final CircularBuffer<OperatorRequest> requests = new 
CircularBuffer<OperatorRequest>(1024);
   public final boolean stateless;
+  private int windowsFromCheckpoint;
 
   /**
    * The operator to which this context is passed, will timeout after the 
following milliseconds if no new tuple has been received by it.
@@ -97,6 +98,17 @@ public class OperatorContext extends BaseContext implements 
Context.OperatorCont
     return id;
   }
 
+  @Override
+  public int getWindowsFromCheckpoint()
+  {
+    return windowsFromCheckpoint;
+  }
+
+  public void setWindowsFromCheckpoint(int windowsFromCheckpoint)
+  {
+    this.windowsFromCheckpoint = windowsFromCheckpoint;
+  }
+
   /**
    * Reset counts for next heartbeat interval and return current counts. This 
is called as part of the heartbeat processing.
    *

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 2577504..ee00d0f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -54,6 +54,7 @@ import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
 import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.common.util.ScheduledExecutorService;
 import com.datatorrent.stram.tuple.EndStreamTuple;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -162,6 +163,7 @@ public class GenericNodeTest
 
   public static class GenericOperator implements Operator
   {
+    Context.OperatorContext context;
     long beginWindowId;
     long endWindowId;
     public final transient DefaultInputPort<Object> ip1 = new 
DefaultInputPort<Object>()
@@ -201,17 +203,32 @@ public class GenericNodeTest
     @Override
     public void setup(Context.OperatorContext context)
     {
-      throw new UnsupportedOperationException("Not supported yet.");
+      this.context = context;
     }
 
     @Override
     public void teardown()
     {
-      throw new UnsupportedOperationException("Not supported yet.");
     }
 
   }
 
+  public static class CheckpointDistanceOperator extends GenericOperator
+  {
+    List<Integer> distances = new ArrayList<Integer>();
+    int numWindows = 0;
+    int maxWindows = 0;
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      if (numWindows++ < maxWindows) {
+        distances.add(context.getWindowsFromCheckpoint());
+      }
+    }
+  }
+
   public static class GenericCheckpointOperator extends GenericOperator 
implements CheckpointListener
   {
     public Set<Long> checkpointedWindows = Sets.newHashSet();
@@ -406,8 +423,7 @@ public class GenericNodeTest
     do {
       Thread.sleep(sleeptime);
       interval += sleeptime;
-    }
-    while ((ab.get() == false) && (interval < maxSleep));
+    } while ((ab.get() == false) && (interval < maxSleep));
 
 
     int controlTupleCount = gn.controlTupleCount;
@@ -628,5 +644,104 @@ public class GenericNodeTest
     }
   }
 
+  @Test
+  public void testDefaultCheckPointDistance() throws InterruptedException
+  {
+    
testCheckpointDistance(Context.DAGContext.CHECKPOINT_WINDOW_COUNT.defaultValue, 
Context.OperatorContext.CHECKPOINT_WINDOW_COUNT.defaultValue);
+  }
+
+  @Test
+  public void testDAGGreaterCheckPointDistance() throws InterruptedException
+  {
+    testCheckpointDistance(7, 5);
+  }
+
+  @Test
+  public void testOpGreaterCheckPointDistance() throws InterruptedException
+  {
+    testCheckpointDistance(3, 5);
+  }
+
+  private void testCheckpointDistance(int dagCheckPoint, int opCheckPoint) 
throws InterruptedException
+  {
+    int windowWidth = 50;
+    long sleeptime = 25L;
+    int maxWindows = 60;
+    // Adding some extra time for the windows to finish
+    long maxSleep = windowWidth * maxWindows + 5000;
+
+    ScheduledExecutorService executorService = new 
ScheduledThreadPoolExecutor(1, "default");
+    final WindowGenerator windowGenerator = new 
WindowGenerator(executorService, 1024);
+    windowGenerator.setWindowWidth(windowWidth);
+    windowGenerator.setFirstWindow(executorService.getCurrentTimeMillis());
+    windowGenerator.setCheckpointCount(dagCheckPoint, 0);
+    //GenericOperator go = new GenericOperator();
+    CheckpointDistanceOperator go = new CheckpointDistanceOperator();
+    go.maxWindows = maxWindows;
+
+    List<Integer> checkpoints = new ArrayList<Integer>();
+
+    int window = 0;
+    while (window < maxWindows) {
+      window = (int)Math.ceil((double)(window + 1)/dagCheckPoint) * 
dagCheckPoint;
+      window = (int)Math.ceil((double)window/opCheckPoint) * opCheckPoint;
+      checkpoints.add(window);
+    }
+
+    final StreamContext stcontext = new StreamContext("s1");
+    DefaultAttributeMap attrMap = new DefaultAttributeMap();
+    attrMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, dagCheckPoint);
+    attrMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, opCheckPoint);
+    final OperatorContext context = new 
com.datatorrent.stram.engine.OperatorContext(0, attrMap, null);
+    final GenericNode gn = new GenericNode(go, context);
+    gn.setId(1);
+
+    //DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
+    //DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
+
+    //gn.connectInputPort("ip1", reservoir1);
+    //gn.connectInputPort("ip2", reservoir2);
+    gn.connectInputPort("ip1", windowGenerator.acquireReservoir("ip1", 1024));
+    gn.connectInputPort("ip2", windowGenerator.acquireReservoir("ip2", 1024));
+    gn.connectOutputPort("op", Sink.BLACKHOLE);
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        gn.setup(context);
+        windowGenerator.activate(stcontext);
+        gn.activate();
+        ab.set(true);
+        gn.run();
+        windowGenerator.deactivate();
+        gn.deactivate();
+        gn.teardown();
+      }
+    };
+    t.start();
+
+    long interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    } while ((go.numWindows < maxWindows) && (interval < maxSleep));
+
+    Assert.assertEquals("Number distances", maxWindows, go.numWindows);
+    int chkindex = 0;
+    int nextCheckpoint = checkpoints.get(chkindex++);
+    for (int i = 0; i < maxWindows; ++i) {
+      if ((i + 1) > nextCheckpoint) {
+        nextCheckpoint = checkpoints.get(chkindex++);
+      }
+      Assert.assertEquals("Windows from checkpoint for " + i, nextCheckpoint - 
i, (int)go.distances.get(i));
+    }
+
+    gn.shutdown();
+    t.join();
+  }
+
   private static final Logger LOG = 
LoggerFactory.getLogger(GenericNodeTest.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index c518350..2e12f63 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -214,11 +214,15 @@ public class NodeTest
 
     };
 
+    node.activate();
+
     synchronized (StorageAgentImpl.calls) {
       StorageAgentImpl.calls.clear();
       node.checkpoint(0);
       Assert.assertEquals("Calls to StorageAgent", 0, 
StorageAgentImpl.calls.size());
     }
+
+    node.deactivate();
   }
 
   @Test
@@ -243,11 +247,15 @@ public class NodeTest
 
     };
 
+    node.activate();
+
     synchronized (StorageAgentImpl.calls) {
       StorageAgentImpl.calls.clear();
       node.checkpoint(0);
       Assert.assertEquals("Calls to StorageAgent", 1, 
StorageAgentImpl.calls.size());
     }
+
+    node.deactivate();
   }
 
 }

Reply via email to