http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
index 4da1c85..1e57a94 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.util.Arrays;
@@ -23,7 +18,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.coordination.BatchOutputCollector;
@@ -71,6 +65,34 @@ public class ReachTopology {
         put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
     }};
 
+    public static LinearDRPCTopologyBuilder construct() {
+        LinearDRPCTopologyBuilder builder = new 
LinearDRPCTopologyBuilder("reach");
+        builder.addBolt(new GetTweeters(), 4);
+        builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
+        builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new 
Fields("id", "follower"));
+        builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new 
Fields("id"));
+        return builder;
+    }
+
+    public static void main(String[] args) throws Exception {
+        LinearDRPCTopologyBuilder builder = construct();
+
+        Config conf = new Config();
+        conf.setNumWorkers(6);
+        String topoName = "reach-drpc";
+        if (args.length > 0) {
+            topoName = args[0];
+        }
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createRemoteTopology());
+
+        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+            String[] urlsToTry = new String[]{ "foo.com/blog/1", 
"engineering.twitter.com/blog/5", "notaurl.com" };
+            for (String url : urlsToTry) {
+                System.out.println("Reach of " + url + ": " + 
drpc.execute("reach", url));
+            }
+        }
+    }
+
     public static class GetTweeters extends BaseBasicBolt {
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
@@ -162,32 +184,4 @@ public class ReachTopology {
             declarer.declare(new Fields("id", "reach"));
         }
     }
