http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
index faa4e32..d7ab065 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
@@ -1,24 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.tools;
 
-import org.apache.storm.utils.Time;
 import org.apache.commons.collections.buffer.CircularFifoBuffer;
+import org.apache.storm.utils.Time;
 
 /**
  * This class tracks the time-since-last-modify of a "thing" in a rolling 
fashion.
@@ -30,41 +25,41 @@ import 
org.apache.commons.collections.buffer.CircularFifoBuffer;
  */
 public class NthLastModifiedTimeTracker {
 
-  private static final int MILLIS_IN_SEC = 1000;
+    private static final int MILLIS_IN_SEC = 1000;
 
-  private final CircularFifoBuffer lastModifiedTimesMillis;
+    private final CircularFifoBuffer lastModifiedTimesMillis;
 
-  public NthLastModifiedTimeTracker(int numTimesToTrack) {
-    if (numTimesToTrack < 1) {
-      throw new IllegalArgumentException(
-          "numTimesToTrack must be greater than zero (you requested " + 
numTimesToTrack + ")");
+    public NthLastModifiedTimeTracker(int numTimesToTrack) {
+        if (numTimesToTrack < 1) {
+            throw new IllegalArgumentException(
+                "numTimesToTrack must be greater than zero (you requested " + 
numTimesToTrack + ")");
+        }
+        lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
+        initLastModifiedTimesMillis();
     }
-    lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
-    initLastModifiedTimesMillis();
-  }
 
-  private void initLastModifiedTimesMillis() {
-    long nowCached = now();
-    for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
-      lastModifiedTimesMillis.add(Long.valueOf(nowCached));
+    private void initLastModifiedTimesMillis() {
+        long nowCached = now();
+        for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
+            lastModifiedTimesMillis.add(Long.valueOf(nowCached));
+        }
     }
-  }
 
-  private long now() {
-    return Time.currentTimeMillis();
-  }
+    private long now() {
+        return Time.currentTimeMillis();
+    }
 
-  public int secondsSinceOldestModification() {
-    long modifiedTimeMillis = ((Long) 
lastModifiedTimesMillis.get()).longValue();
-    return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
-  }
+    public int secondsSinceOldestModification() {
+        long modifiedTimeMillis = ((Long) 
lastModifiedTimesMillis.get()).longValue();
+        return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
+    }
 
-  public void markAsModified() {
-    updateLastModifiedTime();
-  }
+    public void markAsModified() {
+        updateLastModifiedTime();
+    }
 
-  private void updateLastModifiedTime() {
-    lastModifiedTimesMillis.add(now());
-  }
+    private void updateLastModifiedTime() {
+        lastModifiedTimesMillis.add(now());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java
index 85f2b62..ea9e6d6 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java
@@ -1,32 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.tools;
 
 public interface Rankable extends Comparable<Rankable> {
 
-  Object getObject();
+    Object getObject();
 
-  long getCount();
+    long getCount();
 
-  /**
-   * Note: We do not defensively copy the object wrapped by the Rankable.  It 
is passed as is.
-   *
-   * @return a defensive copy
-   */
-  Rankable copy();
+    /**
+     * Note: We do not defensively copy the object wrapped by the Rankable.  
It is passed as is.
+     *
+     * @return a defensive copy
+     */
+    Rankable copy();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
index b1a9dca..0b3808e 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
@@ -1,28 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.tools;
 
-import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-
 import java.io.Serializable;
 import java.util.List;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * This class wraps an objects and its associated count, including any 
additional data fields.
@@ -31,118 +25,116 @@ import java.util.List;
  */
 public class RankableObjectWithFields implements Rankable, Serializable {
 
-  private static final long serialVersionUID = -9102878650001058090L;
-  private static final String toStringSeparator = "|";
+    private static final long serialVersionUID = -9102878650001058090L;
+    private static final String toStringSeparator = "|";
 
-  private final Object obj;
-  private final long count;
-  private final ImmutableList<Object> fields;
+    private final Object obj;
+    private final long count;
+    private final ImmutableList<Object> fields;
+
+    public RankableObjectWithFields(Object obj, long count, Object... 
otherFields) {
+        if (obj == null) {
+            throw new IllegalArgumentException("The object must not be null");
+        }
+        if (count < 0) {
+            throw new IllegalArgumentException("The count must be >= 0");
+        }
+        this.obj = obj;
+        this.count = count;
+        fields = ImmutableList.copyOf(otherFields);
 
-  public RankableObjectWithFields(Object obj, long count, Object... 
otherFields) {
-    if (obj == null) {
-      throw new IllegalArgumentException("The object must not be null");
     }
-    if (count < 0) {
-      throw new IllegalArgumentException("The count must be >= 0");
+
+    /**
+     * Construct a new instance based on the provided {@link Tuple}.
+     * <p/>
+     * This method expects the object to be ranked in the first field (index 
0) of the provided tuple, and the number of
+     * occurrences of the object (its count) in the second field (index 1). 
Any further fields in the tuple will be
+     * extracted and tracked, too. These fields can be accessed via {@link 
RankableObjectWithFields#getFields()}.
+     *
+     * @param tuple
+     *
+     * @return new instance based on the provided tuple
+     */
+    public static RankableObjectWithFields from(Tuple tuple) {
+        List<Object> otherFields = Lists.newArrayList(tuple.getValues());
+        Object obj = otherFields.remove(0);
+        Long count = (Long) otherFields.remove(0);
+        return new RankableObjectWithFields(obj, count, otherFields.toArray());
     }
-    this.obj = obj;
-    this.count = count;
-    fields = ImmutableList.copyOf(otherFields);
-
-  }
-
-  /**
-   * Construct a new instance based on the provided {@link Tuple}.
-   * <p/>
-   * This method expects the object to be ranked in the first field (index 0) 
of the provided tuple, and the number of
-   * occurrences of the object (its count) in the second field (index 1). Any 
further fields in the tuple will be
-   * extracted and tracked, too. These fields can be accessed via {@link 
RankableObjectWithFields#getFields()}.
-   *
-   * @param tuple
-   *
-   * @return new instance based on the provided tuple
-   */
-  public static RankableObjectWithFields from(Tuple tuple) {
-    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
-    Object obj = otherFields.remove(0);
-    Long count = (Long) otherFields.remove(0);
-    return new RankableObjectWithFields(obj, count, otherFields.toArray());
-  }
-
-  public Object getObject() {
-    return obj;
-  }
-
-  public long getCount() {
-    return count;
-  }
-
-  /**
-   * @return an immutable list of any additional data fields of the object 
(may be empty but will never be null)
-   */
-  public List<Object> getFields() {
-    return fields;
-  }
-
-  @Override
-  public int compareTo(Rankable other) {
-    long delta = this.getCount() - other.getCount();
-    if (delta > 0) {
-      return 1;
+
+    public Object getObject() {
+        return obj;
     }
-    else if (delta < 0) {
-      return -1;
+
+    public long getCount() {
+        return count;
     }
-    else {
-      return 0;
+
+    /**
+     * @return an immutable list of any additional data fields of the object 
(may be empty but will never be null)
+     */
+    public List<Object> getFields() {
+        return fields;
+    }
+
+    @Override
+    public int compareTo(Rankable other) {
+        long delta = this.getCount() - other.getCount();
+        if (delta > 0) {
+            return 1;
+        } else if (delta < 0) {
+            return -1;
+        } else {
+            return 0;
+        }
     }
-  }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof RankableObjectWithFields)) {
+            return false;
+        }
+        RankableObjectWithFields other = (RankableObjectWithFields) o;
+        return obj.equals(other.obj) && count == other.count;
     }
-    if (!(o instanceof RankableObjectWithFields)) {
-      return false;
+
+    @Override
+    public int hashCode() {
+        int result = 17;
+        int countHash = (int) (count ^ (count >>> 32));
+        result = 31 * result + countHash;
+        result = 31 * result + obj.hashCode();
+        return result;
     }
-    RankableObjectWithFields other = (RankableObjectWithFields) o;
-    return obj.equals(other.obj) && count == other.count;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 17;
-    int countHash = (int) (count ^ (count >>> 32));
-    result = 31 * result + countHash;
-    result = 31 * result + obj.hashCode();
-    return result;
-  }
-
-  public String toString() {
-    StringBuffer buf = new StringBuffer();
-    buf.append("[");
-    buf.append(obj);
-    buf.append(toStringSeparator);
-    buf.append(count);
-    for (Object field : fields) {
-      buf.append(toStringSeparator);
-      buf.append(field);
+
+    public String toString() {
+        StringBuffer buf = new StringBuffer();
+        buf.append("[");
+        buf.append(obj);
+        buf.append(toStringSeparator);
+        buf.append(count);
+        for (Object field : fields) {
+            buf.append(toStringSeparator);
+            buf.append(field);
+        }
+        buf.append("]");
+        return buf.toString();
+    }
+
+    /**
+     * Note: We do not defensively copy the wrapped object and any 
accompanying fields.  We do guarantee, however,
+     * do return a defensive (shallow) copy of the List object that is 
wrapping any accompanying fields.
+     *
+     * @return
+     */
+    @Override
+    public Rankable copy() {
+        List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
+        return new RankableObjectWithFields(getObject(), getCount(), 
shallowCopyOfFields);
     }
-    buf.append("]");
-    return buf.toString();
-  }
-
-  /**
-   * Note: We do not defensively copy the wrapped object and any accompanying 
fields.  We do guarantee, however,
-   * do return a defensive (shallow) copy of the List object that is wrapping 
any accompanying fields.
-   *
-   * @return
-   */
-  @Override
-  public Rankable copy() {
-    List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
-    return new RankableObjectWithFields(getObject(), getCount(), 
shallowCopyOfFields);
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
index 17174b3..eae6ccd 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
@@ -1,156 +1,148 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.tools;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 
 public class Rankings implements Serializable {
 
-  private static final long serialVersionUID = -1549827195410578903L;
-  private static final int DEFAULT_COUNT = 10;
+    private static final long serialVersionUID = -1549827195410578903L;
+    private static final int DEFAULT_COUNT = 10;
+
+    private final int maxSize;
+    private final List<Rankable> rankedItems = Lists.newArrayList();
+
+    public Rankings() {
+        this(DEFAULT_COUNT);
+    }
+
+    public Rankings(int topN) {
+        if (topN < 1) {
+            throw new IllegalArgumentException("topN must be >= 1");
+        }
+        maxSize = topN;
+    }
+
+    /**
+     * Copy constructor.
+     * @param other
+     */
+    public Rankings(Rankings other) {
+        this(other.maxSize());
+        updateWith(other);
+    }
 
-  private final int maxSize;
-  private final List<Rankable> rankedItems = Lists.newArrayList();
+    /**
+     * @return the maximum possible number (size) of ranked objects this 
instance can hold
+     */
+    public int maxSize() {
+        return maxSize;
+    }
 
-  public Rankings() {
-    this(DEFAULT_COUNT);
-  }
+    /**
+     * @return the number (size) of ranked objects this instance is currently 
holding
+     */
+    public int size() {
+        return rankedItems.size();
+    }
 
-  public Rankings(int topN) {
-    if (topN < 1) {
-      throw new IllegalArgumentException("topN must be >= 1");
+    /**
+     * The returned defensive copy is only "somewhat" defensive.  We do, for 
instance, return a defensive copy of the
+     * enclosing List instance, and we do try to defensively copy any 
contained Rankable objects, too.  However, the
+     * contract of {@link org.apache.storm.starter.tools.Rankable#copy()} does 
not guarantee that any Object's embedded within
+     * a Rankable will be defensively copied, too.
+     *
+     * @return a somewhat defensive copy of ranked items
+     */
+    public List<Rankable> getRankings() {
+        List<Rankable> copy = Lists.newLinkedList();
+        for (Rankable r : rankedItems) {
+            copy.add(r.copy());
+        }
+        return ImmutableList.copyOf(copy);
     }
-    maxSize = topN;
-  }
-
-  /**
-   * Copy constructor.
-   * @param other
-   */
-  public Rankings(Rankings other) {
-    this(other.maxSize());
-    updateWith(other);
-  }
-
-  /**
-   * @return the maximum possible number (size) of ranked objects this 
instance can hold
-   */
-  public int maxSize() {
-    return maxSize;
-  }
-
-  /**
-   * @return the number (size) of ranked objects this instance is currently 
holding
-   */
-  public int size() {
-    return rankedItems.size();
-  }
-
-  /**
-   * The returned defensive copy is only "somewhat" defensive.  We do, for 
instance, return a defensive copy of the
-   * enclosing List instance, and we do try to defensively copy any contained 
Rankable objects, too.  However, the
-   * contract of {@link org.apache.storm.starter.tools.Rankable#copy()} does 
not guarantee that any Object's embedded within
-   * a Rankable will be defensively copied, too.
-   *
-   * @return a somewhat defensive copy of ranked items
-   */
-  public List<Rankable> getRankings() {
-    List<Rankable> copy = Lists.newLinkedList();
-    for (Rankable r: rankedItems) {
-      copy.add(r.copy());
+
+    public void updateWith(Rankings other) {
+        for (Rankable r : other.getRankings()) {
+            updateWith(r);
+        }
     }
-    return ImmutableList.copyOf(copy);
-  }
 
-  public void updateWith(Rankings other) {
-    for (Rankable r : other.getRankings()) {
-      updateWith(r);
+    public void updateWith(Rankable r) {
+        synchronized (rankedItems) {
+            addOrReplace(r);
+            rerank();
+            shrinkRankingsIfNeeded();
+        }
     }
-  }
 
-  public void updateWith(Rankable r) {
-    synchronized(rankedItems) {
-      addOrReplace(r);
-      rerank();
-      shrinkRankingsIfNeeded();
+    private void addOrReplace(Rankable r) {
+        Integer rank = findRankOf(r);
+        if (rank != null) {
+            rankedItems.set(rank, r);
+        } else {
+            rankedItems.add(r);
+        }
     }
-  }
 
-  private void addOrReplace(Rankable r) {
-    Integer rank = findRankOf(r);
-    if (rank != null) {
-      rankedItems.set(rank, r);
+    private Integer findRankOf(Rankable r) {
+        Object tag = r.getObject();
+        for (int rank = 0; rank < rankedItems.size(); rank++) {
+            Object cur = rankedItems.get(rank).getObject();
+            if (cur.equals(tag)) {
+                return rank;
+            }
+        }
+        return null;
     }
-    else {
-      rankedItems.add(r);
+
+    private void rerank() {
+        Collections.sort(rankedItems);
+        Collections.reverse(rankedItems);
     }
-  }
-
-  private Integer findRankOf(Rankable r) {
-    Object tag = r.getObject();
-    for (int rank = 0; rank < rankedItems.size(); rank++) {
-      Object cur = rankedItems.get(rank).getObject();
-      if (cur.equals(tag)) {
-        return rank;
-      }
+
+    private void shrinkRankingsIfNeeded() {
+        if (rankedItems.size() > maxSize) {
+            rankedItems.remove(maxSize);
+        }
     }
-    return null;
-  }
 
-  private void rerank() {
-    Collections.sort(rankedItems);
-    Collections.reverse(rankedItems);
-  }
+    /**
+     * Removes ranking entries that have a count of zero.
+     */
+    public void pruneZeroCounts() {
+        int i = 0;
+        while (i < rankedItems.size()) {
+            if (rankedItems.get(i).getCount() == 0) {
+                rankedItems.remove(i);
+            } else {
+                i++;
+            }
+        }
+    }
 
-  private void shrinkRankingsIfNeeded() {
-    if (rankedItems.size() > maxSize) {
-      rankedItems.remove(maxSize);
+    public String toString() {
+        return rankedItems.toString();
     }
-  }
-
-  /**
-   * Removes ranking entries that have a count of zero.
-   */
-  public void pruneZeroCounts() {
-    int i = 0;
-    while (i < rankedItems.size()) {
-      if (rankedItems.get(i).getCount() == 0) {
-        rankedItems.remove(i);
-      }
-      else {
-        i++;
-      }
+
+    /**
+     * Creates a (defensive) copy of itself.
+     */
+    public Rankings copy() {
+        return new Rankings(this);
     }
-  }
-
-  public String toString() {
-    return rankedItems.toString();
-  }
-
-  /**
-   * Creates a (defensive) copy of itself.
-   */
-  public Rankings copy() {
-    return new Rankings(this);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
index b95a6a9..6aedfc8 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.tools;
 
 import java.io.Serializable;
@@ -67,53 +62,53 @@ import java.util.Map;
  */
 public final class SlidingWindowCounter<T> implements Serializable {
 
-  private static final long serialVersionUID = -2645063988768785810L;
+    private static final long serialVersionUID = -2645063988768785810L;
 
-  private SlotBasedCounter<T> objCounter;
-  private int headSlot;
-  private int tailSlot;
-  private int windowLengthInSlots;
+    private SlotBasedCounter<T> objCounter;
+    private int headSlot;
+    private int tailSlot;
+    private int windowLengthInSlots;
 
-  public SlidingWindowCounter(int windowLengthInSlots) {
-    if (windowLengthInSlots < 2) {
-      throw new IllegalArgumentException(
-          "Window length in slots must be at least two (you requested " + 
windowLengthInSlots + ")");
-    }
-    this.windowLengthInSlots = windowLengthInSlots;
-    this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
+    public SlidingWindowCounter(int windowLengthInSlots) {
+        if (windowLengthInSlots < 2) {
+            throw new IllegalArgumentException(
+                "Window length in slots must be at least two (you requested " 
+ windowLengthInSlots + ")");
+        }
+        this.windowLengthInSlots = windowLengthInSlots;
+        this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
 
-    this.headSlot = 0;
-    this.tailSlot = slotAfter(headSlot);
-  }
+        this.headSlot = 0;
+        this.tailSlot = slotAfter(headSlot);
+    }
 
-  public void incrementCount(T obj) {
-    objCounter.incrementCount(obj, headSlot);
-  }
+    public void incrementCount(T obj) {
+        objCounter.incrementCount(obj, headSlot);
+    }
 
-  /**
-   * Return the current (total) counts of all tracked objects, then advance 
the window.
-   * <p/>
-   * Whenever this method is called, we consider the counts of the current 
sliding window to be available to and
-   * successfully processed "upstream" (i.e. by the caller). Knowing this we 
will start counting any subsequent
-   * objects within the next "chunk" of the sliding window.
-   *
-   * @return The current (total) counts of all tracked objects.
-   */
-  public Map<T, Long> getCountsThenAdvanceWindow() {
-    Map<T, Long> counts = objCounter.getCounts();
-    objCounter.wipeZeros();
-    objCounter.wipeSlot(tailSlot);
-    advanceHead();
-    return counts;
-  }
+    /**
+     * Return the current (total) counts of all tracked objects, then advance 
the window.
+     * <p/>
+     * Whenever this method is called, we consider the counts of the current 
sliding window to be available to and
+     * successfully processed "upstream" (i.e. by the caller). Knowing this we 
will start counting any subsequent
+     * objects within the next "chunk" of the sliding window.
+     *
+     * @return The current (total) counts of all tracked objects.
+     */
+    public Map<T, Long> getCountsThenAdvanceWindow() {
+        Map<T, Long> counts = objCounter.getCounts();
+        objCounter.wipeZeros();
+        objCounter.wipeSlot(tailSlot);
+        advanceHead();
+        return counts;
+    }
 
-  private void advanceHead() {
-    headSlot = tailSlot;
-    tailSlot = slotAfter(tailSlot);
-  }
+    private void advanceHead() {
+        headSlot = tailSlot;
+        tailSlot = slotAfter(tailSlot);
+    }
 
-  private int slotAfter(int slot) {
-    return (slot + 1) % windowLengthInSlots;
-  }
+    private int slotAfter(int slot) {
+        return (slot + 1) % windowLengthInSlots;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
index 5bf66a5..51157b0 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
@@ -1,28 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.tools;
 
 import java.io.Serializable;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * This class provides per-slot counts of the occurrences of objects.
@@ -33,84 +26,83 @@ import java.util.Set;
  */
 public final class SlotBasedCounter<T> implements Serializable {
 
-  private static final long serialVersionUID = 4858185737378394432L;
+    private static final long serialVersionUID = 4858185737378394432L;
 
-  private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
-  private final int numSlots;
+    private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
+    private final int numSlots;
 
-  public SlotBasedCounter(int numSlots) {
-    if (numSlots <= 0) {
-      throw new IllegalArgumentException("Number of slots must be greater than 
zero (you requested " + numSlots + ")");
+    public SlotBasedCounter(int numSlots) {
+        if (numSlots <= 0) {
+            throw new IllegalArgumentException("Number of slots must be 
greater than zero (you requested " + numSlots + ")");
+        }
+        this.numSlots = numSlots;
     }
-    this.numSlots = numSlots;
-  }
 
-  public void incrementCount(T obj, int slot) {
-    long[] counts = objToCounts.get(obj);
-    if (counts == null) {
-      counts = new long[this.numSlots];
-      objToCounts.put(obj, counts);
+    public void incrementCount(T obj, int slot) {
+        long[] counts = objToCounts.get(obj);
+        if (counts == null) {
+            counts = new long[this.numSlots];
+            objToCounts.put(obj, counts);
+        }
+        counts[slot]++;
     }
-    counts[slot]++;
-  }
 
-  public long getCount(T obj, int slot) {
-    long[] counts = objToCounts.get(obj);
-    if (counts == null) {
-      return 0;
-    }
-    else {
-      return counts[slot];
+    public long getCount(T obj, int slot) {
+        long[] counts = objToCounts.get(obj);
+        if (counts == null) {
+            return 0;
+        } else {
+            return counts[slot];
+        }
     }
-  }
 
-  public Map<T, Long> getCounts() {
-    Map<T, Long> result = new HashMap<T, Long>();
-    for (T obj : objToCounts.keySet()) {
-      result.put(obj, computeTotalCount(obj));
+    public Map<T, Long> getCounts() {
+        Map<T, Long> result = new HashMap<T, Long>();
+        for (T obj : objToCounts.keySet()) {
+            result.put(obj, computeTotalCount(obj));
+        }
+        return result;
     }
-    return result;
-  }
 
-  private long computeTotalCount(T obj) {
-    long[] curr = objToCounts.get(obj);
-    long total = 0;
-    for (long l : curr) {
-      total += l;
+    private long computeTotalCount(T obj) {
+        long[] curr = objToCounts.get(obj);
+        long total = 0;
+        for (long l : curr) {
+            total += l;
+        }
+        return total;
     }
-    return total;
-  }
 
-  /**
-   * Reset the slot count of any tracked objects to zero for the given slot.
-   *
-   * @param slot
-   */
-  public void wipeSlot(int slot) {
-    for (T obj : objToCounts.keySet()) {
-      resetSlotCountToZero(obj, slot);
+    /**
+     * Reset the slot count of any tracked objects to zero for the given slot.
+     *
+     * @param slot
+     */
+    public void wipeSlot(int slot) {
+        for (T obj : objToCounts.keySet()) {
+            resetSlotCountToZero(obj, slot);
+        }
     }
-  }
 
-  private void resetSlotCountToZero(T obj, int slot) {
-    long[] counts = objToCounts.get(obj);
-    counts[slot] = 0;
-  }
+    private void resetSlotCountToZero(T obj, int slot) {
+        long[] counts = objToCounts.get(obj);
+        counts[slot] = 0;
+    }
 
-  private boolean shouldBeRemovedFromCounter(T obj) {
-    return computeTotalCount(obj) == 0;
-  }
+    private boolean shouldBeRemovedFromCounter(T obj) {
+        return computeTotalCount(obj) == 0;
+    }
 
-  /**
-   * Remove any object from the counter whose total count is zero (to free up 
memory).
-   */
-  public void wipeZeros() {
-    for(Iterator<Map.Entry<T, long[]>> it = objToCounts.entrySet().iterator(); 
it.hasNext(); ) {
-      Map.Entry<T, long[]> entry = it.next();
-      if (shouldBeRemovedFromCounter(entry.getKey())) {
-        it.remove();
-      }
+    /**
+     * Remove any object from the counter whose total count is zero (to free 
up memory).
+     */
+    public void wipeZeros() {
+        for (Iterator<Map.Entry<T, long[]>> it = 
objToCounts.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<T, long[]> entry = it.next();
+            if (shouldBeRemovedFromCounter(entry.getKey())) {
+                it.remove();
+            }
+        }
     }
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
index 5bb23f7..4849f72 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
@@ -18,6 +18,9 @@
 
 package org.apache.storm.starter.trident;
 
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.state.CombinerValueUpdater;
@@ -28,10 +31,6 @@ import org.apache.storm.trident.testing.MemoryMapState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
 public class DebugMemoryMapState<T> extends MemoryMapState<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(DebugMemoryMapState.class);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index fa9274d..6938b8a 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -1,25 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.trident;
 
 import java.util.HashMap;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -47,21 +40,23 @@ public class TridentHBaseWindowingStoreTopology {
 
     public static StormTopology buildTopology(WindowsStoreFactory 
windowsStore) throws Exception {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, 
new Values("the cow jumped over the moon"),
-                new Values("the man went to the store and bought some candy"), 
new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"), new Values("to be 
or not to be the person"));
+                                                    new Values("the man went 
to the store and bought some candy"),
+                                                    new Values("four score and 
seven years ago"),
+                                                    new Values("how many 
apples can you eat"), new Values("to be or not to be the person"));
         spout.setCycle(true);
 
         TridentTopology topology = new TridentTopology();
 
         Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
-                new Split(), new Fields("word"))
-                .window(TumblingCountWindow.of(1000), windowsStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
-                .peek(new Consumer() {
-                    @Override
-                    public void accept(TridentTuple input) {
-                        LOG.info("Received tuple: [{}]", input);
-                    }
-                });
+                                                                               
      new Split(), new Fields("word"))
+                                .window(TumblingCountWindow.of(1000), 
windowsStore, new Fields("word"), new CountAsAggregator(),
+                                        new Fields("count"))
+                                .peek(new Consumer() {
+                                    @Override
+                                    public void accept(TridentTuple input) {
+                                        LOG.info("Received tuple: [{}]", 
input);
+                                    }
+                                });
 
         return topology.build();
     }
@@ -72,7 +67,8 @@ public class TridentHBaseWindowingStoreTopology {
         conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100);
 
         // window-state table should already be created with cf:tuples column
-        HBaseWindowsStoreFactory windowStoreFactory = new 
HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", 
"cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
+        HBaseWindowsStoreFactory windowStoreFactory =
+            new HBaseWindowsStoreFactory(new HashMap<String, Object>(), 
"window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
         String topoName = "wordCounterWithWindowing";
         if (args.length > 0) {
             topoName = args[0];

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c5f73ff..067eeff 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.trident;
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -74,25 +68,25 @@ public class TridentMapExample {
 
     public static StormTopology buildTopology() {
         FixedBatchSpout spout = new FixedBatchSpout(
-                new Fields("word"), 3, new Values("the cow jumped over the 
moon"),
-                new Values("the man went to the store and bought some candy"), 
new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"), new Values("to be 
or not to be the person"));
+            new Fields("word"), 3, new Values("the cow jumped over the moon"),
+            new Values("the man went to the store and bought some candy"), new 
Values("four score and seven years ago"),
+            new Values("how many apples can you eat"), new Values("to be or 
not to be the person"));
         spout.setCycle(true);
 
         TridentTopology topology = new TridentTopology();
         TridentState wordCounts = topology.newStream("spout1", 
spout).parallelismHint(16)
-                .flatMap(split)
-                .map(toUpper, new Fields("uppercased"))
-                .filter(theFilter)
-                .peek(new Consumer() {
-                    @Override
-                    public void accept(TridentTuple input) {
-                        System.out.println(input.getString(0));
-                    }
-                })
-                .groupBy(new Fields("uppercased"))
-                .persistentAggregate(new MemoryMapState.Factory(), new 
Count(), new Fields("count"))
-                .parallelismHint(16);
+                                          .flatMap(split)
+                                          .map(toUpper, new 
Fields("uppercased"))
+                                          .filter(theFilter)
+                                          .peek(new Consumer() {
+                                              @Override
+                                              public void accept(TridentTuple 
input) {
+                                                  
System.out.println(input.getString(0));
+                                              }
+                                          })
+                                          .groupBy(new Fields("uppercased"))
+                                          .persistentAggregate(new 
MemoryMapState.Factory(), new Count(), new Fields("count"))
+                                          .parallelismHint(16);
 
         topology.newDRPCStream("words")
                 .flatMap(split, new Fields("word"))

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
index 1cc33a9..bde5baa 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.trident;
 
 import java.io.Serializable;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -55,13 +49,13 @@ public class TridentMinMaxOfDevicesTopology {
 
         TridentTopology topology = new TridentTopology();
         Stream devicesStream = topology.newStream("devicegen-spout", spout).
-                each(allFields, new Debug("##### devices"));
+            each(allFields, new Debug("##### devices"));
 
         devicesStream.minBy(deviceID).
-                each(allFields, new Debug("#### device with min id"));
+            each(allFields, new Debug("#### device with min id"));
 
         devicesStream.maxBy(count).
-                each(allFields, new Debug("#### device with max count"));
+            each(allFields, new Debug("#### device with max count"));
 
         return topology.build();
     }
@@ -80,27 +74,27 @@ public class TridentMinMaxOfDevicesTopology {
 
         TridentTopology topology = new TridentTopology();
         Stream vehiclesStream = topology.newStream("spout1", spout).
-                each(allFields, new Debug("##### vehicles"));
+            each(allFields, new Debug("##### vehicles"));
 
         Stream slowVehiclesStream =
-                vehiclesStream
-                        .min(new SpeedComparator())
-                        .each(vehicleField, new Debug("#### slowest vehicle"));
+            vehiclesStream
+                .min(new SpeedComparator())
+                .each(vehicleField, new Debug("#### slowest vehicle"));
 
         Stream slowDriversStream =
-                slowVehiclesStream
-                        .project(driverField)
-                        .each(driverField, new Debug("##### slowest driver"));
+            slowVehiclesStream
+                .project(driverField)
+                .each(driverField, new Debug("##### slowest driver"));
 
         vehiclesStream
-                .max(new SpeedComparator())
-                .each(vehicleField, new Debug("#### fastest vehicle"))
-                .project(driverField)
-                .each(driverField, new Debug("##### fastest driver"));
+            .max(new SpeedComparator())
+            .each(vehicleField, new Debug("#### fastest vehicle"))
+            .project(driverField)
+            .each(driverField, new Debug("##### fastest driver"));
 
         vehiclesStream
-                .max(new EfficiencyComparator()).
-                each(vehicleField, new Debug("#### efficient vehicle"));
+            .max(new EfficiencyComparator()).
+            each(vehicleField, new Debug("#### efficient vehicle"));
 
         return topology.build();
     }
@@ -148,9 +142,9 @@ public class TridentMinMaxOfDevicesTopology {
         @Override
         public String toString() {
             return "Driver{" +
-                    "name='" + name + '\'' +
-                    ", id=" + id +
-                    '}';
+                   "name='" + name + '\'' +
+                   ", id=" + id +
+                   '}';
         }
     }
 
@@ -166,26 +160,27 @@ public class TridentMinMaxOfDevicesTopology {
             this.efficiency = efficiency;
         }
 
-        @Override
-        public String toString() {
-            return "Vehicle{" +
-                    "name='" + name + '\'' +
-                    ", maxSpeed=" + maxSpeed +
-                    ", efficiency=" + efficiency +
-                    '}';
-        }
-
         public static List<Object>[] generateVehicles(int count) {
             List<Object>[] vehicles = new List[count];
             for (int i = 0; i < count; i++) {
                 int id = i - 1;
                 vehicles[i] =
-                        (new Values(
-                                new Vehicle("Vehicle-" + id, 
ThreadLocalRandom.current().nextInt(0, 100), 
ThreadLocalRandom.current().nextDouble(1, 5)),
-                                new Driver("Driver-" + id, id)
-                        ));
+                    (new Values(
+                        new Vehicle("Vehicle-" + id, 
ThreadLocalRandom.current().nextInt(0, 100),
+                                    ThreadLocalRandom.current().nextDouble(1, 
5)),
+                        new Driver("Driver-" + id, id)
+                    ));
             }
             return vehicles;
         }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                   "name='" + name + '\'' +
+                   ", maxSpeed=" + maxSpeed +
+                   ", efficiency=" + efficiency +
+                   '}';
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
index 0d77914..f37442b 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.trident;
 
 import java.io.Serializable;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -57,31 +51,31 @@ public class TridentMinMaxOfVehiclesTopology {
 
         TridentTopology topology = new TridentTopology();
         Stream vehiclesStream = topology.newStream("spout1", spout).
-                each(allFields, new Debug("##### vehicles"));
+            each(allFields, new Debug("##### vehicles"));
 
         Stream slowVehiclesStream =
-                vehiclesStream
-                        .min(new SpeedComparator())
-                        .each(vehicleField, new Debug("#### slowest vehicle"));
+            vehiclesStream
+                .min(new SpeedComparator())
+                .each(vehicleField, new Debug("#### slowest vehicle"));
 
         Stream slowDriversStream =
-                slowVehiclesStream
-                        .project(driverField)
-                        .each(driverField, new Debug("##### slowest driver"));
+            slowVehiclesStream
+                .project(driverField)
+                .each(driverField, new Debug("##### slowest driver"));
 
         vehiclesStream
-                .max(new SpeedComparator())
-                .each(vehicleField, new Debug("#### fastest vehicle"))
-                .project(driverField)
-                .each(driverField, new Debug("##### fastest driver"));
+            .max(new SpeedComparator())
+            .each(vehicleField, new Debug("#### fastest vehicle"))
+            .project(driverField)
+            .each(driverField, new Debug("##### fastest driver"));
 
         vehiclesStream
-                .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
-                each(vehicleField, new Debug("#### least efficient vehicle"));
+            .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+            each(vehicleField, new Debug("#### least efficient vehicle"));
 
         vehiclesStream
-                .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
-                each(vehicleField, new Debug("#### most efficient vehicle"));
+            .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+            each(vehicleField, new Debug("#### most efficient vehicle"));
 
         return topology.build();
     }
@@ -127,9 +121,9 @@ public class TridentMinMaxOfVehiclesTopology {
         @Override
         public String toString() {
             return "Driver{" +
-                    "name='" + name + '\'' +
-                    ", id=" + id +
-                    '}';
+                   "name='" + name + '\'' +
+                   ", id=" + id +
+                   '}';
         }
     }
 
@@ -145,26 +139,27 @@ public class TridentMinMaxOfVehiclesTopology {
             this.efficiency = efficiency;
         }
 
-        @Override
-        public String toString() {
-            return "Vehicle{" +
-                    "name='" + name + '\'' +
-                    ", maxSpeed=" + maxSpeed +
-                    ", efficiency=" + efficiency +
-                    '}';
-        }
-
         public static List<Object>[] generateVehicles(int count) {
             List<Object>[] vehicles = new List[count];
             for (int i = 0; i < count; i++) {
                 int id = i - 1;
                 vehicles[i] =
-                        (new Values(
-                                new Vehicle("Vehicle-" + id, 
ThreadLocalRandom.current().nextInt(0, 100), 
ThreadLocalRandom.current().nextDouble(1, 5)),
-                                new Driver("Driver-" + id, id)
-                        ));
+                    (new Values(
+                        new Vehicle("Vehicle-" + id, 
ThreadLocalRandom.current().nextInt(0, 100),
+                                    ThreadLocalRandom.current().nextDouble(1, 
5)),
+                        new Driver("Driver-" + id, id)
+                    ));
             }
             return vehicles;
         }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                   "name='" + name + '\'' +
+                   ", maxSpeed=" + maxSpeed +
+                   ", efficiency=" + efficiency +
+                   '}';
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
index 8e5229d..19c0665 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.trident;
 
 import java.util.ArrayList;
@@ -22,7 +17,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -60,28 +54,41 @@ public class TridentReach {
         put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
     }};
 
-    public static class StaticSingleKeyMapState extends ReadOnlyState 
implements ReadOnlyMapState<Object> {
-        public static class Factory implements StateFactory {
-            Map _map;
+    public static StormTopology buildTopology() {
+        TridentTopology topology = new TridentTopology();
+        TridentState urlToTweeters = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(TWEETERS_DB));
+        TridentState tweetersToFollowers = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
 
-            public Factory(Map map) {
-                _map = map;
-            }
 
-            @Override
-            public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-                return new StaticSingleKeyMapState(_map);
-            }
+        topology.newDRPCStream("reach").stateQuery(urlToTweeters, new 
Fields("args"), new MapGet(), new Fields(
+            "tweeters")).each(new Fields("tweeters"), new ExpandList(), new 
Fields("tweeter")).shuffle().stateQuery(
+            tweetersToFollowers, new Fields("tweeter"), new MapGet(), new 
Fields("followers")).each(new Fields("followers"),
+                                                                               
                     new ExpandList(),
+                                                                               
                     new Fields("follower"))
+                .groupBy(new Fields("follower")).aggregate(new One(), new 
Fields(
+            "one")).aggregate(new Fields("one"), new Sum(), new 
Fields("reach"));
+        return topology.build();
+    }
 
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        StormSubmitter.submitTopology("reach", conf, buildTopology());
+        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+            Thread.sleep(2000);
+
+            System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+            System.out.println("REACH: " + drpc.execute("reach", 
"foo.com/blog/1"));
+            System.out.println("REACH: " + drpc.execute("reach", 
"engineering.twitter.com/blog/5"));
         }
+    }
 
+    public static class StaticSingleKeyMapState extends ReadOnlyState 
implements ReadOnlyMapState<Object> {
         Map _map;
 
         public StaticSingleKeyMapState(Map map) {
             _map = map;
         }
 
-
         @Override
         public List<Object> multiGet(List<List<Object>> keys) {
             List<Object> ret = new ArrayList();
@@ -92,6 +99,20 @@ public class TridentReach {
             return ret;
         }
 
+        public static class Factory implements StateFactory {
+            Map _map;
+
+            public Factory(Map map) {
+                _map = map;
+            }
+
+            @Override
+            public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
+                return new StaticSingleKeyMapState(_map);
+            }
+
+        }
+
     }
 
     public static class One implements CombinerAggregator<Integer> {
@@ -124,30 +145,4 @@ public class TridentReach {
         }
 
     }
-
-    public static StormTopology buildTopology() {
-        TridentTopology topology = new TridentTopology();
-        TridentState urlToTweeters = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(TWEETERS_DB));
-        TridentState tweetersToFollowers = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
-
-
-        topology.newDRPCStream("reach").stateQuery(urlToTweeters, new 
Fields("args"), new MapGet(), new Fields(
-                "tweeters")).each(new Fields("tweeters"), new ExpandList(), 
new Fields("tweeter")).shuffle().stateQuery(
-                        tweetersToFollowers, new Fields("tweeter"), new 
MapGet(), new Fields("followers")).each(new Fields("followers"),
-                                new ExpandList(), new 
Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new 
Fields(
-                                        "one")).aggregate(new Fields("one"), 
new Sum(), new Fields("reach"));
-        return topology.build();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        StormSubmitter.submitTopology("reach", conf, buildTopology());
-        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
-            Thread.sleep(2000);
-
-            System.out.println("REACH: " + drpc.execute("reach", "aaa"));
-            System.out.println("REACH: " + drpc.execute("reach", 
"foo.com/blog/1"));
-            System.out.println("REACH: " + drpc.execute("reach", 
"engineering.twitter.com/blog/5"));
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index 2da29bf..862d09a 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -1,21 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
@@ -45,21 +39,22 @@ public class TridentWindowingInmemoryStoreTopology {
 
     public static StormTopology buildTopology(WindowsStoreFactory windowStore, 
WindowConfig windowConfig) throws Exception {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, 
new Values("the cow jumped over the moon"),
-                new Values("the man went to the store and bought some candy"), 
new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"), new Values("to be 
or not to be the person"));
+                                                    new Values("the man went 
to the store and bought some candy"),
+                                                    new Values("four score and 
seven years ago"),
+                                                    new Values("how many 
apples can you eat"), new Values("to be or not to be the person"));
         spout.setCycle(true);
 
         TridentTopology topology = new TridentTopology();
 
         Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
-                new Split(), new Fields("word"))
-                .window(windowConfig, windowStore, new Fields("word"), new 
CountAsAggregator(), new Fields("count"))
-                .peek(new Consumer() {
-                    @Override
-                    public void accept(TridentTuple input) {
-                        LOG.info("Received tuple: [{}]", input);
-                    }
-                });
+                                                                               
      new Split(), new Fields("word"))
+                                .window(windowConfig, windowStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
+                                .peek(new Consumer() {
+                                    @Override
+                                    public void accept(TridentTuple input) {
+                                        LOG.info("Received tuple: [{}]", 
input);
+                                    }
+                                });
 
         return topology.build();
     }
@@ -71,7 +66,7 @@ public class TridentWindowingInmemoryStoreTopology {
         if (args.length > 0) {
             topoName = args[0];
         }
-        
+
         conf.setNumWorkers(3);
         StormSubmitter.submitTopologyWithProgressBar(topoName, conf, 
buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
index 0f86e1f..bafeba2 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
@@ -36,32 +31,25 @@ import org.apache.storm.utils.DRPCClient;
 
 
 public class TridentWordCount {
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split(" ")) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-
     public static StormTopology buildTopology() {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, 
new Values("the cow jumped over the moon"),
-                new Values("the man went to the store and bought some candy"), 
new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"), new Values("to be 
or not to be the person"));
+                                                    new Values("the man went 
to the store and bought some candy"),
+                                                    new Values("four score and 
seven years ago"),
+                                                    new Values("how many 
apples can you eat"), new Values("to be or not to be the person"));
         spout.setCycle(true);
 
         TridentTopology topology = new TridentTopology();
         TridentState wordCounts = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
-                new Split(), new Fields("word")).groupBy(new 
Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
-                        new Count(), new Fields("count")).parallelismHint(16);
+                                                                               
                new Split(), new Fields("word"))
+                                          .groupBy(new 
Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
+                                                                               
            new Count(), new Fields("count"))
+                                          .parallelismHint(16);
 
         topology.newDRPCStream("words").each(new Fields("args"), new Split(), 
new Fields("word"))
-        .groupBy(new Fields("word"))
-        .stateQuery(wordCounts, new Fields("word"), new MapGet(), new 
Fields("count"))
-        .each(new Fields("count"), new FilterNull())
-        .project(new Fields("word", "count"));
+                .groupBy(new Fields("word"))
+                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new 
Fields("count"))
+                .each(new Fields("count"), new FilterNull())
+                .project(new Fields("word", "count"));
         return topology.build();
     }
 
@@ -81,4 +69,14 @@ public class TridentWordCount {
             }
         }
     }
+
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
 }

Reply via email to