Repository: storm Updated Branches: refs/heads/master 50d55a951 -> 66ff5fd94
STORM-2748: Fix TickTupleTest to actually test something Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54f0bf22 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54f0bf22 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54f0bf22 Branch: refs/heads/master Commit: 54f0bf22fdcc7e68118ed5184aa8c1f65678e218 Parents: 50d55a9 Author: Robert (Bobby) Evans <[email protected]> Authored: Wed Sep 20 10:42:26 2017 -0500 Committer: Robert (Bobby) Evans <[email protected]> Committed: Wed Sep 20 12:06:44 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/storm/TickTupleTest.java | 118 ++++++++++++------- storm-server/src/test/resources/log4j2.xml | 6 +- 2 files changed, 75 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/54f0bf22/storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java index 2ee9a0c..daa706b 100644 --- a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java +++ b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java @@ -18,85 +18,113 @@ package org.apache.storm; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.ILocalCluster.ILocalTopology; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.Utils; -import org.junit.Assert; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.TupleUtils; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.Map; +import static org.junit.Assert.*; public class TickTupleTest { + private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class); + private static LinkedBlockingQueue<Long> tickTupleTimes = new LinkedBlockingQueue<>(); + private static AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null); @Test public void testTickTupleWorksWithSystemBolt() throws Exception { - ILocalCluster cluster = null; - try { - cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build(); + try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build()){ StormTopology topology = createNoOpTopology(); Config topoConf = new Config(); - topoConf.putAll(Utils.readDefaultConfig()); - topoConf.put("storm.cluster.mode", "local"); topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1); - cluster.submitTopology("test", topoConf, topology); - cluster.advanceClusterTime(2); - Assert.assertTrue("Test is passed", true); - } finally { - cluster.close(); + try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) { + //Give the cluster some time to come up + long time = 0; + while (tickTupleTimes.size() <= 0) { + assert time <= 100_000 : "took over " + time + " ms of simulated time to get a message back..."; + cluster.advanceClusterTime(10); + time += 10_000; + } + tickTupleTimes.clear(); + cluster.advanceClusterTime(1); + time += 1000; + assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue()); + cluster.advanceClusterTime(1); + time += 1000; + assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue()); + cluster.advanceClusterTime(1); + time += 1000; + assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue()); + cluster.advanceClusterTime(1); + time += 1000; + assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue()); + cluster.advanceClusterTime(1); + time += 1000; + assertEquals(time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS).longValue()); + } + assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get()); } - } - private IRichSpout makeNoOpSpout() { - return new BaseRichSpout() { - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("tuple")); - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - } + private static class NoopSpout extends BaseRichSpout { + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("tuple")); + } - @Override - public void nextTuple() { - } + @Override + public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { + } - private void writeObject(java.io.ObjectOutputStream stream) { - } - }; + @Override + public void nextTuple() { + } } - private BaseRichBolt makeNoOpBolt() { - return new BaseRichBolt() { - @Override - public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {} - @Override - public void execute(Tuple tuple) {} + private static class NoopBolt extends BaseRichBolt { + @Override + public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {} - @Override - public void cleanup() { } + @Override + public void execute(Tuple tuple) { + LOG.info("GOT {}", tuple); + if (TupleUtils.isTick(tuple)) { + try { + tickTupleTimes.put(Time.currentTimeMillis()); + } catch (InterruptedException e) { + //Ignored + } + } else { + nonTickTuple.set(tuple); + } + } - @Override - public void declareOutputFields(OutputFieldsDeclarer ofd) {} + @Override + public void cleanup() { } - private void writeObject(java.io.ObjectOutputStream stream) {} - }; + @Override + public void declareOutputFields(OutputFieldsDeclarer ofd) {} } private StormTopology createNoOpTopology() { TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("1", makeNoOpSpout()); - builder.setBolt("2", makeNoOpBolt()).fieldsGrouping("1", new Fields("tuple")); + builder.setSpout("1", new NoopSpout()); + builder.setBolt("2", new NoopBolt()).fieldsGrouping("1", new Fields("tuple")); return builder.createTopology(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/54f0bf22/storm-server/src/test/resources/log4j2.xml ---------------------------------------------------------------------- diff --git a/storm-server/src/test/resources/log4j2.xml b/storm-server/src/test/resources/log4j2.xml index fe097c6..2348548 100644 --- a/storm-server/src/test/resources/log4j2.xml +++ b/storm-server/src/test/resources/log4j2.xml @@ -22,11 +22,9 @@ </Console> </Appenders> <Loggers> - <!-- suppress ERROR org.apache.storm.blobstore.BlobStoreUtils - Could not update the blob with key: key when testing --> - <Logger name="org.apache.storm.blobstore" level="FATAL" /> - <Root level="ERROR"> + <Root level="INFO"> <appender-ref ref="console" /> </Root> </Loggers> </Configuration> - \ No newline at end of file +