-
-    public static LinearDRPCTopologyBuilder construct() {
-        LinearDRPCTopologyBuilder builder = new 
LinearDRPCTopologyBuilder("reach");
-        builder.addBolt(new GetTweeters(), 4);
-        builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
-        builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new 
Fields("id", "follower"));
-        builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new 
Fields("id"));
-        return builder;
-    }
-
-    public static void main(String[] args) throws Exception {
-        LinearDRPCTopologyBuilder builder = construct();
-
-        Config conf = new Config();
-        conf.setNumWorkers(6);
-        String topoName = "reach-drpc";
-        if (args.length > 0) {
-            topoName = args[0];
-        }
-        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createRemoteTopology());
-
-        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
-            String[] urlsToTry = new String[]{ "foo.com/blog/1", 
"engineering.twitter.com/blog/5", "notaurl.com" };
-            for (String url : urlsToTry) {
-                System.out.println("Reach of " + url + ": " + 
drpc.execute("reach", url));
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
index 07149e8..18af7ea 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.starter;
@@ -22,7 +16,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.task.OutputCollector;
@@ -39,53 +32,6 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
 public class ResourceAwareExampleTopology {
-    public static class ExclamationBolt extends BaseRichBolt {
-        //Have a crummy cache to show off shared memory accounting
-        private static final ConcurrentHashMap<String, String> myCrummyCache =
-            new ConcurrentHashMap<>();
-        private static final int CACHE_SIZE = 100_000;
-        OutputCollector _collector;
-
-        protected static String getFromCache(String key) {
-            return myCrummyCache.get(key);
-        }
-
-        protected static void addToCache(String key, String value) {
-            myCrummyCache.putIfAbsent(key, value);
-            int numToRemove = myCrummyCache.size() - CACHE_SIZE;
-            if (numToRemove > 0) {
-                //Remove something randomly...
-                Iterator<Entry<String, String>> it = 
myCrummyCache.entrySet().iterator();
-                for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
-                    it.next();
-                    it.remove();
-                }
-            }
-        }
-
-        @Override
-        public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
-            _collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple tuple) {
-            String orig = tuple.getString(0);
-            String ret = getFromCache(orig);
-            if (ret == null) {
-                ret = orig + "!!!";
-                addToCache(orig, ret);
-            }
-            _collector.emit(tuple, new Values(ret));
-            _collector.ack(tuple);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         TopologyBuilder builder = new TopologyBuilder();
 
@@ -154,4 +100,51 @@ public class ResourceAwareExampleTopology {
 
         StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
     }
+
+    public static class ExclamationBolt extends BaseRichBolt {
+        //Have a crummy cache to show off shared memory accounting
+        private static final ConcurrentHashMap<String, String> myCrummyCache =
+            new ConcurrentHashMap<>();
+        private static final int CACHE_SIZE = 100_000;
+        OutputCollector _collector;
+
+        protected static String getFromCache(String key) {
+            return myCrummyCache.get(key);
+        }
+
+        protected static void addToCache(String key, String value) {
+            myCrummyCache.putIfAbsent(key, value);
+            int numToRemove = myCrummyCache.size() - CACHE_SIZE;
+            if (numToRemove > 0) {
+                //Remove something randomly...
+                Iterator<Entry<String, String>> it = 
myCrummyCache.entrySet().iterator();
+                for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
+                    it.next();
+                    it.remove();
+                }
+            }
+        }
+
+        @Override
+        public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
+            _collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            String orig = tuple.getString(0);
+            String ret = getFromCache(orig);
+            if (ret == null) {
+                ret = orig + "!!!";
+                addToCache(orig, ret);
+            }
+            _collector.emit(tuple, new Values(ret));
+            _collector.ack(tuple);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
index 78b2baf..23f56c2 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
@@ -35,54 +30,54 @@ import org.slf4j.LoggerFactory;
  */
 public class RollingTopWords extends ConfigurableTopology {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(RollingTopWords.class);
-  private static final int TOP_N = 5;
-
-  private RollingTopWords() {
-  }
+    private static final Logger LOG = 
LoggerFactory.getLogger(RollingTopWords.class);
+    private static final int TOP_N = 5;
 
-  public static void main(String[] args) throws Exception {
-    ConfigurableTopology.start(new RollingTopWords(), args);
-  }
+    private RollingTopWords() {
+    }
 
-  /**
-   * Submits (runs) the topology.
-   *
-   * Usage: "RollingTopWords [topology-name] [-local]"
-   *
-   * By default, the topology is run locally under the name
-   * "slidingWindowCounts".
-   *
-   * Examples:
-   *
-   * ```
-   *
-   * # Runs in remote/cluster mode, with topology name "production-topology"
-   * $ storm jar storm-starter-jar-with-dependencies.jar 
org.apache.storm.starter.RollingTopWords production-topology ```
-   *
-   * @param args
-   *          First positional argument (optional) is topology name, second
-   *          positional argument (optional) defines whether to run the 
topology
-   *          locally ("-local") or remotely, i.e. on a real cluster.
-   * @throws Exception
-   */
-  protected int run(String[] args) {
-    String topologyName = "slidingWindowCounts";
-    if (args.length >= 1) {
-      topologyName = args[0];
+    public static void main(String[] args) throws Exception {
+        ConfigurableTopology.start(new RollingTopWords(), args);
     }
-    TopologyBuilder builder = new TopologyBuilder();
-    String spoutId = "wordGenerator";
-    String counterId = "counter";
-    String intermediateRankerId = "intermediateRanker";
-    String totalRankerId = "finalRanker";
-    builder.setSpout(spoutId, new TestWordSpout(), 5);
-    builder.setBolt(counterId, new RollingCountBolt(9, 3), 
4).fieldsGrouping(spoutId, new Fields("word"));
-    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 
4).fieldsGrouping(counterId,
-        new Fields("obj"));
-    builder.setBolt(totalRankerId, new 
TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
-    LOG.info("Topology name: " + topologyName);
 
-    return submit(topologyName, conf, builder);
-  }
+    /**
+     * Submits (runs) the topology.
+     *
+     * Usage: "RollingTopWords [topology-name] [-local]"
+     *
+     * By default, the topology is run locally under the name
+     * "slidingWindowCounts".
+     *
+     * Examples:
+     *
+     * ```
+     *
+     * # Runs in remote/cluster mode, with topology name "production-topology"
+     * $ storm jar storm-starter-jar-with-dependencies.jar 
org.apache.storm.starter.RollingTopWords production-topology ```
+     *
+     * @param args
+     *          First positional argument (optional) is topology name, second
+     *          positional argument (optional) defines whether to run the 
topology
+     *          locally ("-local") or remotely, i.e. on a real cluster.
+     * @throws Exception
+     */
+    protected int run(String[] args) {
+        String topologyName = "slidingWindowCounts";
+        if (args.length >= 1) {
+            topologyName = args[0];
+        }
+        TopologyBuilder builder = new TopologyBuilder();
+        String spoutId = "wordGenerator";
+        String counterId = "counter";
+        String intermediateRankerId = "intermediateRanker";
+        String totalRankerId = "finalRanker";
+        builder.setSpout(spoutId, new TestWordSpout(), 5);
+        builder.setBolt(counterId, new RollingCountBolt(9, 3), 
4).fieldsGrouping(spoutId, new Fields("word"));
+        builder.setBolt(intermediateRankerId, new 
IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId,
+                                                                               
                      new Fields("obj"));
+        builder.setBolt(totalRankerId, new 
TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+        LOG.info("Topology name: " + topologyName);
+
+        return submit(topologyName, conf, builder);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
index 2e1bb94..4f4b6b6 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import org.apache.storm.Config;
@@ -34,7 +29,7 @@ public class SingleJoinExample {
     public static void main(String[] args) throws Exception {
         if (!NimbusClient.isLocalOverride()) {
             throw new IllegalStateException("This example only works in local 
mode.  "
-                    + "Run with storm local not storm jar");
+                                            + "Run with storm local not storm 
jar");
         }
         FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
         FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
@@ -43,7 +38,7 @@ public class SingleJoinExample {
         builder.setSpout("gender", genderSpout);
         builder.setSpout("age", ageSpout);
         builder.setBolt("join", new SingleJoinBolt(new Fields("gender", 
"age"))).fieldsGrouping("gender", new Fields("id"))
-        .fieldsGrouping("age", new Fields("id"));
+               .fieldsGrouping("age", new Fields("id"));
 
         Config conf = new Config();
         conf.setDebug(true);
@@ -53,8 +48,7 @@ public class SingleJoinExample {
             String gender;
             if (i % 2 == 0) {
                 gender = "male";
-            }
-            else {
+            } else {
                 gender = "female";
             }
             genderSpout.feed(new Values(i, gender));

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
index 4e18217..a3b8296 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
@@ -38,55 +33,55 @@ import org.slf4j.LoggerFactory;
  */
 public class SkewedRollingTopWords extends ConfigurableTopology {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SkewedRollingTopWords.class);
-  private static final int TOP_N = 5;
-
-  private SkewedRollingTopWords() {
-  }
+    private static final Logger LOG = 
LoggerFactory.getLogger(SkewedRollingTopWords.class);
+    private static final int TOP_N = 5;
 
-  public static void main(String[] args) throws Exception {
-    ConfigurableTopology.start(new SkewedRollingTopWords(), args);
-  }
+    private SkewedRollingTopWords() {
+    }
 
-  /**
-   * Submits (runs) the topology.
-   *
-   * Usage: "SkewedRollingTopWords [topology-name] [-local]"
-   *
-   * By default, the topology is run locally under the name
-   * "slidingWindowCounts".
-   *
-   * Examples:
-   *
-   * ```
-   *
-   * # Runs in remote/cluster mode, with topology name "production-topology"
-   * $ storm jar storm-starter-jar-with-dependencies.jar 
org.apache.storm.starter.SkewedRollingTopWords production-topology ```
-   *
-   * @param args
-   *          First positional argument (optional) is topology name, second
-   *          positional argument (optional) defines whether to run the 
topology
-   *          locally ("-local") or remotely, i.e. on a real cluster.
-   * @throws Exception
-   */
-  protected int run(String[] args) {
-    String topologyName = "slidingWindowCounts";
-    if (args.length >= 1) {
-      topologyName = args[0];
+    public static void main(String[] args) throws Exception {
+        ConfigurableTopology.start(new SkewedRollingTopWords(), args);
     }
-    TopologyBuilder builder = new TopologyBuilder();
-    String spoutId = "wordGenerator";
-    String counterId = "counter";
-    String aggId = "aggregator";
-    String intermediateRankerId = "intermediateRanker";
-    String totalRankerId = "finalRanker";
-    builder.setSpout(spoutId, new TestWordSpout(), 5);
-    builder.setBolt(counterId, new RollingCountBolt(9, 3), 
4).partialKeyGrouping(spoutId, new Fields("word"));
-    builder.setBolt(aggId, new RollingCountAggBolt(), 
4).fieldsGrouping(counterId, new Fields("obj"));
-    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 
4).fieldsGrouping(aggId, new Fields("obj"));
-    builder.setBolt(totalRankerId, new 
TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
-    LOG.info("Topology name: " + topologyName);
 
-    return submit(topologyName, conf, builder);
-  }
+    /**
+     * Submits (runs) the topology.
+     *
+     * Usage: "SkewedRollingTopWords [topology-name] [-local]"
+     *
+     * By default, the topology is run locally under the name
+     * "slidingWindowCounts".
+     *
+     * Examples:
+     *
+     * ```
+     *
+     * # Runs in remote/cluster mode, with topology name "production-topology"
+     * $ storm jar storm-starter-jar-with-dependencies.jar 
org.apache.storm.starter.SkewedRollingTopWords production-topology ```
+     *
+     * @param args
+     *          First positional argument (optional) is topology name, second
+     *          positional argument (optional) defines whether to run the 
topology
+     *          locally ("-local") or remotely, i.e. on a real cluster.
+     * @throws Exception
+     */
+    protected int run(String[] args) {
+        String topologyName = "slidingWindowCounts";
+        if (args.length >= 1) {
+            topologyName = args[0];
+        }
+        TopologyBuilder builder = new TopologyBuilder();
+        String spoutId = "wordGenerator";
+        String counterId = "counter";
+        String aggId = "aggregator";
+        String intermediateRankerId = "intermediateRanker";
+        String totalRankerId = "finalRanker";
+        builder.setSpout(spoutId, new TestWordSpout(), 5);
+        builder.setBolt(counterId, new RollingCountBolt(9, 3), 
4).partialKeyGrouping(spoutId, new Fields("word"));
+        builder.setBolt(aggId, new RollingCountAggBolt(), 
4).fieldsGrouping(counterId, new Fields("obj"));
+        builder.setBolt(intermediateRankerId, new 
IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
+        builder.setBolt(totalRankerId, new 
TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+        LOG.info("Topology name: " + topologyName);
+
+        return submit(topologyName, conf, builder);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
index 6204f8c..d179899 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.util.concurrent.TimeUnit;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.bolt.PrinterBolt;
@@ -36,20 +30,20 @@ public class SlidingTupleTsTopology {
     public static void main(String[] args) throws Exception {
         TopologyBuilder builder = new TopologyBuilder();
         BaseWindowedBolt bolt = new SlidingWindowSumBolt()
-                .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, 
TimeUnit.SECONDS))
-                .withTimestampField("ts")
-                .withLag(new Duration(5, TimeUnit.SECONDS));
+            .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, 
TimeUnit.SECONDS))
+            .withTimestampField("ts")
+            .withLag(new Duration(5, TimeUnit.SECONDS));
         builder.setSpout("integer", new RandomIntegerSpout(), 1);
         builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
         builder.setBolt("printer", new PrinterBolt(), 
1).shuffleGrouping("slidingsum");
         Config conf = new Config();
         conf.setDebug(true);
         String topoName = "test";
-        
+
         if (args != null && args.length > 0) {
             topoName = args[0];
         }
-        
+
         conf.setNumWorkers(1);
         StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
index fc7cf4a..a75f999 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
@@ -1,26 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.starter;
 
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.bolt.PrinterBolt;
@@ -47,6 +40,24 @@ public class SlidingWindowTopology {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SlidingWindowTopology.class);
 
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("integer", new RandomIntegerSpout(), 1);
+        builder.setBolt("slidingsum", new 
SlidingWindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1)
+               .shuffleGrouping("integer");
+        builder.setBolt("tumblingavg", new 
TumblingWindowAvgBolt().withTumblingWindow(Count.of(3)), 1)
+               .shuffleGrouping("slidingsum");
+        builder.setBolt("printer", new PrinterBolt(), 
1).shuffleGrouping("tumblingavg");
+        Config conf = new Config();
+        conf.setDebug(true);
+        String topoName = "test";
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
+    }
+
     /*
      * Computes tumbling window average
      */
@@ -65,9 +76,9 @@ public class SlidingWindowTopology {
             LOG.debug("Events in current window: " + tuplesInWindow.size());
             if (tuplesInWindow.size() > 0) {
                 /*
-                * Since this is a tumbling window calculation,
-                * we use all the tuples in the window to compute the avg.
-                */
+                 * Since this is a tumbling window calculation,
+                 * we use all the tuples in the window to compute the avg.
+                 */
                 for (Tuple tuple : tuplesInWindow) {
                     sum += (int) tuple.getValue(0);
                 }
@@ -80,23 +91,4 @@ public class SlidingWindowTopology {
             declarer.declare(new Fields("avg"));
         }
     }
-
-
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("integer", new RandomIntegerSpout(), 1);
-        builder.setBolt("slidingsum", new 
SlidingWindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1)
-                .shuffleGrouping("integer");
-        builder.setBolt("tumblingavg", new 
TumblingWindowAvgBolt().withTumblingWindow(Count.of(3)), 1)
-                .shuffleGrouping("slidingsum");
-        builder.setBolt("printer", new PrinterBolt(), 
1).shuffleGrouping("tumblingavg");
-        Config conf = new Config();
-        conf.setDebug(true);
-        String topoName = "test";
-        if (args != null && args.length > 0) {
-            topoName = args[0];
-        }
-        conf.setNumWorkers(1);
-        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
index e407ce8..3f30f48 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
@@ -19,7 +19,6 @@
 package org.apache.storm.starter;
 
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
@@ -38,20 +37,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An example topology that demonstrates the use of {@link 
org.apache.storm.topology.IStatefulBolt}
- * to manage state. To run the example,
+ * An example topology that demonstrates the use of {@link 
org.apache.storm.topology.IStatefulBolt} to manage state. To run the example,
  * <pre>
  * $ storm jar examples/storm-starter/storm-starter-topologies-*.jar 
storm.starter.StatefulTopology statetopology
  * </pre>
  * <p/>
- * The default state used is 'InMemoryKeyValueState' which does not persist 
the state across restarts. You could use
- * 'RedisKeyValueState' to test state persistence by setting below property in 
conf/storm.yaml
+ * The default state used is 'InMemoryKeyValueState' which does not persist 
the state across restarts. You could use 'RedisKeyValueState' to
+ * test state persistence by setting below property in conf/storm.yaml
  * <pre>
  * topology.state.provider: 
org.apache.storm.redis.state.RedisKeyValueStateProvider
  * </pre>
  * <p/>
- * You should also start a local redis instance before running the 'storm jar' 
command. The default
- * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, 
for e.g.
+ * You should also start a local redis instance before running the 'storm jar' 
command. The default RedisKeyValueStateProvider parameters
+ * can be overridden in conf/storm.yaml, for e.g.
  * <p/>
  * <pre>
  * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...",
@@ -65,6 +63,22 @@ import org.slf4j.LoggerFactory;
 public class StatefulTopology {
     private static final Logger LOG = 
LoggerFactory.getLogger(StatefulTopology.class);
 
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new RandomIntegerSpout());
+        builder.setBolt("partialsum", new StatefulSumBolt("partial"), 
1).shuffleGrouping("spout");
+        builder.setBolt("printer", new PrinterBolt(), 
2).shuffleGrouping("partialsum");
+        builder.setBolt("total", new StatefulSumBolt("total"), 
1).shuffleGrouping("printer");
+        Config conf = new Config();
+        conf.setDebug(false);
+        String topoName = "test";
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
+    }
+
     /**
      * A bolt that uses {@link KeyValueState} to save its state.
      */
@@ -119,20 +133,4 @@ public class StatefulTopology {
         }
 
     }
-
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomIntegerSpout());
-        builder.setBolt("partialsum", new StatefulSumBolt("partial"), 
1).shuffleGrouping("spout");
-        builder.setBolt("printer", new PrinterBolt(), 
2).shuffleGrouping("partialsum");
-        builder.setBolt("total", new StatefulSumBolt("total"), 
1).shuffleGrouping("printer");
-        Config conf = new Config();
-        conf.setDebug(false);
-        String topoName = "test";
-        if (args != null && args.length > 0) {
-            topoName = args[0];
-        }
-        conf.setNumWorkers(1);
-        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
index 30c1ab2..eb0132e 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
@@ -19,7 +19,6 @@
 package org.apache.storm.starter;
 
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.bolt.PrinterBolt;
@@ -40,18 +39,37 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A simple example that demonstrates the usage of {@link 
org.apache.storm.topology.IStatefulWindowedBolt} to
- * save the state of the windowing operation to avoid re-computation in case 
of failures.
+ * A simple example that demonstrates the usage of {@link 
org.apache.storm.topology.IStatefulWindowedBolt} to save the state of the
+ * windowing operation to avoid re-computation in case of failures.
  * <p>
  * The framework internally manages the window boundaries and does not invoke
- * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)} for 
the already evaluated windows in case of restarts
- * during failures. The {@link 
org.apache.storm.topology.IStatefulBolt#initState(State)}
+ * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)}
+ * for the already evaluated windows in case of restarts during failures. The
+ * {@link org.apache.storm.topology.IStatefulBolt#initState(State)}
  * is invoked with the previously saved state of the bolt after prepare, 
before the execute() method is invoked.
  * </p>
  */
 public class StatefulWindowingTopology {
     private static final Logger LOG = 
LoggerFactory.getLogger(StatefulWindowingTopology.class);
 
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new RandomIntegerSpout());
+        builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new 
Count(5), new Count(3))
+                                                      
.withMessageIdField("msgid"), 1).shuffleGrouping("spout");
+        builder.setBolt("printer", new PrinterBolt(), 
1).shuffleGrouping("sumbolt");
+        Config conf = new Config();
+        conf.setDebug(false);
+        //conf.put(Config.TOPOLOGY_STATE_PROVIDER, 
"org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+        String topoName = "test";
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
+    }
+
     private static class WindowSumBolt extends 
BaseStatefulWindowedBolt<KeyValueState<String, Long>> {
         private KeyValueState<String, Long> state;
         private long sum;
@@ -85,22 +103,4 @@ public class StatefulWindowingTopology {
         }
     }
 
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomIntegerSpout());
-        builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new 
Count(5), new Count(3))
-                .withMessageIdField("msgid"), 1).shuffleGrouping("spout");
-        builder.setBolt("printer", new PrinterBolt(), 
1).shuffleGrouping("sumbolt");
-        Config conf = new Config();
-        conf.setDebug(false);
-        //conf.put(Config.TOPOLOGY_STATE_PROVIDER, 
"org.apache.storm.redis.state.RedisKeyValueStateProvider");
-        
-        String topoName = "test";
-        if (args != null && args.length > 0) {
-            topoName = args[0];
-        }
-        conf.setNumWorkers(1);
-        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
index 363fc2c..45cd68d 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.math.BigInteger;
@@ -22,7 +17,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.coordination.BatchOutputCollector;
@@ -74,15 +68,31 @@ public class TransactionalGlobalCount {
             add(new Values("dog"));
         }});
     }};
