Fixing stylecheck problems with storm-starter

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81ec15d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81ec15d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81ec15d1

Branch: refs/heads/master
Commit: 81ec15d1096cd526b94313661e7b5de7ed1791d0
Parents: 7da98cf
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 22:36:19 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 00:22:36 2018 -0400

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |   2 +-
 .../apache/storm/starter/AnchoredWordCount.java |  72 +-
 .../apache/storm/starter/BasicDRPCTopology.java |  49 +-
 .../starter/BlobStoreAPIWordCountTopology.java  | 276 ++++----
 .../storm/starter/ExclamationTopology.java      |  84 ++-
 .../storm/starter/FastWordCountTopology.java    | 300 ++++-----
 .../storm/starter/InOrderDeliveryTest.java      | 261 ++++----
 .../apache/storm/starter/JoinBoltExample.java   |  39 +-
 .../apache/storm/starter/LambdaTopology.java    |  24 +-
 .../org/apache/storm/starter/ManualDRPC.java    |  51 +-
 .../storm/starter/MultipleLoggerTopology.java   | 112 ++--
 .../starter/PersistentWindowingTopology.java    |  99 ++-
 .../org/apache/storm/starter/ReachTopology.java |  76 +--
 .../starter/ResourceAwareExampleTopology.java   | 113 ++--
 .../apache/storm/starter/RollingTopWords.java   | 111 ++-
 .../apache/storm/starter/SingleJoinExample.java |  26 +-
 .../storm/starter/SkewedRollingTopWords.java    | 113 ++--
 .../storm/starter/SlidingTupleTsTopology.java   |  30 +-
 .../storm/starter/SlidingWindowTopology.java    |  62 +-
 .../apache/storm/starter/StatefulTopology.java  |  44 +-
 .../starter/StatefulWindowingTopology.java      |  46 +-
 .../storm/starter/TransactionalGlobalCount.java |  65 +-
 .../storm/starter/TransactionalWords.java       | 102 ++-
 .../apache/storm/starter/WordCountTopology.java | 115 ++--
 .../storm/starter/WordCountTopologyNode.java    |  61 +-
 .../storm/starter/bolt/AbstractRankerBolt.java  | 135 ++--
 .../starter/bolt/IntermediateRankingsBolt.java  |  61 +-
 .../apache/storm/starter/bolt/PrinterBolt.java  |  33 +-
 .../storm/starter/bolt/RollingCountAggBolt.java |  92 ++-
 .../storm/starter/bolt/RollingCountBolt.java    | 169 +++--
 .../storm/starter/bolt/SingleJoinBolt.java      | 167 ++---
 .../starter/bolt/SlidingWindowSumBolt.java      |  48 +-
 .../storm/starter/bolt/TotalRankingsBolt.java   |  63 +-
 .../storm/starter/spout/RandomIntegerSpout.java |  24 +-
 .../spout/RandomNumberGeneratorSpout.java       |  35 +-
 .../starter/spout/RandomSentenceSpout.java      | 120 ++--
 .../storm/starter/streams/AggregateExample.java |  33 +-
 .../storm/starter/streams/BranchExample.java    |  45 +-
 .../streams/GroupByKeyAndWindowExample.java     |  62 +-
 .../storm/starter/streams/JoinExample.java      |  48 +-
 .../starter/streams/StateQueryExample.java      |  53 +-
 .../starter/streams/StatefulWordCount.java      |  59 +-
 .../starter/streams/TypedTupleExample.java      |  19 +-
 .../starter/streams/WindowedWordCount.java      |  70 +-
 .../storm/starter/streams/WordCountToBolt.java  |  51 +-
 .../tools/NthLastModifiedTimeTracker.java       |  75 +--
 .../apache/storm/starter/tools/Rankable.java    |  35 +-
 .../starter/tools/RankableObjectWithFields.java | 224 +++----
 .../apache/storm/starter/tools/Rankings.java    | 238 ++++---
 .../starter/tools/SlidingWindowCounter.java     | 101 ++-
 .../storm/starter/tools/SlotBasedCounter.java   | 144 ++--
 .../starter/trident/DebugMemoryMapState.java    |   7 +-
 .../TridentHBaseWindowingStoreTopology.java     |  46 +-
 .../starter/trident/TridentMapExample.java      |  50 +-
 .../trident/TridentMinMaxOfDevicesTopology.java |  85 ++-
 .../TridentMinMaxOfVehiclesTopology.java        |  83 ++-
 .../storm/starter/trident/TridentReach.java     |  95 ++-
 .../TridentWindowingInmemoryStoreTopology.java  |  43 +-
 .../storm/starter/trident/TridentWordCount.java |  58 +-
 .../bolt/IntermediateRankingsBoltTest.java      | 244 ++++---
 .../starter/bolt/RollingCountBoltTest.java      | 178 +++--
 .../starter/bolt/TotalRankingsBoltTest.java     | 246 ++++---
 .../tools/NthLastModifiedTimeTrackerTest.java   | 158 +++--
 .../tools/RankableObjectWithFieldsTest.java     | 489 +++++++-------
 .../storm/starter/tools/RankingsTest.java       | 668 ++++++++++---------
 .../starter/tools/SlidingWindowCounterTest.java | 175 +++--
 .../starter/tools/SlotBasedCounterTest.java     | 320 +++++----
 67 files changed, 3613 insertions(+), 3869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index d12062d..569daf5 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -249,7 +249,7 @@
             <artifactId>maven-checkstyle-plugin</artifactId>
             <!--Note - the version would be inherited-->
             <configuration>
-                <maxAllowedViolations>1538</maxAllowedViolations>
+                <maxAllowedViolations>263</maxAllowedViolations>
             </configuration>
         </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
