Fixing stylecheck problems with storm-perf
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e409ecd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e409ecd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e409ecd Branch: refs/heads/master Commit: 0e409ecd8b2d6956ed38f78bb068ce8fef67e83b Parents: f4ba7c9 Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 23:14:26 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 02:32:40 2018 -0400 ---------------------------------------------------------------------- examples/storm-perf/pom.xml | 2 +- .../perf/ConstSpoutIdBoltNullBoltTopo.java | 12 ++-- .../storm/perf/ConstSpoutNullBoltTopo.java | 4 +- .../storm/perf/FileReadWordCountTopo.java | 33 +++++------ .../storm/perf/HdfsSpoutNullBoltTopo.java | 3 +- .../org/apache/storm/perf/JCQueuePerfTest.java | 31 +++++----- .../org/apache/storm/perf/JCToolsPerfTest.java | 27 ++++----- .../apache/storm/perf/KafkaClientHdfsTopo.java | 8 +-- .../perf/KafkaClientSpoutNullBoltTopo.java | 15 ++--- .../apache/storm/perf/LowThroughputTopo.java | 9 ++- .../storm/perf/SimplifiedWordCountTopo.java | 1 - .../storm/perf/StrGenSpoutHdfsBoltTopo.java | 3 +- .../org/apache/storm/perf/ThroughputMeter.java | 16 ++--- .../org/apache/storm/perf/bolt/CountBolt.java | 1 - .../org/apache/storm/perf/bolt/DevNullBolt.java | 1 - .../java/org/apache/storm/perf/bolt/IdBolt.java | 1 - .../storm/perf/bolt/SplitSentenceBolt.java | 1 - .../org/apache/storm/perf/spout/ConstSpout.java | 1 - .../apache/storm/perf/spout/FileReadSpout.java | 1 - .../apache/storm/perf/spout/StringGenSpout.java | 27 ++++----- .../apache/storm/perf/spout/WordGenSpout.java | 62 ++++++++++---------- .../storm/perf/utils/BasicMetricsCollector.java | 23 ++++---- .../org/apache/storm/perf/utils/Helper.java | 7 ++- .../apache/storm/perf/utils/IdentityBolt.java | 3 +- .../apache/storm/perf/utils/MetricsSample.java | 3 +- 25 files changed, 139 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml index f8eb965..4bbabfa3 100644 --- a/examples/storm-perf/pom.xml +++ b/examples/storm-perf/pom.xml @@ -95,7 +95,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>100</maxAllowedViolations> + <maxAllowedViolations>65</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java index dc3649e..a65efe6 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java @@ -19,7 +19,6 @@ package org.apache.storm.perf; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.perf.bolt.DevNullBolt; @@ -30,11 +29,8 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; /** - * ConstSpout -> IdBolt -> DevNullBolt - * This topology measures speed of messaging between spouts->bolt and bolt->bolt - * ConstSpout : Continuously emits a constant string - * IdBolt : clones and emits input tuples - * DevNullBolt : discards incoming tuples + * ConstSpout -> IdBolt -> DevNullBolt This topology measures speed of messaging between spouts->bolt and bolt->bolt ConstSpout : + * Continuously emits a constant string IdBolt : clones and emits input tuples DevNullBolt : discards incoming tuples */ public class ConstSpoutIdBoltNullBoltTopo { @@ -66,11 +62,11 @@ public class ConstSpoutIdBoltNullBoltTopo { int numBolt1 = Helper.getInt(conf, BOLT1_COUNT, 1); builder.setBolt(BOLT1_ID, bolt1, numBolt1) - .localOrShuffleGrouping(SPOUT_ID); + .localOrShuffleGrouping(SPOUT_ID); int numBolt2 = Helper.getInt(conf, BOLT2_COUNT, 1); builder.setBolt(BOLT2_ID, bolt2, numBolt2) - .localOrShuffleGrouping(BOLT1_ID); + .localOrShuffleGrouping(BOLT1_ID); System.err.printf("====> Using : numSpouts = %d , numBolt1 = %d, numBolt2=%d\n", numSpouts, numBolt1, numBolt2); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java index ee778fb..9683e08 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java @@ -19,7 +19,6 @@ package org.apache.storm.perf; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.perf.bolt.DevNullBolt; @@ -89,7 +88,8 @@ public class ConstSpoutNullBoltTopo { // For reference : numbers taken on MacBook Pro mid 2015 // -- ACKer=0: ~8 mill/sec (batchSz=2k & recvQsize=50k). 6.7 mill/sec (batchSz=1 & recvQsize=1k) // -- ACKer=1: ~1 mill/sec, lat= ~1 microsec (batchSz=1 & bolt.wait.strategy=Park bolt.wait.park.micros=0) - // -- ACKer=1: ~1.3 mill/sec, lat= ~11 micros (batchSz=1 & receive.buffer.size=1k, bolt.wait & bp.wait = Progressive[defaults]) + // -- ACKer=1: ~1.3 mill/sec, lat= ~11 micros (batchSz=1 & receive.buffer.size=1k, bolt.wait & bp.wait = + // Progressive[defaults]) // -- ACKer=1: ~1.6 mill/sec, lat= ~300 micros (batchSz=500 & bolt.wait.strategy=Park bolt.wait.park.micros=0) topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8); topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 500); http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java index 7e9256e..827337e 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java @@ -1,25 +1,24 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ package org.apache.storm.perf; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.perf.bolt.CountBolt; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java index e288a5c..4f97dfa 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java @@ -19,7 +19,6 @@ package org.apache.storm.perf; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.hdfs.spout.HdfsSpout; @@ -78,7 +77,7 @@ public class HdfsSpoutNullBoltTopo { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout, spoutNum); builder.setBolt(BOLT_ID, bolt, boltNum) - .localOrShuffleGrouping(SPOUT_ID); + .localOrShuffleGrouping(SPOUT_ID); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java index 700ce4e..f2bfd31 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java @@ -19,7 +19,6 @@ package org.apache.storm.perf; import java.util.concurrent.locks.LockSupport; - import org.apache.storm.policy.WaitStrategyPark; import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.MutableLong; @@ -28,15 +27,15 @@ public class JCQueuePerfTest { // Usage: Let it and then explicitly terminate. // Metrics will be printed when application is terminated. public static void main(String[] args) throws Exception { -// oneProducer1Consumer(1000); // -- measurement 1 -// twoProducer1Consumer(1000); // -- measurement 2 -// threeProducer1Consumer(1); // -- measurement 3 + // oneProducer1Consumer(1000); // -- measurement 1 + // twoProducer1Consumer(1000); // -- measurement 2 + // threeProducer1Consumer(1); // -- measurement 3 -// oneProducer2Consumers(); // -- measurement 4 + // oneProducer2Consumers(); // -- measurement 4 -// producerFwdConsumer(); // -- measurement 5 + // producerFwdConsumer(); // -- measurement 5 -// ackingProducerSimulation(); // -- measurement 6 + // ackingProducerSimulation(); // -- measurement 6 while (true) { Thread.sleep(1000); @@ -46,8 +45,8 @@ public class JCQueuePerfTest { private static void ackingProducerSimulation() { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test",1000, 1000); - JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test",1000, 1000); + JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", 1000, 1000); + JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", 1000, 1000); final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); final Acker acker = new Acker(ackQ, spoutQ); @@ -57,8 +56,8 @@ public class JCQueuePerfTest { private static void producerFwdConsumer(int prodBatchSz) { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); - JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); + JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000); + JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000); final Producer prod = new Producer(q1); final Forwarder fwd = new Forwarder(q1, q2); @@ -69,7 +68,7 @@ public class JCQueuePerfTest { private static void oneProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000); final Producer prod1 = new Producer(q1); final Consumer cons1 = new Consumer(q1); @@ -78,7 +77,7 @@ public class JCQueuePerfTest { } private static void twoProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000); final Producer prod1 = new Producer(q1); final Producer prod2 = new Producer(q1); @@ -88,7 +87,7 @@ public class JCQueuePerfTest { } private static void threeProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000); final Producer prod1 = new Producer(q1); final Producer prod2 = new Producer(q1); @@ -101,8 +100,8 @@ public class JCQueuePerfTest { private static void oneProducer2Consumers(int prodBatchSz) { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); - JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); + JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000); + JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000); final Producer2 prod1 = new Producer2(q1, q2); final Consumer cons1 = new Consumer(q1); http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java index 98525ed..a439522 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java @@ -19,25 +19,24 @@ package org.apache.storm.perf; import java.util.concurrent.locks.LockSupport; - import org.apache.storm.utils.MutableLong; import org.jctools.queues.MpscArrayQueue; public class JCToolsPerfTest { public static void main(String[] args) throws Exception { -// oneProducer1Consumer(); -// twoProducer1Consumer(); -// threeProducer1Consumer(); -// oneProducer2Consumers(); -// producerFwdConsumer(); - -// JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0); -// JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0); -// -// final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); -// final Acker acker = new Acker(ackQ, spoutQ); -// -// runAllThds(ackingProducer, acker); + // oneProducer1Consumer(); + // twoProducer1Consumer(); + // threeProducer1Consumer(); + // oneProducer2Consumers(); + // producerFwdConsumer(); + + // JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0); + // JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0); + // + // final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); + // final Acker acker = new Acker(ackQ, spoutQ); + // + // runAllThds(ackingProducer, acker); while (true) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java index 0432d9e..67117e4 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java @@ -19,7 +19,6 @@ package org.apache.storm.perf; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.hdfs.bolt.HdfsBolt; @@ -80,8 +79,9 @@ public class KafkaClientHdfsTopo { String topicName = getStr(config, KAFKA_TOPIC); KafkaSpoutConfig<String, String> spoutConfig = KafkaSpoutConfig.builder(bootstrapHosts, topicName) - .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) - .build(); + .setFirstPollOffsetStrategy( + KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) + .build(); KafkaSpout<String, String> spout = new KafkaSpout<>(spoutConfig); @@ -106,7 +106,7 @@ public class KafkaClientHdfsTopo { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout, spoutNum); builder.setBolt(BOLT_ID, bolt, boltNum) - .localOrShuffleGrouping(SPOUT_ID); + .localOrShuffleGrouping(SPOUT_ID); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java index 4d88702..a75785d 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java @@ -65,15 +65,16 @@ public class KafkaClientSpoutNullBoltTopo { String kafkaTopic = Optional.ofNullable(Helper.getStr(config, KAFKA_TOPIC)).orElse("storm-perf-null-bolt-topic"); ProcessingGuarantee processingGuarantee = ProcessingGuarantee.valueOf( Optional.ofNullable(Helper.getStr(config, PROCESSING_GUARANTEE)) - .orElse(ProcessingGuarantee.AT_LEAST_ONCE.name())); + .orElse(ProcessingGuarantee.AT_LEAST_ONCE.name())); int offsetCommitPeriodMs = Helper.getInt(config, OFFSET_COMMIT_PERIOD_MS, 30_000); KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(bootstrapServers, kafkaTopic) - .setProcessingGuarantee(processingGuarantee) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) - .setTupleTrackingEnforced(true) - .build(); + .setProcessingGuarantee(processingGuarantee) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .setFirstPollOffsetStrategy( + KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) + .setTupleTrackingEnforced(true) + .build(); KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig); @@ -84,7 +85,7 @@ public class KafkaClientSpoutNullBoltTopo { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout, spoutNum); builder.setBolt(BOLT_ID, bolt, boltNum) - .localOrShuffleGrouping(SPOUT_ID); + .localOrShuffleGrouping(SPOUT_ID); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java index 5560780..e4972a4 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java @@ -22,7 +22,6 @@ package org.apache.storm.perf; import java.util.Collections; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.perf.utils.Helper; @@ -64,7 +63,7 @@ public class LowThroughputTopo { BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1)); bd.localOrShuffleGrouping(SPOUT_ID); -// bd.shuffleGrouping(SPOUT_ID); + // bd.shuffleGrouping(SPOUT_ID); return builder.createTopology(); } @@ -111,7 +110,7 @@ public class LowThroughputTopo { @Override public void open(Map<String, Object> conf, TopologyContext context, - SpoutOutputCollector collector) { + SpoutOutputCollector collector) { this.collector = collector; } @@ -135,8 +134,8 @@ public class LowThroughputTopo { @Override public void prepare(Map<String, Object> topoConf, - TopologyContext context, - OutputCollector collector) { + TopologyContext context, + OutputCollector collector) { this.collector = collector; } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java index ea5be88..0a9148c 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java @@ -19,7 +19,6 @@ package org.apache.storm.perf; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.perf.bolt.CountBolt; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java index a3650c6..110bf79 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java @@ -20,7 +20,6 @@ package org.apache.storm.perf; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.hdfs.bolt.HdfsBolt; @@ -96,7 +95,7 @@ public class StrGenSpoutHdfsBoltTopo { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout, spoutNum); builder.setBolt(BOLT_ID, bolt, boltNum) - .localOrShuffleGrouping(SPOUT_ID); + .localOrShuffleGrouping(SPOUT_ID); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java index 09aec7d..bd5d1e1 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java @@ -30,6 +30,14 @@ public class ThroughputMeter { this.startTime = System.currentTimeMillis(); } + /** + * @return events/sec + */ + private static double calcThroughput(long count, long startTime, long endTime) { + long gap = (endTime - startTime); + return (count / gap) * 1000; + } + public String getName() { return name; } @@ -61,12 +69,4 @@ public class ThroughputMeter { count = 0; return result; } - - /** - * @return events/sec - */ - private static double calcThroughput(long count, long startTime, long endTime) { - long gap = (endTime - startTime); - return (count / gap) * 1000; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java index 368699b..ee1bf7e 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java @@ -20,7 +20,6 @@ package org.apache.storm.perf.bolt; import java.util.HashMap; import java.util.Map; - import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java index 5f9c710..abb397d 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java @@ -20,7 +20,6 @@ package org.apache.storm.perf.bolt; import java.util.Map; import java.util.concurrent.locks.LockSupport; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java index 0644e31..5387499 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java @@ -19,7 +19,6 @@ package org.apache.storm.perf.bolt; import java.util.Map; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java index abb5af8..85a3f5a 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java @@ -19,7 +19,6 @@ package org.apache.storm.perf.bolt; import java.util.Map; - import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java index 46f12ab..656f992 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java @@ -20,7 +20,6 @@ package org.apache.storm.perf.spout; import java.util.ArrayList; import java.util.Map; - import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java index 4f27d3b..a819fa2 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java @@ -27,7 +27,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; - import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java index 530f7b6..01964ea 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; - import org.apache.commons.lang.RandomStringUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -30,18 +29,18 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; -/** Spout pre-computes a list with 30k fixed length random strings. - * Emits sequentially from this list, over and over again. +/** + * Spout pre-computes a list with 30k fixed length random strings. Emits sequentially from this list, over and over again. */ public class StringGenSpout extends BaseRichSpout { private static final String DEFAULT_FIELD_NAME = "str"; - private int strLen; private final int strCount = 30_000; + ArrayList<String> records; + private int strLen; private String fieldName = DEFAULT_FIELD_NAME; private SpoutOutputCollector collector = null; - ArrayList<String> records; private int curr = 0; private int count = 0; @@ -49,6 +48,14 @@ public class StringGenSpout extends BaseRichSpout { this.strLen = strLen; } + private static ArrayList<String> genStringList(int strLen, int count) { + ArrayList<String> result = new ArrayList<String>(count); + for (int i = 0; i < count; i++) { + result.add(RandomStringUtils.random(strLen)); + } + return result; + } + public StringGenSpout withFieldName(String fieldName) { this.fieldName = fieldName; return this; @@ -66,18 +73,10 @@ public class StringGenSpout extends BaseRichSpout { this.collector = collector; } - private static ArrayList<String> genStringList(int strLen, int count) { - ArrayList<String> result = new ArrayList<String>(count); - for (int i = 0; i < count; i++) { - result.add( RandomStringUtils.random(strLen) ); - } - return result; - } - @Override public void nextTuple() { List<Object> tuple; - if(curr < strCount) { + if (curr < strCount) { tuple = Collections.singletonList((Object) records.get(curr)); ++curr; collector.emit(tuple, ++count); http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java index b3195c1..f46d6f3 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Map; - +import org.apache.storm.perf.ThroughputMeter; import org.apache.storm.perf.utils.Helper; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -32,7 +32,6 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import org.apache.storm.perf.ThroughputMeter; public class WordGenSpout extends BaseRichSpout { public static final String FIELDS = "word"; @@ -51,10 +50,37 @@ public class WordGenSpout extends BaseRichSpout { this.file = file; } + /** + * Reads text file and extracts words from each line. + * + * @return a list of all (non-unique) words + */ + public static ArrayList<String> readWords(String file) { + ArrayList<String> lines = new ArrayList<>(); + try { + FileInputStream input = new FileInputStream(file); + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + try { + String line; + while ((line = reader.readLine()) != null) { + for (String word : line.split("\\s+")) + lines.add(word); + } + } catch (IOException e) { + throw new RuntimeException("Reading file failed", e); + } finally { + reader.close(); + } + } catch (IOException e) { + throw new RuntimeException("Error closing reader", e); + } + return lines; + } + @Override public void open(Map<String, Object> conf, - TopologyContext context, - SpoutOutputCollector collector) { + TopologyContext context, + SpoutOutputCollector collector) { this.collector = collector; Integer ackers = Helper.getInt(conf, "topology.acker.executors", 0); if (ackers.equals(0)) { @@ -67,7 +93,7 @@ public class WordGenSpout extends BaseRichSpout { @Override public void nextTuple() { - index = (index < words.size()-1) ? index+1 : 0; + index = (index < words.size() - 1) ? index + 1 : 0; String word = words.get(index); if (ackEnabled) { collector.emit(new Values(word), count); @@ -84,30 +110,4 @@ public class WordGenSpout extends BaseRichSpout { declarer.declare(new Fields(FIELDS)); } - /** - * Reads text file and extracts words from each line. - * @return a list of all (non-unique) words - */ - public static ArrayList<String> readWords(String file) { - ArrayList<String> lines = new ArrayList<>(); - try { - FileInputStream input = new FileInputStream(file); - BufferedReader reader = new BufferedReader(new InputStreamReader(input)); - try { - String line; - while ((line = reader.readLine()) != null) { - for (String word : line.split("\\s+")) - lines.add(word); - } - } catch (IOException e) { - throw new RuntimeException("Reading file failed", e); - } finally { - reader.close(); - } - } catch (IOException e) { - throw new RuntimeException("Error closing reader", e); - } - return lines; - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java index 7cfd354..735abe0 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.log4j.Logger; import org.apache.storm.generated.Nimbus; import org.apache.storm.utils.Utils; @@ -160,9 +159,9 @@ public class BasicMetricsCollector implements AutoCloseable { this.maxLatency = latency; } metrics.put(SPOUT_AVG_COMPLETE_LATENCY, - String.format(SPOUT_AVG_LATENCY_FORMAT, latency)); + String.format(SPOUT_AVG_LATENCY_FORMAT, latency)); metrics.put(SPOUT_MAX_COMPLETE_LATENCY, - String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency)); + String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency)); } } @@ -192,10 +191,12 @@ public class BasicMetricsCollector implements AutoCloseable { header.add(SPOUT_MAX_COMPLETE_LATENCY); } - writer.println("\n------------------------------------------------------------------------------------------------------------------"); + writer.println( + "\n------------------------------------------------------------------------------------------------------------------"); String str = Utils.join(header, ","); writer.println(str); - writer.println("------------------------------------------------------------------------------------------------------------------"); + writer + .println("------------------------------------------------------------------------------------------------------------------"); writer.flush(); } @@ -211,28 +212,28 @@ public class BasicMetricsCollector implements AutoCloseable { boolean collectTopologyStats(Set<MetricsItem> items) { return items.contains(MetricsItem.ALL) - || items.contains(MetricsItem.TOPOLOGY_STATS); + || items.contains(MetricsItem.TOPOLOGY_STATS); } boolean collectExecutorStats(Set<MetricsItem> items) { return items.contains(MetricsItem.ALL) - || items.contains(MetricsItem.XSFER_RATE) - || items.contains(MetricsItem.SPOUT_LATENCY); + || items.contains(MetricsItem.XSFER_RATE) + || items.contains(MetricsItem.SPOUT_LATENCY); } boolean collectThroughput(Set<MetricsItem> items) { return items.contains(MetricsItem.ALL) - || items.contains(MetricsItem.XSFER_RATE); + || items.contains(MetricsItem.XSFER_RATE); } boolean collectSpoutThroughput(Set<MetricsItem> items) { return items.contains(MetricsItem.ALL) - || items.contains(MetricsItem.SPOUT_THROUGHPUT); + || items.contains(MetricsItem.SPOUT_THROUGHPUT); } boolean collectSpoutLatency(Set<MetricsItem> items) { return items.contains(MetricsItem.ALL) - || items.contains(MetricsItem.SPOUT_LATENCY); + || items.contains(MetricsItem.SPOUT_LATENCY); } public enum MetricsItem { http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java index 4429f4c..a0f838e 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java @@ -19,7 +19,6 @@ package org.apache.storm.perf.utils; import java.util.Map; - import org.apache.storm.StormSubmitter; import org.apache.storm.generated.KillOptions; import org.apache.storm.generated.Nimbus; @@ -68,7 +67,9 @@ public class Helper { } } - /** Kill topo on Ctrl-C */ + /** + * Kill topo on Ctrl-C + */ public static void setupShutdownHook(final String topoName) { Map<String, Object> clusterConf = Utils.readStormConfig(); final Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient(); @@ -86,7 +87,7 @@ public class Helper { } public static void runOnClusterAndPrintMetrics(int durationSec, String topoName, Map<String, Object> topoConf, StormTopology topology) - throws Exception { + throws Exception { // submit topology StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology); setupShutdownHook(topoName); // handle Ctrl-C http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java index f950203..df27263 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java @@ -19,7 +19,6 @@ package org.apache.storm.perf.utils; import java.util.Map; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -36,7 +35,7 @@ public class IdentityBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { - collector.emit(tuple, tuple.getValues() ); + collector.emit(tuple, tuple.getValues()); collector.ack(tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/0e409ecd/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java index f462bc7..b70d06a 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java @@ -20,7 +20,6 @@ package org.apache.storm.perf.utils; import java.util.List; import java.util.Map; - import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; @@ -160,7 +159,7 @@ public class MetricsSample { ret.spoutEmitted = spoutEmitted; ret.spoutTransferred = spoutTransferred; ret.sampleTime = System.currentTimeMillis(); -// ret.numSupervisors = clusterSummary.get_supervisors_size(); + // ret.numSupervisors = clusterSummary.get_supervisors_size(); ret.numWorkers = 0; ret.numExecutors = 0; ret.numTasks = 0;
