http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java 
b/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
deleted file mode 100755
index 3512c65..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License
-*/
-
-package org.apache.storm.perf;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.KafkaSpout;
-import org.apache.storm.kafka.SpoutConfig;
-import org.apache.storm.kafka.StringMultiSchemeWithTopic;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.UUID;
-
-
-/***
- * This topo helps measure speed of reading from Kafka
- *   Spout Reads from Kafka.
- *   Bolt acks and discards tuples
- */
-
-public class KafkaSpoutNullBoltTopo {
-
-    // configs - topo parallelism
-    public static final String SPOUT_NUM = "spout.count";
-    public static final String BOLT_NUM = "bolt.count";
-
-    // configs - kafka spout
-    public static final String KAFKA_TOPIC = "kafka.topic";
-    public static final String ZOOKEEPER_URI = "zk.uri";
-
-
-    public static final int DEFAULT_SPOUT_NUM = 1;
-    public static final int DEFAULT_BOLT_NUM = 1;
-
-    // names
-    public static final String TOPOLOGY_NAME = "KafkaSpoutNullBoltTopo";
-    public static final String SPOUT_ID = "kafkaSpout";
-    public static final String BOLT_ID = "devNullBolt";
-
-
-    public static StormTopology getTopology(Map config) {
-
-        final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
-        final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
-        // 1 -  Setup Kafka Spout   --------
-
-        String zkConnString = getStr(config, ZOOKEEPER_URI);
-        String topicName = getStr(config, KAFKA_TOPIC);
-
-        BrokerHosts brokerHosts = new ZkHosts(zkConnString);
-        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" 
+ topicName, UUID.randomUUID().toString());
-        spoutConfig.scheme = new StringMultiSchemeWithTopic();
-        spoutConfig.ignoreZkOffsets = true;
-
-        KafkaSpout spout = new KafkaSpout(spoutConfig);
-
-        // 2 -   DevNull Bolt   --------
-        DevNullBolt bolt = new DevNullBolt();
-
-        // 3 - Setup Topology  --------
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, spout, spoutNum);
-        builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
-
-        return builder.createTopology();
-    }
-
-
-    public static int getInt(Map map, Object key, int def) {
-        return Utils.getInt(Utils.get(map, key, def));
-    }
-
-    public static String getStr(Map map, Object key) {
-        return (String) map.get(key);
-    }
-
-
-    /**
-     * Copies text file content from sourceDir to destinationDir. Moves source 
files into sourceDir after its done consuming
-     */
-    public static void main(String[] args) throws Exception {
-        if (args.length !=2) {
-            System.err.println("args: runDurationSec confFile");
-            return;
-        }
-        Integer durationSec = Integer.parseInt(args[0]);
-        Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-        //  Submit to Storm cluster
-        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, 
topoConf, getTopology(topoConf));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java 
b/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
deleted file mode 100755
index 5b97540..0000000
--- 
a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-
-package org.apache.storm.perf;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hdfs.bolt.HdfsBolt;
-import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.format.RecordFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.perf.spout.StringGenSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/***
- * This topo helps measure speed of writing to Hdfs
- *  Spout generates fixed length random strings.
- *  Bolt writes to Hdfs
- */
-
-public class StrGenSpoutHdfsBoltTopo {
-
-    // configs - topo parallelism
-    public static final String SPOUT_NUM = "spout.count";
-    public static final String BOLT_NUM =  "bolt.count";
-
-    // configs - hdfs bolt
-    public static final String HDFS_URI   = "hdfs.uri";
-    public static final String HDFS_PATH  = "hdfs.dir";
-    public static final String HDFS_BATCH = "hdfs.batch";
-
-    public static final int DEFAULT_SPOUT_NUM = 1;
-    public static final int DEFAULT_BOLT_NUM = 1;
-    public static final int DEFAULT_HDFS_BATCH = 1000;
-
-    // names
-    public static final String TOPOLOGY_NAME = "StrGenSpoutHdfsBoltTopo";
-    public static final String SPOUT_ID = "GenSpout";
-    public static final String BOLT_ID = "hdfsBolt";
-
-
-    public static StormTopology getTopology(Map topoConf) {
-        final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, 
DEFAULT_HDFS_BATCH);
-
-        // 1 -  Setup StringGen Spout   --------
-        StringGenSpout spout = new StringGenSpout(100).withFieldName("str");
-
-
-        // 2 -  Setup HFS Bolt   --------
-        String Hdfs_url = Helper.getStr(topoConf, HDFS_URI);
-        RecordFormat format = new LineWriter("str");
-        SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
-        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, 
FileSizeRotationPolicy.Units.GB);
-        final int spoutNum = Helper.getInt(topoConf, SPOUT_NUM, 
DEFAULT_SPOUT_NUM);
-        final int boltNum = Helper.getInt(topoConf, BOLT_NUM, 
DEFAULT_BOLT_NUM);
-
-        // Use default, Storm-generated file names
-        FileNameFormat fileNameFormat = new 
DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) );
-
-        // Instantiate the HdfsBolt
-        HdfsBolt bolt = new HdfsBolt()
-                .withFsUrl(Hdfs_url)
-                .withFileNameFormat(fileNameFormat)
-                .withRecordFormat(format)
-                .withRotationPolicy(rotationPolicy)
-                .withSyncPolicy(syncPolicy);
-
-
-        // 3 - Setup Topology  --------
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, spout, spoutNum);
-        builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
-
-        return builder.createTopology();
-    }
-
-
-    /** Spout generates random strings and HDFS bolt writes them to a text 
file */
-    public static void main(String[] args) throws Exception {
-        if(args.length <= 0) {
-            // submit to local cluster
-            Map topoConf = 
Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml");
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, 
getTopology(topoConf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            //  Submit to Storm cluster
-            if (args.length !=2) {
-                System.err.println("args: runDurationSec confFile");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, 
topoConf, getTopology(topoConf));
-        }
-    }
-
-
-    public static class LineWriter implements RecordFormat {
-        private String lineDelimiter = System.lineSeparator();
-        private String fieldName;
-
-        public LineWriter(String fieldName) {
-            this.fieldName = fieldName;
-        }
-
-        /**
-         * Overrides the default record delimiter.
-         *
-         * @param delimiter
-         * @return
-         */
-        public LineWriter withLineDelimiter(String delimiter){
-            this.lineDelimiter = delimiter;
-            return this;
-        }
-
-        public byte[] format(Tuple tuple) {
-            return (tuple.getValueByField(fieldName).toString() +  
this.lineDelimiter).getBytes();
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java 
b/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
deleted file mode 100644
index b79a0ee..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class CountBolt extends BaseBasicBolt {
-    public static final String FIELDS_WORD = "word";
-    public static final String FIELDS_COUNT = "count";
-
-    Map<String, Integer> counts = new HashMap<>();
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-    }
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-        String word = tuple.getString(0);
-        Integer count = counts.get(word);
-        if (count == null)
-            count = 0;
-        count++;
-        counts.put(word, count);
-        collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FIELDS_WORD, FIELDS_COUNT));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java 
b/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
deleted file mode 100755
index b85ce15..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.Map;
-
-
-public class DevNullBolt extends BaseRichBolt {
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java 
b/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
deleted file mode 100644
index 116265e..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-public class IdBolt extends BaseRichBolt {
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        collector.emit(tuple, new Values( tuple.getValues() ) );
-        collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("field1"));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java 
b/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
deleted file mode 100644
index 96f9f73..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-
-public class SplitSentenceBolt extends BaseBasicBolt {
-    public static final String FIELDS = "word";
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-    }
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        for (String word : splitSentence(input.getString(0))) {
-            collector.emit(new Values(word));
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FIELDS));
-    }
-
-
-    public static String[] splitSentence(String sentence) {
-        if (sentence != null) {
-            return sentence.split("\\s+");
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java 
b/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
deleted file mode 100755
index b66e4f3..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class ConstSpout extends BaseRichSpout {
-
-    private static final String DEFAUT_FIELD_NAME = "str";
-    private String value;
-    private String fieldName = DEFAUT_FIELD_NAME;
-    private SpoutOutputCollector collector = null;
-    private int count=0;
-
-    public ConstSpout(String value) {
-        this.value = value;
-    }
-
-    public ConstSpout withOutputFields(String fieldName) {
-        this.fieldName = fieldName;
-        return this;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(fieldName));
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void nextTuple() {
-        List<Object> tuple = Collections.singletonList((Object) value);
-        collector.emit(tuple, count++);
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        super.ack(msgId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java 
b/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
deleted file mode 100644
index 959e7c6..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class FileReadSpout extends BaseRichSpout {
-    public static final String FIELDS = "sentence";
-    private static final long serialVersionUID = -2582705611472467172L;
-    private transient FileReader reader;
-    private String file;
-    private boolean ackEnabled = true;
-    private SpoutOutputCollector collector;
-
-    private long count = 0;
-
-
-    public FileReadSpout(String file) {
-        this.file = file;
-    }
-
-    // For testing
-    FileReadSpout(FileReader reader) {
-        this.reader = reader;
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context,
-                     SpoutOutputCollector collector) {
-        this.collector = collector;
-        Object ackObj = conf.get("topology.acker.executors");
-        if (ackObj != null && ackObj.equals(0)) {
-            this.ackEnabled = false;
-        }
-        // for tests, reader will not be null
-        if (this.reader == null) {
-            this.reader = new FileReader(this.file);
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        if (ackEnabled) {
-            collector.emit(new Values(reader.nextLine()), count);
-            count++;
-        } else {
-            collector.emit(new Values(reader.nextLine()));
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FIELDS));
-    }
-
-    public static List<String> readLines(InputStream input) {
-        List<String> lines = new ArrayList<>();
-        try {
-            BufferedReader reader = new BufferedReader(new 
InputStreamReader(input));
-            try {
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    lines.add(line);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("Reading file failed", e);
-            } finally {
-                reader.close();
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Error closing reader", e);
-        }
-        return lines;
-    }
-
-    public static class FileReader implements Serializable {
-
-        private static final long serialVersionUID = -7012334600647556267L;
-
-        public final String file;
-        private List<String> contents = null;
-        private int index = 0;
-        private int limit = 0;
-
-        public FileReader(String file) {
-            this.file = file;
-            if (this.file != null) {
-                try {
-                    this.contents = readLines(new FileInputStream(this.file));
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    throw new IllegalArgumentException("Cannot open file " + 
file, e);
-                }
-                this.limit = contents.size();
-            } else {
-                throw new IllegalArgumentException("file name cannot be null");
-            }
-        }
-
-        public String nextLine() {
-            if (index >= limit) {
-                index = 0;
-            }
-            String line = contents.get(index);
-            index++;
-            return line;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java 
b/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
deleted file mode 100755
index f9c665b..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/** Spout pre-computes a list with 30k fixed length random strings.
- *  Emits sequentially from this list, over and over again.
- */
-
-public class StringGenSpout extends BaseRichSpout {
-
-    private static final String DEFAULT_FIELD_NAME = "str";
-    private int strLen;
-    private final int strCount = 30_000;
-    private String fieldName = DEFAULT_FIELD_NAME;
-    private SpoutOutputCollector collector = null;
-    ArrayList<String> records;
-    private int curr=0;
-    private int count=0;
-
-    public StringGenSpout(int strLen) {
-        this.strLen = strLen;
-    }
-
-    public StringGenSpout withFieldName(String fieldName) {
-        this.fieldName = fieldName;
-        return this;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare( new Fields(fieldName) );
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
-        this.records = genStringList(strLen, strCount);
-
-        this.collector = collector;
-    }
-
-    private static ArrayList<String> genStringList(int strLen, int count) {
-        ArrayList<String> result = new ArrayList<String>(count);
-        for (int i = 0; i < count; i++) {
-            result.add( RandomStringUtils.random(strLen) );
-        }
-        return result;
-    }
-
-    @Override
-    public void nextTuple() {
-        List<Object> tuple;
-        if( curr < strCount ) {
-            tuple = Collections.singletonList((Object) records.get(curr));
-            ++curr;
-            collector.emit(tuple, ++count);
-        }
-    }
-
-
-    @Override
-    public void ack(Object msgId) {
-        super.ack(msgId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
 
b/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
deleted file mode 100755
index 686f9da..0000000
--- 
a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-import org.apache.log4j.Logger;
-
-import java.io.PrintWriter;
-import java.util.*;
-
-
-public class BasicMetricsCollector  {
-
-    private LocalCluster localCluster = null;
-    private Nimbus.Client client = null;
-    private PrintWriter dataWriter;
-    private long startTime=0;
-
-    public enum MetricsItem {
-        TOPOLOGY_STATS,
-        XSFER_RATE,
-        SPOUT_THROUGHPUT,
-        SPOUT_LATENCY,
-        ALL
-    }
-
-
-    /* headers */
-    public static final String TIME = "elapsed (sec)";
-    public static final String TIME_FORMAT = "%d";
-    public static final String TOTAL_SLOTS = "total_slots";
-    public static final String USED_SLOTS = "used_slots";
-    public static final String WORKERS = "workers";
-    public static final String TASKS = "tasks";
-    public static final String EXECUTORS = "executors";
-    public static final String TRANSFERRED = "transferred (messages)";
-    public static final String XSFER_RATE = "transfer rate (messages/s)";
-    public static final String SPOUT_EXECUTORS = "spout_executors";
-    public static final String SPOUT_TRANSFERRED = "spout_transferred 
(messages)";
-    public static final String SPOUT_ACKED = "spout_acks";
-    public static final String SPOUT_THROUGHPUT = "spout_throughput (acks/s)";
-    public static final String SPOUT_AVG_COMPLETE_LATENCY = 
"spout_avg_complete_latency(ms)";
-    public static final String SPOUT_AVG_LATENCY_FORMAT = "%.1f";
-    public static final String SPOUT_MAX_COMPLETE_LATENCY = 
"spout_max_complete_latency(ms)";
-    public static final String SPOUT_MAX_LATENCY_FORMAT = "%.1f";
-    private static final Logger LOG = 
Logger.getLogger(BasicMetricsCollector.class);
-    final MetricsCollectorConfig config;
-    //    final StormTopology topology;
-    final Set<String> header = new LinkedHashSet<String>();
-    final Map<String, String> metrics = new HashMap<String, String>();
-    int lineNumber = 0;
-
-    final boolean collectTopologyStats;
-    final boolean collectExecutorStats;
-    final boolean collectThroughput;
-
-    final boolean collectSpoutThroughput;
-    final boolean collectSpoutLatency;
-
-    private MetricsSample lastSample;
-    private MetricsSample curSample;
-    private double maxLatency = 0;
-
-    boolean first = true;
-
-    public BasicMetricsCollector(Nimbus.Client client, String topoName, Map 
stormConfig) {
-        this(topoName, stormConfig);
-        this.client = client;
-        this.localCluster = null;
-    }
-
-    public BasicMetricsCollector(LocalCluster localCluster, String topoName, 
Map stormConfig) {
-        this(topoName, stormConfig);
-        this.client = null;
-        this.localCluster = localCluster;
-    }
-
-    private BasicMetricsCollector(String topoName, Map stormConfig) {
-        Set<MetricsItem> items = getMetricsToCollect();
-        this.config = new MetricsCollectorConfig(topoName, stormConfig);
-        collectTopologyStats = collectTopologyStats(items);
-        collectExecutorStats = collectExecutorStats(items);
-        collectThroughput = collectThroughput(items);
-        collectSpoutThroughput = collectSpoutThroughput(items);
-        collectSpoutLatency = collectSpoutLatency(items);
-        dataWriter = new PrintWriter(System.err);
-    }
-
-
-    private Set<MetricsItem>  getMetricsToCollect() {
-        Set<MetricsItem> result = new HashSet<>();
-        result.add(MetricsItem.ALL);
-        return result;
-    }
-
-    public void collect(Nimbus.Client client) {
-        try {
-            if (!first) {
-                this.lastSample = this.curSample;
-                this.curSample = MetricsSample.factory(client, config.name);
-                updateStats(dataWriter);
-                writeLine(dataWriter);
-            } else {
-                LOG.info("Getting baseline metrics sample.");
-                writeHeader(dataWriter);
-                this.curSample = MetricsSample.factory(client, config.name);
-                first = false;
-                startTime = System.currentTimeMillis();
-            }
-        } catch (Exception e) {
-            LOG.error("storm metrics failed! ", e);
-        }
-    }
-
-    public void collect(LocalCluster localCluster) {
-        try {
-            if (!first) {
-                this.lastSample = this.curSample;
-                this.curSample = MetricsSample.factory(localCluster, 
config.name);
-                updateStats(dataWriter);
-                writeLine(dataWriter);
-            } else {
-                LOG.info("Getting baseline metrics sample.");
-                writeHeader(dataWriter);
-                this.curSample = MetricsSample.factory(localCluster, 
config.name);
-                first = false;
-                startTime = System.currentTimeMillis();
-            }
-        } catch (Exception e) {
-            LOG.error("storm metrics failed! ", e);
-        }
-    }
-
-    public void close() {
-        dataWriter.close();
-    }
-
-    boolean updateStats(PrintWriter writer)
-            throws Exception {
-        if (collectTopologyStats) {
-            updateTopologyStats();
-        }
-        if (collectExecutorStats) {
-            updateExecutorStats();
-        }
-        return true;
-    }
-
-    void updateTopologyStats() {
-        long timeTotal = System.currentTimeMillis() - startTime;
-        int numWorkers = this.curSample.getNumWorkers();
-        int numExecutors = this.curSample.getNumExecutors();
-        int numTasks = this.curSample.getNumTasks();
-        metrics.put(TIME, String.format(TIME_FORMAT, timeTotal / 1000));
-        metrics.put(WORKERS, Integer.toString(numWorkers));
-        metrics.put(EXECUTORS, Integer.toString(numExecutors));
-        metrics.put(TASKS, Integer.toString(numTasks));
-    }
-
-    void updateExecutorStats() {
-        long timeDiff = this.curSample.getSampleTime() - 
this.lastSample.getSampleTime();
-        long transferredDiff = this.curSample.getTotalTransferred() - 
this.lastSample.getTotalTransferred();
-        long throughput = transferredDiff / (timeDiff / 1000);
-
-        long spoutDiff = this.curSample.getSpoutTransferred() - 
this.lastSample.getSpoutTransferred();
-        long spoutAckedDiff = this.curSample.getTotalAcked() - 
this.lastSample.getTotalAcked();
-        long spoutThroughput = spoutDiff / (timeDiff / 1000);
-
-        if (collectThroughput) {
-            metrics.put(TRANSFERRED, Long.toString(transferredDiff));
-            metrics.put(XSFER_RATE, Long.toString(throughput));
-        }
-
-        if (collectSpoutThroughput) {
-
-            metrics.put(SPOUT_EXECUTORS, 
Integer.toString(this.curSample.getSpoutExecutors()));
-            metrics.put(SPOUT_TRANSFERRED, Long.toString(spoutDiff));
-            metrics.put(SPOUT_ACKED, Long.toString(spoutAckedDiff));
-            metrics.put(SPOUT_THROUGHPUT, Long.toString(spoutThroughput));
-        }
-
-
-        if (collectSpoutLatency) {
-            double latency = this.curSample.getTotalLatency();
-            if (latency > this.maxLatency) {
-                this.maxLatency = latency;
-            }
-            metrics.put(SPOUT_AVG_COMPLETE_LATENCY,
-                    String.format(SPOUT_AVG_LATENCY_FORMAT, latency));
-            metrics.put(SPOUT_MAX_COMPLETE_LATENCY,
-                    String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency));
-
-        }
-    }
-
-
-    void writeHeader(PrintWriter writer) {
-        header.add(TIME);
-        if (collectTopologyStats) {
-            header.add(WORKERS);
-            header.add(TASKS);
-            header.add(EXECUTORS);
-        }
-
-        if (collectThroughput) {
-            header.add(TRANSFERRED);
-            header.add(XSFER_RATE);
-        }
-
-        if (collectSpoutThroughput) {
-            header.add(SPOUT_EXECUTORS);
-            header.add(SPOUT_TRANSFERRED);
-            header.add(SPOUT_ACKED);
-            header.add(SPOUT_THROUGHPUT);
-        }
-
-        if (collectSpoutLatency) {
-            header.add(SPOUT_AVG_COMPLETE_LATENCY);
-            header.add(SPOUT_MAX_COMPLETE_LATENCY);
-        }
-
-        
writer.println("\n------------------------------------------------------------------------------------------------------------------");
-        String str = Utils.join(header, ",");
-        writer.println(str);
-        
writer.println("------------------------------------------------------------------------------------------------------------------");
-        writer.flush();
-    }
-
-    void writeLine(PrintWriter writer) {
-        List<String> line = new LinkedList<String>();
-        for (String h : header) {
-            line.add(metrics.get(h));
-        }
-        String str = Utils.join(line, ",");
-        writer.println(str);
-        writer.flush();
-    }
-
-
-    boolean collectTopologyStats(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.TOPOLOGY_STATS);
-    }
-
-    boolean collectExecutorStats(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.XSFER_RATE) ||
-                items.contains(MetricsItem.SPOUT_LATENCY);
-    }
-
-    boolean collectThroughput(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.XSFER_RATE);
-    }
-
-    boolean collectSpoutThroughput(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.SPOUT_THROUGHPUT);
-    }
-
-    boolean collectSpoutLatency(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.SPOUT_LATENCY);
-    }
-
-
-
-    public static class MetricsCollectorConfig {
-        private static final Logger LOG = 
Logger.getLogger(MetricsCollectorConfig.class);
-
-        // storm configuration
-        public final Map stormConfig;
-        // storm topology name
-        public final String name;
-        // benchmark label
-        public final String label;
-
-        public MetricsCollectorConfig(String topoName, Map stormConfig) {
-            this.stormConfig = stormConfig;
-            String labelStr = (String) stormConfig.get("benchmark.label");
-            this.name = topoName;
-            if (labelStr == null) {
-                LOG.warn("'benchmark.label' not found in config. Defaulting to 
topology name");
-                labelStr = this.name;
-            }
-            this.label = labelStr;
-        }
-    } // MetricsCollectorConfig
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java 
b/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
deleted file mode 100755
index f429699..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ExecutorSummary;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.perf.KafkaHdfsTopo;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-
-public class Helper {
-
-  public static void kill(Nimbus.Client client, String topoName) throws 
Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(topoName, opts);
-  }
-
-  public static void killAndShutdownCluster(LocalCluster cluster, String 
topoName) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    cluster.killTopologyWithOpts(topoName, opts);
-    cluster.shutdown();
-  }
-
-
-    public static LocalCluster runOnLocalCluster(String topoName, 
StormTopology topology) {
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(topoName, new Config(), topology);
-        return cluster;
-    }
-
-    public static int getInt(Map map, Object key, int def) {
-        return Utils.getInt(Utils.get(map, key, def));
-    }
-
-    public static String getStr(Map map, Object key) {
-        return (String) map.get(key);
-    }
-
-    public static void collectMetricsAndKill(String topologyName, Integer 
pollInterval, Integer duration) throws Exception {
-        Map clusterConf = Utils.readStormConfig();
-        Nimbus.Client client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
-        BasicMetricsCollector metricsCollector = new 
BasicMetricsCollector(client, topologyName, clusterConf);
-
-        int times = duration / pollInterval;
-        metricsCollector.collect(client);
-        for (int i = 0; i < times; i++) {
-            Thread.sleep(pollInterval * 1000);
-            metricsCollector.collect(client);
-        }
-        metricsCollector.close();
-        kill(client, topologyName);
-    }
-
-    public static void collectLocalMetricsAndKill(LocalCluster localCluster, 
String topologyName, Integer pollInterval, Integer duration, Map clusterConf) 
throws Exception {
-        BasicMetricsCollector metricsCollector = new 
BasicMetricsCollector(localCluster, topologyName, clusterConf);
-
-        int times = duration / pollInterval;
-        metricsCollector.collect(localCluster);
-        for (int i = 0; i < times; i++) {
-            Thread.sleep(pollInterval * 1000);
-            metricsCollector.collect(localCluster);
-        }
-        metricsCollector.close();
-        killAndShutdownCluster(localCluster, topologyName);
-    }
-
-    /** Kill topo and Shutdown local cluster on Ctrl-C */
-  public static void setupShutdownHook(final LocalCluster cluster, final 
String topoName) {
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-        cluster.killTopology(topoName);
-        System.out.println("Killed Topology");
-        cluster.shutdown();
-      }
-    });
-  }
-
-  /** Kill topo on Ctrl-C */
-  public static void setupShutdownHook(final String topoName) {
-    Map clusterConf = Utils.readStormConfig();
-    final Nimbus.Client client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-        try {
-          Helper.kill(client, topoName);
-          System.out.println("Killed Topology");
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    });
-  }
-
-    public static void runOnClusterAndPrintMetrics(Integer durationSec, String 
topoName, Map topoConf, StormTopology topology) throws Exception {
-      // submit topology
-      StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, 
topology);
-      setupShutdownHook(topoName); // handle Ctrl-C
-
-      // poll metrics every minute, then kill topology after specified duration
-      Integer pollIntervalSec = 60;
-      collectMetricsAndKill(topoName, pollIntervalSec, durationSec);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java 
b/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
deleted file mode 100755
index 396ad53..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-
-public class IdentityBolt extends BaseRichBolt {
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        collector.emit(tuple, tuple.getValues() );
-        collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/ff93e07f/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git 
a/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java 
b/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
deleted file mode 100755
index a934120..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ExecutorSpecificStats;
-import org.apache.storm.generated.ExecutorStats;
-import org.apache.storm.generated.ExecutorSummary;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.utils.Utils;
-
-import java.util.List;
-import java.util.Map;
-
-public class MetricsSample {
-
-    private long sampleTime = -1;
-    private long totalTransferred = 0l;
-    private long totalEmitted = 0l;
-    private long totalAcked = 0l;
-    private long totalFailed = 0l;
-
-    private double totalLatency;
-
-    private long spoutEmitted = 0l;
-    private long spoutTransferred = 0l;
-    private int spoutExecutors = 0;
-
-    private int numSupervisors = 0;
-    private int numWorkers = 0;
-    private int numTasks = 0;
-    private int numExecutors = 0;
-
-    private int totalSlots = 0;
-    private int usedSlots = 0;
-
-    public static MetricsSample factory(Nimbus.Client client, String 
topologyName) throws Exception {
-        // "************ Sampling Metrics *****************
-
-        ClusterSummary clusterSummary = client.getClusterInfo();
-        // get topology info
-        TopologySummary topSummary = getTopologySummary(clusterSummary, 
topologyName);
-        int topologyExecutors = topSummary.get_num_executors();
-        int topologyWorkers = topSummary.get_num_workers();
-        int topologyTasks = topSummary.get_num_tasks();
-        TopologyInfo topInfo = client.getTopologyInfo(topSummary.get_id());
-
-        MetricsSample sample =  getMetricsSample( topInfo);
-        sample.numWorkers = topologyWorkers;
-        sample.numExecutors = topologyExecutors;
-        sample.numTasks = topologyTasks;
-        return sample;
-    }
-
-    public static MetricsSample factory(LocalCluster localCluster, String 
topologyName) throws Exception {
-        TopologyInfo topologyInfo = 
localCluster.getTopologyInfo(topologyName);;
-        return getMetricsSample(topologyInfo);
-    }
-
-
-    private static MetricsSample getMetricsSample(TopologyInfo topInfo) {
-        List<ExecutorSummary> executorSummaries = topInfo.get_executors();
-
-        // totals
-        long totalTransferred = 0l;
-        long totalEmitted = 0l;
-        long totalAcked = 0l;
-        long totalFailed = 0l;
-
-        // number of spout executors
-        int spoutExecCount = 0;
-        double spoutLatencySum = 0.0;
-
-        long spoutEmitted = 0l;
-        long spoutTransferred = 0l;
-
-        // Executor summaries
-        for(ExecutorSummary executorSummary : executorSummaries){
-            ExecutorStats execuatorStats = executorSummary.get_stats();
-            if(execuatorStats == null){
-                continue;
-            }
-
-            ExecutorSpecificStats executorSpecificStats = 
execuatorStats.get_specific();
-            if(executorSpecificStats == null){
-                // bail out
-                continue;
-            }
-
-            // transferred totals
-            Map<String,Map<String,Long>> transferred = 
execuatorStats.get_transferred();
-            Map<String, Long> txMap = transferred.get(":all-time");
-            if(txMap == null){
-                continue;
-            }
-            for(String key : txMap.keySet()){
-                // todo, ignore the master batch coordinator ?
-                if(!Utils.isSystemId(key)){
-                    Long count = txMap.get(key);
-                    totalTransferred += count;
-                    if(executorSpecificStats.is_set_spout()){
-                        spoutTransferred += count;
-                    }
-                }
-            }
-
-            // we found a spout
-            if(executorSpecificStats.isSet(2)) { // spout
-
-                SpoutStats spoutStats = executorSpecificStats.get_spout();
-                Map<String, Long> acked = 
spoutStats.get_acked().get(":all-time");
-                if(acked != null){
-                    for(String key : acked.keySet()) {
-                        totalAcked += acked.get(key);
-                    }
-                }
-
-                Map<String, Long> failed = 
spoutStats.get_failed().get(":all-time");
-                if(failed != null){
-                    for(String key : failed.keySet()) {
-                        totalFailed += failed.get(key);
-                    }
-                }
-
-                Double total = 0d;
-                Map<String, Double> vals = 
spoutStats.get_complete_ms_avg().get(":all-time");
-                for(String key : vals.keySet()){
-                    total += vals.get(key);
-                }
-                Double latency = total / vals.size();
-
-                spoutExecCount++;
-                spoutLatencySum += latency;
-            }
-
-
-        } // end executor summary
-
-        MetricsSample ret = new MetricsSample();
-        ret.totalEmitted = totalEmitted;
-        ret.totalTransferred = totalTransferred;
-        ret.totalAcked  = totalAcked;
-        ret.totalFailed = totalFailed;
-        ret.totalLatency = spoutLatencySum/spoutExecCount;
-        ret.spoutEmitted = spoutEmitted;
-        ret.spoutTransferred = spoutTransferred;
-        ret.sampleTime = System.currentTimeMillis();
-//        ret.numSupervisors = clusterSummary.get_supervisors_size();
-        ret.numWorkers = 0;
-        ret.numExecutors = 0;
-        ret.numTasks = 0;
-        ret.spoutExecutors = spoutExecCount;
-        return ret;
-    }
-
-    public static TopologySummary getTopologySummary(ClusterSummary cs, String 
name) {
-        for (TopologySummary ts : cs.get_topologies()) {
-            if (name.equals(ts.get_name())) {
-                return ts;
-            }
-        }
-        return null;
-    }
-
-
-
-    // getters
-    public long getSampleTime() {
-        return sampleTime;
-    }
-
-    public long getTotalTransferred() {
-        return totalTransferred;
-    }
-
-    public long getTotalEmitted() {
-        return totalEmitted;
-    }
-
-    public long getTotalAcked() {
-        return totalAcked;
-    }
-
-    public long getTotalFailed() {
-        return totalFailed;
-    }
-
-    public double getTotalLatency() {
-        return totalLatency;
-    }
-
-    public long getSpoutEmitted() {
-        return spoutEmitted;
-    }
-
-    public long getSpoutTransferred() {
-        return spoutTransferred;
-    }
-
-    public int getNumSupervisors() {
-        return numSupervisors;
-    }
-
-    public int getNumWorkers() {
-        return numWorkers;
-    }
-
-    public int getNumTasks() {
-        return numTasks;
-    }
-
-    public int getTotalSlots() {
-        return totalSlots;
-    }
-
-    public int getSpoutExecutors(){
-        return this.spoutExecutors;
-    }
-
-    public int getNumExecutors() {
-        return this.numExecutors;
-    }
-
-    public int getUsedSlots() {
-        return this.usedSlots;
-    }
-
-}
\ No newline at end of file

Reply via email to