+    public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
+    public static Map<String, Value> DATABASE = new HashMap<String, Value>();
+
+    public static void main(String[] args) throws Exception {
+        if (!NimbusClient.isLocalOverride()) {
+            throw new IllegalStateException("This example only works in local 
mode.  "
+                                            + "Run with storm local not storm 
jar");
+        }
+        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, 
new Fields("word"), PARTITION_TAKE_PER_BATCH);
+        TransactionalTopologyBuilder builder = new 
TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+        builder.setBolt("partial-count", new BatchCount(), 
5).noneGrouping("spout");
+        builder.setBolt("sum", new 
UpdateGlobalCount()).globalGrouping("partial-count");
+
+        Config config = new Config();
+        config.setDebug(true);
+        config.setMaxSpoutPending(3);
+
+        StormSubmitter.submitTopology("global-count-topology", config, 
builder.buildTopology());
+    }
 
     public static class Value {
         int count = 0;
         BigInteger txid;
     }
 
-    public static Map<String, Value> DATABASE = new HashMap<String, Value>();
-    public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-
     public static class BatchCount extends BaseBatchBolt<Object> {
         Object _id;
         BatchOutputCollector _collector;
@@ -137,13 +147,11 @@ public class TransactionalGlobalCount {
                 newval.txid = _attempt.getTransactionId();
                 if (val == null) {
                     newval.count = _sum;
-                }
-                else {
+                } else {
                     newval.count = _sum + val.count;
                 }
                 DATABASE.put(GLOBAL_COUNT_KEY, newval);
-            }
-            else {
+            } else {
                 newval = val;
             }
             _collector.emit(new Values(_attempt, newval.count));
@@ -154,21 +162,4 @@ public class TransactionalGlobalCount {
             declarer.declare(new Fields("id", "sum"));
         }
     }
