STORM-2416: normalize provided.scope across poms; move storm-perf to examples; update packaging
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ff93e07f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ff93e07f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ff93e07f Branch: refs/heads/1.x-branch Commit: ff93e07f0d63c4942ee398cd795006984ff8e4c7 Parents: 1b514cb Author: P. Taylor Goetz <[email protected]> Authored: Fri Mar 17 16:04:53 2017 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Fri Mar 17 16:05:33 2017 -0400 ---------------------------------------------------------------------- examples/flux-examples/pom.xml | 2 +- examples/storm-elasticsearch-examples/pom.xml | 15 - examples/storm-hbase-examples/pom.xml | 14 - examples/storm-hdfs-examples/pom.xml | 14 - examples/storm-hive-examples/pom.xml | 14 - examples/storm-jdbc-examples/pom.xml | 14 - examples/storm-jms-examples/pom.xml | 2 +- examples/storm-kafka-client-examples/pom.xml | 14 - examples/storm-kafka-examples/pom.xml | 14 - examples/storm-mongodb-examples/pom.xml | 14 - examples/storm-mqtt-examples/pom.xml | 14 - examples/storm-opentsdb-examples/pom.xml | 14 - examples/storm-perf/README.markdown | 50 + examples/storm-perf/pom.xml | 107 + .../main/conf/ConstSpoutIdBoltNullBoltTopo.yaml | 22 + .../src/main/conf/ConstSpoutNullBoltTopo.yaml | 22 + .../src/main/conf/FileReadWordCountTopo.yaml | 23 + .../src/main/conf/HdfsSpoutNullBoltTopo.yaml | 25 + .../storm-perf/src/main/conf/KafkaHdfsTopo.yaml | 26 + .../src/main/conf/KafkaSpoutNullBoltTopo.yaml | 23 + .../src/main/conf/StrGenSpoutHdfsBoltTopo.yaml | 25 + .../perf/ConstSpoutIdBoltNullBoltTopo.java | 101 + .../storm/perf/ConstSpoutNullBoltTopo.java | 107 + .../apache/storm/perf/ConstSpoutOnlyTopo.java | 74 + .../storm/perf/FileReadWordCountTopo.java | 96 + .../storm/perf/HdfsSpoutNullBoltTopo.java | 101 + .../org/apache/storm/perf/KafkaHdfsTopo.java | 168 + .../storm/perf/KafkaSpoutNullBoltTopo.java | 114 + .../storm/perf/StrGenSpoutHdfsBoltTopo.java | 154 + .../org/apache/storm/perf/bolt/CountBolt.java | 58 + .../org/apache/storm/perf/bolt/DevNullBolt.java | 47 + .../java/org/apache/storm/perf/bolt/IdBolt.java | 49 + .../storm/perf/bolt/SplitSentenceBolt.java | 58 + .../org/apache/storm/perf/spout/ConstSpout.java | 70 + .../apache/storm/perf/spout/FileReadSpout.java | 141 + .../apache/storm/perf/spout/StringGenSpout.java | 93 + .../storm/perf/utils/BasicMetricsCollector.java | 309 + .../org/apache/storm/perf/utils/Helper.java | 133 + .../apache/storm/perf/utils/IdentityBolt.java | 51 + .../apache/storm/perf/utils/MetricsSample.java | 248 + .../src/main/sampledata/randomwords.txt | 14049 +++++++++++++++++ examples/storm-pmml-examples/pom.xml | 14 - examples/storm-redis-examples/pom.xml | 14 - examples/storm-solr-examples/pom.xml | 14 - examples/storm-starter/pom.xml | 11 - external/flux/pom.xml | 20 - external/sql/storm-sql-core/pom.xml | 4 +- .../storm-sql-external/storm-sql-hdfs/pom.xml | 6 +- .../storm-sql-external/storm-sql-kafka/pom.xml | 6 +- .../storm-sql-mongodb/pom.xml | 6 +- .../storm-sql-external/storm-sql-redis/pom.xml | 6 +- external/sql/storm-sql-runtime/pom.xml | 2 +- external/storm-cassandra/pom.xml | 2 +- external/storm-druid/pom.xml | 2 +- external/storm-elasticsearch/pom.xml | 2 +- external/storm-eventhubs/pom.xml | 42 +- external/storm-hbase/pom.xml | 2 +- external/storm-hdfs/pom.xml | 2 +- external/storm-hive/pom.xml | 2 +- external/storm-jdbc/pom.xml | 2 +- external/storm-jms/pom.xml | 2 +- external/storm-kafka-client/pom.xml | 2 +- external/storm-kafka/pom.xml | 2 +- external/storm-kinesis/pom.xml | 2 +- external/storm-metrics/pom.xml | 2 +- external/storm-mongodb/pom.xml | 2 +- external/storm-opentsdb/pom.xml | 2 +- external/storm-pmml/pom.xml | 2 +- external/storm-redis/pom.xml | 2 +- external/storm-solr/pom.xml | 2 +- pom.xml | 2 +- storm-dist/binary/src/main/assembly/binary.xml | 236 +- storm-perf/README.markdown | 50 - storm-perf/pom.xml | 107 - .../main/conf/ConstSpoutIdBoltNullBoltTopo.yaml | 22 - .../src/main/conf/ConstSpoutNullBoltTopo.yaml | 22 - .../src/main/conf/FileReadWordCountTopo.yaml | 23 - .../src/main/conf/HdfsSpoutNullBoltTopo.yaml | 25 - storm-perf/src/main/conf/KafkaHdfsTopo.yaml | 26 - .../src/main/conf/KafkaSpoutNullBoltTopo.yaml | 23 - .../src/main/conf/StrGenSpoutHdfsBoltTopo.yaml | 25 - .../perf/ConstSpoutIdBoltNullBoltTopo.java | 101 - .../storm/perf/ConstSpoutNullBoltTopo.java | 107 - .../apache/storm/perf/ConstSpoutOnlyTopo.java | 74 - .../storm/perf/FileReadWordCountTopo.java | 96 - .../storm/perf/HdfsSpoutNullBoltTopo.java | 101 - .../org/apache/storm/perf/KafkaHdfsTopo.java | 168 - .../storm/perf/KafkaSpoutNullBoltTopo.java | 114 - .../storm/perf/StrGenSpoutHdfsBoltTopo.java | 154 - .../org/apache/storm/perf/bolt/CountBolt.java | 58 - .../org/apache/storm/perf/bolt/DevNullBolt.java | 47 - .../java/org/apache/storm/perf/bolt/IdBolt.java | 49 - .../storm/perf/bolt/SplitSentenceBolt.java | 58 - .../org/apache/storm/perf/spout/ConstSpout.java | 70 - .../apache/storm/perf/spout/FileReadSpout.java | 141 - .../apache/storm/perf/spout/StringGenSpout.java | 93 - .../storm/perf/utils/BasicMetricsCollector.java | 309 - .../org/apache/storm/perf/utils/Helper.java | 133 - .../apache/storm/perf/utils/IdentityBolt.java | 51 - .../apache/storm/perf/utils/MetricsSample.java | 248 - storm-perf/src/main/sampledata/randomwords.txt | 14049 ----------------- 101 files changed, 16587 insertions(+), 17063 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/flux-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/flux-examples/pom.xml b/examples/flux-examples/pom.xml index 7fff5db..fb9a013 100644 --- a/examples/flux-examples/pom.xml +++ b/examples/flux-examples/pom.xml @@ -35,7 +35,7 @@ <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${project.version}</version> - <scope>provided</scope> + <scope>${provided.scope}</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-elasticsearch-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml index c36a4cc..1fd2d9d 100644 --- a/examples/storm-elasticsearch-examples/pom.xml +++ b/examples/storm-elasticsearch-examples/pom.xml @@ -26,21 +26,6 @@ </parent> <artifactId>storm-elasticsearch-examples</artifactId> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-hbase-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-hbase-examples/pom.xml b/examples/storm-hbase-examples/pom.xml index b802c0c..8c4d66b 100644 --- a/examples/storm-hbase-examples/pom.xml +++ b/examples/storm-hbase-examples/pom.xml @@ -27,20 +27,6 @@ <artifactId>storm-hbase-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-hdfs-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-hdfs-examples/pom.xml b/examples/storm-hdfs-examples/pom.xml index 80324b9..390630d 100644 --- a/examples/storm-hdfs-examples/pom.xml +++ b/examples/storm-hdfs-examples/pom.xml @@ -27,20 +27,6 @@ <artifactId>storm-hdfs-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-hive-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-hive-examples/pom.xml b/examples/storm-hive-examples/pom.xml index d755bde..69e8abe 100644 --- a/examples/storm-hive-examples/pom.xml +++ b/examples/storm-hive-examples/pom.xml @@ -27,20 +27,6 @@ <artifactId>storm-hive-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-jdbc-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-jdbc-examples/pom.xml b/examples/storm-jdbc-examples/pom.xml index 7fc2582..989ebdc 100644 --- a/examples/storm-jdbc-examples/pom.xml +++ b/examples/storm-jdbc-examples/pom.xml @@ -27,20 +27,6 @@ <artifactId>storm-jdbc-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-jms-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml index 6451283..7e26c6a 100644 --- a/examples/storm-jms-examples/pom.xml +++ b/examples/storm-jms-examples/pom.xml @@ -63,7 +63,7 @@ <artifactId>storm-core</artifactId> <version>${project.version}</version> <!-- keep storm out of the jar-with-dependencies --> - <scope>provided</scope> + <scope>${provided.scope</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-kafka-client-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml index d118313..2bf16c8 100644 --- a/examples/storm-kafka-client-examples/pom.xml +++ b/examples/storm-kafka-client-examples/pom.xml @@ -31,20 +31,6 @@ <artifactId>storm-kafka-client-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-kafka-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml index 0674534..ea844b0 100644 --- a/examples/storm-kafka-examples/pom.xml +++ b/examples/storm-kafka-examples/pom.xml @@ -27,20 +27,6 @@ <artifactId>storm-kafka-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-mongodb-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml index a5a34f4..1e00f2a 100644 --- a/examples/storm-mongodb-examples/pom.xml +++ b/examples/storm-mongodb-examples/pom.xml @@ -27,20 +27,6 @@ <artifactId>storm-mongodb-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-mqtt-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml index b5cc8de..c8bf839 100644 --- a/examples/storm-mqtt-examples/pom.xml +++ b/examples/storm-mqtt-examples/pom.xml @@ -30,20 +30,6 @@ <relativePath>../../pom.xml</relativePath> </parent> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-opentsdb-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml index a87eb07..a6bbcd3 100644 --- a/examples/storm-opentsdb-examples/pom.xml +++ b/examples/storm-opentsdb-examples/pom.xml @@ -27,20 +27,6 @@ <artifactId>storm-opentsdb-examples</artifactId> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/README.markdown ---------------------------------------------------------------------- diff --git a/examples/storm-perf/README.markdown b/examples/storm-perf/README.markdown new file mode 100644 index 0000000..946ab21 --- /dev/null +++ b/examples/storm-perf/README.markdown @@ -0,0 +1,50 @@ +# Topologies for measuring Storm performance + +This module includes topologies designed for measuring Storm performance. + +## Overview +There are two basic modes for running these topologies + +- **Cluster mode:** Submits the topology to a storm cluster. This mode is useful for benchmarking. It calculates throughput and latency numbers every minute and prints them on the console. +- **In-process mode:** Uses LocalCluster to run topology. This mode helps identify bottlenecks using profilers like JProfiler from within a IDE. This mode does not print metrics. + +In both the modes, a shutdown hook is setup to terminate the topology when the program that is submitting the topology is terminated. + +The bundled topologies can be classified into two types. + +- Topologies that measure purely the internal functioning of Storm. Such topologies do not interact with external systems like Kafka or Hdfs. +- Topologies that measure speed of I/O with external systems like Kafka and Hdfs. + +Topologies that measure internal performance can be run in either in-proc or cluster modes. +Topologies that measure I/O with external systems are designed to run in cluster mode only. + +## Topologies List + +1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports in-proc and cluster mode. +2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports in-proc and cluster mode. +3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports in-proc and cluster mode. +4. **FileReadWordCount:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports in-proc and cluster mode. +5. **HdfsSpoutNullBolt:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only. +6. **StrGenSpoutHdfsBoltTopo:** Measures speed at which HdfsBolt can write to HDFS. Supports cluster mode only. +7. **KafkaSpoutNullBolt:** Measures speed at which KafkaSpout can read from Kafka. Supports cluster mode only. +8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS. + + +## How to run ? + +### In-process mode: +This mode is intended for running the topology quickly and easily from within the IDE and does not expect any command line arguments. +Simply running the Topology's main() method without any arguments will get it running. The topology runs indefinitely till the program is terminated. + + +### Cluster mode: +When the topology is run with one or more than one cmd line arguments, the topology is submitted to the cluster. +The first argument indicates how long the topology should be run. Often the second argument refers to a yaml config +file which contains topology configuration settings. The conf/ directory in this module contains sample config files +with names matching the corresponding topology. + +These topologies can be run using the standard storm jar command. + +``` +bin/storm jar /path/storm-perf-1.1.0-jar-with-dependencies.jar org.apache.storm.perf.ConstSpoutNullBoltTopo 200 conf/ConstSpoutIdBoltNullBoltTopo.yaml +``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml new file mode 100644 index 0000000..f2e62f0 --- /dev/null +++ b/examples/storm-perf/pom.xml @@ -0,0 +1,107 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.apache.storm</groupId> + <artifactId>storm-perf</artifactId> + <packaging>jar</packaging> + <name>Storm Perf</name> + <description>Topologies and tools to measure performance.</description> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifest> + <mainClass /> + </manifest> + </archive> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <includeProjectDependencies>true</includeProjectDependencies> + <includePluginDependencies>false</includePluginDependencies> + <classpathScope>compile</classpathScope> + <mainClass>${storm.topology}</mainClass> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <!-- + Use "provided" scope to keep storm out of the jar-with-dependencies + For IntelliJ dev, intellij will load properly. + --> + <scope>${provided.scope}</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hdfs</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml new file mode 100644 index 0000000..9f74aee --- /dev/null +++ b/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spout.count : 1 +bolt1.count : 1 # IdBolt instances +bolt2.count : 1 # DevNullBolt instances + +# storm config overrides +topology.workers : 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml new file mode 100644 index 0000000..51f2dd7 --- /dev/null +++ b/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spout.count : 1 +bolt.count : 1 +grouping : "local" # either "shuffle" or "local" + +# storm config overrides +topology.workers : 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml b/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml new file mode 100644 index 0000000..61abe8f --- /dev/null +++ b/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spout.count : 1 +splitter.count : 1 +counter.count : 1 +input.file : "/Users/roshan/Projects/idea/storm/storm-perf/src/main/resources/randomwords.txt" + +# storm config overrides +topology.workers : 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml new file mode 100644 index 0000000..a06ad6e --- /dev/null +++ b/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spout.count : 1 +bolt.count : 1 +hdfs.uri : "hdfs://hdfs.namenode:8020" +hdfs.source.dir : "/tmp/storm/in" +hdfs.archive.dir : "/tmp/storm/done" +hdfs.bad.dir : "/tmp/storm/bad" + +# storm config overrides +topology.workers : 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml b/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml new file mode 100755 index 0000000..a8ed2f2 --- /dev/null +++ b/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spout.count : 1 +bolt.count : 1 +kafka.topic : "kafka_topic" +zk.uri : "zkhostname:2181" +hdfs.uri : "hdfs://hdfs.namenode:8020" +hdfs.dir : "/tmp/storm" +hdfs.batch : 1000 + +# storm config overrides +topology.workers : 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml new file mode 100644 index 0000000..cde4c2e --- /dev/null +++ b/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spout.count : 1 +bolt.count : 1 +kafka.topic : "kafka_topic" +zk.uri : "zkhostname:2181" + +# storm config overrides +topology.workers : 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml b/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml new file mode 100644 index 0000000..d16431b --- /dev/null +++ b/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spout.count : 1 +bolt.count : 1 +hdfs.uri : "hdfs://hdfs.namenode:8020" +hdfs.dir : "/tmp/storm" +hdfs.batch : 1000 + + +# storm config overrides +topology.workers : 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java new file mode 100644 index 0000000..11c63d3 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.perf.bolt.DevNullBolt; +import org.apache.storm.perf.bolt.IdBolt; +import org.apache.storm.perf.spout.ConstSpout; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; + +import java.util.Map; + +/** + * ConstSpout -> IdBolt -> DevNullBolt + * This topology measures speed of messaging between spouts->bolt and bolt->bolt + * ConstSpout : Continuously emits a constant string + * IdBolt : clones and emits input tuples + * DevNullBolt : discards incoming tuples + */ +public class ConstSpoutIdBoltNullBoltTopo { + + public static final String TOPOLOGY_NAME = "ConstSpoutIdBoltNullBoltTopo"; + public static final String SPOUT_ID = "constSpout"; + public static final String BOLT1_ID = "idBolt"; + public static final String BOLT2_ID = "nullBolt"; + + // Configs + public static final String BOLT1_COUNT = "bolt1.count"; + public static final String BOLT2_COUNT = "bolt2.count"; + public static final String SPOUT_COUNT = "spout.count"; + + public static StormTopology getTopology(Map conf) { + + // 1 - Setup Spout -------- + ConstSpout spout = new ConstSpout("some data").withOutputFields("str"); + + // 2 - Setup IdBolt & DevNullBolt -------- + IdBolt bolt1 = new IdBolt(); + DevNullBolt bolt2 = new DevNullBolt(); + + + // 3 - Setup Topology -------- + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1) ); + + builder.setBolt(BOLT1_ID, bolt1, Helper.getInt(conf, BOLT1_COUNT, 1)) + .localOrShuffleGrouping(SPOUT_ID); + + builder.setBolt(BOLT2_ID, bolt2, Helper.getInt(conf, BOLT2_COUNT, 1)) + .localOrShuffleGrouping(BOLT1_ID); + + return builder.createTopology(); + } + + + public static void main(String[] args) throws Exception { + + if (args.length <= 0) { + // submit to local cluster + Config conf = new Config(); + LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf)); + + Helper.setupShutdownHook(cluster, TOPOLOGY_NAME); + while (true) {// run indefinitely till Ctrl-C + Thread.sleep(20_000_000); + } + } else { + // submit to real cluster + if (args.length >2) { + System.err.println("args: runDurationSec [optionalConfFile]"); + return; + } + Integer durationSec = Integer.parseInt(args[0]); + Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config(); + + // Submit topology to storm cluster + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java new file mode 100755 index 0000000..92c2787 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.perf.bolt.DevNullBolt; +import org.apache.storm.perf.spout.ConstSpout; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; + +import java.util.Map; + +/*** + * This topo helps measure the messaging speed between a spout and a bolt. + * Spout generates a stream of a fixed string. + * Bolt will simply ack and discard the tuple received + */ + +public class ConstSpoutNullBoltTopo { + + public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo"; + public static final String SPOUT_ID = "constSpout"; + public static final String BOLT_ID = "nullBolt"; + + // Configs + public static final String BOLT_COUNT = "bolt.count"; + public static final String SPOUT_COUNT = "spout.count"; + public static final String GROUPING = "grouping"; // can be 'local' or 'shuffle' + + public static final String LOCAL_GROPING = "local"; + public static final String SHUFFLE_GROUPING = "shuffle"; + public static final String DEFAULT_GROUPING = LOCAL_GROPING; + + public static StormTopology getTopology(Map conf) { + + // 1 - Setup Spout -------- + ConstSpout spout = new ConstSpout("some data").withOutputFields("str"); + + // 2 - Setup DevNull Bolt -------- + DevNullBolt bolt = new DevNullBolt(); + + + // 3 - Setup Topology -------- + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1) ); + BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1)); + + String groupingType = Helper.getStr(conf, GROUPING); + if(groupingType==null || groupingType.equalsIgnoreCase(DEFAULT_GROUPING) ) + bd.localOrShuffleGrouping(SPOUT_ID); + else if(groupingType.equalsIgnoreCase(SHUFFLE_GROUPING) ) + bd.shuffleGrouping(SPOUT_ID); + return builder.createTopology(); + } + + /** + * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle) + */ + public static void main(String[] args) throws Exception { + + if(args.length <= 0) { + // For IDE based profiling ... submit topology to local cluster + Config conf = new Config(); + final LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf)); + + Helper.setupShutdownHook(cluster, TOPOLOGY_NAME); + while (true) {// run indefinitely till Ctrl-C + Thread.sleep(20_000_000); + } + + } else { + // For measuring perf against a Storm cluster + if (args.length > 2) { + System.err.println("args: runDurationSec [optionalConfFile]"); + return; + } + Integer durationSec = Integer.parseInt(args[0]); + Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config(); + + // Submit topology to storm cluster + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); + } + } + +} + http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java new file mode 100755 index 0000000..721ae3d --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.perf.spout.ConstSpout; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.TopologyBuilder; + + +/*** + * This topo helps measure how fast a spout can produce data (so no bolts are attached) + * Spout generates a stream of a fixed string. + */ + +public class ConstSpoutOnlyTopo { + + public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo"; + public static final String SPOUT_ID = "constSpout"; + + + public static StormTopology getTopology() { + + // 1 - Setup Const Spout -------- + ConstSpout spout = new ConstSpout("some data").withOutputFields("str"); + + // 2 - Setup Topology -------- + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, spout, 1); + return builder.createTopology(); + } + + /** + * ConstSpout only topology (No bolts) + */ + public static void main(String[] args) throws Exception { + if(args.length <= 0) { + // For IDE based profiling ... submit topology to local cluster + LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology()); + + Helper.setupShutdownHook(cluster, TOPOLOGY_NAME); + while (true) {// run indefinitely till Ctrl-C + Thread.sleep(20_000_000); + } + } else { + // Submit topology to storm cluster + if (args.length != 1) { + System.err.println("args: runDurationSec"); + return; + } + Integer durationSec = Integer.parseInt(args[0]); + + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, new Config(), getTopology()); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java new file mode 100644 index 0000000..d518c86 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java @@ -0,0 +1,96 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ +package org.apache.storm.perf; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.perf.bolt.CountBolt; +import org.apache.storm.perf.bolt.SplitSentenceBolt; +import org.apache.storm.perf.spout.FileReadSpout; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + + +import java.util.Map; + +/*** + * This topo helps measure speed of word count. + * Spout loads a file into memory on initialization, then emits the lines in an endless loop. + */ + +public class FileReadWordCountTopo { + public static final String SPOUT_ID = "spout"; + public static final String COUNT_ID = "counter"; + public static final String SPLIT_ID = "splitter"; + public static final String TOPOLOGY_NAME = "FileReadWordCountTopo"; + + // Config settings + public static final String SPOUT_NUM = "spout.count"; + public static final String SPLIT_NUM = "splitter.count"; + public static final String COUNT_NUM = "counter.count"; + public static final String INPUT_FILE = "input.file"; + + public static final int DEFAULT_SPOUT_NUM = 1; + public static final int DEFAULT_SPLIT_BOLT_NUM = 2; + public static final int DEFAULT_COUNT_BOLT_NUM = 2; + + + public static StormTopology getTopology(Map config) { + + final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM); + final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM); + final int cntBoltNum = Helper.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM); + final String inputFile = Helper.getStr(config, INPUT_FILE); + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum); + builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum).localOrShuffleGrouping(SPOUT_ID); + builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS)); + + return builder.createTopology(); + } + + public static void main(String[] args) throws Exception { + if(args.length <= 0) { + // For IDE based profiling ... submit topology to local cluster + Config conf = new Config(); + conf.put(INPUT_FILE, "resources/randomwords.txt"); + LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf)); + + Helper.setupShutdownHook(cluster, TOPOLOGY_NAME); + while (true) {// run indefinitely till Ctrl-C + Thread.sleep(20_000_000); + } + } else { + // Submit to Storm cluster + if (args.length !=2) { + System.err.println("args: runDurationSec confFile"); + return; + } + Integer durationSec = Integer.parseInt(args[0]); + Map topoConf = Utils.findAndReadConfigFile(args[1]); + + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); + + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java new file mode 100644 index 0000000..248b523 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.perf; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hdfs.spout.HdfsSpout; +import org.apache.storm.hdfs.spout.TextFileReader; +import org.apache.storm.perf.bolt.DevNullBolt; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; + +import java.util.Map; + +/*** + * This topo helps measure speed of reading from Hdfs. + * Spout Reads from Hdfs. + * Bolt acks and discards tuples + */ + + +public class HdfsSpoutNullBoltTopo { + // names + static final String TOPOLOGY_NAME = "HdfsSpoutNullBoltTopo"; + static final String SPOUT_ID = "hdfsSpout"; + static final String BOLT_ID = "devNullBolt"; + + // configs + static final String SPOUT_NUM = "spout.count"; + static final String BOLT_NUM = "bolt.count"; + + static final String HDFS_URI = "hdfs.uri"; + static final String SOURCE_DIR = "hdfs.source.dir"; + static final String ARCHIVE_DIR = "hdfs.archive.dir"; + static final String BAD_DIR = "hdfs.bad.dir"; + + public static final int DEFAULT_SPOUT_NUM = 1; + public static final int DEFAULT_BOLT_NUM = 1; + + + public static StormTopology getTopology(Map config) { + + final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM); + final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM); + final String fileFormat = Helper.getStr(config, "text"); + final String hdfsUri = Helper.getStr(config, HDFS_URI); + final String sourceDir = Helper.getStr(config, SOURCE_DIR); + final String archiveDir = Helper.getStr(config, ARCHIVE_DIR); + final String badDir = Helper.getStr(config, BAD_DIR); + + + // 1 - Setup Hdfs Spout -------- + HdfsSpout spout = new HdfsSpout() + .setReaderType(fileFormat) + .setHdfsUri(hdfsUri) + .setSourceDir(sourceDir) + .setArchiveDir(archiveDir) + .setBadFilesDir(badDir) + .withOutputFields(TextFileReader.defaultFields); + + // 2 - DevNull Bolt -------- + DevNullBolt bolt = new DevNullBolt(); + + // 3 - Setup Topology -------- + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, spout, spoutNum); + builder.setBolt(BOLT_ID, bolt, boltNum) + .localOrShuffleGrouping(SPOUT_ID); + + return builder.createTopology(); + } + + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("args: runDurationSec topConfFile"); + return; + } + + Integer durationSec = Integer.parseInt(args[0]); + Map topoConf = Utils.findAndReadConfigFile(args[1]); + + // Submit to Storm cluster + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java new file mode 100755 index 0000000..4293aac --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hdfs.bolt.HdfsBolt; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.apache.storm.hdfs.bolt.format.RecordFormat; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.kafka.BrokerHosts; +import org.apache.storm.kafka.KafkaSpout; +import org.apache.storm.kafka.SpoutConfig; +import org.apache.storm.kafka.StringMultiSchemeWithTopic; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.Utils; + +import java.util.Map; +import java.util.UUID; + +/*** + * This topo helps measure speed of reading from Kafka and writing to Hdfs. + * Spout Reads from Kafka. + * Bolt writes to Hdfs + */ + +public class KafkaHdfsTopo { + + // configs - topo parallelism + public static final String SPOUT_NUM = "spout.count"; + public static final String BOLT_NUM = "bolt.count"; + // configs - kafka spout + public static final String KAFKA_TOPIC = "kafka.topic"; + public static final String ZOOKEEPER_URI = "zk.uri"; + // configs - hdfs bolt + public static final String HDFS_URI = "hdfs.uri"; + public static final String HDFS_PATH = "hdfs.dir"; + public static final String HDFS_BATCH = "hdfs.batch"; + + + public static final int DEFAULT_SPOUT_NUM = 1; + public static final int DEFAULT_BOLT_NUM = 1; + public static final int DEFAULT_HDFS_BATCH = 1000; + + // names + public static final String TOPOLOGY_NAME = "KafkaHdfsTopo"; + public static final String SPOUT_ID = "kafkaSpout"; + public static final String BOLT_ID = "hdfsBolt"; + + + + public static StormTopology getTopology(Map config) { + + final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM); + final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM); + + final int hdfsBatch = getInt(config, HDFS_BATCH, DEFAULT_HDFS_BATCH); + + // 1 - Setup Kafka Spout -------- + String zkConnString = getStr(config, ZOOKEEPER_URI); + String topicName = getStr(config, KAFKA_TOPIC); + + BrokerHosts brokerHosts = new ZkHosts(zkConnString); + SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString()); + spoutConfig.scheme = new StringMultiSchemeWithTopic(); + spoutConfig.ignoreZkOffsets = true; + + KafkaSpout spout = new KafkaSpout(spoutConfig); + + // 2 - Setup HFS Bolt -------- + String Hdfs_url = getStr(config, HDFS_URI); + RecordFormat format = new LineWriter("str"); + SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch); + FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB); + + FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(getStr(config,HDFS_PATH) ); + + // Instantiate the HdfsBolt + HdfsBolt bolt = new HdfsBolt() + .withFsUrl(Hdfs_url) + .withFileNameFormat(fileNameFormat) + .withRecordFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(syncPolicy); + + + // 3 - Setup Topology -------- + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, spout, spoutNum); + builder.setBolt(BOLT_ID, bolt, boltNum) + .localOrShuffleGrouping(SPOUT_ID); + + return builder.createTopology(); + } + + + public static int getInt(Map map, Object key, int def) { + return Utils.getInt(Utils.get(map, key, def)); + } + + public static String getStr(Map map, Object key) { + return (String) map.get(key); + } + + + /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming */ + public static void main(String[] args) throws Exception { + + if (args.length != 2) { + System.err.println("args: runDurationSec topConfFile"); + return; + } + + Integer durationSec = Integer.parseInt(args[0]); + String confFile = args[1]; + Map topoConf = Utils.findAndReadConfigFile(confFile); + + // Submit topology to Storm cluster + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); + } + + public static class LineWriter implements RecordFormat { + private String lineDelimiter = System.lineSeparator(); + private String fieldName; + + public LineWriter(String fieldName) { + this.fieldName = fieldName; + } + + /** + * Overrides the default record delimiter. + * + * @param delimiter + * @return + */ + public LineWriter withLineDelimiter(String delimiter){ + this.lineDelimiter = delimiter; + return this; + } + + @Override + public byte[] format(Tuple tuple) { + return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java new file mode 100755 index 0000000..3512c65 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java @@ -0,0 +1,114 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +package org.apache.storm.perf; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.BrokerHosts; +import org.apache.storm.kafka.KafkaSpout; +import org.apache.storm.kafka.SpoutConfig; +import org.apache.storm.kafka.StringMultiSchemeWithTopic; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.perf.bolt.DevNullBolt; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; + +import java.util.Map; +import java.util.UUID; + + +/*** + * This topo helps measure speed of reading from Kafka + * Spout Reads from Kafka. + * Bolt acks and discards tuples + */ + +public class KafkaSpoutNullBoltTopo { + + // configs - topo parallelism + public static final String SPOUT_NUM = "spout.count"; + public static final String BOLT_NUM = "bolt.count"; + + // configs - kafka spout + public static final String KAFKA_TOPIC = "kafka.topic"; + public static final String ZOOKEEPER_URI = "zk.uri"; + + + public static final int DEFAULT_SPOUT_NUM = 1; + public static final int DEFAULT_BOLT_NUM = 1; + + // names + public static final String TOPOLOGY_NAME = "KafkaSpoutNullBoltTopo"; + public static final String SPOUT_ID = "kafkaSpout"; + public static final String BOLT_ID = "devNullBolt"; + + + public static StormTopology getTopology(Map config) { + + final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM); + final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM); + // 1 - Setup Kafka Spout -------- + + String zkConnString = getStr(config, ZOOKEEPER_URI); + String topicName = getStr(config, KAFKA_TOPIC); + + BrokerHosts brokerHosts = new ZkHosts(zkConnString); + SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString()); + spoutConfig.scheme = new StringMultiSchemeWithTopic(); + spoutConfig.ignoreZkOffsets = true; + + KafkaSpout spout = new KafkaSpout(spoutConfig); + + // 2 - DevNull Bolt -------- + DevNullBolt bolt = new DevNullBolt(); + + // 3 - Setup Topology -------- + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, spout, spoutNum); + builder.setBolt(BOLT_ID, bolt, boltNum) + .localOrShuffleGrouping(SPOUT_ID); + + return builder.createTopology(); + } + + + public static int getInt(Map map, Object key, int def) { + return Utils.getInt(Utils.get(map, key, def)); + } + + public static String getStr(Map map, Object key) { + return (String) map.get(key); + } + + + /** + * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming + */ + public static void main(String[] args) throws Exception { + if (args.length !=2) { + System.err.println("args: runDurationSec confFile"); + return; + } + Integer durationSec = Integer.parseInt(args[0]); + Map topoConf = Utils.findAndReadConfigFile(args[1]); + + // Submit to Storm cluster + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java new file mode 100755 index 0000000..5b97540 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + + +package org.apache.storm.perf; + +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hdfs.bolt.HdfsBolt; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.apache.storm.hdfs.bolt.format.RecordFormat; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.perf.spout.StringGenSpout; +import org.apache.storm.perf.utils.Helper; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.Utils; + +import java.util.Map; + +/*** + * This topo helps measure speed of writing to Hdfs + * Spout generates fixed length random strings. + * Bolt writes to Hdfs + */ + +public class StrGenSpoutHdfsBoltTopo { + + // configs - topo parallelism + public static final String SPOUT_NUM = "spout.count"; + public static final String BOLT_NUM = "bolt.count"; + + // configs - hdfs bolt + public static final String HDFS_URI = "hdfs.uri"; + public static final String HDFS_PATH = "hdfs.dir"; + public static final String HDFS_BATCH = "hdfs.batch"; + + public static final int DEFAULT_SPOUT_NUM = 1; + public static final int DEFAULT_BOLT_NUM = 1; + public static final int DEFAULT_HDFS_BATCH = 1000; + + // names + public static final String TOPOLOGY_NAME = "StrGenSpoutHdfsBoltTopo"; + public static final String SPOUT_ID = "GenSpout"; + public static final String BOLT_ID = "hdfsBolt"; + + + public static StormTopology getTopology(Map topoConf) { + final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH); + + // 1 - Setup StringGen Spout -------- + StringGenSpout spout = new StringGenSpout(100).withFieldName("str"); + + + // 2 - Setup HFS Bolt -------- + String Hdfs_url = Helper.getStr(topoConf, HDFS_URI); + RecordFormat format = new LineWriter("str"); + SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch); + FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB); + final int spoutNum = Helper.getInt(topoConf, SPOUT_NUM, DEFAULT_SPOUT_NUM); + final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM); + + // Use default, Storm-generated file names + FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) ); + + // Instantiate the HdfsBolt + HdfsBolt bolt = new HdfsBolt() + .withFsUrl(Hdfs_url) + .withFileNameFormat(fileNameFormat) + .withRecordFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(syncPolicy); + + + // 3 - Setup Topology -------- + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, spout, spoutNum); + builder.setBolt(BOLT_ID, bolt, boltNum) + .localOrShuffleGrouping(SPOUT_ID); + + return builder.createTopology(); + } + + + /** Spout generates random strings and HDFS bolt writes them to a text file */ + public static void main(String[] args) throws Exception { + if(args.length <= 0) { + // submit to local cluster + Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml"); + LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf)); + + Helper.setupShutdownHook(cluster, TOPOLOGY_NAME); + while (true) {// run indefinitely till Ctrl-C + Thread.sleep(20_000_000); + } + } else { + // Submit to Storm cluster + if (args.length !=2) { + System.err.println("args: runDurationSec confFile"); + return; + } + Integer durationSec = Integer.parseInt(args[0]); + Map topoConf = Utils.findAndReadConfigFile(args[1]); + + Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); + } + } + + + public static class LineWriter implements RecordFormat { + private String lineDelimiter = System.lineSeparator(); + private String fieldName; + + public LineWriter(String fieldName) { + this.fieldName = fieldName; + } + + /** + * Overrides the default record delimiter. + * + * @param delimiter + * @return + */ + public LineWriter withLineDelimiter(String delimiter){ + this.lineDelimiter = delimiter; + return this; + } + + public byte[] format(Tuple tuple) { + return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes(); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java new file mode 100644 index 0000000..b79a0ee --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.bolt; + + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +public class CountBolt extends BaseBasicBolt { + public static final String FIELDS_WORD = "word"; + public static final String FIELDS_COUNT = "count"; + + Map<String, Integer> counts = new HashMap<>(); + + @Override + public void prepare(Map stormConf, TopologyContext context) { + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(FIELDS_WORD, FIELDS_COUNT)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java new file mode 100755 index 0000000..b85ce15 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.Map; + + +public class DevNullBolt extends BaseRichBolt { + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple tuple) { + collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java new file mode 100644 index 0000000..116265e --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +public class IdBolt extends BaseRichBolt { + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple tuple) { + collector.emit(tuple, new Values( tuple.getValues() ) ); + collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("field1")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java new file mode 100644 index 0000000..96f9f73 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.bolt; + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + + +public class SplitSentenceBolt extends BaseBasicBolt { + public static final String FIELDS = "word"; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + } + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + for (String word : splitSentence(input.getString(0))) { + collector.emit(new Values(word)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(FIELDS)); + } + + + public static String[] splitSentence(String sentence) { + if (sentence != null) { + return sentence.split("\\s+"); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java new file mode 100755 index 0000000..b66e4f3 --- /dev/null +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.perf.spout; + + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ConstSpout extends BaseRichSpout { + + private static final String DEFAUT_FIELD_NAME = "str"; + private String value; + private String fieldName = DEFAUT_FIELD_NAME; + private SpoutOutputCollector collector = null; + private int count=0; + + public ConstSpout(String value) { + this.value = value; + } + + public ConstSpout withOutputFields(String fieldName) { + this.fieldName = fieldName; + return this; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(fieldName)); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void nextTuple() { + List<Object> tuple = Collections.singletonList((Object) value); + collector.emit(tuple, count++); + } + + @Override + public void ack(Object msgId) { + super.ack(msgId); + } + +}
