http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java new file mode 100644 index 0000000..959e7c6 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FileReadSpout extends BaseRichSpout { + public static final String FIELDS = "sentence"; + private static final long serialVersionUID = -2582705611472467172L; + private transient FileReader reader; + private String file; + private boolean ackEnabled = true; + private SpoutOutputCollector collector; + + private long count = 0; + + + public FileReadSpout(String file) { + this.file = file; + } + + // For testing + FileReadSpout(FileReader reader) { + this.reader = reader; + } + + @Override + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + this.collector = collector; + Object ackObj = conf.get("topology.acker.executors"); + if (ackObj != null && ackObj.equals(0)) { + this.ackEnabled = false; + } + // for tests, reader will not be null + if (this.reader == null) { + this.reader = new FileReader(this.file); + } + } + + @Override + public void nextTuple() { + if (ackEnabled) { + collector.emit(new Values(reader.nextLine()), count); + count++; + } else { + collector.emit(new Values(reader.nextLine())); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(FIELDS)); + } + + public static List<String> readLines(InputStream input) { + List<String> lines = new ArrayList<>(); + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + try { + String line; + while ((line = reader.readLine()) != null) { + lines.add(line); + } + } catch (IOException e) { + throw new RuntimeException("Reading file failed", e); + } finally { + reader.close(); + } + } catch (IOException e) { + throw new RuntimeException("Error closing reader", e); + } + return lines; + } + + public static class FileReader implements Serializable { + + private static final long serialVersionUID = -7012334600647556267L; + + public final String file; + private List<String> contents = null; + private int index = 0; + private int limit = 0; + + public FileReader(String file) { + this.file = file; + if (this.file != null) { + try { + this.contents = readLines(new FileInputStream(this.file)); + } catch (IOException e) { + e.printStackTrace(); + throw new IllegalArgumentException("Cannot open file " + file, e); + } + this.limit = contents.size(); + } else { + throw new IllegalArgumentException("file name cannot be null"); + } + } + + public String nextLine() { + if (index >= limit) { + index = 0; + } + String line = contents.get(index); + index++; + return line; + } + + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java new file mode 100755 index 0000000..f9c665b --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.spout; + + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Spout pre-computes a list with 30k fixed length random strings. + * Emits sequentially from this list, over and over again. + */ + +public class StringGenSpout extends BaseRichSpout { + + private static final String DEFAULT_FIELD_NAME = "str"; + private int strLen; + private final int strCount = 30_000; + private String fieldName = DEFAULT_FIELD_NAME; + private SpoutOutputCollector collector = null; + ArrayList<String> records; + private int curr=0; + private int count=0; + + public StringGenSpout(int strLen) { + this.strLen = strLen; + } + + public StringGenSpout withFieldName(String fieldName) { + this.fieldName = fieldName; + return this; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare( new Fields(fieldName) ); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.records = genStringList(strLen, strCount); + + this.collector = collector; + } + + private static ArrayList<String> genStringList(int strLen, int count) { + ArrayList<String> result = new ArrayList<String>(count); + for (int i = 0; i < count; i++) { + result.add( RandomStringUtils.random(strLen) ); + } + return result; + } + + @Override + public void nextTuple() { + List<Object> tuple; + if( curr < strCount ) { + tuple = Collections.singletonList((Object) records.get(curr)); + ++curr; + collector.emit(tuple, ++count); + } + } + + + @Override + public void ack(Object msgId) { + super.ack(msgId); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java new file mode 100755 index 0000000..686f9da --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.utils; + +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.apache.log4j.Logger; + +import java.io.PrintWriter; +import java.util.*; + + +public class BasicMetricsCollector { + + private LocalCluster localCluster = null; + private Nimbus.Client client = null; + private PrintWriter dataWriter; + private long startTime=0; + + public enum MetricsItem { + TOPOLOGY_STATS, + XSFER_RATE, + SPOUT_THROUGHPUT, + SPOUT_LATENCY, + ALL + } + + + /* headers */ + public static final String TIME = "elapsed (sec)"; + public static final String TIME_FORMAT = "%d"; + public static final String TOTAL_SLOTS = "total_slots"; + public static final String USED_SLOTS = "used_slots"; + public static final String WORKERS = "workers"; + public static final String TASKS = "tasks"; + public static final String EXECUTORS = "executors"; + public static final String TRANSFERRED = "transferred (messages)"; + public static final String XSFER_RATE = "transfer rate (messages/s)"; + public static final String SPOUT_EXECUTORS = "spout_executors"; + public static final String SPOUT_TRANSFERRED = "spout_transferred (messages)"; + public static final String SPOUT_ACKED = "spout_acks"; + public static final String SPOUT_THROUGHPUT = "spout_throughput (acks/s)"; + public static final String SPOUT_AVG_COMPLETE_LATENCY = "spout_avg_complete_latency(ms)"; + public static final String SPOUT_AVG_LATENCY_FORMAT = "%.1f"; + public static final String SPOUT_MAX_COMPLETE_LATENCY = "spout_max_complete_latency(ms)"; + public static final String SPOUT_MAX_LATENCY_FORMAT = "%.1f"; + private static final Logger LOG = Logger.getLogger(BasicMetricsCollector.class); + final MetricsCollectorConfig config; + // final StormTopology topology; + final Set<String> header = new LinkedHashSet<String>(); + final Map<String, String> metrics = new HashMap<String, String>(); + int lineNumber = 0; + + final boolean collectTopologyStats; + final boolean collectExecutorStats; + final boolean collectThroughput; + + final boolean collectSpoutThroughput; + final boolean collectSpoutLatency; + + private MetricsSample lastSample; + private MetricsSample curSample; + private double maxLatency = 0; + + boolean first = true; + + public BasicMetricsCollector(Nimbus.Client client, String topoName, Map stormConfig) { + this(topoName, stormConfig); + this.client = client; + this.localCluster = null; + } + + public BasicMetricsCollector(LocalCluster localCluster, String topoName, Map stormConfig) { + this(topoName, stormConfig); + this.client = null; + this.localCluster = localCluster; + } + + private BasicMetricsCollector(String topoName, Map stormConfig) { + Set<MetricsItem> items = getMetricsToCollect(); + this.config = new MetricsCollectorConfig(topoName, stormConfig); + collectTopologyStats = collectTopologyStats(items); + collectExecutorStats = collectExecutorStats(items); + collectThroughput = collectThroughput(items); + collectSpoutThroughput = collectSpoutThroughput(items); + collectSpoutLatency = collectSpoutLatency(items); + dataWriter = new PrintWriter(System.err); + } + + + private Set<MetricsItem> getMetricsToCollect() { + Set<MetricsItem> result = new HashSet<>(); + result.add(MetricsItem.ALL); + return result; + } + + public void collect(Nimbus.Client client) { + try { + if (!first) { + this.lastSample = this.curSample; + this.curSample = MetricsSample.factory(client, config.name); + updateStats(dataWriter); + writeLine(dataWriter); + } else { + LOG.info("Getting baseline metrics sample."); + writeHeader(dataWriter); + this.curSample = MetricsSample.factory(client, config.name); + first = false; + startTime = System.currentTimeMillis(); + } + } catch (Exception e) { + LOG.error("storm metrics failed! ", e); + } + } + + public void collect(LocalCluster localCluster) { + try { + if (!first) { + this.lastSample = this.curSample; + this.curSample = MetricsSample.factory(localCluster, config.name); + updateStats(dataWriter); + writeLine(dataWriter); + } else { + LOG.info("Getting baseline metrics sample."); + writeHeader(dataWriter); + this.curSample = MetricsSample.factory(localCluster, config.name); + first = false; + startTime = System.currentTimeMillis(); + } + } catch (Exception e) { + LOG.error("storm metrics failed! ", e); + } + } + + public void close() { + dataWriter.close(); + } + + boolean updateStats(PrintWriter writer) + throws Exception { + if (collectTopologyStats) { + updateTopologyStats(); + } + if (collectExecutorStats) { + updateExecutorStats(); + } + return true; + } + + void updateTopologyStats() { + long timeTotal = System.currentTimeMillis() - startTime; + int numWorkers = this.curSample.getNumWorkers(); + int numExecutors = this.curSample.getNumExecutors(); + int numTasks = this.curSample.getNumTasks(); + metrics.put(TIME, String.format(TIME_FORMAT, timeTotal / 1000)); + metrics.put(WORKERS, Integer.toString(numWorkers)); + metrics.put(EXECUTORS, Integer.toString(numExecutors)); + metrics.put(TASKS, Integer.toString(numTasks)); + } + + void updateExecutorStats() { + long timeDiff = this.curSample.getSampleTime() - this.lastSample.getSampleTime(); + long transferredDiff = this.curSample.getTotalTransferred() - this.lastSample.getTotalTransferred(); + long throughput = transferredDiff / (timeDiff / 1000); + + long spoutDiff = this.curSample.getSpoutTransferred() - this.lastSample.getSpoutTransferred(); + long spoutAckedDiff = this.curSample.getTotalAcked() - this.lastSample.getTotalAcked(); + long spoutThroughput = spoutDiff / (timeDiff / 1000); + + if (collectThroughput) { + metrics.put(TRANSFERRED, Long.toString(transferredDiff)); + metrics.put(XSFER_RATE, Long.toString(throughput)); + } + + if (collectSpoutThroughput) { + + metrics.put(SPOUT_EXECUTORS, Integer.toString(this.curSample.getSpoutExecutors())); + metrics.put(SPOUT_TRANSFERRED, Long.toString(spoutDiff)); + metrics.put(SPOUT_ACKED, Long.toString(spoutAckedDiff)); + metrics.put(SPOUT_THROUGHPUT, Long.toString(spoutThroughput)); + } + + + if (collectSpoutLatency) { + double latency = this.curSample.getTotalLatency(); + if (latency > this.maxLatency) { + this.maxLatency = latency; + } + metrics.put(SPOUT_AVG_COMPLETE_LATENCY, + String.format(SPOUT_AVG_LATENCY_FORMAT, latency)); + metrics.put(SPOUT_MAX_COMPLETE_LATENCY, + String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency)); + + } + } + + + void writeHeader(PrintWriter writer) { + header.add(TIME); + if (collectTopologyStats) { + header.add(WORKERS); + header.add(TASKS); + header.add(EXECUTORS); + } + + if (collectThroughput) { + header.add(TRANSFERRED); + header.add(XSFER_RATE); + } + + if (collectSpoutThroughput) { + header.add(SPOUT_EXECUTORS); + header.add(SPOUT_TRANSFERRED); + header.add(SPOUT_ACKED); + header.add(SPOUT_THROUGHPUT); + } + + if (collectSpoutLatency) { + header.add(SPOUT_AVG_COMPLETE_LATENCY); + header.add(SPOUT_MAX_COMPLETE_LATENCY); + } + + writer.println("\n------------------------------------------------------------------------------------------------------------------"); + String str = Utils.join(header, ","); + writer.println(str); + writer.println("------------------------------------------------------------------------------------------------------------------"); + writer.flush(); + } + + void writeLine(PrintWriter writer) { + List<String> line = new LinkedList<String>(); + for (String h : header) { + line.add(metrics.get(h)); + } + String str = Utils.join(line, ","); + writer.println(str); + writer.flush(); + } + + + boolean collectTopologyStats(Set<MetricsItem> items) { + return items.contains(MetricsItem.ALL) || + items.contains(MetricsItem.TOPOLOGY_STATS); + } + + boolean collectExecutorStats(Set<MetricsItem> items) { + return items.contains(MetricsItem.ALL) || + items.contains(MetricsItem.XSFER_RATE) || + items.contains(MetricsItem.SPOUT_LATENCY); + } + + boolean collectThroughput(Set<MetricsItem> items) { + return items.contains(MetricsItem.ALL) || + items.contains(MetricsItem.XSFER_RATE); + } + + boolean collectSpoutThroughput(Set<MetricsItem> items) { + return items.contains(MetricsItem.ALL) || + items.contains(MetricsItem.SPOUT_THROUGHPUT); + } + + boolean collectSpoutLatency(Set<MetricsItem> items) { + return items.contains(MetricsItem.ALL) || + items.contains(MetricsItem.SPOUT_LATENCY); + } + + + + public static class MetricsCollectorConfig { + private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class); + + // storm configuration + public final Map stormConfig; + // storm topology name + public final String name; + // benchmark label + public final String label; + + public MetricsCollectorConfig(String topoName, Map stormConfig) { + this.stormConfig = stormConfig; + String labelStr = (String) stormConfig.get("benchmark.label"); + this.name = topoName; + if (labelStr == null) { + LOG.warn("'benchmark.label' not found in config. Defaulting to topology name"); + labelStr = this.name; + } + this.label = labelStr; + } + } // MetricsCollectorConfig + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java new file mode 100755 index 0000000..f429699 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.utils; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.SpoutStats; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.perf.KafkaHdfsTopo; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; + +import java.util.Map; + + +public class Helper { + + public static void kill(Nimbus.Client client, String topoName) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(topoName, opts); + } + + public static void killAndShutdownCluster(LocalCluster cluster, String topoName) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + cluster.killTopologyWithOpts(topoName, opts); + cluster.shutdown(); + } + + + public static LocalCluster runOnLocalCluster(String topoName, StormTopology topology) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology(topoName, new Config(), topology); + return cluster; + } + + public static int getInt(Map map, Object key, int def) { + return Utils.getInt(Utils.get(map, key, def)); + } + + public static String getStr(Map map, Object key) { + return (String) map.get(key); + } + + public static void collectMetricsAndKill(String topologyName, Integer pollInterval, Integer duration) throws Exception { + Map clusterConf = Utils.readStormConfig(); + Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + BasicMetricsCollector metricsCollector = new BasicMetricsCollector(client, topologyName, clusterConf); + + int times = duration / pollInterval; + metricsCollector.collect(client); + for (int i = 0; i < times; i++) { + Thread.sleep(pollInterval * 1000); + metricsCollector.collect(client); + } + metricsCollector.close(); + kill(client, topologyName); + } + + public static void collectLocalMetricsAndKill(LocalCluster localCluster, String topologyName, Integer pollInterval, Integer duration, Map clusterConf) throws Exception { + BasicMetricsCollector metricsCollector = new BasicMetricsCollector(localCluster, topologyName, clusterConf); + + int times = duration / pollInterval; + metricsCollector.collect(localCluster); + for (int i = 0; i < times; i++) { + Thread.sleep(pollInterval * 1000); + metricsCollector.collect(localCluster); + } + metricsCollector.close(); + killAndShutdownCluster(localCluster, topologyName); + } + + /** Kill topo and Shutdown local cluster on Ctrl-C */ + public static void setupShutdownHook(final LocalCluster cluster, final String topoName) { + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + cluster.killTopology(topoName); + System.out.println("Killed Topology"); + cluster.shutdown(); + } + }); + } + + /** Kill topo on Ctrl-C */ + public static void setupShutdownHook(final String topoName) { + Map clusterConf = Utils.readStormConfig(); + final Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + try { + Helper.kill(client, topoName); + System.out.println("Killed Topology"); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + public static void runOnClusterAndPrintMetrics(Integer durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception { + // submit topology + StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology); + setupShutdownHook(topoName); // handle Ctrl-C + + // poll metrics every minute, then kill topology after specified duration + Integer pollIntervalSec = 60; + collectMetricsAndKill(topoName, pollIntervalSec, durationSec); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java new file mode 100755 index 0000000..396ad53 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.utils; + + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + + +public class IdentityBolt extends BaseRichBolt { + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple tuple) { + collector.emit(tuple, tuple.getValues() ); + collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} + http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java new file mode 100755 index 0000000..a934120 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.utils; + +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.ExecutorSpecificStats; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.SpoutStats; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.utils.Utils; + +import java.util.List; +import java.util.Map; + +public class MetricsSample { + + private long sampleTime = -1; + private long totalTransferred = 0l; + private long totalEmitted = 0l; + private long totalAcked = 0l; + private long totalFailed = 0l; + + private double totalLatency; + + private long spoutEmitted = 0l; + private long spoutTransferred = 0l; + private int spoutExecutors = 0; + + private int numSupervisors = 0; + private int numWorkers = 0; + private int numTasks = 0; + private int numExecutors = 0; + + private int totalSlots = 0; + private int usedSlots = 0; + + public static MetricsSample factory(Nimbus.Client client, String topologyName) throws Exception { + // "************ Sampling Metrics ***************** + + ClusterSummary clusterSummary = client.getClusterInfo(); + // get topology info + TopologySummary topSummary = getTopologySummary(clusterSummary, topologyName); + int topologyExecutors = topSummary.get_num_executors(); + int topologyWorkers = topSummary.get_num_workers(); + int topologyTasks = topSummary.get_num_tasks(); + TopologyInfo topInfo = client.getTopologyInfo(topSummary.get_id()); + + MetricsSample sample = getMetricsSample( topInfo); + sample.numWorkers = topologyWorkers; + sample.numExecutors = topologyExecutors; + sample.numTasks = topologyTasks; + return sample; + } + + public static MetricsSample factory(LocalCluster localCluster, String topologyName) throws Exception { + TopologyInfo topologyInfo = localCluster.getTopologyInfo(topologyName);; + return getMetricsSample(topologyInfo); + } + + + private static MetricsSample getMetricsSample(TopologyInfo topInfo) { + List<ExecutorSummary> executorSummaries = topInfo.get_executors(); + + // totals + long totalTransferred = 0l; + long totalEmitted = 0l; + long totalAcked = 0l; + long totalFailed = 0l; + + // number of spout executors + int spoutExecCount = 0; + double spoutLatencySum = 0.0; + + long spoutEmitted = 0l; + long spoutTransferred = 0l; + + // Executor summaries + for(ExecutorSummary executorSummary : executorSummaries){ + ExecutorStats execuatorStats = executorSummary.get_stats(); + if(execuatorStats == null){ + continue; + } + + ExecutorSpecificStats executorSpecificStats = execuatorStats.get_specific(); + if(executorSpecificStats == null){ + // bail out + continue; + } + + // transferred totals + Map<String,Map<String,Long>> transferred = execuatorStats.get_transferred(); + Map<String, Long> txMap = transferred.get(":all-time"); + if(txMap == null){ + continue; + } + for(String key : txMap.keySet()){ + // todo, ignore the master batch coordinator ? + if(!Utils.isSystemId(key)){ + Long count = txMap.get(key); + totalTransferred += count; + if(executorSpecificStats.is_set_spout()){ + spoutTransferred += count; + } + } + } + + // we found a spout + if(executorSpecificStats.isSet(2)) { // spout + + SpoutStats spoutStats = executorSpecificStats.get_spout(); + Map<String, Long> acked = spoutStats.get_acked().get(":all-time"); + if(acked != null){ + for(String key : acked.keySet()) { + totalAcked += acked.get(key); + } + } + + Map<String, Long> failed = spoutStats.get_failed().get(":all-time"); + if(failed != null){ + for(String key : failed.keySet()) { + totalFailed += failed.get(key); + } + } + + Double total = 0d; + Map<String, Double> vals = spoutStats.get_complete_ms_avg().get(":all-time"); + for(String key : vals.keySet()){ + total += vals.get(key); + } + Double latency = total / vals.size(); + + spoutExecCount++; + spoutLatencySum += latency; + } + + + } // end executor summary + + MetricsSample ret = new MetricsSample(); + ret.totalEmitted = totalEmitted; + ret.totalTransferred = totalTransferred; + ret.totalAcked = totalAcked; + ret.totalFailed = totalFailed; + ret.totalLatency = spoutLatencySum/spoutExecCount; + ret.spoutEmitted = spoutEmitted; + ret.spoutTransferred = spoutTransferred; + ret.sampleTime = System.currentTimeMillis(); +// ret.numSupervisors = clusterSummary.get_supervisors_size(); + ret.numWorkers = 0; + ret.numExecutors = 0; + ret.numTasks = 0; + ret.spoutExecutors = spoutExecCount; + return ret; + } + + public static TopologySummary getTopologySummary(ClusterSummary cs, String name) { + for (TopologySummary ts : cs.get_topologies()) { + if (name.equals(ts.get_name())) { + return ts; + } + } + return null; + } + + + + // getters + public long getSampleTime() { + return sampleTime; + } + + public long getTotalTransferred() { + return totalTransferred; + } + + public long getTotalEmitted() { + return totalEmitted; + } + + public long getTotalAcked() { + return totalAcked; + } + + public long getTotalFailed() { + return totalFailed; + } + + public double getTotalLatency() { + return totalLatency; + } + + public long getSpoutEmitted() { + return spoutEmitted; + } + + public long getSpoutTransferred() { + return spoutTransferred; + } + + public int getNumSupervisors() { + return numSupervisors; + } + + public int getNumWorkers() { + return numWorkers; + } + + public int getNumTasks() { + return numTasks; + } + + public int getTotalSlots() { + return totalSlots; + } + + public int getSpoutExecutors(){ + return this.spoutExecutors; + } + + public int getNumExecutors() { + return this.numExecutors; + } + + public int getUsedSlots() { + return this.usedSlots; + } + +} \ No newline at end of file
