Repository: incubator-apex-core Updated Branches: refs/heads/master 061176656 -> 47d66b000
APEXCORE-415 #resolve #comment Fixed double checkpointing bug in InputNode. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f8b4d499 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f8b4d499 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f8b4d499 Branch: refs/heads/master Commit: f8b4d499c61de2d4acd7710e63f52d11020eda2c Parents: 9e17211 Author: Timothy Farkas <[email protected]> Authored: Fri Apr 1 16:18:41 2016 -0700 Committer: Timothy Farkas <[email protected]> Committed: Mon Apr 4 16:49:02 2016 -0700 ---------------------------------------------------------------------- .../com/datatorrent/stram/engine/InputNode.java | 18 ++-- .../stram/engine/GenericNodeTest.java | 75 +++-------------- .../datatorrent/stram/engine/InputNodeTest.java | 69 ++++++++++++++++ .../com/datatorrent/stram/engine/NodeTest.java | 87 ++++++++++++++++++++ 4 files changed, 180 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java index f28841c..09eca59 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java @@ -28,6 +28,8 @@ import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Operator.ShutdownException; import com.datatorrent.api.Sink; +import com.datatorrent.api.annotation.Stateless; + import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; @@ -43,6 +45,7 @@ public class InputNode extends Node<InputOperator> { private final ArrayList<SweepableReservoir> deferredInputConnections = new ArrayList<SweepableReservoir>(); protected SweepableReservoir controlTuples; + long lastCheckpointWindowId = Stateless.WINDOW_ID; public InputNode(InputOperator operator, OperatorContext context) { @@ -145,10 +148,12 @@ public class InputNode extends Node<InputOperator> checkpointWindowCount = 0; if (doCheckpoint) { checkpoint(currentWindowId); + lastCheckpointWindowId = currentWindowId; doCheckpoint = false; } else if (PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) { checkpoint(currentWindowId); + lastCheckpointWindowId = currentWindowId; } } @@ -162,11 +167,13 @@ public class InputNode extends Node<InputOperator> case CHECKPOINT: dagCheckpointOffsetCount = 0; - if (checkpointWindowCount == 0 && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { - checkpoint(currentWindowId); - } - else { - doCheckpoint = true; + if (lastCheckpointWindowId < currentWindowId) { + if (checkpointWindowCount == 0 && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { + checkpoint(currentWindowId); + lastCheckpointWindowId = currentWindowId; + } else { + doCheckpoint = true; + } } for (int i = sinks.length; i-- > 0;) { sinks[i].put(t); @@ -234,6 +241,7 @@ public class InputNode extends Node<InputOperator> checkpointWindowCount = 0; if (doCheckpoint || PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) { checkpoint(currentWindowId); + lastCheckpointWindowId = currentWindowId; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index 5828844..034851b 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -44,7 +44,7 @@ import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Sink; import com.datatorrent.api.Stats.OperatorStats; @@ -52,9 +52,9 @@ import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.common.util.AsyncFSStorageAgent; +import com.datatorrent.common.util.ScheduledExecutorService; import com.datatorrent.common.util.ScheduledThreadPoolExecutor; import com.datatorrent.stram.api.Checkpoint; -import com.datatorrent.common.util.ScheduledExecutorService; import com.datatorrent.stram.tuple.EndStreamTuple; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -229,7 +229,7 @@ public class GenericNodeTest } } - public static class GenericCheckpointOperator extends GenericOperator implements CheckpointListener + public static class GenericCheckpointOperator extends GenericOperator implements CheckpointNotificationListener { public Set<Long> checkpointedWindows = Sets.newHashSet(); public volatile boolean checkpointTwice = false; @@ -262,6 +262,11 @@ public class GenericNodeTest public void committed(long windowId) { } + + @Override + public void beforeCheckpoint(long windowId) + { + } } @Test @@ -476,77 +481,19 @@ public class GenericNodeTest @Test public void testDoubleCheckpointAtleastOnce() throws Exception { - testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE); + NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, true, testMeta.getDir()); } @Test public void testDoubleCheckpointAtMostOnce() throws Exception { - testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE); + NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE, true, testMeta.getDir()); } @Test public void testDoubleCheckpointExactlyOnce() throws Exception { - testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE); - } - - @SuppressWarnings("SleepWhileInLoop") - private void testDoubleCheckpointHandling(ProcessingMode processingMode) throws Exception - { - WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024); - windowGenerator.setResetWindow(0L); - windowGenerator.setFirstWindow(0L); - windowGenerator.setWindowWidth(100); - windowGenerator.setCheckpointCount(1, 0); - - GenericCheckpointOperator gco = new GenericCheckpointOperator(); - DefaultAttributeMap dam = new DefaultAttributeMap(); - dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2); - dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 2); - dam.put(OperatorContext.PROCESSING_MODE, processingMode); - - final GenericNode in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null)); - in.setId(1); - - TestSink testSink = new TestSink(); - - in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024)); - in.connectOutputPort("output", testSink); - in.firstWindowMillis = 0; - in.windowWidthMillis = 100; - - windowGenerator.activate(null); - - final AtomicBoolean ab = new AtomicBoolean(false); - Thread t = new Thread() - { - @Override - public void run() - { - ab.set(true); - in.activate(); - in.run(); - in.deactivate(); - } - }; - - t.start(); - - long startTime = System.currentTimeMillis(); - long endTime = 0; - - while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime) < 5000) { - Thread.sleep(50); - } - - in.shutdown(); - t.join(); - - windowGenerator.deactivate(); - - Assert.assertFalse(gco.checkpointTwice); - Assert.assertTrue("Timed out", (endTime - startTime) < 5000); + NodeTest.testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE, true, testMeta.getDir()); } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java index f723c2b..94b7675 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java @@ -18,26 +18,36 @@ */ package com.datatorrent.stram.engine; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; + import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Sink; import com.datatorrent.bufferserver.packet.MessageType; +import com.datatorrent.stram.engine.GenericNodeTest.FSTestWatcher; +import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.ResetWindowTuple; import com.datatorrent.stram.tuple.Tuple; public class InputNodeTest { + @Rule + public FSTestWatcher testMeta = new FSTestWatcher(); + @Test public void testEmitTuplesOutsideStreamingWindow() throws Exception { @@ -111,6 +121,24 @@ public class InputNodeTest } } + @Test + public void testDoubleCheckpointAtleastOnce() throws Exception + { + NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, false, testMeta.getDir()); + } + + @Test + public void testDoubleCheckpointAtMostOnce() throws Exception + { + NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE, false, testMeta.getDir()); + } + + @Test + public void testDoubleCheckpointExactlyOnce() throws Exception + { + NodeTest.testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE, false, testMeta.getDir()); + } + public static class TestWindowGenerator implements SweepableReservoir { private final long baseSeconds = (System.currentTimeMillis() / 1000L) << 32; @@ -208,6 +236,47 @@ public class InputNodeTest private static final Logger LOG = LoggerFactory.getLogger(TestWindowGenerator.class); } + + public static class InputCheckpointOperator extends GenericCheckpointOperator implements InputOperator + { + public Set<Long> checkpointedWindows = Sets.newHashSet(); + public volatile boolean checkpointTwice = false; + public volatile int numWindows = 0; + + public InputCheckpointOperator() + { + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + } + + @Override + public void endWindow() + { + super.endWindow(); + } + + @Override + public void checkpointed(long windowId) + { + super.checkpointed(windowId); + } + + @Override + public void committed(long windowId) + { + super.committed(windowId); + } + + @Override + public void emitTuples() + { + } + } + public static class TestInputOperator implements InputOperator, IdleTimeHandler { public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java index 2e12f63..2595714 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java @@ -20,18 +20,26 @@ package com.datatorrent.stram.engine; import java.io.IOException; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.StorageAgent; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.FSStorageAgent; +import com.datatorrent.common.util.ScheduledThreadPoolExecutor; import com.datatorrent.stram.StramLocalCluster; +import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator; +import com.datatorrent.stram.engine.InputNodeTest.InputCheckpointOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; /** @@ -258,4 +266,83 @@ public class NodeTest node.deactivate(); } + @SuppressWarnings("SleepWhileInLoop") + public static void testDoubleCheckpointHandling(ProcessingMode processingMode, boolean trueGenericFalseInput, String path) + throws Exception + { + WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024); + windowGenerator.setResetWindow(0L); + windowGenerator.setFirstWindow(0L); + windowGenerator.setWindowWidth(100); + windowGenerator.setCheckpointCount(1, 0); + + GenericCheckpointOperator gco; + + if (trueGenericFalseInput) { + gco = new GenericCheckpointOperator(); + } else { + gco = new InputCheckpointOperator(); + } + DefaultAttributeMap dam = new DefaultAttributeMap(); + dam.put(com.datatorrent.stram.engine.OperatorContext.APPLICATION_WINDOW_COUNT, 2); + dam.put(com.datatorrent.stram.engine.OperatorContext.CHECKPOINT_WINDOW_COUNT, 2); + dam.put(com.datatorrent.stram.engine.OperatorContext.PROCESSING_MODE, processingMode); + dam.put(com.datatorrent.stram.engine.OperatorContext.STORAGE_AGENT, new FSStorageAgent(path, new Configuration())); + + final Node in; + + if (trueGenericFalseInput) { + in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null)); + } else { + in = new InputNode((InputCheckpointOperator) gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null)); + } + + in.setId(1); + + TestSink testSink = new TestSink(); + String inputPort; + + if (trueGenericFalseInput) { + inputPort = "ip1"; + } else { + inputPort = Node.INPUT; + } + + in.connectInputPort(inputPort, windowGenerator.acquireReservoir(String.valueOf(in.id), 1024)); + in.connectOutputPort("output", testSink); + in.firstWindowMillis = 0; + in.windowWidthMillis = 100; + + windowGenerator.activate(null); + + final AtomicBoolean ab = new AtomicBoolean(false); + Thread t = new Thread() + { + @Override + public void run() + { + ab.set(true); + in.activate(); + in.run(); + in.deactivate(); + } + }; + + t.start(); + + long startTime = System.currentTimeMillis(); + long endTime = 0; + + while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime) < 6000) { + Thread.sleep(50); + } + + in.shutdown(); + t.join(); + + windowGenerator.deactivate(); + + Assert.assertFalse(gco.checkpointTwice); + Assert.assertTrue("Timed out", (endTime - startTime) < 5000); + } }