-
-    public static void main(String[] args) throws Exception {
-        if (!NimbusClient.isLocalOverride()) {
-            throw new IllegalStateException("This example only works in local 
mode.  "
-                    + "Run with storm local not storm jar");
-        }
-        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, 
new Fields("word"), PARTITION_TAKE_PER_BATCH);
-        TransactionalTopologyBuilder builder = new 
TransactionalTopologyBuilder("global-count", "spout", spout, 3);
-        builder.setBolt("partial-count", new BatchCount(), 
5).noneGrouping("spout");
-        builder.setBolt("sum", new 
UpdateGlobalCount()).globalGrouping("partial-count");
-
-        Config config = new Config();
-        config.setDebug(true);
-        config.setMaxSpoutPending(3);
-
-        StormSubmitter.submitTopology("global-count-topology", config, 
builder.buildTopology());
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
index 7e343af..7875360 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.math.BigInteger;
@@ -22,7 +17,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.coordination.BatchOutputCollector;
@@ -51,25 +45,8 @@ import org.apache.storm.utils.NimbusClient;
  * between buckets as their counts accumulate.
  */
 public class TransactionalWords {
-    public static class CountValue {
-        Integer prev_count = null;
-        int count = 0;
-        BigInteger txid = null;
-    }
-
-    public static class BucketValue {
-        int count = 0;
-        BigInteger txid;
-    }
-
     public static final int BUCKET_SIZE = 10;
-
-    public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, 
CountValue>();
-    public static Map<Integer, BucketValue> BUCKET_DATABASE = new 
HashMap<Integer, BucketValue>();
-
-
     public static final int PARTITION_TAKE_PER_BATCH = 3;
-
     public static final Map<Integer, List<List<Object>>> DATA = new 
HashMap<Integer, List<List<Object>>>() {{
         put(0, new ArrayList<List<Object>>() {{
             add(new Values("cat"));
@@ -97,6 +74,36 @@ public class TransactionalWords {
             add(new Values("dog"));
         }});
     }};
+    public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, 
CountValue>();
+    public static Map<Integer, BucketValue> BUCKET_DATABASE = new 
HashMap<Integer, BucketValue>();
+
+    public static void main(String[] args) throws Exception {
+        if (!NimbusClient.isLocalOverride()) {
+            throw new IllegalStateException("This example only works in local 
mode.  "
+                                            + "Run with storm local not storm 
jar");
+        }
+        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, 
new Fields("word"), PARTITION_TAKE_PER_BATCH);
+        TransactionalTopologyBuilder builder = new 
TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
+        builder.setBolt("count", new KeyedCountUpdater(), 
5).fieldsGrouping("spout", new Fields("word"));
+        builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
+        builder.setBolt("buckets", new BucketCountUpdater(), 
5).fieldsGrouping("bucketize", new Fields("bucket"));
+        Config config = new Config();
+        config.setDebug(true);
+        config.setMaxSpoutPending(3);
+
+        StormSubmitter.submitTopology("top-n-topology", config, 
builder.buildTopology());
+    }
+
+    public static class CountValue {
+        Integer prev_count = null;
+        int count = 0;
+        BigInteger txid = null;
+    }
+
+    public static class BucketValue {
+        int count = 0;
+        BigInteger txid;
+    }
 
     public static class KeyedCountUpdater extends BaseTransactionalBolt 
implements ICommitter {
         Map<String, Integer> _counts = new HashMap<String, Integer>();
@@ -115,8 +122,9 @@ public class TransactionalWords {
         public void execute(Tuple tuple) {
             String key = tuple.getString(1);
             Integer curr = _counts.get(key);
-            if (curr == null)
+            if (curr == null) {
                 curr = 0;
+            }
             _counts.put(key, curr + 1);
         }
 
@@ -134,8 +142,7 @@ public class TransactionalWords {
                     }
                     newVal.count = newVal.count + _counts.get(key);
                     COUNT_DATABASE.put(key, newVal);
-                }
-                else {
+                } else {
                     newVal = val;
                 }
                 _collector.emit(new Values(_id, key, newVal.count, 
newVal.prev_count));
@@ -163,8 +170,7 @@ public class TransactionalWords {
 
             if (prevBucket == null) {
                 collector.emit(new Values(attempt, currBucket, 1));
-            }
-            else if (currBucket != prevBucket) {
+            } else if (currBucket != prevBucket) {
                 collector.emit(new Values(attempt, currBucket, 1));
                 collector.emit(new Values(attempt, prevBucket, -1));
             }
@@ -194,8 +200,9 @@ public class TransactionalWords {
             Integer bucket = tuple.getInteger(1);
             Integer delta = tuple.getInteger(2);
             Integer curr = _accum.get(bucket);
-            if (curr == null)
+            if (curr == null) {
                 curr = 0;
+            }
             _accum.put(bucket, curr + delta);
         }
 
@@ -208,11 +215,11 @@ public class TransactionalWords {
                     newVal = new BucketValue();
                     newVal.txid = _attempt.getTransactionId();
                     newVal.count = _accum.get(bucket);
-                    if (currVal != null)
+                    if (currVal != null) {
                         newVal.count += currVal.count;
+                    }
                     BUCKET_DATABASE.put(bucket, newVal);
-                }
-                else {
+                } else {
                     newVal = currVal;
                 }
                 _collector.emit(new Values(_attempt, bucket, newVal.count));
@@ -224,21 +231,4 @@ public class TransactionalWords {
             declarer.declare(new Fields("id", "bucket", "count"));
         }
     }
-
-    public static void main(String[] args) throws Exception {
-        if (!NimbusClient.isLocalOverride()) {
-            throw new IllegalStateException("This example only works in local 
mode.  "
-                    + "Run with storm local not storm jar");
-        }
-        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, 
new Fields("word"), PARTITION_TAKE_PER_BATCH);
-        TransactionalTopologyBuilder builder = new 
TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
-        builder.setBolt("count", new KeyedCountUpdater(), 
5).fieldsGrouping("spout", new Fields("word"));
-        builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
-        builder.setBolt("buckets", new BucketCountUpdater(), 
5).fieldsGrouping("bucketize", new Fields("bucket"));
-        Config config = new Config();
-        config.setDebug(true);
-        config.setMaxSpoutPending(3);
-
-        StormSubmitter.submitTopology("top-n-topology", config, 
builder.buildTopology());
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index 59e3e4b..32ec822 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.starter.spout.RandomSentenceSpout;
 import org.apache.storm.task.ShellBolt;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -37,65 +31,66 @@ import org.apache.storm.tuple.Values;
  * capabilities.
  */
 public class WordCountTopology extends ConfigurableTopology {
-  public static class SplitSentence extends ShellBolt implements IRichBolt {
-
-    public SplitSentence() {
-      super("python", "splitsentence.py");
+    public static void main(String[] args) throws Exception {
+        ConfigurableTopology.start(new WordCountTopology(), args);
     }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
+    protected int run(String[] args) throws Exception {
 
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-      return null;
-    }
-  }
-
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
+        TopologyBuilder builder = new TopologyBuilder();
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
+        builder.setSpout("spout", new RandomSentenceSpout(), 5);
 
-  public static void main(String[] args) throws Exception {
-    ConfigurableTopology.start(new WordCountTopology(), args);
-  }
+        builder.setBolt("split", new SplitSentence(), 
8).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", 
new Fields("word"));
 
-  protected int run(String[] args) throws Exception {
+        conf.setDebug(true);
 
-    TopologyBuilder builder = new TopologyBuilder();
+        String topologyName = "word-count";
 
-    builder.setSpout("spout", new RandomSentenceSpout(), 5);
+        conf.setNumWorkers(3);
 
-    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new 
Fields("word"));
+        if (args != null && args.length > 0) {
+            topologyName = args[0];
+        }
+        return submit(topologyName, conf, builder);
+    }
 
-    conf.setDebug(true);
+    public static class SplitSentence extends ShellBolt implements IRichBolt {
 
-    String topologyName = "word-count";
+        public SplitSentence() {
+            super("python", "splitsentence.py");
+        }
 
-    conf.setNumWorkers(3);
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            return null;
+        }
+    }
 
-    if (args != null && args.length > 0) {
-      topologyName = args[0];
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null) {
+                count = 0;
+            }
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
     }
-    return submit(topologyName, conf, builder);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
index 39f5932..6df7678 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.spout.ShellSpout;
@@ -38,6 +32,25 @@ import org.apache.storm.tuple.Values;
  * This topology demonstrates Storm's stream groupings and multilang 
capabilities.
  */
 public class WordCountTopologyNode {
+    public static void main(String[] args) throws Exception {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentence(), 5);
+
+        builder.setBolt("split", new SplitSentence(), 
8).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", 
new Fields("word"));
+
+        Config conf = new Config();
+        conf.setDebug(true);
+        String topoName = "word-count";
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
+    }
+
     public static class SplitSentence extends ShellBolt implements IRichBolt {
 
         public SplitSentence() {
@@ -79,8 +92,9 @@ public class WordCountTopologyNode {
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             String word = tuple.getString(0);
             Integer count = counts.get(word);
-            if (count == null)
+            if (count == null) {
                 count = 0;
+            }
             count++;
             counts.put(word, count);
             collector.emit(new Values(word, count));
@@ -91,23 +105,4 @@ public class WordCountTopologyNode {
             declarer.declare(new Fields("word", "count"));
         }
     }
-
-    public static void main(String[] args) throws Exception {
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout("spout", new RandomSentence(), 5);
-
-        builder.setBolt("split", new SplitSentence(), 
8).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", 
new Fields("word"));
-
-        Config conf = new Config();
-        conf.setDebug(true);
-        String topoName = "word-count";
-        if (args != null && args.length > 0) {
-            topoName = args[0];
-        }
-        conf.setNumWorkers(3);
-        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
builder.createTopology());
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
index 9cf9e79..b04196c 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
@@ -1,23 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.bolt;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
 import org.apache.storm.Config;
+import org.apache.storm.starter.tools.Rankings;
 import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseBasicBolt;
@@ -25,11 +24,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.TupleUtils;
-import org.apache.log4j.Logger;
-import org.apache.storm.starter.tools.Rankings;
-
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * This abstract bolt provides the basic behavior of bolts that rank objects 
according to their count.
@@ -40,71 +34,70 @@ import java.util.Map;
  */
 public abstract class AbstractRankerBolt extends BaseBasicBolt {
 
-  private static final long serialVersionUID = 4931640198501530202L;
-  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
-  private static final int DEFAULT_COUNT = 10;
-
-  private final int emitFrequencyInSeconds;
-  private final int count;
-  private final Rankings rankings;
-
-  public AbstractRankerBolt() {
-    this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
-  }
+    private static final long serialVersionUID = 4931640198501530202L;
+    private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
+    private static final int DEFAULT_COUNT = 10;
 
-  public AbstractRankerBolt(int topN) {
-    this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
-  }
+    private final int emitFrequencyInSeconds;
+    private final int count;
+    private final Rankings rankings;
 
-  public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
-    if (topN < 1) {
-      throw new IllegalArgumentException("topN must be >= 1 (you requested " + 
topN + ")");
+    public AbstractRankerBolt() {
+        this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
     }
-    if (emitFrequencyInSeconds < 1) {
-      throw new IllegalArgumentException(
-          "The emit frequency must be >= 1 seconds (you requested " + 
emitFrequencyInSeconds + " seconds)");
+
+    public AbstractRankerBolt(int topN) {
+        this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
     }
-    count = topN;
-    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
-    rankings = new Rankings(count);
-  }
 
-  protected Rankings getRankings() {
-    return rankings;
-  }
+    public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
+        if (topN < 1) {
+            throw new IllegalArgumentException("topN must be >= 1 (you 
requested " + topN + ")");
+        }
+        if (emitFrequencyInSeconds < 1) {
+            throw new IllegalArgumentException(
+                "The emit frequency must be >= 1 seconds (you requested " + 
emitFrequencyInSeconds + " seconds)");
+        }
+        count = topN;
+        this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+        rankings = new Rankings(count);
+    }
 
-  /**
-   * This method functions as a template method (design pattern).
-   */
-  @Override
-  public final void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      getLogger().debug("Received tick tuple, triggering emit of current 
rankings");
-      emitRankings(collector);
+    protected Rankings getRankings() {
+        return rankings;
     }
-    else {
-      updateRankingsWithTuple(tuple);
+
+    /**
+     * This method functions as a template method (design pattern).
+     */
+    @Override
+    public final void execute(Tuple tuple, BasicOutputCollector collector) {
+        if (TupleUtils.isTick(tuple)) {
+            getLogger().debug("Received tick tuple, triggering emit of current 
rankings");
+            emitRankings(collector);
+        } else {
+            updateRankingsWithTuple(tuple);
+        }
     }
-  }
 
-  abstract void updateRankingsWithTuple(Tuple tuple);
+    abstract void updateRankingsWithTuple(Tuple tuple);
 
-  private void emitRankings(BasicOutputCollector collector) {
-    collector.emit(new Values(rankings.copy()));
-    getLogger().debug("Rankings: " + rankings);
-  }
+    private void emitRankings(BasicOutputCollector collector) {
+        collector.emit(new Values(rankings.copy()));
+        getLogger().debug("Rankings: " + rankings);
+    }
 
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("rankings"));
-  }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("rankings"));
+    }
 
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    Map<String, Object> conf = new HashMap<String, Object>();
-    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
-    return conf;
-  }
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+        return conf;
+    }
 
