Repository: incubator-apex-core Updated Branches: refs/heads/master 52c16418e -> 99466a3ad
APEXCORE-360 #resolve Providing a way for operator to check how many windows till checkpoint. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1a372857 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1a372857 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1a372857 Branch: refs/heads/master Commit: 1a372857765bfcae137b1d212f0ae73b8fa51897 Parents: bbc4257 Author: Pramod Immaneni <[email protected]> Authored: Thu Feb 18 17:48:15 2016 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Thu Feb 25 16:11:58 2016 -0800 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 6 + .../stram/api/OperatorDeployInfo.java | 6 + .../datatorrent/stram/engine/GenericNode.java | 20 ++- .../com/datatorrent/stram/engine/InputNode.java | 7 ++ .../java/com/datatorrent/stram/engine/Node.java | 17 +++ .../com/datatorrent/stram/engine/OiONode.java | 1 + .../stram/engine/OperatorContext.java | 12 ++ .../stram/engine/GenericNodeTest.java | 123 ++++++++++++++++++- .../com/datatorrent/stram/engine/NodeTest.java | 8 ++ 9 files changed, 193 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index ceed8a2..90d2108 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -306,6 +306,12 @@ public interface Context */ int getId(); + /** + * Return the number of windows before the next checkpoint including the current window. + * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window + */ + int getWindowsFromCheckpoint(); + @SuppressWarnings("FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(OperatorContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java index ae89bc9..22bebfc 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java +++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java @@ -76,6 +76,12 @@ public class OperatorDeployInfo implements Serializable, OperatorContext throw new UnsupportedOperationException("Not supported yet."); } + @Override + public int getWindowsFromCheckpoint() + { + throw new UnsupportedOperationException("Not supported yet."); + } + public enum OperatorType { INPUT, UNIFIER, GENERIC, OIO http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 1ccec31..61176e0 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -18,7 +18,13 @@ */ package com.datatorrent.stram.engine; -import java.util.*; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; import java.util.Map.Entry; import org.slf4j.Logger; @@ -33,11 +39,10 @@ import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Operator.ShutdownException; import com.datatorrent.api.Sink; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.bufferserver.util.Codec; -import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.netlet.util.CircularBuffer; +import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; import com.datatorrent.stram.debug.TappedReservoir; import com.datatorrent.stram.plan.logical.LogicalPlan; @@ -156,6 +161,10 @@ public class GenericNode extends Node<Operator> controlTupleCount++; } + if (doCheckpoint) { + dagCheckpointOffsetCount = (dagCheckpointOffsetCount + 1) % DAG_CHECKPOINT_WINDOW_COUNT; + } + if (++checkpointWindowCount == CHECKPOINT_WINDOW_COUNT) { checkpointWindowCount = 0; if (doCheckpoint) { @@ -240,6 +249,8 @@ public class GenericNode extends Node<Operator> int receivedEndWindow = 0; long firstWindowId = -1; + calculateNextCheckpointWindow(); + TupleTracker tracker; LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>(); try { @@ -291,6 +302,8 @@ public class GenericNode extends Node<Operator> } controlTupleCount++; + context.setWindowsFromCheckpoint(nextCheckpointWindowCount--); + if (applicationWindowCount == 0) { insideWindow = true; operator.beginWindow(currentWindowId); @@ -360,6 +373,7 @@ public class GenericNode extends Node<Operator> activePort.remove(); long checkpointWindow = t.getWindowId(); if (lastCheckpointWindowId < checkpointWindow) { + dagCheckpointOffsetCount = 0; if (PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) { lastCheckpointWindowId = checkpointWindow; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java index 92a61f0..318f796 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java @@ -73,6 +73,8 @@ public class InputNode extends Node<InputOperator> boolean doCheckpoint = false; boolean insideStreamingWindow = false; + calculateNextCheckpointWindow(); + try { while (alive) { Tuple t = controlTuples.sweep(); @@ -135,6 +137,10 @@ public class InputNode extends Node<InputOperator> } controlTupleCount++; + if (doCheckpoint) { + dagCheckpointOffsetCount = (dagCheckpointOffsetCount + 1) % DAG_CHECKPOINT_WINDOW_COUNT; + } + if (++checkpointWindowCount == CHECKPOINT_WINDOW_COUNT) { checkpointWindowCount = 0; if (doCheckpoint) { @@ -155,6 +161,7 @@ public class InputNode extends Node<InputOperator> break; case CHECKPOINT: + dagCheckpointOffsetCount = 0; if (checkpointWindowCount == 0 && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { checkpoint(currentWindowId); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index d4970cd..9eae7e9 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -55,6 +55,7 @@ import com.google.common.math.IntMath; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Component; +import com.datatorrent.api.Context; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; @@ -99,6 +100,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera public static final String OUTPUT = "output"; protected int APPLICATION_WINDOW_COUNT; /* this is write once variable */ + protected int DAG_CHECKPOINT_WINDOW_COUNT; /* this is write once variable */ + protected int CHECKPOINT_WINDOW_COUNT; /* this is write once variable */ protected boolean DATA_TUPLE_AWARE; /* this is write once variable */ @@ -118,6 +121,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera protected Checkpoint checkpoint; public int applicationWindowCount; public int checkpointWindowCount; + public int nextCheckpointWindowCount; + public int dagCheckpointOffsetCount; protected int controlTupleCount; public final OperatorContext context; public final BlockingQueue<StatsListener.OperatorResponse> commandResponse; @@ -540,12 +545,23 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera } } + calculateNextCheckpointWindow(); + dagCheckpointOffsetCount = 0; checkpoint = new Checkpoint(windowId, applicationWindowCount, checkpointWindowCount); if (operator instanceof Operator.CheckpointListener) { ((Operator.CheckpointListener) operator).checkpointed(windowId); } } + protected void calculateNextCheckpointWindow() + { + if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { + nextCheckpointWindowCount = ((DAG_CHECKPOINT_WINDOW_COUNT - dagCheckpointOffsetCount + CHECKPOINT_WINDOW_COUNT - 1)/CHECKPOINT_WINDOW_COUNT) * CHECKPOINT_WINDOW_COUNT; + } else { + nextCheckpointWindowCount = 1; + } + } + @SuppressWarnings("unchecked") public static Node<?> retrieveNode(Object operator, OperatorContext context, OperatorDeployInfo.OperatorType type) { @@ -598,6 +614,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera int slidingWindowCount = context.getValue(OperatorContext.SLIDE_BY_WINDOW_COUNT); APPLICATION_WINDOW_COUNT = IntMath.gcd(APPLICATION_WINDOW_COUNT, slidingWindowCount); } + DAG_CHECKPOINT_WINDOW_COUNT = context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT); CHECKPOINT_WINDOW_COUNT = context.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT); Collection<StatsListener> statsListeners = context.getValue(OperatorContext.STATS_LISTENERS); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java index c90966f..39b3fa4 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java @@ -88,6 +88,7 @@ public class OiONode extends GenericNode break; case CHECKPOINT: + dagCheckpointOffsetCount = 0; if (lastCheckpointWindowId < t.getWindowId() && !doCheckpoint) { if (checkpointWindowCount == 0) { checkpoint(t.getWindowId()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java index 2967b47..424ffcc 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java @@ -48,6 +48,7 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024); private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024); public final boolean stateless; + private int windowsFromCheckpoint; /** * The operator to which this context is passed, will timeout after the following milliseconds if no new tuple has been received by it. @@ -97,6 +98,17 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont return id; } + @Override + public int getWindowsFromCheckpoint() + { + return windowsFromCheckpoint; + } + + public void setWindowsFromCheckpoint(int windowsFromCheckpoint) + { + this.windowsFromCheckpoint = windowsFromCheckpoint; + } + /** * Reset counts for next heartbeat interval and return current counts. This is called as part of the heartbeat processing. * http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index 2577504..ee00d0f 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -54,6 +54,7 @@ import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.ScheduledThreadPoolExecutor; import com.datatorrent.stram.api.Checkpoint; +import com.datatorrent.common.util.ScheduledExecutorService; import com.datatorrent.stram.tuple.EndStreamTuple; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -162,6 +163,7 @@ public class GenericNodeTest public static class GenericOperator implements Operator { + Context.OperatorContext context; long beginWindowId; long endWindowId; public final transient DefaultInputPort<Object> ip1 = new DefaultInputPort<Object>() @@ -201,17 +203,32 @@ public class GenericNodeTest @Override public void setup(Context.OperatorContext context) { - throw new UnsupportedOperationException("Not supported yet."); + this.context = context; } @Override public void teardown() { - throw new UnsupportedOperationException("Not supported yet."); } } + public static class CheckpointDistanceOperator extends GenericOperator + { + List<Integer> distances = new ArrayList<Integer>(); + int numWindows = 0; + int maxWindows = 0; + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + if (numWindows++ < maxWindows) { + distances.add(context.getWindowsFromCheckpoint()); + } + } + } + public static class GenericCheckpointOperator extends GenericOperator implements CheckpointListener { public Set<Long> checkpointedWindows = Sets.newHashSet(); @@ -406,8 +423,7 @@ public class GenericNodeTest do { Thread.sleep(sleeptime); interval += sleeptime; - } - while ((ab.get() == false) && (interval < maxSleep)); + } while ((ab.get() == false) && (interval < maxSleep)); int controlTupleCount = gn.controlTupleCount; @@ -628,5 +644,104 @@ public class GenericNodeTest } } + @Test + public void testDefaultCheckPointDistance() throws InterruptedException + { + testCheckpointDistance(Context.DAGContext.CHECKPOINT_WINDOW_COUNT.defaultValue, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT.defaultValue); + } + + @Test + public void testDAGGreaterCheckPointDistance() throws InterruptedException + { + testCheckpointDistance(7, 5); + } + + @Test + public void testOpGreaterCheckPointDistance() throws InterruptedException + { + testCheckpointDistance(3, 5); + } + + private void testCheckpointDistance(int dagCheckPoint, int opCheckPoint) throws InterruptedException + { + int windowWidth = 50; + long sleeptime = 25L; + int maxWindows = 60; + // Adding some extra time for the windows to finish + long maxSleep = windowWidth * maxWindows + 5000; + + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, "default"); + final WindowGenerator windowGenerator = new WindowGenerator(executorService, 1024); + windowGenerator.setWindowWidth(windowWidth); + windowGenerator.setFirstWindow(executorService.getCurrentTimeMillis()); + windowGenerator.setCheckpointCount(dagCheckPoint, 0); + //GenericOperator go = new GenericOperator(); + CheckpointDistanceOperator go = new CheckpointDistanceOperator(); + go.maxWindows = maxWindows; + + List<Integer> checkpoints = new ArrayList<Integer>(); + + int window = 0; + while (window < maxWindows) { + window = (int)Math.ceil((double)(window + 1)/dagCheckPoint) * dagCheckPoint; + window = (int)Math.ceil((double)window/opCheckPoint) * opCheckPoint; + checkpoints.add(window); + } + + final StreamContext stcontext = new StreamContext("s1"); + DefaultAttributeMap attrMap = new DefaultAttributeMap(); + attrMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, dagCheckPoint); + attrMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, opCheckPoint); + final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, attrMap, null); + final GenericNode gn = new GenericNode(go, context); + gn.setId(1); + + //DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024); + //DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024); + + //gn.connectInputPort("ip1", reservoir1); + //gn.connectInputPort("ip2", reservoir2); + gn.connectInputPort("ip1", windowGenerator.acquireReservoir("ip1", 1024)); + gn.connectInputPort("ip2", windowGenerator.acquireReservoir("ip2", 1024)); + gn.connectOutputPort("op", Sink.BLACKHOLE); + + final AtomicBoolean ab = new AtomicBoolean(false); + Thread t = new Thread() + { + @Override + public void run() + { + gn.setup(context); + windowGenerator.activate(stcontext); + gn.activate(); + ab.set(true); + gn.run(); + windowGenerator.deactivate(); + gn.deactivate(); + gn.teardown(); + } + }; + t.start(); + + long interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } while ((go.numWindows < maxWindows) && (interval < maxSleep)); + + Assert.assertEquals("Number distances", maxWindows, go.numWindows); + int chkindex = 0; + int nextCheckpoint = checkpoints.get(chkindex++); + for (int i = 0; i < maxWindows; ++i) { + if ((i + 1) > nextCheckpoint) { + nextCheckpoint = checkpoints.get(chkindex++); + } + Assert.assertEquals("Windows from checkpoint for " + i, nextCheckpoint - i, (int)go.distances.get(i)); + } + + gn.shutdown(); + t.join(); + } + private static final Logger LOG = LoggerFactory.getLogger(GenericNodeTest.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a372857/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java index c518350..2e12f63 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java @@ -214,11 +214,15 @@ public class NodeTest }; + node.activate(); + synchronized (StorageAgentImpl.calls) { StorageAgentImpl.calls.clear(); node.checkpoint(0); Assert.assertEquals("Calls to StorageAgent", 0, StorageAgentImpl.calls.size()); } + + node.deactivate(); } @Test @@ -243,11 +247,15 @@ public class NodeTest }; + node.activate(); + synchronized (StorageAgentImpl.calls) { StorageAgentImpl.calls.clear(); node.checkpoint(0); Assert.assertEquals("Calls to StorageAgent", 1, StorageAgentImpl.calls.size()); } + + node.deactivate(); } }
