http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java deleted file mode 100755 index 3512c65..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java +++ /dev/null @@ -1,114 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License -*/ - -package org.apache.storm.perf; - -import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.BrokerHosts; -import org.apache.storm.kafka.KafkaSpout; -import org.apache.storm.kafka.SpoutConfig; -import org.apache.storm.kafka.StringMultiSchemeWithTopic; -import org.apache.storm.kafka.ZkHosts; -import org.apache.storm.perf.bolt.DevNullBolt; -import org.apache.storm.perf.utils.Helper; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.utils.Utils; - -import java.util.Map; -import java.util.UUID; - - -/*** - * This topo helps measure speed of reading from Kafka - * Spout Reads from Kafka. - * Bolt acks and discards tuples - */ - -public class KafkaSpoutNullBoltTopo { - - // configs - topo parallelism - public static final String SPOUT_NUM = "spout.count"; - public static final String BOLT_NUM = "bolt.count"; - - // configs - kafka spout - public static final String KAFKA_TOPIC = "kafka.topic"; - public static final String ZOOKEEPER_URI = "zk.uri"; - - - public static final int DEFAULT_SPOUT_NUM = 1; - public static final int DEFAULT_BOLT_NUM = 1; - - // names - public static final String TOPOLOGY_NAME = "KafkaSpoutNullBoltTopo"; - public static final String SPOUT_ID = "kafkaSpout"; - public static final String BOLT_ID = "devNullBolt"; - - - public static StormTopology getTopology(Map config) { - - final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM); - final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM); - // 1 - Setup Kafka Spout -------- - - String zkConnString = getStr(config, ZOOKEEPER_URI); - String topicName = getStr(config, KAFKA_TOPIC); - - BrokerHosts brokerHosts = new ZkHosts(zkConnString); - SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString()); - spoutConfig.scheme = new StringMultiSchemeWithTopic(); - spoutConfig.ignoreZkOffsets = true; - - KafkaSpout spout = new KafkaSpout(spoutConfig); - - // 2 - DevNull Bolt -------- - DevNullBolt bolt = new DevNullBolt(); - - // 3 - Setup Topology -------- - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(SPOUT_ID, spout, spoutNum); - builder.setBolt(BOLT_ID, bolt, boltNum) - .localOrShuffleGrouping(SPOUT_ID); - - return builder.createTopology(); - } - - - public static int getInt(Map map, Object key, int def) { - return Utils.getInt(Utils.get(map, key, def)); - } - - public static String getStr(Map map, Object key) { - return (String) map.get(key); - } - - - /** - * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming - */ - public static void main(String[] args) throws Exception { - if (args.length !=2) { - System.err.println("args: runDurationSec confFile"); - return; - } - Integer durationSec = Integer.parseInt(args[0]); - Map topoConf = Utils.findAndReadConfigFile(args[1]); - - // Submit to Storm cluster - Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java deleted file mode 100755 index 5b97540..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - - -package org.apache.storm.perf; - -import org.apache.storm.LocalCluster; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.hdfs.bolt.HdfsBolt; -import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; -import org.apache.storm.hdfs.bolt.format.FileNameFormat; -import org.apache.storm.hdfs.bolt.format.RecordFormat; -import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; -import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; -import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; -import org.apache.storm.hdfs.bolt.sync.SyncPolicy; -import org.apache.storm.perf.spout.StringGenSpout; -import org.apache.storm.perf.utils.Helper; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.Utils; - -import java.util.Map; - -/*** - * This topo helps measure speed of writing to Hdfs - * Spout generates fixed length random strings. - * Bolt writes to Hdfs - */ - -public class StrGenSpoutHdfsBoltTopo { - - // configs - topo parallelism - public static final String SPOUT_NUM = "spout.count"; - public static final String BOLT_NUM = "bolt.count"; - - // configs - hdfs bolt - public static final String HDFS_URI = "hdfs.uri"; - public static final String HDFS_PATH = "hdfs.dir"; - public static final String HDFS_BATCH = "hdfs.batch"; - - public static final int DEFAULT_SPOUT_NUM = 1; - public static final int DEFAULT_BOLT_NUM = 1; - public static final int DEFAULT_HDFS_BATCH = 1000; - - // names - public static final String TOPOLOGY_NAME = "StrGenSpoutHdfsBoltTopo"; - public static final String SPOUT_ID = "GenSpout"; - public static final String BOLT_ID = "hdfsBolt"; - - - public static StormTopology getTopology(Map topoConf) { - final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH); - - // 1 - Setup StringGen Spout -------- - StringGenSpout spout = new StringGenSpout(100).withFieldName("str"); - - - // 2 - Setup HFS Bolt -------- - String Hdfs_url = Helper.getStr(topoConf, HDFS_URI); - RecordFormat format = new LineWriter("str"); - SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch); - FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB); - final int spoutNum = Helper.getInt(topoConf, SPOUT_NUM, DEFAULT_SPOUT_NUM); - final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM); - - // Use default, Storm-generated file names - FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) ); - - // Instantiate the HdfsBolt - HdfsBolt bolt = new HdfsBolt() - .withFsUrl(Hdfs_url) - .withFileNameFormat(fileNameFormat) - .withRecordFormat(format) - .withRotationPolicy(rotationPolicy) - .withSyncPolicy(syncPolicy); - - - // 3 - Setup Topology -------- - - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(SPOUT_ID, spout, spoutNum); - builder.setBolt(BOLT_ID, bolt, boltNum) - .localOrShuffleGrouping(SPOUT_ID); - - return builder.createTopology(); - } - - - /** Spout generates random strings and HDFS bolt writes them to a text file */ - public static void main(String[] args) throws Exception { - if(args.length <= 0) { - // submit to local cluster - Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml"); - LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf)); - - Helper.setupShutdownHook(cluster, TOPOLOGY_NAME); - while (true) {// run indefinitely till Ctrl-C - Thread.sleep(20_000_000); - } - } else { - // Submit to Storm cluster - if (args.length !=2) { - System.err.println("args: runDurationSec confFile"); - return; - } - Integer durationSec = Integer.parseInt(args[0]); - Map topoConf = Utils.findAndReadConfigFile(args[1]); - - Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); - } - } - - - public static class LineWriter implements RecordFormat { - private String lineDelimiter = System.lineSeparator(); - private String fieldName; - - public LineWriter(String fieldName) { - this.fieldName = fieldName; - } - - /** - * Overrides the default record delimiter. - * - * @param delimiter - * @return - */ - public LineWriter withLineDelimiter(String delimiter){ - this.lineDelimiter = delimiter; - return this; - } - - public byte[] format(Tuple tuple) { - return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes(); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java deleted file mode 100644 index b79a0ee..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.bolt; - - -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.HashMap; -import java.util.Map; - -public class CountBolt extends BaseBasicBolt { - public static final String FIELDS_WORD = "word"; - public static final String FIELDS_COUNT = "count"; - - Map<String, Integer> counts = new HashMap<>(); - - @Override - public void prepare(Map stormConf, TopologyContext context) { - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(FIELDS_WORD, FIELDS_COUNT)); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java deleted file mode 100755 index b85ce15..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.bolt; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; - -import java.util.Map; - - -public class DevNullBolt extends BaseRichBolt { - private OutputCollector collector; - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple tuple) { - collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java deleted file mode 100644 index 116265e..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.bolt; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.Map; - -public class IdBolt extends BaseRichBolt { - private OutputCollector collector; - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple tuple) { - collector.emit(tuple, new Values( tuple.getValues() ) ); - collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("field1")); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java deleted file mode 100644 index 96f9f73..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.bolt; - -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.Map; - - -public class SplitSentenceBolt extends BaseBasicBolt { - public static final String FIELDS = "word"; - - @Override - public void prepare(Map stormConf, TopologyContext context) { - } - - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - for (String word : splitSentence(input.getString(0))) { - collector.emit(new Values(word)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(FIELDS)); - } - - - public static String[] splitSentence(String sentence) { - if (sentence != null) { - return sentence.split("\\s+"); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java deleted file mode 100755 index b66e4f3..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.spout; - - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -public class ConstSpout extends BaseRichSpout { - - private static final String DEFAUT_FIELD_NAME = "str"; - private String value; - private String fieldName = DEFAUT_FIELD_NAME; - private SpoutOutputCollector collector = null; - private int count=0; - - public ConstSpout(String value) { - this.value = value; - } - - public ConstSpout withOutputFields(String fieldName) { - this.fieldName = fieldName; - return this; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(fieldName)); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - } - - @Override - public void nextTuple() { - List<Object> tuple = Collections.singletonList((Object) value); - collector.emit(tuple, count++); - } - - @Override - public void ack(Object msgId) { - super.ack(msgId); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java deleted file mode 100644 index 959e7c6..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class FileReadSpout extends BaseRichSpout { - public static final String FIELDS = "sentence"; - private static final long serialVersionUID = -2582705611472467172L; - private transient FileReader reader; - private String file; - private boolean ackEnabled = true; - private SpoutOutputCollector collector; - - private long count = 0; - - - public FileReadSpout(String file) { - this.file = file; - } - - // For testing - FileReadSpout(FileReader reader) { - this.reader = reader; - } - - @Override - public void open(Map conf, TopologyContext context, - SpoutOutputCollector collector) { - this.collector = collector; - Object ackObj = conf.get("topology.acker.executors"); - if (ackObj != null && ackObj.equals(0)) { - this.ackEnabled = false; - } - // for tests, reader will not be null - if (this.reader == null) { - this.reader = new FileReader(this.file); - } - } - - @Override - public void nextTuple() { - if (ackEnabled) { - collector.emit(new Values(reader.nextLine()), count); - count++; - } else { - collector.emit(new Values(reader.nextLine())); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(FIELDS)); - } - - public static List<String> readLines(InputStream input) { - List<String> lines = new ArrayList<>(); - try { - BufferedReader reader = new BufferedReader(new InputStreamReader(input)); - try { - String line; - while ((line = reader.readLine()) != null) { - lines.add(line); - } - } catch (IOException e) { - throw new RuntimeException("Reading file failed", e); - } finally { - reader.close(); - } - } catch (IOException e) { - throw new RuntimeException("Error closing reader", e); - } - return lines; - } - - public static class FileReader implements Serializable { - - private static final long serialVersionUID = -7012334600647556267L; - - public final String file; - private List<String> contents = null; - private int index = 0; - private int limit = 0; - - public FileReader(String file) { - this.file = file; - if (this.file != null) { - try { - this.contents = readLines(new FileInputStream(this.file)); - } catch (IOException e) { - e.printStackTrace(); - throw new IllegalArgumentException("Cannot open file " + file, e); - } - this.limit = contents.size(); - } else { - throw new IllegalArgumentException("file name cannot be null"); - } - } - - public String nextLine() { - if (index >= limit) { - index = 0; - } - String line = contents.get(index); - index++; - return line; - } - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java deleted file mode 100755 index f9c665b..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.spout; - - -import org.apache.commons.lang.RandomStringUtils; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** Spout pre-computes a list with 30k fixed length random strings. - * Emits sequentially from this list, over and over again. - */ - -public class StringGenSpout extends BaseRichSpout { - - private static final String DEFAULT_FIELD_NAME = "str"; - private int strLen; - private final int strCount = 30_000; - private String fieldName = DEFAULT_FIELD_NAME; - private SpoutOutputCollector collector = null; - ArrayList<String> records; - private int curr=0; - private int count=0; - - public StringGenSpout(int strLen) { - this.strLen = strLen; - } - - public StringGenSpout withFieldName(String fieldName) { - this.fieldName = fieldName; - return this; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare( new Fields(fieldName) ); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.records = genStringList(strLen, strCount); - - this.collector = collector; - } - - private static ArrayList<String> genStringList(int strLen, int count) { - ArrayList<String> result = new ArrayList<String>(count); - for (int i = 0; i < count; i++) { - result.add( RandomStringUtils.random(strLen) ); - } - return result; - } - - @Override - public void nextTuple() { - List<Object> tuple; - if( curr < strCount ) { - tuple = Collections.singletonList((Object) records.get(curr)); - ++curr; - collector.emit(tuple, ++count); - } - } - - - @Override - public void ack(Object msgId) { - super.ack(msgId); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java deleted file mode 100755 index 686f9da..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.utils; - -import org.apache.storm.LocalCluster; -import org.apache.storm.generated.Nimbus; -import org.apache.storm.utils.NimbusClient; -import org.apache.storm.utils.Utils; -import org.apache.log4j.Logger; - -import java.io.PrintWriter; -import java.util.*; - - -public class BasicMetricsCollector { - - private LocalCluster localCluster = null; - private Nimbus.Client client = null; - private PrintWriter dataWriter; - private long startTime=0; - - public enum MetricsItem { - TOPOLOGY_STATS, - XSFER_RATE, - SPOUT_THROUGHPUT, - SPOUT_LATENCY, - ALL - } - - - /* headers */ - public static final String TIME = "elapsed (sec)"; - public static final String TIME_FORMAT = "%d"; - public static final String TOTAL_SLOTS = "total_slots"; - public static final String USED_SLOTS = "used_slots"; - public static final String WORKERS = "workers"; - public static final String TASKS = "tasks"; - public static final String EXECUTORS = "executors"; - public static final String TRANSFERRED = "transferred (messages)"; - public static final String XSFER_RATE = "transfer rate (messages/s)"; - public static final String SPOUT_EXECUTORS = "spout_executors"; - public static final String SPOUT_TRANSFERRED = "spout_transferred (messages)"; - public static final String SPOUT_ACKED = "spout_acks"; - public static final String SPOUT_THROUGHPUT = "spout_throughput (acks/s)"; - public static final String SPOUT_AVG_COMPLETE_LATENCY = "spout_avg_complete_latency(ms)"; - public static final String SPOUT_AVG_LATENCY_FORMAT = "%.1f"; - public static final String SPOUT_MAX_COMPLETE_LATENCY = "spout_max_complete_latency(ms)"; - public static final String SPOUT_MAX_LATENCY_FORMAT = "%.1f"; - private static final Logger LOG = Logger.getLogger(BasicMetricsCollector.class); - final MetricsCollectorConfig config; - // final StormTopology topology; - final Set<String> header = new LinkedHashSet<String>(); - final Map<String, String> metrics = new HashMap<String, String>(); - int lineNumber = 0; - - final boolean collectTopologyStats; - final boolean collectExecutorStats; - final boolean collectThroughput; - - final boolean collectSpoutThroughput; - final boolean collectSpoutLatency; - - private MetricsSample lastSample; - private MetricsSample curSample; - private double maxLatency = 0; - - boolean first = true; - - public BasicMetricsCollector(Nimbus.Client client, String topoName, Map stormConfig) { - this(topoName, stormConfig); - this.client = client; - this.localCluster = null; - } - - public BasicMetricsCollector(LocalCluster localCluster, String topoName, Map stormConfig) { - this(topoName, stormConfig); - this.client = null; - this.localCluster = localCluster; - } - - private BasicMetricsCollector(String topoName, Map stormConfig) { - Set<MetricsItem> items = getMetricsToCollect(); - this.config = new MetricsCollectorConfig(topoName, stormConfig); - collectTopologyStats = collectTopologyStats(items); - collectExecutorStats = collectExecutorStats(items); - collectThroughput = collectThroughput(items); - collectSpoutThroughput = collectSpoutThroughput(items); - collectSpoutLatency = collectSpoutLatency(items); - dataWriter = new PrintWriter(System.err); - } - - - private Set<MetricsItem> getMetricsToCollect() { - Set<MetricsItem> result = new HashSet<>(); - result.add(MetricsItem.ALL); - return result; - } - - public void collect(Nimbus.Client client) { - try { - if (!first) { - this.lastSample = this.curSample; - this.curSample = MetricsSample.factory(client, config.name); - updateStats(dataWriter); - writeLine(dataWriter); - } else { - LOG.info("Getting baseline metrics sample."); - writeHeader(dataWriter); - this.curSample = MetricsSample.factory(client, config.name); - first = false; - startTime = System.currentTimeMillis(); - } - } catch (Exception e) { - LOG.error("storm metrics failed! ", e); - } - } - - public void collect(LocalCluster localCluster) { - try { - if (!first) { - this.lastSample = this.curSample; - this.curSample = MetricsSample.factory(localCluster, config.name); - updateStats(dataWriter); - writeLine(dataWriter); - } else { - LOG.info("Getting baseline metrics sample."); - writeHeader(dataWriter); - this.curSample = MetricsSample.factory(localCluster, config.name); - first = false; - startTime = System.currentTimeMillis(); - } - } catch (Exception e) { - LOG.error("storm metrics failed! ", e); - } - } - - public void close() { - dataWriter.close(); - } - - boolean updateStats(PrintWriter writer) - throws Exception { - if (collectTopologyStats) { - updateTopologyStats(); - } - if (collectExecutorStats) { - updateExecutorStats(); - } - return true; - } - - void updateTopologyStats() { - long timeTotal = System.currentTimeMillis() - startTime; - int numWorkers = this.curSample.getNumWorkers(); - int numExecutors = this.curSample.getNumExecutors(); - int numTasks = this.curSample.getNumTasks(); - metrics.put(TIME, String.format(TIME_FORMAT, timeTotal / 1000)); - metrics.put(WORKERS, Integer.toString(numWorkers)); - metrics.put(EXECUTORS, Integer.toString(numExecutors)); - metrics.put(TASKS, Integer.toString(numTasks)); - } - - void updateExecutorStats() { - long timeDiff = this.curSample.getSampleTime() - this.lastSample.getSampleTime(); - long transferredDiff = this.curSample.getTotalTransferred() - this.lastSample.getTotalTransferred(); - long throughput = transferredDiff / (timeDiff / 1000); - - long spoutDiff = this.curSample.getSpoutTransferred() - this.lastSample.getSpoutTransferred(); - long spoutAckedDiff = this.curSample.getTotalAcked() - this.lastSample.getTotalAcked(); - long spoutThroughput = spoutDiff / (timeDiff / 1000); - - if (collectThroughput) { - metrics.put(TRANSFERRED, Long.toString(transferredDiff)); - metrics.put(XSFER_RATE, Long.toString(throughput)); - } - - if (collectSpoutThroughput) { - - metrics.put(SPOUT_EXECUTORS, Integer.toString(this.curSample.getSpoutExecutors())); - metrics.put(SPOUT_TRANSFERRED, Long.toString(spoutDiff)); - metrics.put(SPOUT_ACKED, Long.toString(spoutAckedDiff)); - metrics.put(SPOUT_THROUGHPUT, Long.toString(spoutThroughput)); - } - - - if (collectSpoutLatency) { - double latency = this.curSample.getTotalLatency(); - if (latency > this.maxLatency) { - this.maxLatency = latency; - } - metrics.put(SPOUT_AVG_COMPLETE_LATENCY, - String.format(SPOUT_AVG_LATENCY_FORMAT, latency)); - metrics.put(SPOUT_MAX_COMPLETE_LATENCY, - String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency)); - - } - } - - - void writeHeader(PrintWriter writer) { - header.add(TIME); - if (collectTopologyStats) { - header.add(WORKERS); - header.add(TASKS); - header.add(EXECUTORS); - } - - if (collectThroughput) { - header.add(TRANSFERRED); - header.add(XSFER_RATE); - } - - if (collectSpoutThroughput) { - header.add(SPOUT_EXECUTORS); - header.add(SPOUT_TRANSFERRED); - header.add(SPOUT_ACKED); - header.add(SPOUT_THROUGHPUT); - } - - if (collectSpoutLatency) { - header.add(SPOUT_AVG_COMPLETE_LATENCY); - header.add(SPOUT_MAX_COMPLETE_LATENCY); - } - - writer.println("\n------------------------------------------------------------------------------------------------------------------"); - String str = Utils.join(header, ","); - writer.println(str); - writer.println("------------------------------------------------------------------------------------------------------------------"); - writer.flush(); - } - - void writeLine(PrintWriter writer) { - List<String> line = new LinkedList<String>(); - for (String h : header) { - line.add(metrics.get(h)); - } - String str = Utils.join(line, ","); - writer.println(str); - writer.flush(); - } - - - boolean collectTopologyStats(Set<MetricsItem> items) { - return items.contains(MetricsItem.ALL) || - items.contains(MetricsItem.TOPOLOGY_STATS); - } - - boolean collectExecutorStats(Set<MetricsItem> items) { - return items.contains(MetricsItem.ALL) || - items.contains(MetricsItem.XSFER_RATE) || - items.contains(MetricsItem.SPOUT_LATENCY); - } - - boolean collectThroughput(Set<MetricsItem> items) { - return items.contains(MetricsItem.ALL) || - items.contains(MetricsItem.XSFER_RATE); - } - - boolean collectSpoutThroughput(Set<MetricsItem> items) { - return items.contains(MetricsItem.ALL) || - items.contains(MetricsItem.SPOUT_THROUGHPUT); - } - - boolean collectSpoutLatency(Set<MetricsItem> items) { - return items.contains(MetricsItem.ALL) || - items.contains(MetricsItem.SPOUT_LATENCY); - } - - - - public static class MetricsCollectorConfig { - private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class); - - // storm configuration - public final Map stormConfig; - // storm topology name - public final String name; - // benchmark label - public final String label; - - public MetricsCollectorConfig(String topoName, Map stormConfig) { - this.stormConfig = stormConfig; - String labelStr = (String) stormConfig.get("benchmark.label"); - this.name = topoName; - if (labelStr == null) { - LOG.warn("'benchmark.label' not found in config. Defaulting to topology name"); - labelStr = this.name; - } - this.label = labelStr; - } - } // MetricsCollectorConfig - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java deleted file mode 100755 index f429699..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.utils; - -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.ClusterSummary; -import org.apache.storm.generated.ExecutorSummary; -import org.apache.storm.generated.KillOptions; -import org.apache.storm.generated.Nimbus; -import org.apache.storm.generated.SpoutStats; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.generated.TopologyInfo; -import org.apache.storm.generated.TopologySummary; -import org.apache.storm.perf.KafkaHdfsTopo; -import org.apache.storm.utils.NimbusClient; -import org.apache.storm.utils.Utils; - -import java.util.Map; - - -public class Helper { - - public static void kill(Nimbus.Client client, String topoName) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(topoName, opts); - } - - public static void killAndShutdownCluster(LocalCluster cluster, String topoName) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - cluster.killTopologyWithOpts(topoName, opts); - cluster.shutdown(); - } - - - public static LocalCluster runOnLocalCluster(String topoName, StormTopology topology) { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topoName, new Config(), topology); - return cluster; - } - - public static int getInt(Map map, Object key, int def) { - return Utils.getInt(Utils.get(map, key, def)); - } - - public static String getStr(Map map, Object key) { - return (String) map.get(key); - } - - public static void collectMetricsAndKill(String topologyName, Integer pollInterval, Integer duration) throws Exception { - Map clusterConf = Utils.readStormConfig(); - Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - BasicMetricsCollector metricsCollector = new BasicMetricsCollector(client, topologyName, clusterConf); - - int times = duration / pollInterval; - metricsCollector.collect(client); - for (int i = 0; i < times; i++) { - Thread.sleep(pollInterval * 1000); - metricsCollector.collect(client); - } - metricsCollector.close(); - kill(client, topologyName); - } - - public static void collectLocalMetricsAndKill(LocalCluster localCluster, String topologyName, Integer pollInterval, Integer duration, Map clusterConf) throws Exception { - BasicMetricsCollector metricsCollector = new BasicMetricsCollector(localCluster, topologyName, clusterConf); - - int times = duration / pollInterval; - metricsCollector.collect(localCluster); - for (int i = 0; i < times; i++) { - Thread.sleep(pollInterval * 1000); - metricsCollector.collect(localCluster); - } - metricsCollector.close(); - killAndShutdownCluster(localCluster, topologyName); - } - - /** Kill topo and Shutdown local cluster on Ctrl-C */ - public static void setupShutdownHook(final LocalCluster cluster, final String topoName) { - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - cluster.killTopology(topoName); - System.out.println("Killed Topology"); - cluster.shutdown(); - } - }); - } - - /** Kill topo on Ctrl-C */ - public static void setupShutdownHook(final String topoName) { - Map clusterConf = Utils.readStormConfig(); - final Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - try { - Helper.kill(client, topoName); - System.out.println("Killed Topology"); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - - public static void runOnClusterAndPrintMetrics(Integer durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception { - // submit topology - StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology); - setupShutdownHook(topoName); // handle Ctrl-C - - // poll metrics every minute, then kill topology after specified duration - Integer pollIntervalSec = 60; - collectMetricsAndKill(topoName, pollIntervalSec, durationSec); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java deleted file mode 100755 index 396ad53..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.utils; - - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.Map; - - -public class IdentityBolt extends BaseRichBolt { - private OutputCollector collector; - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple tuple) { - collector.emit(tuple, tuple.getValues() ); - collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} - http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java ---------------------------------------------------------------------- diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java deleted file mode 100755 index a934120..0000000 --- a/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.storm.perf.utils; - -import org.apache.storm.LocalCluster; -import org.apache.storm.generated.ClusterSummary; -import org.apache.storm.generated.ExecutorSpecificStats; -import org.apache.storm.generated.ExecutorStats; -import org.apache.storm.generated.ExecutorSummary; -import org.apache.storm.generated.Nimbus; -import org.apache.storm.generated.SpoutStats; -import org.apache.storm.generated.TopologyInfo; -import org.apache.storm.generated.TopologySummary; -import org.apache.storm.utils.Utils; - -import java.util.List; -import java.util.Map; - -public class MetricsSample { - - private long sampleTime = -1; - private long totalTransferred = 0l; - private long totalEmitted = 0l; - private long totalAcked = 0l; - private long totalFailed = 0l; - - private double totalLatency; - - private long spoutEmitted = 0l; - private long spoutTransferred = 0l; - private int spoutExecutors = 0; - - private int numSupervisors = 0; - private int numWorkers = 0; - private int numTasks = 0; - private int numExecutors = 0; - - private int totalSlots = 0; - private int usedSlots = 0; - - public static MetricsSample factory(Nimbus.Client client, String topologyName) throws Exception { - // "************ Sampling Metrics ***************** - - ClusterSummary clusterSummary = client.getClusterInfo(); - // get topology info - TopologySummary topSummary = getTopologySummary(clusterSummary, topologyName); - int topologyExecutors = topSummary.get_num_executors(); - int topologyWorkers = topSummary.get_num_workers(); - int topologyTasks = topSummary.get_num_tasks(); - TopologyInfo topInfo = client.getTopologyInfo(topSummary.get_id()); - - MetricsSample sample = getMetricsSample( topInfo); - sample.numWorkers = topologyWorkers; - sample.numExecutors = topologyExecutors; - sample.numTasks = topologyTasks; - return sample; - } - - public static MetricsSample factory(LocalCluster localCluster, String topologyName) throws Exception { - TopologyInfo topologyInfo = localCluster.getTopologyInfo(topologyName);; - return getMetricsSample(topologyInfo); - } - - - private static MetricsSample getMetricsSample(TopologyInfo topInfo) { - List<ExecutorSummary> executorSummaries = topInfo.get_executors(); - - // totals - long totalTransferred = 0l; - long totalEmitted = 0l; - long totalAcked = 0l; - long totalFailed = 0l; - - // number of spout executors - int spoutExecCount = 0; - double spoutLatencySum = 0.0; - - long spoutEmitted = 0l; - long spoutTransferred = 0l; - - // Executor summaries - for(ExecutorSummary executorSummary : executorSummaries){ - ExecutorStats execuatorStats = executorSummary.get_stats(); - if(execuatorStats == null){ - continue; - } - - ExecutorSpecificStats executorSpecificStats = execuatorStats.get_specific(); - if(executorSpecificStats == null){ - // bail out - continue; - } - - // transferred totals - Map<String,Map<String,Long>> transferred = execuatorStats.get_transferred(); - Map<String, Long> txMap = transferred.get(":all-time"); - if(txMap == null){ - continue; - } - for(String key : txMap.keySet()){ - // todo, ignore the master batch coordinator ? - if(!Utils.isSystemId(key)){ - Long count = txMap.get(key); - totalTransferred += count; - if(executorSpecificStats.is_set_spout()){ - spoutTransferred += count; - } - } - } - - // we found a spout - if(executorSpecificStats.isSet(2)) { // spout - - SpoutStats spoutStats = executorSpecificStats.get_spout(); - Map<String, Long> acked = spoutStats.get_acked().get(":all-time"); - if(acked != null){ - for(String key : acked.keySet()) { - totalAcked += acked.get(key); - } - } - - Map<String, Long> failed = spoutStats.get_failed().get(":all-time"); - if(failed != null){ - for(String key : failed.keySet()) { - totalFailed += failed.get(key); - } - } - - Double total = 0d; - Map<String, Double> vals = spoutStats.get_complete_ms_avg().get(":all-time"); - for(String key : vals.keySet()){ - total += vals.get(key); - } - Double latency = total / vals.size(); - - spoutExecCount++; - spoutLatencySum += latency; - } - - - } // end executor summary - - MetricsSample ret = new MetricsSample(); - ret.totalEmitted = totalEmitted; - ret.totalTransferred = totalTransferred; - ret.totalAcked = totalAcked; - ret.totalFailed = totalFailed; - ret.totalLatency = spoutLatencySum/spoutExecCount; - ret.spoutEmitted = spoutEmitted; - ret.spoutTransferred = spoutTransferred; - ret.sampleTime = System.currentTimeMillis(); -// ret.numSupervisors = clusterSummary.get_supervisors_size(); - ret.numWorkers = 0; - ret.numExecutors = 0; - ret.numTasks = 0; - ret.spoutExecutors = spoutExecCount; - return ret; - } - - public static TopologySummary getTopologySummary(ClusterSummary cs, String name) { - for (TopologySummary ts : cs.get_topologies()) { - if (name.equals(ts.get_name())) { - return ts; - } - } - return null; - } - - - - // getters - public long getSampleTime() { - return sampleTime; - } - - public long getTotalTransferred() { - return totalTransferred; - } - - public long getTotalEmitted() { - return totalEmitted; - } - - public long getTotalAcked() { - return totalAcked; - } - - public long getTotalFailed() { - return totalFailed; - } - - public double getTotalLatency() { - return totalLatency; - } - - public long getSpoutEmitted() { - return spoutEmitted; - } - - public long getSpoutTransferred() { - return spoutTransferred; - } - - public int getNumSupervisors() { - return numSupervisors; - } - - public int getNumWorkers() { - return numWorkers; - } - - public int getNumTasks() { - return numTasks; - } - - public int getTotalSlots() { - return totalSlots; - } - - public int getSpoutExecutors(){ - return this.spoutExecutors; - } - - public int getNumExecutors() { - return this.numExecutors; - } - - public int getUsedSlots() { - return this.usedSlots; - } - -} \ No newline at end of file