-  abstract Logger getLogger();
+    abstract Logger getLogger();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
index 6950bfb..a6a8b49 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
@@ -1,26 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.bolt;
 
-import org.apache.storm.tuple.Tuple;
 import org.apache.log4j.Logger;
 import org.apache.storm.starter.tools.Rankable;
 import org.apache.storm.starter.tools.RankableObjectWithFields;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * This bolt ranks incoming objects by their count.
@@ -30,29 +25,29 @@ import 
org.apache.storm.starter.tools.RankableObjectWithFields;
  */
 public final class IntermediateRankingsBolt extends AbstractRankerBolt {
 
-  private static final long serialVersionUID = -1369800530256637409L;
-  private static final Logger LOG = 
Logger.getLogger(IntermediateRankingsBolt.class);
+    private static final long serialVersionUID = -1369800530256637409L;
+    private static final Logger LOG = 
Logger.getLogger(IntermediateRankingsBolt.class);
 
-  public IntermediateRankingsBolt() {
-    super();
-  }
+    public IntermediateRankingsBolt() {
+        super();
+    }
 
-  public IntermediateRankingsBolt(int topN) {
-    super(topN);
-  }
+    public IntermediateRankingsBolt(int topN) {
+        super(topN);
+    }
 
-  public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
-    super(topN, emitFrequencyInSeconds);
-  }
+    public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
+        super(topN, emitFrequencyInSeconds);
+    }
 
-  @Override
-  void updateRankingsWithTuple(Tuple tuple) {
-    Rankable rankable = RankableObjectWithFields.from(tuple);
-    super.getRankings().updateWith(rankable);
-  }
+    @Override
+    void updateRankingsWithTuple(Tuple tuple) {
+        Rankable rankable = RankableObjectWithFields.from(tuple);
+        super.getRankings().updateWith(rankable);
+    }
 
-  @Override
-  Logger getLogger() {
-    return LOG;
-  }
+    @Override
+    Logger getLogger() {
+        return LOG;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
index 993a937..8364222 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.bolt;
 
 import org.apache.storm.topology.BasicOutputCollector;
@@ -25,13 +20,13 @@ import org.apache.storm.tuple.Tuple;
 
 public class PrinterBolt extends BaseBasicBolt {
 
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    System.out.println(tuple);
-  }
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+        System.out.println(tuple);
+    }
 
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer ofd) {
-  }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer ofd) {
+    }
 
 }

Reply via email to