Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 1873f5562 -> 93bdf2d10
- APEX-263 Fixed case where double checkpointing could occurr. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/a4207c56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/a4207c56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/a4207c56 Branch: refs/heads/devel-3 Commit: a4207c5685822a22020d253c0a00231bd83af2c0 Parents: 1873f55 Author: Timothy Farkas <[email protected]> Authored: Sat Nov 14 17:33:34 2015 -0800 Committer: Timothy Farkas <[email protected]> Committed: Mon Nov 16 12:23:01 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/stram/engine/GenericNode.java | 2 + .../stram/engine/GenericNodeTest.java | 119 ++++++++++++++++++- 2 files changed, 119 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a4207c56/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 26ba98a..93cee49 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -156,10 +156,12 @@ public class GenericNode extends Node<Operator> checkpointWindowCount = 0; if (doCheckpoint) { checkpoint(currentWindowId); + lastCheckpointWindowId = currentWindowId; doCheckpoint = false; } else if (PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) { checkpoint(currentWindowId); + lastCheckpointWindowId = currentWindowId; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a4207c56/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index d5ceae6..f2c23b2 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -19,16 +19,22 @@ package com.datatorrent.stram.engine; import java.util.ArrayList; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; +import org.junit.Assert; import org.junit.Test; -import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; +import com.google.common.collect.Sets; + import com.datatorrent.api.*; +import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; +import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.bufferserver.packet.MessageType; +import com.datatorrent.common.util.ScheduledThreadPoolExecutor; import com.datatorrent.stram.tuple.EndStreamTuple; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -90,6 +96,41 @@ public class GenericNodeTest } + public static class GenericCheckpointOperator extends GenericOperator implements CheckpointListener + { + public Set<Long> checkpointedWindows = Sets.newHashSet(); + public volatile boolean checkpointTwice = false; + public volatile int numWindows = 0; + + public GenericCheckpointOperator() + { + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + } + + @Override + public void endWindow() + { + super.endWindow(); + numWindows++; + } + + @Override + public void checkpointed(long windowId) + { + checkpointTwice = checkpointTwice || !checkpointedWindows.add(windowId); + } + + @Override + public void committed(long windowId) + { + } + } + @Test @SuppressWarnings("SleepWhileInLoop") public void testSynchingLogic() throws InterruptedException @@ -296,4 +337,78 @@ public class GenericNodeTest Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId); } + @Test + public void testDoubleCheckpointAtleastOnce() throws Exception + { + testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE); + } + + @Test + public void testDoubleCheckpointAtMostOnce() throws Exception + { + testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE); + } + + @Test + public void testDoubleCheckpointExactlyOnce() throws Exception + { + testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE); + } + + @SuppressWarnings("SleepWhileInLoop") + private void testDoubleCheckpointHandling(ProcessingMode processingMode) throws Exception + { + WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024); + windowGenerator.setResetWindow(0L); + windowGenerator.setFirstWindow(0L); + windowGenerator.setWindowWidth(100); + windowGenerator.setCheckpointCount(1, 0); + + GenericCheckpointOperator gco = new GenericCheckpointOperator(); + DefaultAttributeMap dam = new DefaultAttributeMap(); + dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2); + dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 2); + dam.put(OperatorContext.PROCESSING_MODE, processingMode); + + final GenericNode in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null)); + in.setId(1); + + TestSink testSink = new TestSink(); + + in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024)); + in.connectOutputPort("output", testSink); + + windowGenerator.activate(null); + + final AtomicBoolean ab = new AtomicBoolean(false); + Thread t = new Thread() + { + @Override + public void run() + { + ab.set(true); + in.activate(); + in.run(); + in.deactivate(); + } + + }; + + t.start(); + + long startTime = System.currentTimeMillis(); + long endTime = 0; + + while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime) < 5000) { + Thread.sleep(50); + } + + in.shutdown(); + t.join(); + + windowGenerator.deactivate(); + + Assert.assertFalse(gco.checkpointTwice); + Assert.assertTrue("Timed out", (endTime - startTime) < 5000); + } }