index cb45024..022f6ad 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.starter;
@@ -22,7 +16,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
-
 import org.apache.storm.Config;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -39,6 +32,27 @@ import org.apache.storm.utils.Utils;
 
 public class AnchoredWordCount extends ConfigurableTopology {
 
+    protected int run(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 4);
+
+        builder.setBolt("split", new SplitSentence(), 
4).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", 
new Fields("word"));
+
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(3);
+
+        String topologyName = "word-count";
+
+        conf.setNumWorkers(3);
+
+        if (args != null && args.length > 0) {
+            topologyName = args[0];
+        }
+        return submit(topologyName, conf, builder);
+    }
+
     public static class RandomSentenceSpout extends BaseRichSpout {
         SpoutOutputCollector collector;
         Random random;
@@ -53,9 +67,11 @@ public class AnchoredWordCount extends ConfigurableTopology {
         @Override
         public void nextTuple() {
             Utils.sleep(10);
-            String[] sentences = new String[]{sentence("the cow jumped over 
the moon"), sentence("an apple a day keeps the doctor away"),
-                    sentence("four score and seven years ago"),
-                    sentence("snow white and the seven dwarfs"), sentence("i 
am at two with nature")};
+            String[] sentences = new String[]{
+                sentence("the cow jumped over the moon"), sentence("an apple a 
day keeps the doctor away"),
+                sentence("four score and seven years ago"),
+                sentence("snow white and the seven dwarfs"), sentence("i am at 
two with nature")
+            };
             final String sentence = 
sentences[random.nextInt(sentences.length)];
 
             this.collector.emit(new Values(sentence), UUID.randomUUID());
@@ -79,12 +95,11 @@ public class AnchoredWordCount extends ConfigurableTopology 
{
         }
     }
 
-
     public static class SplitSentence extends BaseBasicBolt {
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             String sentence = tuple.getString(0);
-            for (String word: sentence.split("\\s+")) {
+            for (String word : sentence.split("\\s+")) {
                 collector.emit(new Values(word, 1));
             }
         }
@@ -115,25 +130,4 @@ public class AnchoredWordCount extends 
ConfigurableTopology {
             declarer.declare(new Fields("word", "count"));
         }
     }
-
-    protected int run(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout("spout", new RandomSentenceSpout(), 4);
-
-        builder.setBolt("split", new SplitSentence(), 
4).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", 
new Fields("word"));
-
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(3);
-
-        String topologyName = "word-count";
-
-        conf.setNumWorkers(3);
-
-        if (args != null && args.length > 0) {
-            topologyName = args[0];
-        }
-        return submit(topologyName, conf, builder);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
index 325809c..83bad13 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import org.apache.storm.Config;
@@ -35,19 +30,6 @@ import org.apache.storm.utils.DRPCClient;
  * @see <a 
href="http://storm.apache.org/documentation/Distributed-RPC.html";>Distributed 
RPC</a>
  */
 public class BasicDRPCTopology {
-    public static class ExclaimBolt extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String input = tuple.getString(1);
-            collector.emit(new Values(tuple.getValue(0), input + "!"));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "result"));
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         LinearDRPCTopologyBuilder builder = new 
LinearDRPCTopologyBuilder("exclamation");
         builder.addBolt(new ExclaimBolt(), 3);
@@ -56,12 +38,12 @@ public class BasicDRPCTopology {
         String topoName = "DRPCExample";
 
         if (args != null && args.length > 0) {
-            topoName = args[0]; 
+            topoName = args[0];
         }
 
         conf.setNumWorkers(3);
         StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createRemoteTopology());
-        
+
         if (args.length > 1) {
             try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
                 for (int i = 1; i < args.length; i++) {
@@ -71,4 +53,17 @@ public class BasicDRPCTopology {
             }
         }
     }
+
+    public static class ExclaimBolt extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String input = tuple.getString(1);
+            collector.emit(new Values(tuple.getValue(0), input + "!"));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "result"));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java
index 2fc28d2..caa751a 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -1,27 +1,36 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.StringTokenizer;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
 import org.apache.storm.blobstore.ClientBlobStore;
-
 import org.apache.storm.generated.AccessControl;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
@@ -38,7 +47,6 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
@@ -46,26 +54,11 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.StringTokenizer;
-
 public class BlobStoreAPIWordCountTopology {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
     private static ClientBlobStore store; // Client API to invoke blob store 
API functionality
     private static String key = "key";
     private static String fileName = "blacklist.txt";
-    private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
 
     public static void prepare() {
         Config conf = new Config();
@@ -73,114 +66,16 @@ public class BlobStoreAPIWordCountTopology {
         store = Utils.getClientBlobStore(conf);
     }
 
-    // Spout implementation
-    public static class RandomSentenceSpout extends BaseRichSpout {
-        SpoutOutputCollector _collector;
-
-        @Override
-        public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-            _collector = collector;
-        }
-
-        @Override
-        public void nextTuple() {
-            Utils.sleep(100);
-            _collector.emit(new Values(getRandomSentence()));
-        }
-
-        @Override
-        public void ack(Object id) {
-        }
-
-        @Override
-        public void fail(Object id) {
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sentence"));
-        }
-
-    }
-
-    // Bolt implementation
-    public static class SplitSentence extends ShellBolt implements IRichBolt {
-
-        public SplitSentence() {
-            super("python", "splitsentence.py");
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-
-        @Override
-        public Map<String, Object> getComponentConfiguration() {
-            return null;
-        }
-    }
-
-    public static class FilterWords extends BaseBasicBolt {
-        boolean poll = false;
-        long pollTime;
-        Set<String> wordSet;
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            // Thread Polling every 5 seconds to update the wordSet seconds 
which is
-            // used in FilterWords bolt to filter the words
-            try {
-                if (!poll) {
-                    wordSet = parseFile(fileName);
-                    pollTime = System.currentTimeMillis();
-                    poll = true;
-                } else {
-                    if ((System.currentTimeMillis() - pollTime) > 5000) {
-                        wordSet = parseFile(fileName);
-                        pollTime = System.currentTimeMillis();
-                    }
-                }
-            } catch (IOException exp) {
-                throw new RuntimeException(exp);
-            }
-            if (wordSet !=null && !wordSet.contains(word)) {
-                collector.emit(new Values(word));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-    }
-
-    public void buildAndLaunchWordCountTopology(String[] args) {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomSentenceSpout(), 5);
-        builder.setBolt("split", new SplitSentence(), 
8).shuffleGrouping("spout");
-        builder.setBolt("filter", new FilterWords(), 
6).shuffleGrouping("split");
-
-        Config conf = new Config();
-        conf.setDebug(true);
-        try {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, 
builder.createTopology());
-        } catch (InvalidTopologyException | AuthorizationException | 
AlreadyAliveException exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-
     // Equivalent create command on command line
     // storm blobstore create --file blacklist.txt --acl o::rwa key
     private static void createBlobWithContent(String blobKey, ClientBlobStore 
clientBlobStore, File file)
-            throws AuthorizationException, KeyAlreadyExistsException, 
IOException,KeyNotFoundException {
+        throws AuthorizationException, KeyAlreadyExistsException, IOException, 
KeyNotFoundException {
         String stringBlobACL = "o::rwa";
         AccessControl blobACL = 
BlobStoreAclHandler.parseAccessControl(stringBlobACL);
         List<AccessControl> acls = new LinkedList<AccessControl>();
         acls.add(blobACL); // more ACLs can be added here
         SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls);
-        AtomicOutputStream blobStream = 
clientBlobStore.createBlob(blobKey,settableBlobMeta);
+        AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey, 
settableBlobMeta);
         blobStream.write(readFile(file).toString().getBytes());
         blobStream.close();
     }
@@ -188,15 +83,17 @@ public class BlobStoreAPIWordCountTopology {
     // Equivalent update command on command line
     // storm blobstore update --file blacklist.txt key
     private static void updateBlobWithContent(String blobKey, ClientBlobStore 
clientBlobStore, File file)
-            throws KeyNotFoundException, AuthorizationException, IOException {
+        throws KeyNotFoundException, AuthorizationException, IOException {
         AtomicOutputStream blobOutputStream = 
clientBlobStore.updateBlob(blobKey);
         blobOutputStream.write(readFile(file).toString().getBytes());
         blobOutputStream.close();
     }
 
     private static String getRandomSentence() {
-        String[] sentences = new String[]{ "the cow jumped over the moon", "an 
apple a day keeps the doctor away",
-                "four score and seven years ago", "snow white and the seven 
dwarfs", "i am at two with nature" };
+        String[] sentences = new String[]{
+            "the cow jumped over the moon", "an apple a day keeps the doctor 
away",
+            "four score and seven years ago", "snow white and the seven 
dwarfs", "i am at two with nature"
+        };
         String sentence = sentences[new Random().nextInt(sentences.length)];
         return sentence;
     }
@@ -204,10 +101,12 @@ public class BlobStoreAPIWordCountTopology {
     private static Set<String> getRandomWordSet() {
         Set<String> randomWordSet = new HashSet<>();
         Random random = new Random();
-        String[] words = new String[]{ "cow", "jumped", "over", "the", "moon", 
"apple", "day", "doctor", "away",
-                "four", "seven", "ago", "snow", "white", "seven", "dwarfs", 
"nature", "two" };
+        String[] words = new String[]{
+            "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", 
"away",
+            "four", "seven", "ago", "snow", "white", "seven", "dwarfs", 
"nature", "two"
+        };
         // Choosing atmost 5 words to update the blacklist file for filtering
-        for (int i=0; i<5; i++) {
+        for (int i = 0; i < 5; i++) {
             randomWordSet.add(words[random.nextInt(words.length)]);
         }
         return randomWordSet;
@@ -259,11 +158,11 @@ public class BlobStoreAPIWordCountTopology {
     }
 
     // Writing random words to be blacklisted
-    public static void writeToFile(File file, Set<String> content) throws 
IOException{
+    public static void writeToFile(File file, Set<String> content) throws 
IOException {
         FileWriter fw = new FileWriter(file, false);
         BufferedWriter bw = new BufferedWriter(fw);
         Iterator<String> iter = content.iterator();
-        while(iter.hasNext()) {
+        while (iter.hasNext()) {
             bw.write(iter.next());
             bw.write(System.lineSeparator());
         }
@@ -286,7 +185,7 @@ public class BlobStoreAPIWordCountTopology {
             wc.buildAndLaunchWordCountTopology(args);
 
             // Updating file few times every 5 seconds
-            for(int i=0; i<10; i++) {
+            for (int i = 0; i < 10; i++) {
                 updateBlobWithContent(key, store, updateFile(file));
                 Utils.sleep(5000);
             }
@@ -296,6 +195,105 @@ public class BlobStoreAPIWordCountTopology {
             throw new RuntimeException(exp);
         }
     }
+
+    public void buildAndLaunchWordCountTopology(String[] args) {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new RandomSentenceSpout(), 5);
+        builder.setBolt("split", new SplitSentence(), 
8).shuffleGrouping("spout");
+        builder.setBolt("filter", new FilterWords(), 
6).shuffleGrouping("split");
+
+        Config conf = new Config();
+        conf.setDebug(true);
+        try {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, 
builder.createTopology());
+        } catch (InvalidTopologyException | AuthorizationException | 
AlreadyAliveException exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    // Spout implementation
+    public static class RandomSentenceSpout extends BaseRichSpout {
+        SpoutOutputCollector _collector;
+
+        @Override
+        public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            _collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(100);
+            _collector.emit(new Values(getRandomSentence()));
+        }
+
+        @Override
+        public void ack(Object id) {
+        }
+
+        @Override
+        public void fail(Object id) {
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence"));
+        }
+
+    }
+
+    // Bolt implementation
+    public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+        public SplitSentence() {
+            super("python", "splitsentence.py");
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            return null;
+        }
+    }
+
+    public static class FilterWords extends BaseBasicBolt {
+        boolean poll = false;
+        long pollTime;
+        Set<String> wordSet;
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            // Thread Polling every 5 seconds to update the wordSet seconds 
which is
+            // used in FilterWords bolt to filter the words
+            try {
+                if (!poll) {
+                    wordSet = parseFile(fileName);
+                    pollTime = System.currentTimeMillis();
+                    poll = true;
+                } else {
+                    if ((System.currentTimeMillis() - pollTime) > 5000) {
+                        wordSet = parseFile(fileName);
+                        pollTime = System.currentTimeMillis();
+                    }
+                }
+            } catch (IOException exp) {
+                throw new RuntimeException(exp);
+            }
+            if (wordSet != null && !wordSet.contains(word)) {
+                collector.emit(new Values(word));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index a691201..73f2067 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.util.Map;
-
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.TestWordSpout;
@@ -35,48 +29,48 @@ import org.apache.storm.tuple.Values;
  */
 public class ExclamationTopology extends ConfigurableTopology {
 
-  public static class ExclamationBolt extends BaseRichBolt {
-    OutputCollector _collector;
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
-      _collector = collector;
+    public static void main(String[] args) throws Exception {
+        ConfigurableTopology.start(new ExclamationTopology(), args);
     }
 
-    @Override
-    public void execute(Tuple tuple) {
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
-    }
+    protected int run(String[] args) {
+        TopologyBuilder builder = new TopologyBuilder();
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
+        builder.setSpout("word", new TestWordSpout(), 10);
+        builder.setBolt("exclaim1", new ExclamationBolt(), 
3).shuffleGrouping("word");
+        builder.setBolt("exclaim2", new ExclamationBolt(), 
2).shuffleGrouping("exclaim1");
 
-  }
+        conf.setDebug(true);
 
-  public static void main(String[] args) throws Exception {
-    ConfigurableTopology.start(new ExclamationTopology(), args);
-  }
+        String topologyName = "test";
 
-  protected int run(String[] args) {
-    TopologyBuilder builder = new TopologyBuilder();
+        conf.setNumWorkers(3);
 
-    builder.setSpout("word", new TestWordSpout(), 10);
-    builder.setBolt("exclaim1", new ExclamationBolt(), 
3).shuffleGrouping("word");
-    builder.setBolt("exclaim2", new ExclamationBolt(), 
2).shuffleGrouping("exclaim1");
+        if (args != null && args.length > 0) {
+            topologyName = args[0];
+        }
 
-    conf.setDebug(true);
+        return submit(topologyName, conf, builder);
+    }
 
-    String topologyName = "test";
+    public static class ExclamationBolt extends BaseRichBolt {
+        OutputCollector _collector;
 
-    conf.setNumWorkers(3);
+        @Override
+        public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
+            _collector = collector;
+        }
 
-    if (args != null && args.length > 0) {
-      topologyName = args[0];
-    }
+        @Override
+        public void execute(Tuple tuple) {
+            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+            _collector.ack(tuple);
+        }
 
-    return submit(topologyName, conf, builder);
-  }
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
index e171557..f881a86 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
@@ -1,25 +1,30 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.*;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -30,168 +35,165 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.NimbusClient;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
+import org.apache.storm.utils.Utils;
 
 /**
  * WordCount but the spout does not stop, and the bolts are implemented in
  * java.  This can show how fast the word count can run.
  */
 public class FastWordCountTopology {
-  public static class FastRandomSentenceSpout extends BaseRichSpout {
-    SpoutOutputCollector _collector;
-    Random _rand;
-    private static final String[] CHOICES = {
-        "marry had a little lamb whos fleese was white as snow",
-        "and every where that marry went the lamb was sure to go",
-        "one two three four five six seven eight nine ten",
-        "this is a test of the emergency broadcast system this is only a test",
-        "peter piper picked a peck of pickeled peppers"
-    };
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-      _collector = collector;
-      _rand = ThreadLocalRandom.current();
+    public static void printMetrics(Nimbus.Iface client, String name) throws 
Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        String id = null;
+        for (TopologySummary ts : summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                id = ts.get_id();
+            }
+        }
+        if (id == null) {
+            throw new Exception("Could not find a topology named " + name);
+        }
+        TopologyInfo info = client.getTopologyInfo(id);
+        int uptime = info.get_uptime_secs();
+        long acked = 0;
+        long failed = 0;
+        double weightedAvgTotal = 0.0;
+        for (ExecutorSummary exec : info.get_executors()) {
+            if ("spout".equals(exec.get_component_id())) {
+                SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                Map<String, Long> failedMap = 
stats.get_failed().get(":all-time");
+                Map<String, Long> ackedMap = 
stats.get_acked().get(":all-time");
+                Map<String, Double> avgLatMap = 
stats.get_complete_ms_avg().get(":all-time");
+                for (String key : ackedMap.keySet()) {
+                    if (failedMap != null) {
+                        Long tmp = failedMap.get(key);
+                        if (tmp != null) {
+                            failed += tmp;
+                        }
+                    }
+                    long ackVal = ackedMap.get(key);
+                    double latVal = avgLatMap.get(key) * ackVal;
+                    acked += ackVal;
+                    weightedAvgTotal += latVal;
+                }
+            }
+        }
+        double avgLatency = weightedAvgTotal / acked;
+        System.out.println("uptime: " + uptime + " acked: " + acked + " 
avgLatency: " + avgLatency + " acked/sec: " +
+                           (((double) acked) / uptime + " failed: " + failed));
     }
 
-    @Override
-    public void nextTuple() {
-      String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
-      _collector.emit(new Values(sentence), sentence);
+    public static void kill(Nimbus.Iface client, String name) throws Exception 
{
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(name, opts);
     }
 
-    @Override
-    public void ack(Object id) {
-        //Ignored
-    }
+    public static void main(String[] args) throws Exception {
 
-    @Override
-    public void fail(Object id) {
-      _collector.emit(new Values(id), id);
-    }
+        TopologyBuilder builder = new TopologyBuilder();
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("sentence"));
-    }
-  }
-
-  public static class SplitSentence extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String sentence = tuple.getString(0);
-      for (String word: sentence.split("\\s+")) {
-          collector.emit(new Values(word, 1));
-      }
-    }
+        builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
+        builder.setBolt("split", new SplitSentence(), 
4).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", 
new Fields("word"));
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static void printMetrics(Nimbus.Iface client, String name) throws 
Exception {
-    ClusterSummary summary = client.getClusterInfo();
-    String id = null;
-    for (TopologySummary ts: summary.get_topologies()) {
-      if (name.equals(ts.get_name())) {
-        id = ts.get_id();
-      }
-    }
-    if (id == null) {
-      throw new Exception("Could not find a topology named "+name);
-    }
-    TopologyInfo info = client.getTopologyInfo(id);
-    int uptime = info.get_uptime_secs();
-    long acked = 0;
-    long failed = 0;
-    double weightedAvgTotal = 0.0;
-    for (ExecutorSummary exec: info.get_executors()) {
-      if ("spout".equals(exec.get_component_id())) {
-        SpoutStats stats = exec.get_stats().get_specific().get_spout();
-        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        Map<String, Double> avgLatMap = 
stats.get_complete_ms_avg().get(":all-time");
-        for (String key: ackedMap.keySet()) {
-          if (failedMap != null) {
-              Long tmp = failedMap.get(key);
-              if (tmp != null) {
-                  failed += tmp;
-              }
-          }
-          long ackVal = ackedMap.get(key);
-          double latVal = avgLatMap.get(key) * ackVal;
-          acked += ackVal;
-          weightedAvgTotal += latVal;
+        Config conf = new Config();
+        
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+
+        String name = "wc-test";
+        if (args != null && args.length > 0) {
+            name = args[0];
         }
-      }
-    }
-    double avgLatency = weightedAvgTotal/acked;
-    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: 
"+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
-  } 
 
-  public static void kill(Nimbus.Iface client, String name) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(name, opts);
-  } 
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(name, conf, 
builder.createTopology());
 
-  public static void main(String[] args) throws Exception {
+        Map<String, Object> clusterConf = Utils.readStormConfig();
+        clusterConf.putAll(Utils.readCommandLineOpts());
+        Nimbus.Iface client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
 
-    TopologyBuilder builder = new TopologyBuilder();
+        //Sleep for 5 mins
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(30 * 1000);
+            printMetrics(client, name);
+        }
+        kill(client, name);
+    }
+
+    public static class FastRandomSentenceSpout extends BaseRichSpout {
+        private static final String[] CHOICES = {
+            "marry had a little lamb whos fleese was white as snow",
+            "and every where that marry went the lamb was sure to go",
+            "one two three four five six seven eight nine ten",
+            "this is a test of the emergency broadcast system this is only a 
test",
+            "peter piper picked a peck of pickeled peppers"
+        };
+        SpoutOutputCollector _collector;
+        Random _rand;
+
+        @Override
+        public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            _collector = collector;
+            _rand = ThreadLocalRandom.current();
+        }
 
-    builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
+        @Override
+        public void nextTuple() {
+            String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
+            _collector.emit(new Values(sentence), sentence);
+        }
 
-    builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new 
Fields("word"));
+        @Override
+        public void ack(Object id) {
+            //Ignored
+        }
 
-    Config conf = new Config();
-    
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+        @Override
+        public void fail(Object id) {
+            _collector.emit(new Values(id), id);
+        }
 
-    String name = "wc-test";
-    if (args != null && args.length > 0) {
-        name = args[0];
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence"));
+        }
     }
 
-    conf.setNumWorkers(1);
-    StormSubmitter.submitTopologyWithProgressBar(name, conf, 
builder.createTopology());
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
 
-    Map<String, Object> clusterConf = Utils.readStormConfig();
-    clusterConf.putAll(Utils.readCommandLineOpts());
-    Nimbus.Iface client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null) {
+                count = 0;
+            }
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
 
-    //Sleep for 5 mins
-    for (int i = 0; i < 10; i++) {
-        Thread.sleep(30 * 1000);
-        printMetrics(client, name);
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
     }
-    kill(client, name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
index e4a41e2..5d314e0 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
@@ -1,25 +1,28 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.*;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -31,143 +34,141 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.NimbusClient;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.storm.utils.Utils;
 
 public class InOrderDeliveryTest {
-  public static class InOrderSpout extends BaseRichSpout {
-    SpoutOutputCollector _collector;
-    int _base = 0;
-    int _i = 0;
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-      _collector = collector;
-      _base = context.getThisTaskIndex();
+    public static void printMetrics(Nimbus.Iface client, String name) throws 
Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        String id = null;
+        for (TopologySummary ts : summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                id = ts.get_id();
+            }
+        }
+        if (id == null) {
+            throw new Exception("Could not find a topology named " + name);
+        }
+        TopologyInfo info = client.getTopologyInfo(id);
+        int uptime = info.get_uptime_secs();
+        long acked = 0;
+        long failed = 0;
+        double weightedAvgTotal = 0.0;
+        for (ExecutorSummary exec : info.get_executors()) {
+            if ("spout".equals(exec.get_component_id())) {
+                SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                Map<String, Long> failedMap = 
stats.get_failed().get(":all-time");
+                Map<String, Long> ackedMap = 
stats.get_acked().get(":all-time");
+                Map<String, Double> avgLatMap = 
stats.get_complete_ms_avg().get(":all-time");
+                for (String key : ackedMap.keySet()) {
+                    if (failedMap != null) {
+                        Long tmp = failedMap.get(key);
+                        if (tmp != null) {
+                            failed += tmp;
+                        }
+                    }
+                    long ackVal = ackedMap.get(key);
+                    double latVal = avgLatMap.get(key) * ackVal;
+                    acked += ackVal;
+                    weightedAvgTotal += latVal;
+                }
+            }
+        }
+        double avgLatency = weightedAvgTotal / acked;
+        System.out.println("uptime: " + uptime + " acked: " + acked + " 
avgLatency: " + avgLatency + " acked/sec: " +
+                           (((double) acked) / uptime + " failed: " + failed));
     }
 
-    @Override
-    public void nextTuple() {
-      Values v = new Values(_base, _i);
-      _collector.emit(v, "ACK");
-      _i++;
+    public static void kill(Nimbus.Iface client, String name) throws Exception 
{
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(name, opts);
     }
 
-    @Override
-    public void ack(Object id) {
-      //Ignored
-    }
+    public static void main(String[] args) throws Exception {
 
-    @Override
-    public void fail(Object id) {
-      //Ignored
-    }
+        TopologyBuilder builder = new TopologyBuilder();
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("c1", "c2"));
-    }
-  }
-
-  public static class Check extends BaseBasicBolt {
-    Map<Integer, Integer> expected = new HashMap<Integer, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      Integer c1 = tuple.getInteger(0);
-      Integer c2 = tuple.getInteger(1);
-      Integer exp = expected.get(c1);
-      if (exp == null) exp = 0;
-      if (c2.intValue() != exp.intValue()) {
-          System.out.println(c1+" "+c2+" != "+exp);
-          throw new FailedException(c1+" "+c2+" != "+exp);
-      }
-      exp = c2 + 1;
-      expected.put(c1, exp);
-    }
+        builder.setSpout("spout", new InOrderSpout(), 8);
+        builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new 
Fields("c1"));
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      //Empty
-    }
-  }
-
-  public static void printMetrics(Nimbus.Iface client, String name) throws 
Exception {
-    ClusterSummary summary = client.getClusterInfo();
-    String id = null;
-    for (TopologySummary ts: summary.get_topologies()) {
-      if (name.equals(ts.get_name())) {
-        id = ts.get_id();
-      }
-    }
-    if (id == null) {
-      throw new Exception("Could not find a topology named "+name);
-    }
-    TopologyInfo info = client.getTopologyInfo(id);
-    int uptime = info.get_uptime_secs();
-    long acked = 0;
-    long failed = 0;
-    double weightedAvgTotal = 0.0;
-    for (ExecutorSummary exec: info.get_executors()) {
-      if ("spout".equals(exec.get_component_id())) {
-        SpoutStats stats = exec.get_stats().get_specific().get_spout();
-        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        Map<String, Double> avgLatMap = 
stats.get_complete_ms_avg().get(":all-time");
-        for (String key: ackedMap.keySet()) {
-          if (failedMap != null) {
-              Long tmp = failedMap.get(key);
-              if (tmp != null) {
-                  failed += tmp;
-              }
-          }
-          long ackVal = ackedMap.get(key);
-          double latVal = avgLatMap.get(key) * ackVal;
-          acked += ackVal;
-          weightedAvgTotal += latVal;
+        Config conf = new Config();
+        
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+
+        String name = "in-order-test";
+        if (args != null && args.length > 0) {
+            name = args[0];
+        }
+
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(name, conf, 
builder.createTopology());
+
+        Map<String, Object> clusterConf = Utils.readStormConfig();
+        clusterConf.putAll(Utils.readCommandLineOpts());
+        Nimbus.Iface client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
+
+        //Sleep for 50 mins
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(30 * 1000);
+            printMetrics(client, name);
         }
-      }
+        kill(client, name);
     }
-    double avgLatency = weightedAvgTotal/acked;
-    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: 
"+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
-  } 
 
-  public static void kill(Nimbus.Iface client, String name) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(name, opts);
-  } 
+    public static class InOrderSpout extends BaseRichSpout {
+        SpoutOutputCollector _collector;
+        int _base = 0;
+        int _i = 0;
 
-  public static void main(String[] args) throws Exception {
+        @Override
+        public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            _collector = collector;
+            _base = context.getThisTaskIndex();
+        }
 
-    TopologyBuilder builder = new TopologyBuilder();
+        @Override
+        public void nextTuple() {
+            Values v = new Values(_base, _i);
+            _collector.emit(v, "ACK");
+            _i++;
+        }
 
-    builder.setSpout("spout", new InOrderSpout(), 8);
-    builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new 
Fields("c1"));
+        @Override
+        public void ack(Object id) {
+            //Ignored
+        }
 
-    Config conf = new Config();
-    
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+        @Override
+        public void fail(Object id) {
+            //Ignored
+        }
 
-    String name = "in-order-test";
-    if (args != null && args.length > 0) {
-        name = args[0];
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("c1", "c2"));
+        }
     }
 
-    conf.setNumWorkers(1);
-    StormSubmitter.submitTopologyWithProgressBar(name, conf, 
builder.createTopology());
-
-    Map<String, Object> clusterConf = Utils.readStormConfig();
-    clusterConf.putAll(Utils.readCommandLineOpts());
-    Nimbus.Iface client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
+    public static class Check extends BaseBasicBolt {
+        Map<Integer, Integer> expected = new HashMap<Integer, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            Integer c1 = tuple.getInteger(0);
+            Integer c2 = tuple.getInteger(1);
+            Integer exp = expected.get(c1);
+            if (exp == null) exp = 0;
+            if (c2.intValue() != exp.intValue()) {
+                System.out.println(c1 + " " + c2 + " != " + exp);
+                throw new FailedException(c1 + " " + c2 + " != " + exp);
+            }
+            exp = c2 + 1;
+            expected.put(c1, exp);
+        }
 
-    //Sleep for 50 mins
-    for (int i = 0; i < 50; i++) {
-        Thread.sleep(30 * 1000);
-        printMetrics(client, name);
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            //Empty
+        }
     }
-    kill(client, name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
index b71b64a..6fc3739 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.util.concurrent.TimeUnit;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.bolt.JoinBolt;
@@ -33,8 +27,8 @@ import org.apache.storm.utils.NimbusClient;
 public class JoinBoltExample {
     public static void main(String[] args) throws Exception {
         if (!NimbusClient.isLocalOverride()) {
-            throw new IllegalStateException("This example only works in local 
mode.  " 
-                    + "Run with storm local not storm jar");
+            throw new IllegalStateException("This example only works in local 
mode.  "
+                                            + "Run with storm local not storm 
jar");
         }
         FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
         FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
@@ -45,15 +39,15 @@ public class JoinBoltExample {
 
         // inner join of 'age' and 'gender' records on 'id' field
         JoinBolt joiner = new JoinBolt("genderSpout", "id")
-                                 .join("ageSpout",    "id", "genderSpout")
-                                 .select 
("genderSpout:id,ageSpout:id,gender,age")
-                .withTumblingWindow( new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS) );
+            .join("ageSpout", "id", "genderSpout")
+            .select("genderSpout:id,ageSpout:id,gender,age")
+            .withTumblingWindow(new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS));
 
         builder.setBolt("joiner", joiner)
-                .fieldsGrouping("genderSpout", new Fields("id"))
-                .fieldsGrouping("ageSpout", new Fields("id"))         ;
+               .fieldsGrouping("genderSpout", new Fields("id"))
+               .fieldsGrouping("ageSpout", new Fields("id"));
 
-        builder.setBolt("printer", new PrinterBolt() 
).shuffleGrouping("joiner");
+        builder.setBolt("printer", new 
PrinterBolt()).shuffleGrouping("joiner");
 
         Config conf = new Config();
         StormSubmitter.submitTopologyWithProgressBar("join-example", conf, 
builder.createTopology());
@@ -74,8 +68,7 @@ public class JoinBoltExample {
             String gender;
             if (i % 2 == 0) {
                 gender = "male";
-            }
-            else {
+            } else {
                 gender = "female";
             }
             genderSpout.feed(new Values(i, gender));

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
index 61b02db..94b1c38 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
@@ -1,30 +1,24 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
+import java.io.Serializable;
+import java.util.UUID;
 import org.apache.storm.Config;
 import org.apache.storm.topology.ConfigurableTopology;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Values;
 
-import java.io.Serializable;
-import java.util.UUID;
-
 public class LambdaTopology extends ConfigurableTopology {
     public static void main(String[] args) {
         ConfigurableTopology.start(new LambdaTopology(), args);

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
index 110d0be..2038399 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import org.apache.storm.Config;
@@ -31,22 +26,6 @@ import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.DRPCClient;
 
 public class ManualDRPC {
-    public static class ExclamationBolt extends BaseBasicBolt {
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("result", "return-info"));
-        }
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String arg = tuple.getString(0);
-            Object retInfo = tuple.getValue(1);
-            collector.emit(new Values(arg + "!!!", retInfo));
-        }
-
-    }
-
     public static void main(String[] args) throws Exception {
         TopologyBuilder builder = new TopologyBuilder();
 
@@ -62,4 +41,20 @@ public class ManualDRPC {
             System.out.println(drpc.execute("exclamation", "bbb"));
         }
     }
+
+    public static class ExclamationBolt extends BaseBasicBolt {
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("result", "return-info"));
+        }
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String arg = tuple.getString(0);
+            Object retInfo = tuple.getValue(1);
+            collector.emit(new Values(arg + "!!!", retInfo));
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
index aeedc78..954e195 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.task.OutputCollector;
@@ -37,60 +31,60 @@ import org.slf4j.LoggerFactory;
  * This is a basic example of a Storm topology.
  */
 public class MultipleLoggerTopology {
-  public static class ExclamationLoggingBolt extends BaseRichBolt {
-    OutputCollector _collector;
-    Logger _rootLogger = LoggerFactory.getLogger (Logger.ROOT_LOGGER_NAME);
-    // ensure the loggers are configured in the worker.xml before
-    // trying to use them here
-    Logger _logger = LoggerFactory.getLogger ("com.myapp");
-    Logger _subLogger = LoggerFactory.getLogger ("com.myapp.sub");
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
 
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
-      _collector = collector;
-    }
+        builder.setSpout("word", new TestWordSpout(), 10);
+        builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 
3).shuffleGrouping("word");
+        builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 
2).shuffleGrouping("exclaim1");
 
-    @Override
-    public void execute(Tuple tuple) {
-      _rootLogger.debug ("root: This is a DEBUG message");
-      _rootLogger.info ("root: This is an INFO message");
-      _rootLogger.warn ("root: This is a WARN message");
-      _rootLogger.error ("root: This is an ERROR message");
+        Config conf = new Config();
+        conf.setDebug(true);
+        String topoName = MultipleLoggerTopology.class.getName();
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(2);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
+    }
 
-      _logger.debug ("myapp: This is a DEBUG message");
-      _logger.info ("myapp: This is an INFO message");
-      _logger.warn ("myapp: This is a WARN message");
-      _logger.error ("myapp: This is an ERROR message");
+    public static class ExclamationLoggingBolt extends BaseRichBolt {
+        OutputCollector _collector;
+        Logger _rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+        // ensure the loggers are configured in the worker.xml before
+        // trying to use them here
+        Logger _logger = LoggerFactory.getLogger("com.myapp");
+        Logger _subLogger = LoggerFactory.getLogger("com.myapp.sub");
 
-      _subLogger.debug ("myapp.sub: This is a DEBUG message");
-      _subLogger.info ("myapp.sub: This is an INFO message");
-      _subLogger.warn ("myapp.sub: This is a WARN message");
-      _subLogger.error ("myapp.sub: This is an ERROR message");
+        @Override
+        public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
+            _collector = collector;
+        }
 
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
-    }
+        @Override
+        public void execute(Tuple tuple) {
+            _rootLogger.debug("root: This is a DEBUG message");
+            _rootLogger.info("root: This is an INFO message");
+            _rootLogger.warn("root: This is a WARN message");
+            _rootLogger.error("root: This is an ERROR message");
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-  }
+            _logger.debug("myapp: This is a DEBUG message");
+            _logger.info("myapp: This is an INFO message");
+            _logger.warn("myapp: This is a WARN message");
+            _logger.error("myapp: This is an ERROR message");
 
-  public static void main(String[] args) throws Exception {
-    TopologyBuilder builder = new TopologyBuilder();
+            _subLogger.debug("myapp.sub: This is a DEBUG message");
+            _subLogger.info("myapp.sub: This is an INFO message");
+            _subLogger.warn("myapp.sub: This is a WARN message");
+            _subLogger.error("myapp.sub: This is an ERROR message");
 
-    builder.setSpout("word", new TestWordSpout(), 10);
-    builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 
3).shuffleGrouping("word");
-    builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 
2).shuffleGrouping("exclaim1");
+            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+            _collector.ack(tuple);
+        }
 
-    Config conf = new Config();
-    conf.setDebug(true);
-    String topoName = MultipleLoggerTopology.class.getName();
-    if (args != null && args.length > 0) {
-      topoName = args[0];
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
     }
-    conf.setNumWorkers(2);
-    StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java
index 566ab69..46f29a0 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java
@@ -18,8 +18,6 @@
 
 package org.apache.storm.starter;
 
-import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
-
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -40,18 +38,18 @@ import org.apache.storm.windowing.TupleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+
 /**
- * An example that demonstrates the usage of {@link 
org.apache.storm.topology.IStatefulWindowedBolt} with window
- * persistence.
+ * An example that demonstrates the usage of {@link 
org.apache.storm.topology.IStatefulWindowedBolt} with window persistence.
  * <p>
- * The framework automatically checkpoints the tuples in the window along with 
the bolt's state and restores the same
- * during restarts.
+ * The framework automatically checkpoints the tuples in the window along with 
the bolt's state and restores the same during restarts.
  * </p>
  *
  * <p>
- * This topology uses 'redis' for state persistence, so you should also start 
a redis instance before deploying.
- * If you are running in local mode you can just start a redis server locally 
which will be used for storing the state. The default
- * RedisKeyValueStateProvider parameters can be overridden by setting {@link 
Config#TOPOLOGY_STATE_PROVIDER_CONFIG}, for e.g.
+ * This topology uses 'redis' for state persistence, so you should also start 
a redis instance before deploying. If you are running in local
+ * mode you can just start a redis server locally which will be used for 
storing the state. The default RedisKeyValueStateProvider
+ * parameters can be overridden by setting {@link 
Config#TOPOLOGY_STATE_PROVIDER_CONFIG}, for e.g.
  * <pre>
  * {
  *   "jedisPoolConfig": {
@@ -68,6 +66,47 @@ import org.slf4j.LoggerFactory;
 public class PersistentWindowingTopology {
     private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowingTopology.class);
 
+    /**
+     * Create and deploy the topology.
+     *
+     * @param args args
+     * @throws Exception exception
+     */
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        // generate random numbers
+        builder.setSpout("spout", new RandomIntegerSpout());
+
+        // emits sliding window and global averages
+        builder.setBolt("avgbolt", new AvgBolt()
+            .withWindow(new Duration(10, TimeUnit.SECONDS), new Duration(2, 
TimeUnit.SECONDS))
+            // persist the window in state
+            .withPersistence()
+            // max number of events to be cached in memory
+            .withMaxEventsInMemory(25000), 1)
+               .shuffleGrouping("spout");
+
+        // print the values to stdout
+        builder.setBolt("printer", (x, y) -> 
System.out.println(x.getValue(0)), 1).shuffleGrouping("avgbolt");
+
+        Config conf = new Config();
+        conf.setDebug(false);
+
+        // checkpoint the state every 5 seconds
+        conf.put(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL, 5000);
+
+        // use redis for state persistence
+        conf.put(Config.TOPOLOGY_STATE_PROVIDER, 
"org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+        String topoName = "test";
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
+    }
+
     // wrapper to hold global and window averages
     private static class Averages {
         private final double global;
@@ -131,46 +170,4 @@ public class PersistentWindowingTopology {
         }
     }
 
-
-    /**
-     * Create and deploy the topology.
-     *
-     * @param args args
-     * @throws Exception exception
-     */
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-
-        // generate random numbers
-        builder.setSpout("spout", new RandomIntegerSpout());
-
-        // emits sliding window and global averages
-        builder.setBolt("avgbolt", new AvgBolt()
-            .withWindow(new Duration(10, TimeUnit.SECONDS), new Duration(2, 
TimeUnit.SECONDS))
-            // persist the window in state
-            .withPersistence()
-            // max number of events to be cached in memory
-            .withMaxEventsInMemory(25000), 1)
-            .shuffleGrouping("spout");
-
-        // print the values to stdout
-        builder.setBolt("printer", (x, y) -> 
System.out.println(x.getValue(0)), 1).shuffleGrouping("avgbolt");
-
-        Config conf = new Config();
-        conf.setDebug(false);
-
-        // checkpoint the state every 5 seconds
-        conf.put(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL, 5000);
-
-        // use redis for state persistence
-        conf.put(Config.TOPOLOGY_STATE_PROVIDER, 
"org.apache.storm.redis.state.RedisKeyValueStateProvider");
-
-        String topoName = "test";
-        if (args != null && args.length > 0) {
-            topoName = args[0];
-        }
-        conf.setNumWorkers(1);
-        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
-    }
-
 }

Reply via email to