APEX-78 #comment Checkpoint notification to notify operators before checkpoint is performed
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d5e82468 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d5e82468 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d5e82468 Branch: refs/heads/master Commit: d5e82468cfd27cdfd09135f8e6956147ac5d9dc3 Parents: 7629be8 Author: Pramod Immaneni <[email protected]> Authored: Fri Dec 11 06:03:29 2015 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Thu Jan 7 22:18:24 2016 -0800 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Operator.java | 26 ++++++ .../java/com/datatorrent/stram/engine/Node.java | 4 + .../com/datatorrent/stram/CheckpointTest.java | 85 +++++++++++++++++--- 3 files changed, 104 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/api/src/main/java/com/datatorrent/api/Operator.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java index eb69266..785c60b 100644 --- a/api/src/main/java/com/datatorrent/api/Operator.java +++ b/api/src/main/java/com/datatorrent/api/Operator.java @@ -224,6 +224,7 @@ public interface Operator extends Component<OperatorContext> * Operators must implement this interface if they are interested in being notified as * soon as the operator state is checkpointed or committed. * + * @deprecated Use {@link CheckpointNotificationListener} instead * @since 0.3.2 */ public static interface CheckpointListener @@ -270,4 +271,29 @@ public interface Operator extends Component<OperatorContext> } + /** + * Operators that need to be notified about checkpoint events should implement this interface. + * + * The notification callbacks in this interface are called outside window boundaries so the operators should not + * attempt to send any tuples in these callbacks. + * + */ + interface CheckpointNotificationListener extends CheckpointListener + { + /** + * Notify the operator before a checkpoint is performed. + * + * Operators may need to perform certain tasks before a checkpoint such as calling flush on a stream to write out + * pending data. Having this notification helps operators perform such operations optimally by doing them once + * before checkpoint as opposed to doing them at the end of every window. + * + * The method will be called before the checkpoint is performed. It will be called after + * {@link Operator#endWindow()} call of the window preceding the checkpoint and before the checkpoint is + * actually performed. + * + * @param windowId The window id of the window preceding the checkpoint + */ + void beforeCheckpoint(long windowId); + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index 5c6b86f..068a325 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -486,6 +486,10 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera void checkpoint(long windowId) { if (!context.stateless) { + if (operator instanceof Operator.CheckpointNotificationListener) { + ((Operator.CheckpointNotificationListener)operator).beforeCheckpoint(windowId); + } + StorageAgent ba = context.getValue(OperatorContext.STORAGE_AGENT); if (ba != null) { try { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index 5d11b86..ee3cbc3 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -18,26 +18,37 @@ */ package com.datatorrent.stram; -import com.datatorrent.common.util.BaseOperator; - -import java.util.*; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.junit.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; -import com.datatorrent.api.*; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.common.util.AsyncFSStorageAgent; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.MockContainer.MockOperatorStats; import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext; import com.datatorrent.stram.api.Checkpoint; @@ -63,12 +74,14 @@ public class CheckpointTest @Rule public TestMeta testMeta = new TestMeta(); - private static class MockInputOperator extends BaseOperator implements InputOperator + private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener { @OutputPortFieldAnnotation( optional = true) public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>(); private transient int windowCount; + private int checkpointState; + @Override public void beginWindow(long windowId) { @@ -81,6 +94,22 @@ public class CheckpointTest public void emitTuples() { } + + @Override + public void beforeCheckpoint(long windowId) + { + ++checkpointState; + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + } } private LogicalPlan dag; @@ -454,5 +483,39 @@ public class CheckpointTest } + @Test + public void testBeforeCheckpointNotification() throws IOException, ClassNotFoundException + { + FSStorageAgent storageAgent = new FSStorageAgent(testMeta.getPath(), null); + dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent); + dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1); + dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 50); + + MockInputOperator o1 = dag.addOperator("o1", new MockInputOperator()); + + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + dag.setAttribute(o2, OperatorContext.STATELESS, true); + + dag.addStream("o1.outport", o1.outport, o2.inport1); + + StramLocalCluster sc = new StramLocalCluster(dag); + sc.setHeartbeatMonitoringEnabled(false); + sc.run(); + + StreamingContainerManager dnm = sc.dnmgr; + PhysicalPlan plan = dnm.getPhysicalPlan(); + List<PTOperator> o1ps = plan.getOperators(dag.getMeta(o1)); + Assert.assertEquals("Number partitions", 1, o1ps.size()); + + PTOperator o1p1 = o1ps.get(0); + long[] ckWIds = storageAgent.getWindowIds(o1p1.getId()); + Arrays.sort(ckWIds); + int expectedState = 0; + for (long windowId : ckWIds) { + Object ckState = storageAgent.load(o1p1.getId(), windowId); + Assert.assertEquals("Checkpointed state class", MockInputOperator.class, ckState.getClass()); + Assert.assertEquals("Checkpoint state", expectedState++, ((MockInputOperator)ckState).checkpointState); + } + } }
