Repository: apex-core Updated Branches: refs/heads/master ad4210ba7 -> 10650b3a0
APEXCORE-617 InputNodeTest intermittently fails with ConcurrentModificationException Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/25f1ac5c Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/25f1ac5c Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/25f1ac5c Branch: refs/heads/master Commit: 25f1ac5c84a6bd86879e5f947d11edafe351b25a Parents: a469dfb Author: Vlad Rozov <[email protected]> Authored: Sat Jan 21 09:19:59 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Sat Jan 21 10:10:17 2017 -0800 ---------------------------------------------------------------------- .../stram/engine/GenericNodeTest.java | 43 ----------- .../datatorrent/stram/engine/InputNodeTest.java | 81 ++------------------ .../com/datatorrent/stram/engine/NodeTest.java | 62 ++++++++++++--- 3 files changed, 55 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/25f1ac5c/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index af99e98..88f3e42 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,7 +38,6 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; @@ -47,7 +45,6 @@ import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Sink; import com.datatorrent.api.Stats.OperatorStats; @@ -240,46 +237,6 @@ public class GenericNodeTest } } - public static class GenericCheckpointOperator extends GenericOperator implements CheckpointNotificationListener - { - public Set<Long> checkpointedWindows = Sets.newHashSet(); - public volatile boolean checkpointTwice = false; - public volatile int numWindows = 0; - - public GenericCheckpointOperator() - { - } - - @Override - public void beginWindow(long windowId) - { - super.beginWindow(windowId); - } - - @Override - public void endWindow() - { - super.endWindow(); - numWindows++; - } - - @Override - public void checkpointed(long windowId) - { - checkpointTwice = checkpointTwice || !checkpointedWindows.add(windowId); - } - - @Override - public void committed(long windowId) - { - } - - @Override - public void beforeCheckpoint(long windowId) - { - } - } - @Test @SuppressWarnings("SleepWhileInLoop") public void testSynchingLogic() throws InterruptedException http://git-wip-us.apache.org/repos/asf/apex-core/blob/25f1ac5c/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java index e182b75..035bb31 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java @@ -18,27 +18,21 @@ */ package com.datatorrent.stram.engine; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; - import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; -import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Sink; import com.datatorrent.bufferserver.packet.MessageType; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.engine.GenericNodeTest.FSTestWatcher; -import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.ResetWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -60,7 +54,6 @@ public class InputNodeTest emitTestHelper(false); } - @SuppressWarnings("deprecation") private void emitTestHelper(boolean trueEmitTuplesFalseHandleIdleTime) throws Exception { TestInputOperator tio = new TestInputOperator(); @@ -69,32 +62,29 @@ public class InputNodeTest dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 10); dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 10); - final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null)); - in.setId(1); + final InputNode in = new InputNode(tio, new OperatorContext(0, "operator", dam, null)); TestSink testSink = new TestSink(); in.connectInputPort(Node.INPUT, new TestWindowGenerator()); in.connectOutputPort("output", testSink); - final AtomicBoolean ab = new AtomicBoolean(false); Thread t = new Thread() { @Override public void run() { - ab.set(true); in.activate(); in.run(); in.deactivate(); } - }; t.start(); Thread.sleep(3000); - t.stop(); + in.shutdown(); + t.join(); Assert.assertTrue("Should have emitted some tuples", testSink.collectedTuples.size() > 0); @@ -237,48 +227,7 @@ public class InputNodeTest private static final Logger LOG = LoggerFactory.getLogger(TestWindowGenerator.class); } - - public static class InputCheckpointOperator extends GenericCheckpointOperator implements InputOperator - { - public Set<Long> checkpointedWindows = Sets.newHashSet(); - public volatile boolean checkpointTwice = false; - public volatile int numWindows = 0; - - public InputCheckpointOperator() - { - } - - @Override - public void beginWindow(long windowId) - { - super.beginWindow(windowId); - } - - @Override - public void endWindow() - { - super.endWindow(); - } - - @Override - public void checkpointed(long windowId) - { - super.checkpointed(windowId); - } - - @Override - public void committed(long windowId) - { - super.committed(windowId); - } - - @Override - public void emitTuples() - { - } - } - - public static class TestInputOperator implements InputOperator, IdleTimeHandler + private static class TestInputOperator extends BaseOperator implements InputOperator, IdleTimeHandler { public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); @@ -294,26 +243,6 @@ public class InputNodeTest } @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - @Override - public void setup(OperatorContext context) - { - } - - @Override - public void teardown() - { - } - - @Override public void handleIdleTime() { if (!trueEmitTuplesFalseHandleIdleTime) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/25f1ac5c/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java index 26bd7a0..55b5eab 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java @@ -20,7 +20,7 @@ package com.datatorrent.stram.engine; import java.io.IOException; import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Set; import org.junit.Assert; import org.junit.Ignore; @@ -28,18 +28,20 @@ import org.junit.Test; import org.apache.hadoop.conf.Configuration; +import com.google.common.collect.Sets; + import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.StorageAgent; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.common.util.ScheduledThreadPoolExecutor; import com.datatorrent.stram.StramLocalCluster; -import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator; -import com.datatorrent.stram.engine.InputNodeTest.InputCheckpointOperator; +import com.datatorrent.stram.engine.GenericNodeTest.GenericOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; /** @@ -276,12 +278,12 @@ public class NodeTest windowGenerator.setWindowWidth(100); windowGenerator.setCheckpointCount(1, 0); - GenericCheckpointOperator gco; + CheckpointTestOperator checkpointTestOperator; if (trueGenericFalseInput) { - gco = new GenericCheckpointOperator(); + checkpointTestOperator = new CheckpointTestOperator(); } else { - gco = new InputCheckpointOperator(); + checkpointTestOperator = new InputCheckpointTestOperator(); } DefaultAttributeMap dam = new DefaultAttributeMap(); dam.put(com.datatorrent.stram.engine.OperatorContext.APPLICATION_WINDOW_COUNT, 2); @@ -292,9 +294,9 @@ public class NodeTest final Node in; if (trueGenericFalseInput) { - in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null)); + in = new GenericNode(checkpointTestOperator, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null)); } else { - in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + in = new InputNode((InputCheckpointTestOperator)checkpointTestOperator, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null)); } @@ -316,13 +318,11 @@ public class NodeTest windowGenerator.activate(null); - final AtomicBoolean ab = new AtomicBoolean(false); Thread t = new Thread() { @Override public void run() { - ab.set(true); in.activate(); in.run(); in.deactivate(); @@ -334,7 +334,7 @@ public class NodeTest long startTime = System.currentTimeMillis(); long endTime = 0; - while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime) < 6000) { + while (checkpointTestOperator.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime) < 6000) { Thread.sleep(50); } @@ -343,7 +343,45 @@ public class NodeTest windowGenerator.deactivate(); - Assert.assertFalse(gco.checkpointTwice); + Assert.assertFalse(checkpointTestOperator.checkpointTwice); Assert.assertTrue("Timed out", (endTime - startTime) < 5000); } + + private static class CheckpointTestOperator extends GenericOperator implements CheckpointNotificationListener + { + public Set<Long> checkpointedWindows = Sets.newHashSet(); + public volatile boolean checkpointTwice = false; + public volatile int numWindows = 0; + + @Override + public void endWindow() + { + super.endWindow(); + numWindows++; + } + + @Override + public void checkpointed(long windowId) + { + checkpointTwice = checkpointTwice || !checkpointedWindows.add(windowId); + } + + @Override + public void committed(long windowId) + { + } + + @Override + public void beforeCheckpoint(long windowId) + { + } + } + + private static class InputCheckpointTestOperator extends CheckpointTestOperator implements InputOperator + { + @Override + public void emitTuples() + { + } + } }
