http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
 
b/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
deleted file mode 100644
index 9a0ecae..0000000
--- 
a/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import backtype.storm.tuple.Tuple;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * This class wraps an objects and its associated count, including any 
additional data fields.
- * <p/>
- * This class can be used, for instance, to track the number of occurrences of 
an object in a Storm topology.
- */
-public class RankableObjectWithFields implements Rankable, Serializable {
-
-  private static final long serialVersionUID = -9102878650001058090L;
-  private static final String toStringSeparator = "|";
-
-  private final Object obj;
-  private final long count;
-  private final ImmutableList<Object> fields;
-
-  public RankableObjectWithFields(Object obj, long count, Object... 
otherFields) {
-    if (obj == null) {
-      throw new IllegalArgumentException("The object must not be null");
-    }
-    if (count < 0) {
-      throw new IllegalArgumentException("The count must be >= 0");
-    }
-    this.obj = obj;
-    this.count = count;
-    fields = ImmutableList.copyOf(otherFields);
-
-  }
-
-  /**
-   * Construct a new instance based on the provided {@link Tuple}.
-   * <p/>
-   * This method expects the object to be ranked in the first field (index 0) 
of the provided tuple, and the number of
-   * occurrences of the object (its count) in the second field (index 1). Any 
further fields in the tuple will be
-   * extracted and tracked, too. These fields can be accessed via {@link 
RankableObjectWithFields#getFields()}.
-   *
-   * @param tuple
-   *
-   * @return new instance based on the provided tuple
-   */
-  public static RankableObjectWithFields from(Tuple tuple) {
-    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
-    Object obj = otherFields.remove(0);
-    Long count = (Long) otherFields.remove(0);
-    return new RankableObjectWithFields(obj, count, otherFields.toArray());
-  }
-
-  public Object getObject() {
-    return obj;
-  }
-
-  public long getCount() {
-    return count;
-  }
-
-  /**
-   * @return an immutable list of any additional data fields of the object 
(may be empty but will never be null)
-   */
-  public List<Object> getFields() {
-    return fields;
-  }
-
-  @Override
-  public int compareTo(Rankable other) {
-    long delta = this.getCount() - other.getCount();
-    if (delta > 0) {
-      return 1;
-    }
-    else if (delta < 0) {
-      return -1;
-    }
-    else {
-      return 0;
-    }
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof RankableObjectWithFields)) {
-      return false;
-    }
-    RankableObjectWithFields other = (RankableObjectWithFields) o;
-    return obj.equals(other.obj) && count == other.count;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 17;
-    int countHash = (int) (count ^ (count >>> 32));
-    result = 31 * result + countHash;
-    result = 31 * result + obj.hashCode();
-    return result;
-  }
-
-  public String toString() {
-    StringBuffer buf = new StringBuffer();
-    buf.append("[");
-    buf.append(obj);
-    buf.append(toStringSeparator);
-    buf.append(count);
-    for (Object field : fields) {
-      buf.append(toStringSeparator);
-      buf.append(field);
-    }
-    buf.append("]");
-    return buf.toString();
-  }
-
-  /**
-   * Note: We do not defensively copy the wrapped object and any accompanying 
fields.  We do guarantee, however,
-   * do return a defensive (shallow) copy of the List object that is wrapping 
any accompanying fields.
-   *
-   * @return
-   */
-  @Override
-  public Rankable copy() {
-    List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
-    return new RankableObjectWithFields(getObject(), getCount(), 
shallowCopyOfFields);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java 
b/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
deleted file mode 100644
index 551ebfb..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-
-public class Rankings implements Serializable {
-
-  private static final long serialVersionUID = -1549827195410578903L;
-  private static final int DEFAULT_COUNT = 10;
-
-  private final int maxSize;
-  private final List<Rankable> rankedItems = Lists.newArrayList();
-
-  public Rankings() {
-    this(DEFAULT_COUNT);
-  }
-
-  public Rankings(int topN) {
-    if (topN < 1) {
-      throw new IllegalArgumentException("topN must be >= 1");
-    }
-    maxSize = topN;
-  }
-
-  /**
-   * Copy constructor.
-   * @param other
-   */
-  public Rankings(Rankings other) {
-    this(other.maxSize());
-    updateWith(other);
-  }
-
-  /**
-   * @return the maximum possible number (size) of ranked objects this 
instance can hold
-   */
-  public int maxSize() {
-    return maxSize;
-  }
-
-  /**
-   * @return the number (size) of ranked objects this instance is currently 
holding
-   */
-  public int size() {
-    return rankedItems.size();
-  }
-
-  /**
-   * The returned defensive copy is only "somewhat" defensive.  We do, for 
instance, return a defensive copy of the
-   * enclosing List instance, and we do try to defensively copy any contained 
Rankable objects, too.  However, the
-   * contract of {@link storm.starter.tools.Rankable#copy()} does not 
guarantee that any Object's embedded within
-   * a Rankable will be defensively copied, too.
-   *
-   * @return a somewhat defensive copy of ranked items
-   */
-  public List<Rankable> getRankings() {
-    List<Rankable> copy = Lists.newLinkedList();
-    for (Rankable r: rankedItems) {
-      copy.add(r.copy());
-    }
-    return ImmutableList.copyOf(copy);
-  }
-
-  public void updateWith(Rankings other) {
-    for (Rankable r : other.getRankings()) {
-      updateWith(r);
-    }
-  }
-
-  public void updateWith(Rankable r) {
-    synchronized(rankedItems) {
-      addOrReplace(r);
-      rerank();
-      shrinkRankingsIfNeeded();
-    }
-  }
-
-  private void addOrReplace(Rankable r) {
-    Integer rank = findRankOf(r);
-    if (rank != null) {
-      rankedItems.set(rank, r);
-    }
-    else {
-      rankedItems.add(r);
-    }
-  }
-
-  private Integer findRankOf(Rankable r) {
-    Object tag = r.getObject();
-    for (int rank = 0; rank < rankedItems.size(); rank++) {
-      Object cur = rankedItems.get(rank).getObject();
-      if (cur.equals(tag)) {
-        return rank;
-      }
-    }
-    return null;
-  }
-
-  private void rerank() {
-    Collections.sort(rankedItems);
-    Collections.reverse(rankedItems);
-  }
-
-  private void shrinkRankingsIfNeeded() {
-    if (rankedItems.size() > maxSize) {
-      rankedItems.remove(maxSize);
-    }
-  }
-
-  /**
-   * Removes ranking entries that have a count of zero.
-   */
-  public void pruneZeroCounts() {
-    int i = 0;
-    while (i < rankedItems.size()) {
-      if (rankedItems.get(i).getCount() == 0) {
-        rankedItems.remove(i);
-      }
-      else {
-        i++;
-      }
-    }
-  }
-
-  public String toString() {
-    return rankedItems.toString();
-  }
-
-  /**
-   * Creates a (defensive) copy of itself.
-   */
-  public Rankings copy() {
-    return new Rankings(this);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java 
b/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
deleted file mode 100644
index 1199c40..0000000
--- 
a/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * This class counts objects in a sliding window fashion.
- * <p/>
- * It is designed 1) to give multiple "producer" threads write access to the 
counter, i.e. being able to increment
- * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link 
PeriodicSlidingWindowCounter}) read access
- * to the counter. Whenever the consumer thread performs a read operation, 
this class will advance the head slot of the
- * sliding window counter. This means that the consumer thread indirectly 
controls where writes of the producer threads
- * will go to. Also, by itself this class will not advance the head slot.
- * <p/>
- * A note for analyzing data based on a sliding window count: During the 
initial <code>windowLengthInSlots</code>
- * iterations, this sliding window counter will always return object counts 
that are equal or greater than in the
- * previous iteration. This is the effect of the counter "loading up" at the 
very start of its existence. Conceptually,
- * this is the desired behavior.
- * <p/>
- * To give an example, using a counter with 5 slots which for the sake of this 
example represent 1 minute of time each:
- * <p/>
- * <pre>
- * {@code
- * Sliding window counts of an object X over time
- *
- * Minute (timeline):
- * 1    2   3   4   5   6   7   8
- *
- * Observed counts per minute:
- * 1    1   1   1   0   0   0   0
- *
- * Counts returned by counter:
- * 1    2   3   4   4   3   2   1
- * }
- * </pre>
- * <p/>
- * As you can see in this example, for the first 
<code>windowLengthInSlots</code> (here: the first five minutes) the
- * counter will always return counts equal or greater than in the previous 
iteration (1, 2, 3, 4, 4). This initial load
- * effect needs to be accounted for whenever you want to perform analyses such 
as trending topics; otherwise your
- * analysis algorithm might falsely identify the object to be trending as the 
counter seems to observe continuously
- * increasing counts. Also, note that during the initial load phase <em>every 
object</em> will exhibit increasing
- * counts.
- * <p/>
- * On a high-level, the counter exhibits the following behavior: If you asked 
the example counter after two minutes,
- * "how often did you count the object during the past five minutes?", then it 
should reply "I have counted it 2 times
- * in the past five minutes", implying that it can only account for the last 
two of those five minutes because the
- * counter was not running before that time.
- *
- * @param <T> The type of those objects we want to count.
- */
-public final class SlidingWindowCounter<T> implements Serializable {
-
-  private static final long serialVersionUID = -2645063988768785810L;
-
-  private SlotBasedCounter<T> objCounter;
-  private int headSlot;
-  private int tailSlot;
-  private int windowLengthInSlots;
-
-  public SlidingWindowCounter(int windowLengthInSlots) {
-    if (windowLengthInSlots < 2) {
-      throw new IllegalArgumentException(
-          "Window length in slots must be at least two (you requested " + 
windowLengthInSlots + ")");
-    }
-    this.windowLengthInSlots = windowLengthInSlots;
-    this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
-
-    this.headSlot = 0;
-    this.tailSlot = slotAfter(headSlot);
-  }
-
-  public void incrementCount(T obj) {
-    objCounter.incrementCount(obj, headSlot);
-  }
-
-  /**
-   * Return the current (total) counts of all tracked objects, then advance 
the window.
-   * <p/>
-   * Whenever this method is called, we consider the counts of the current 
sliding window to be available to and
-   * successfully processed "upstream" (i.e. by the caller). Knowing this we 
will start counting any subsequent
-   * objects within the next "chunk" of the sliding window.
-   *
-   * @return The current (total) counts of all tracked objects.
-   */
-  public Map<T, Long> getCountsThenAdvanceWindow() {
-    Map<T, Long> counts = objCounter.getCounts();
-    objCounter.wipeZeros();
-    objCounter.wipeSlot(tailSlot);
-    advanceHead();
-    return counts;
-  }
-
-  private void advanceHead() {
-    headSlot = tailSlot;
-    tailSlot = slotAfter(tailSlot);
-  }
-
-  private int slotAfter(int slot) {
-    return (slot + 1) % windowLengthInSlots;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java 
b/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
deleted file mode 100644
index 4b2d472..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * This class provides per-slot counts of the occurrences of objects.
- * <p/>
- * It can be used, for instance, as a building block for implementing sliding 
window counting of objects.
- *
- * @param <T> The type of those objects we want to count.
- */
-public final class SlotBasedCounter<T> implements Serializable {
-
-  private static final long serialVersionUID = 4858185737378394432L;
-
-  private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
-  private final int numSlots;
-
-  public SlotBasedCounter(int numSlots) {
-    if (numSlots <= 0) {
-      throw new IllegalArgumentException("Number of slots must be greater than 
zero (you requested " + numSlots + ")");
-    }
-    this.numSlots = numSlots;
-  }
-
-  public void incrementCount(T obj, int slot) {
-    long[] counts = objToCounts.get(obj);
-    if (counts == null) {
-      counts = new long[this.numSlots];
-      objToCounts.put(obj, counts);
-    }
-    counts[slot]++;
-  }
-
-  public long getCount(T obj, int slot) {
-    long[] counts = objToCounts.get(obj);
-    if (counts == null) {
-      return 0;
-    }
-    else {
-      return counts[slot];
-    }
-  }
-
-  public Map<T, Long> getCounts() {
-    Map<T, Long> result = new HashMap<T, Long>();
-    for (T obj : objToCounts.keySet()) {
-      result.put(obj, computeTotalCount(obj));
-    }
-    return result;
-  }
-
-  private long computeTotalCount(T obj) {
-    long[] curr = objToCounts.get(obj);
-    long total = 0;
-    for (long l : curr) {
-      total += l;
-    }
-    return total;
-  }
-
-  /**
-   * Reset the slot count of any tracked objects to zero for the given slot.
-   *
-   * @param slot
-   */
-  public void wipeSlot(int slot) {
-    for (T obj : objToCounts.keySet()) {
-      resetSlotCountToZero(obj, slot);
-    }
-  }
-
-  private void resetSlotCountToZero(T obj, int slot) {
-    long[] counts = objToCounts.get(obj);
-    counts[slot] = 0;
-  }
-
-  private boolean shouldBeRemovedFromCounter(T obj) {
-    return computeTotalCount(obj) == 0;
-  }
-
-  /**
-   * Remove any object from the counter whose total count is zero (to free up 
memory).
-   */
-  public void wipeZeros() {
-    Set<T> objToBeRemoved = new HashSet<T>();
-    for (T obj : objToCounts.keySet()) {
-      if (shouldBeRemovedFromCounter(obj)) {
-        objToBeRemoved.add(obj);
-      }
-    }
-    for (T obj : objToBeRemoved) {
-      objToCounts.remove(obj);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
 
b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
deleted file mode 100644
index bd8ecba..0000000
--- 
a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- * Contains some contributions under the Thrift Software License.
- * Please see doc/old-thrift-license.txt in the Thrift distribution for
- * details.
- */
-package storm.starter.trident;
-
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import storm.kafka.StringScheme;
-import storm.kafka.ZkHosts;
-import storm.kafka.bolt.KafkaBolt;
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
-import storm.kafka.bolt.selector.DefaultTopicSelector;
-import storm.kafka.trident.TransactionalTridentKafkaSpout;
-import storm.kafka.trident.TridentKafkaConfig;
-import storm.starter.spout.RandomSentenceSpout;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
-import storm.trident.operation.builtin.Count;
-import storm.trident.operation.builtin.FilterNull;
-import storm.trident.operation.builtin.MapGet;
-import storm.trident.testing.MemoryMapState;
-import storm.trident.testing.Split;
-
-import java.util.Properties;
-
-/**
- * A sample word count trident topology using transactional kafka spout that 
has the following components.
- * <ol>
- * <li> {@link KafkaBolt}
- * that receives random sentences from {@link RandomSentenceSpout} and
- * publishes the sentences to a kafka "test" topic.
- * </li>
- * <li> {@link TransactionalTridentKafkaSpout}
- * that consumes sentences from the "test" topic, splits it into words, 
aggregates
- * and stores the word count in a {@link MemoryMapState}.
- * </li>
- * <li> DRPC query
- * that returns the word counts by querying the trident state (MemoryMapState).
- * </li>
- * </ol>
- * <p>
- *     For more background read the <a 
href="https://storm.apache.org/documentation/Trident-tutorial.html";>trident 
tutorial</a>,
- *     <a href="https://storm.apache.org/documentation/Trident-state";>trident 
state</a> and
- *     <a 
href="https://github.com/apache/storm/tree/master/external/storm-kafka";> Storm 
Kafka </a>.
- * </p>
- */
-public class TridentKafkaWordCount {
-
-    private String zkUrl;
-    private String brokerUrl;
-
-    TridentKafkaWordCount(String zkUrl, String brokerUrl) {
-        this.zkUrl = zkUrl;
-        this.brokerUrl = brokerUrl;
-    }
-
-    /**
-     * Creates a transactional kafka spout that consumes any new data 
published to "test" topic.
-     * <p/>
-     * For more info on transactional spouts
-     * see "Transactional spouts" section in
-     * <a href="https://storm.apache.org/documentation/Trident-state";> Trident 
state</a> doc.
-     *
-     * @return a transactional trident kafka spout.
-     */
-    private TransactionalTridentKafkaSpout createKafkaSpout() {
-        ZkHosts hosts = new ZkHosts(zkUrl);
-        TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
-        config.scheme = new SchemeAsMultiScheme(new StringScheme());
-
-        // Consume new data from the topic
-        config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-        return new TransactionalTridentKafkaSpout(config);
-    }
-
-
-    private Stream addDRPCStream(TridentTopology tridentTopology, TridentState 
state, LocalDRPC drpc) {
-        return tridentTopology.newDRPCStream("words", drpc)
-                .each(new Fields("args"), new Split(), new Fields("word"))
-                .groupBy(new Fields("word"))
-                .stateQuery(state, new Fields("word"), new MapGet(), new 
Fields("count"))
-                .each(new Fields("count"), new FilterNull())
-                .project(new Fields("word", "count"));
-    }
-
-    private TridentState addTridentState(TridentTopology tridentTopology) {
-        return tridentTopology.newStream("spout1", 
createKafkaSpout()).parallelismHint(1)
-                .each(new Fields("str"), new Split(), new Fields("word"))
-                .groupBy(new Fields("word"))
-                .persistentAggregate(new MemoryMapState.Factory(), new 
Count(), new Fields("count"))
-                .parallelismHint(1);
-    }
-
-    /**
-     * Creates a trident topology that consumes sentences from the kafka 
"test" topic using a
-     * {@link TransactionalTridentKafkaSpout} computes the word count and 
stores it in a {@link MemoryMapState}.
-     * A DRPC stream is then created to query the word counts.
-     * @param drpc
-     * @return
-     */
-    public StormTopology buildConsumerTopology(LocalDRPC drpc) {
-        TridentTopology tridentTopology = new TridentTopology();
-        addDRPCStream(tridentTopology, addTridentState(tridentTopology), drpc);
-        return tridentTopology.build();
-    }
-
-    /**
-     * Return the consumer topology config.
-     *
-     * @return the topology config
-     */
-    public Config getConsumerConfig() {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        //  conf.setDebug(true);
-        return conf;
-    }
-
-    /**
-     * A topology that produces random sentences using {@link 
RandomSentenceSpout} and
-     * publishes the sentences using a KafkaBolt to kafka "test" topic.
-     *
-     * @return the storm topology
-     */
-    public StormTopology buildProducerTopology(Properties prop) {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomSentenceSpout(), 2);
-        /**
-         * The output field of the RandomSentenceSpout ("word") is provided as 
the boltMessageField
-         * so that this gets written out as the message in the kafka topic.
-         */
-        KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop)
-                .withTopicSelector(new DefaultTopicSelector("test"))
-                .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("key", "word"));
-        builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
-        return builder.createTopology();
-    }
-
-    /**
-     * Returns the storm config for the topology that publishes sentences to 
kafka "test" topic using a kafka bolt.
-     * The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt.
-     *
-     * @return the topology config
-     */
-    public Properties getProducerConfig() {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
-        return props;
-    }
-
-    /**
-     * <p>
-     * To run this topology ensure you have a kafka broker running.
-     * </p>
-     * Create a topic test with command line,
-     * kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partition 1 --topic test
-     */
-    public static void main(String[] args) throws Exception {
-
-        String zkUrl = "localhost:2181";        // the defaults.
-        String brokerUrl = "localhost:9092";
-
-        if (args.length > 2 || (args.length == 1 && 
args[0].matches("^-h|--help$"))) {
-            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper 
url] [kafka broker url]");
-            System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" 
+ " [" + brokerUrl + "]");
-            System.exit(1);
-        } else if (args.length == 1) {
-            zkUrl = args[0];
-        } else if (args.length == 2) {
-            zkUrl = args[0];
-            brokerUrl = args[1];
-        }
-
-        System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker 
url: " + brokerUrl);
-
-        TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, 
brokerUrl);
-
-        LocalDRPC drpc = new LocalDRPC();
-        LocalCluster cluster = new LocalCluster();
-
-        // submit the consumer topology.
-        cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), 
wordCount.buildConsumerTopology(drpc));
-
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        // submit the producer topology.
-        cluster.submitTopology("kafkaBolt", conf, 
wordCount.buildProducerTopology(wordCount.getProducerConfig()));
-
-        // keep querying the word counts for a minute.
-        for (int i = 0; i < 60; i++) {
-            System.out.println("DRPC RESULT: " + drpc.execute("words", "the 
and apple snow jumped"));
-            Thread.sleep(1000);
-        }
-
-        cluster.killTopology("kafkaBolt");
-        cluster.killTopology("wordCounter");
-        cluster.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java 
b/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
deleted file mode 100644
index 2d87c47..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.trident;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.IMetricsContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.builtin.MapGet;
-import storm.trident.operation.builtin.Sum;
-import storm.trident.state.ReadOnlyState;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.map.ReadOnlyMapState;
-import storm.trident.tuple.TridentTuple;
-
-import java.util.*;
-
-public class TridentReach {
-  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, 
List<String>>() {{
-    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", 
"nathan"));
-    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", 
"sally", "nathan"));
-    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
-  }};
-
-  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, 
List<String>>() {{
-    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", 
"jai"));
-    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", 
"vivian"));
-    put("tim", Arrays.asList("alex"));
-    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", 
"vivian", "emily", "jordan"));
-    put("adam", Arrays.asList("david", "carissa"));
-    put("mike", Arrays.asList("john", "bob"));
-    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
-  }};
-
-  public static class StaticSingleKeyMapState extends ReadOnlyState implements 
ReadOnlyMapState<Object> {
-    public static class Factory implements StateFactory {
-      Map _map;
-
-      public Factory(Map map) {
-        _map = map;
-      }
-
-      @Override
-      public State makeState(Map conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
-        return new StaticSingleKeyMapState(_map);
-      }
-
-    }
-
-    Map _map;
-
-    public StaticSingleKeyMapState(Map map) {
-      _map = map;
-    }
-
-
-    @Override
-    public List<Object> multiGet(List<List<Object>> keys) {
-      List<Object> ret = new ArrayList();
-      for (List<Object> key : keys) {
-        Object singleKey = key.get(0);
-        ret.add(_map.get(singleKey));
-      }
-      return ret;
-    }
-
-  }
-
-  public static class One implements CombinerAggregator<Integer> {
-    @Override
-    public Integer init(TridentTuple tuple) {
-      return 1;
-    }
-
-    @Override
-    public Integer combine(Integer val1, Integer val2) {
-      return 1;
-    }
-
-    @Override
-    public Integer zero() {
-      return 1;
-    }
-  }
-
-  public static class ExpandList extends BaseFunction {
-
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-      List l = (List) tuple.getValue(0);
-      if (l != null) {
-        for (Object o : l) {
-          collector.emit(new Values(o));
-        }
-      }
-    }
-
-  }
-
-  public static StormTopology buildTopology(LocalDRPC drpc) {
-    TridentTopology topology = new TridentTopology();
-    TridentState urlToTweeters = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(TWEETERS_DB));
-    TridentState tweetersToFollowers = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
-
-
-    topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new 
Fields("args"), new MapGet(), new Fields(
-        "tweeters")).each(new Fields("tweeters"), new ExpandList(), new 
Fields("tweeter")).shuffle().stateQuery(
-        tweetersToFollowers, new Fields("tweeter"), new MapGet(), new 
Fields("followers")).each(new Fields("followers"),
-        new ExpandList(), new Fields("follower")).groupBy(new 
Fields("follower")).aggregate(new One(), new Fields(
-        "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
-    return topology.build();
-  }
-
-  public static void main(String[] args) throws Exception {
-    LocalDRPC drpc = new LocalDRPC();
-
-    Config conf = new Config();
-    LocalCluster cluster = new LocalCluster();
-
-    cluster.submitTopology("reach", conf, buildTopology(drpc));
-
-    Thread.sleep(2000);
-
-    System.out.println("REACH: " + drpc.execute("reach", "aaa"));
-    System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
-    System.out.println("REACH: " + drpc.execute("reach", 
"engineering.twitter.com/blog/5"));
-
-
-    cluster.shutdown();
-    drpc.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java 
b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
deleted file mode 100644
index e4a2d2e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.trident;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.builtin.Count;
-import storm.trident.operation.builtin.FilterNull;
-import storm.trident.operation.builtin.MapGet;
-import storm.trident.operation.builtin.Sum;
-import storm.trident.testing.FixedBatchSpout;
-import storm.trident.testing.MemoryMapState;
-import storm.trident.tuple.TridentTuple;
-
-
-public class TridentWordCount {
-  public static class Split extends BaseFunction {
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-      String sentence = tuple.getString(0);
-      for (String word : sentence.split(" ")) {
-        collector.emit(new Values(word));
-      }
-    }
-  }
-
-  public static StormTopology buildTopology(LocalDRPC drpc) {
-    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new 
Values("the cow jumped over the moon"),
-        new Values("the man went to the store and bought some candy"), new 
Values("four score and seven years ago"),
-        new Values("how many apples can you eat"), new Values("to be or not to 
be the person"));
-    spout.setCycle(true);
-
-    TridentTopology topology = new TridentTopology();
-    TridentState wordCounts = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
-        new Split(), new Fields("word")).groupBy(new 
Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
-        new Count(), new Fields("count")).parallelismHint(16);
-
-    topology.newDRPCStream("words", drpc).each(new Fields("args"), new 
Split(), new Fields("word")).groupBy(new Fields(
-        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new 
Fields("count")).each(new Fields("count"),
-        new FilterNull()).aggregate(new Fields("count"), new Sum(), new 
Fields("sum"));
-    return topology.build();
-  }
-
-  public static void main(String[] args) throws Exception {
-    Config conf = new Config();
-    conf.setMaxSpoutPending(20);
-    if (args.length == 0) {
-      LocalDRPC drpc = new LocalDRPC();
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
-      for (int i = 0; i < 100; i++) {
-        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the 
dog jumped"));
-        Thread.sleep(1000);
-      }
-    }
-    else {
-      conf.setNumWorkers(3);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, 
buildTopology(null));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java 
b/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
deleted file mode 100644
index eb25a86..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.util;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.StormTopology;
-
-public final class StormRunner {
-
-  private static final int MILLIS_IN_SEC = 1000;
-
-  private StormRunner() {
-  }
-
-  public static void runTopologyLocally(StormTopology topology, String 
topologyName, Config conf, int runtimeInSeconds)
-      throws InterruptedException {
-    LocalCluster cluster = new LocalCluster();
-    cluster.submitTopology(topologyName, conf, topology);
-    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
-    cluster.killTopology(topologyName);
-    cluster.shutdown();
-  }
-
-  public static void runTopologyRemotely(StormTopology topology, String 
topologyName, Config conf)
-      throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException {
-    StormSubmitter.submitTopology(topologyName, conf, topology);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java
 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java
new file mode 100644
index 0000000..23326c5
--- /dev/null
+++ 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MockTupleHelpers;
+import com.google.common.collect.Lists;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class IntermediateRankingsBoltTest {
+
+  private static final String ANY_NON_SYSTEM_COMPONENT_ID = 
"irrelevant_component_id";
+  private static final String ANY_NON_SYSTEM_STREAM_ID = 
"irrelevant_stream_id";
+  private static final Object ANY_OBJECT = new Object();
+  private static final int ANY_TOPN = 10;
+  private static final long ANY_COUNT = 42;
+
+  private Tuple mockRankableTuple(Object obj, long count) {
+    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, 
ANY_NON_SYSTEM_STREAM_ID);
+    when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, 
ANY_COUNT));
+    return tuple;
+  }
+
+  @DataProvider
+  public Object[][] illegalTopN() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = 
"illegalTopN")
+  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
+    new IntermediateRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] illegalEmitFrequency() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = 
"illegalEmitFrequency")
+  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int 
emitFrequencyInSeconds) {
+    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @DataProvider
+  public Object[][] legalTopN() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalTopN")
+  public void positiveTopNShouldBeOk(int topN) {
+    new IntermediateRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] legalEmitFrequency() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalEmitFrequency")
+  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
+    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @Test
+  public void shouldEmitSomethingIfTickTupleIsReceived() {
+    // given
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    bolt.execute(tickTuple, collector);
+
+    // then
+    // verifyZeroInteractions(collector);
+    verify(collector).emit(any(Values.class));
+  }
+
+  @Test
+  public void shouldEmitNothingIfNormalTupleIsReceived() {
+    // given
+    Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    bolt.execute(normalTuple, collector);
+
+    // then
+    verifyZeroInteractions(collector);
+  }
+
+  @Test
+  public void shouldDeclareOutputFields() {
+    // given
+    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    bolt.declareOutputFields(declarer);
+
+    // then
+    verify(declarer, times(1)).declare(any(Fields.class));
+  }
+
+  @Test
+  public void 
shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+    // given
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+    // then
+    
assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    Integer emitFrequencyInSeconds = (Integer) 
componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java
 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java
new file mode 100644
index 0000000..d068e59
--- /dev/null
+++ 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MockTupleHelpers;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class RollingCountBoltTest {
+
+  private static final String ANY_NON_SYSTEM_COMPONENT_ID = 
"irrelevant_component_id";
+  private static final String ANY_NON_SYSTEM_STREAM_ID = 
"irrelevant_stream_id";
+
+  private Tuple mockNormalTuple(Object obj) {
+    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, 
ANY_NON_SYSTEM_STREAM_ID);
+    when(tuple.getValue(0)).thenReturn(obj);
+    return tuple;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void 
shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
+    // given
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+    RollingCountBolt bolt = new RollingCountBolt();
+    Map conf = mock(Map.class);
+    TopologyContext context = mock(TopologyContext.class);
+    OutputCollector collector = mock(OutputCollector.class);
+    bolt.prepare(conf, context, collector);
+
+    // when
+    bolt.execute(tickTuple);
+
+    // then
+    verifyZeroInteractions(collector);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void 
shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
+    // given
+    Tuple normalTuple = mockNormalTuple(new Object());
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+
+    RollingCountBolt bolt = new RollingCountBolt();
+    Map conf = mock(Map.class);
+    TopologyContext context = mock(TopologyContext.class);
+    OutputCollector collector = mock(OutputCollector.class);
+    bolt.prepare(conf, context, collector);
+
+    // when
+    bolt.execute(normalTuple);
+    bolt.execute(tickTuple);
+
+    // then
+    verify(collector).emit(any(Values.class));
+  }
+
+  @Test
+  public void shouldDeclareOutputFields() {
+    // given
+    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+    RollingCountBolt bolt = new RollingCountBolt();
+
+    // when
+    bolt.declareOutputFields(declarer);
+
+    // then
+    verify(declarer, times(1)).declare(any(Fields.class));
+
+  }
+
+  @Test
+  public void 
shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+    // given
+    RollingCountBolt bolt = new RollingCountBolt();
+
+    // when
+    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+    // then
+    
assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    Integer emitFrequencyInSeconds = (Integer) 
componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java
 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java
new file mode 100644
index 0000000..c3582d5
--- /dev/null
+++ 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MockTupleHelpers;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import org.apache.storm.starter.tools.Rankings;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class TotalRankingsBoltTest {
+
+  private static final String ANY_NON_SYSTEM_COMPONENT_ID = 
"irrelevant_component_id";
+  private static final String ANY_NON_SYSTEM_STREAM_ID = 
"irrelevant_stream_id";
+  private static final Object ANY_OBJECT = new Object();
+  private static final int ANY_TOPN = 10;
+  private static final long ANY_COUNT = 42;
+
+  private Tuple mockRankingsTuple(Object obj, long count) {
+    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, 
ANY_NON_SYSTEM_STREAM_ID);
+    Rankings rankings = mock(Rankings.class);
+    when(tuple.getValue(0)).thenReturn(rankings);
+    return tuple;
+  }
+
+  @DataProvider
+  public Object[][] illegalTopN() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = 
"illegalTopN")
+  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
+    new TotalRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] illegalEmitFrequency() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = 
"illegalEmitFrequency")
+  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int 
emitFrequencyInSeconds) {
+    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @DataProvider
+  public Object[][] legalTopN() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalTopN")
+  public void positiveTopNShouldBeOk(int topN) {
+    new TotalRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] legalEmitFrequency() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalEmitFrequency")
+  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
+    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @Test
+  public void shouldEmitSomethingIfTickTupleIsReceived() {
+    // given
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    bolt.execute(tickTuple, collector);
+
+    // then
+    // verifyZeroInteractions(collector);
+    verify(collector).emit(any(Values.class));
+  }
+
+  @Test
+  public void shouldEmitNothingIfNormalTupleIsReceived() {
+    // given
+    Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    bolt.execute(normalTuple, collector);
+
+    // then
+    verifyZeroInteractions(collector);
+  }
+
+  @Test
+  public void shouldDeclareOutputFields() {
+    // given
+    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    bolt.declareOutputFields(declarer);
+
+    // then
+    verify(declarer, times(1)).declare(any(Fields.class));
+  }
+
+  @Test
+  public void 
shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+    // given
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+    // then
+    
assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    Integer emitFrequencyInSeconds = (Integer) 
componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
new file mode 100644
index 0000000..a28ea38
--- /dev/null
+++ 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import org.apache.storm.utils.Time;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+
+public class NthLastModifiedTimeTrackerTest {
+
+  private static final int ANY_NUM_TIMES_TO_TRACK = 3;
+  private static final int MILLIS_IN_SEC = 1000;
+
+  @DataProvider
+  public Object[][] illegalNumTimesData() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = 
"illegalNumTimesData")
+  public void negativeOrZeroNumTimesToTrackShouldThrowIAE(int numTimesToTrack) 
{
+    new NthLastModifiedTimeTracker(numTimesToTrack);
+  }
+
+  @DataProvider
+  public Object[][] legalNumTimesData() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalNumTimesData")
+  public void positiveNumTimesToTrackShouldBeOk(int numTimesToTrack) {
+    new NthLastModifiedTimeTracker(numTimesToTrack);
+  }
+
+  @DataProvider
+  public Object[][] whenNotYetMarkedAsModifiedData() {
+    return new Object[][]{ { 0 }, { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 8 }, { 
10 } };
+  }
+
+  @Test(dataProvider = "whenNotYetMarkedAsModifiedData")
+  public void 
shouldReturnCorrectModifiedTimeEvenWhenNotYetMarkedAsModified(int 
secondsToAdvance) {
+    // given
+    Time.startSimulating();
+    NthLastModifiedTimeTracker tracker = new 
NthLastModifiedTimeTracker(ANY_NUM_TIMES_TO_TRACK);
+
+    // when
+    advanceSimulatedTimeBy(secondsToAdvance);
+    int seconds = tracker.secondsSinceOldestModification();
+
+    // then
+    assertThat(seconds).isEqualTo(secondsToAdvance);
+
+    // cleanup
+    Time.stopSimulating();
+  }
+
+  @DataProvider
+  public Object[][] simulatedTrackerIterations() {
+    return new Object[][]{ { 1, new int[]{ 0, 1 }, new int[]{ 0, 0 } }, { 1, 
new int[]{ 0, 2 }, new int[]{ 0, 0 } },
+        { 2, new int[]{ 2, 2 }, new int[]{ 2, 2 } }, { 2, new int[]{ 0, 4 }, 
new int[]{ 0, 4 } },
+        { 1, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 
} },
+        { 1, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 
} },
+        { 2, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 1, 1, 1, 1, 1, 1 
} },
+        { 2, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 2, 2, 2, 2, 2, 2 
} },
+        { 2, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 2, 3, 4, 5, 6, 7 
} },
+        { 3, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 2, 2, 2, 2, 2 
} },
+        { 3, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 5, 7, 9, 11, 
13 } },
+        { 3, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 4, 4, 4, 4, 4 
} },
+        { 4, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 3, 3, 3, 3 
} },
+        { 4, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 9, 12, 15, 
18 } },
+        { 4, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 6, 6, 6, 6 
} },
+        { 5, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 4, 4, 4 
} },
+        { 5, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 14, 18, 
22 } },
+        { 5, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 8, 8, 8 
} },
+        { 6, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 5, 5, 5 
} },
+        { 6, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 15, 20, 
25 } },
+        { 6, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 10, 10, 
10 } },
+        { 3, new int[]{ 1, 2, 3 }, new int[]{ 1, 3, 5 } } };
+  }
+
+  @Test(dataProvider = "simulatedTrackerIterations")
+  public void shouldReturnCorrectModifiedTimeWhenMarkedAsModified(int 
numTimesToTrack,
+      int[] secondsToAdvancePerIteration, int[] expLastModifiedTimes) {
+    // given
+    Time.startSimulating();
+    NthLastModifiedTimeTracker tracker = new 
NthLastModifiedTimeTracker(numTimesToTrack);
+
+    int[] modifiedTimes = new int[expLastModifiedTimes.length];
+
+    // when
+    int i = 0;
+    for (int secondsToAdvance : secondsToAdvancePerIteration) {
+      advanceSimulatedTimeBy(secondsToAdvance);
+      tracker.markAsModified();
+      modifiedTimes[i] = tracker.secondsSinceOldestModification();
+      i++;
+    }
+
+    // then
+    assertThat(modifiedTimes).isEqualTo(expLastModifiedTimes);
+
+    // cleanup
+    Time.stopSimulating();
+  }
+
+  private void advanceSimulatedTimeBy(int seconds) {
+    Time.advanceTime(seconds * MILLIS_IN_SEC);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java
 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java
new file mode 100644
index 0000000..9837569
--- /dev/null
+++ 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import org.apache.storm.tuple.Tuple;
+import com.google.common.collect.Lists;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class RankableObjectWithFieldsTest {
+
+  private static final Object ANY_OBJECT = new Object();
+  private static final long ANY_COUNT = 271;
+  private static final String ANY_FIELD = "someAdditionalField";
+  private static final int GREATER_THAN = 1;
+  private static final int EQUAL_TO = 0;
+  private static final int SMALLER_THAN = -1;
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void constructorWithNullObjectAndNoFieldsShouldThrowIAE() {
+    new RankableObjectWithFields(null, ANY_COUNT);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void constructorWithNullObjectAndFieldsShouldThrowIAE() {
+    Object someAdditionalField = new Object();
+    new RankableObjectWithFields(null, ANY_COUNT, someAdditionalField);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void constructorWithNegativeCountAndNoFieldsShouldThrowIAE() {
+    new RankableObjectWithFields(ANY_OBJECT, -1);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void constructorWithNegativeCountAndFieldsShouldThrowIAE() {
+    Object someAdditionalField = new Object();
+    new RankableObjectWithFields(ANY_OBJECT, -1, someAdditionalField);
+  }
+
+  @Test
+  public void shouldBeEqualToItself() {
+    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, 
ANY_COUNT);
+    assertThat(r).isEqualTo(r);
+  }
+
+  @DataProvider
+  public Object[][] otherClassesData() {
+    return new Object[][]{ { new String("foo") }, { new Object() }, { 
Integer.valueOf(4) }, { Lists.newArrayList(7, 8,
+        9) } };
+  }
+
+  @Test(dataProvider = "otherClassesData")
+  public void shouldNotBeEqualToInstancesOfOtherClasses(Object notARankable) {
+    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, 
ANY_COUNT);
+    assertFalse(r.equals(notARankable), r + " is equal to " + notARankable + " 
but it should not be");
+  }
+
+  @DataProvider
+  public Object[][] falseDuplicatesData() {
+    return new Object[][]{ { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("foo", 1) },
+        { new RankableObjectWithFields("foo", 1), new 
RankableObjectWithFields("Foo", 1) },
+        { new RankableObjectWithFields("foo", 1), new 
RankableObjectWithFields("FOO", 1) },
+        { new RankableObjectWithFields("foo", 1), new 
RankableObjectWithFields("bar", 1) },
+        { new RankableObjectWithFields("", 0), new 
RankableObjectWithFields("", 1) }, { new RankableObjectWithFields("",
+        1), new RankableObjectWithFields("bar", 1) } };
+  }
+
+  @Test(dataProvider = "falseDuplicatesData")
+  public void shouldNotBeEqualToFalseDuplicates(RankableObjectWithFields r, 
RankableObjectWithFields falseDuplicate) {
+    assertFalse(r.equals(falseDuplicate), r + " is equal to " + falseDuplicate 
+ " but it should not be");
+  }
+
+  @Test(dataProvider = "falseDuplicatesData")
+  public void 
shouldHaveDifferentHashCodeThanFalseDuplicates(RankableObjectWithFields r,
+      RankableObjectWithFields falseDuplicate) {
+    assertThat(r.hashCode()).isNotEqualTo(falseDuplicate.hashCode());
+  }
+
+  @DataProvider
+  public Object[][] trueDuplicatesData() {
+    return new Object[][]{ { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("foo", 0) },
+        { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("foo", 0, "someOtherField") },
+        { new RankableObjectWithFields("foo", 0, "someField"), new 
RankableObjectWithFields("foo", 0,
+            "someOtherField") } };
+  }
+
+  @Test(dataProvider = "trueDuplicatesData")
+  public void shouldBeEqualToTrueDuplicates(RankableObjectWithFields r, 
RankableObjectWithFields trueDuplicate) {
+    assertTrue(r.equals(trueDuplicate), r + " is not equal to " + 
trueDuplicate + " but it should be");
+  }
+
+  @Test(dataProvider = "trueDuplicatesData")
+  public void shouldHaveSameHashCodeAsTrueDuplicates(RankableObjectWithFields 
r,
+      RankableObjectWithFields trueDuplicate) {
+    assertThat(r.hashCode()).isEqualTo(trueDuplicate.hashCode());
+  }
+
+  @DataProvider
+  public Object[][] compareToData() {
+    return new Object[][]{ { new RankableObjectWithFields("foo", 1000), new 
RankableObjectWithFields("foo", 0),
+        GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new 
RankableObjectWithFields("foo", 0),
+        GREATER_THAN }, { new RankableObjectWithFields("foo", 1000), new 
RankableObjectWithFields("bar", 0),
+        GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new 
RankableObjectWithFields("bar", 0),
+        GREATER_THAN }, { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("foo", 0), EQUAL_TO },
+        { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("bar", 0), EQUAL_TO },
+        { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("foo", 1000), SMALLER_THAN },
+        { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("foo", 1), SMALLER_THAN },
+        { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("bar", 1), SMALLER_THAN },
+        { new RankableObjectWithFields("foo", 0), new 
RankableObjectWithFields("bar", 1000), SMALLER_THAN }, };
+  }
+
+  @Test(dataProvider = "compareToData")
+  public void verifyCompareTo(RankableObjectWithFields first, 
RankableObjectWithFields second, int expCompareToValue) {
+    assertThat(first.compareTo(second)).isEqualTo(expCompareToValue);
+  }
+
+  @DataProvider
+  public Object[][] toStringData() {
+    return new Object[][]{ { new String("foo"), 0L }, { new String("BAR"), 8L 
} };
+  }
+
+  @Test(dataProvider = "toStringData")
+  public void 
toStringShouldContainStringRepresentationsOfObjectAndCount(Object obj, long 
count) {
+    // given
+    RankableObjectWithFields r = new RankableObjectWithFields(obj, count);
+
+    // when
+    String strRepresentation = r.toString();
+
+    // then
+    assertThat(strRepresentation).contains(obj.toString()).contains("" + 
count);
+  }
+
+  @Test
+  public void shouldReturnTheObject() {
+    // given
+    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, 
ANY_COUNT, ANY_FIELD);
+
+    // when
+    Object obj = r.getObject();
+
+    // then
+    assertThat(obj).isEqualTo(ANY_OBJECT);
+  }
+
+  @Test
+  public void shouldReturnTheCount() {
+    // given
+    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, 
ANY_COUNT, ANY_FIELD);
+
+    // when
+    long count = r.getCount();
+
+    // then
+    assertThat(count).isEqualTo(ANY_COUNT);
+  }
+
+  @DataProvider
+  public Object[][] fieldsData() {
+    return new Object[][]{ { ANY_OBJECT, ANY_COUNT, new Object[]{ ANY_FIELD } 
},
+        { "quux", 42L, new Object[]{ "one", "two", "three" } } };
+  }
+
+  @Test(dataProvider = "fieldsData")
+  public void shouldReturnTheFields(Object obj, long count, Object[] fields) {
+    // given
+    RankableObjectWithFields r = new RankableObjectWithFields(obj, count, 
fields);
+
+    // when
+    List<Object> actualFields = r.getFields();
+
+    // then
+    assertThat(actualFields).isEqualTo(Lists.newArrayList(fields));
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void fieldsShouldBeImmutable() {
+    // given
+    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, 
ANY_COUNT, ANY_FIELD);
+
+    // when
+    List<Object> fields = r.getFields();
+    // try to modify the list, which should fail
+    fields.remove(0);
+
+    // then (exception)
+  }
+
+  @Test
+  public void shouldCreateRankableObjectFromTuple() {
+    // given
+    Tuple tuple = mock(Tuple.class);
+    List<Object> tupleValues = Lists.newArrayList(ANY_OBJECT, ANY_COUNT, 
ANY_FIELD);
+    when(tuple.getValues()).thenReturn(tupleValues);
+
+    // when
+    RankableObjectWithFields r = RankableObjectWithFields.from(tuple);
+
+    // then
+    assertThat(r.getObject()).isEqualTo(ANY_OBJECT);
+    assertThat(r.getCount()).isEqualTo(ANY_COUNT);
+    List<Object> fields = new ArrayList<Object>();
+    fields.add(ANY_FIELD);
+    assertThat(r.getFields()).isEqualTo(fields);
+
+  }
+
+  @DataProvider
+  public Object[][] copyData() {
+    return new Object[][]{ { new RankableObjectWithFields("foo", 0) }, { new 
RankableObjectWithFields("foo", 3,
+        "someOtherField") }, { new RankableObjectWithFields("foo", 0, 
"someField") } };
+  }
+
+  // TODO: What would be a good test to ensure that RankableObjectWithFields 
is at least somewhat defensively copied?
+  //       The contract of Rankable#copy() returns a Rankable value, not a 
RankableObjectWithFields.
+  @Test(dataProvider = "copyData")
+  public void copyShouldReturnCopy(RankableObjectWithFields original) {
+    // given
+
+    // when
+    Rankable copy = original.copy();
+
+    // then
+    assertThat(copy.getObject()).isEqualTo(original.getObject());
+    assertThat(copy.getCount()).isEqualTo(original.getCount());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java
 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java
new file mode 100644
index 0000000..245c552
--- /dev/null
+++ 
b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.jmock.lib.concurrent.Blitzer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+
+public class RankingsTest {
+
+  private static final int ANY_TOPN = 42;
+  private static final Rankable ANY_RANKABLE = new 
RankableObjectWithFields("someObject", ANY_TOPN);
+  private static final Rankable ZERO = new 
RankableObjectWithFields("ZERO_COUNT", 0);
+  private static final Rankable A = new RankableObjectWithFields("A", 1);
+  private static final Rankable B = new RankableObjectWithFields("B", 2);
+  private static final Rankable C = new RankableObjectWithFields("C", 3);
+  private static final Rankable D = new RankableObjectWithFields("D", 4);
+  private static final Rankable E = new RankableObjectWithFields("E", 5);
+  private static final Rankable F = new RankableObjectWithFields("F", 6);
+  private static final Rankable G = new RankableObjectWithFields("G", 7);
+  private static final Rankable H = new RankableObjectWithFields("H", 8);
+
+  @DataProvider
+  public Object[][] illegalTopNData() {
+    return new Object[][]{ { 0 }, { -1 }, { -2 }, { -10 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = 
"illegalTopNData")
+  public void constructorWithNegativeOrZeroTopNShouldThrowIAE(int topN) {
+    new Rankings(topN);
+  }
+
+  @DataProvider
+  public Object[][] copyRankingsData() {
+    return new Object[][]{ { 5, Lists.newArrayList(A, B, C) }, { 2, 
Lists.newArrayList(A, B, C, D) },
+        { 1, Lists.newArrayList() }, { 1, Lists.newArrayList(A) }, { 1, 
Lists.newArrayList(A, B) } };
+  }
+
+  @Test(dataProvider = "copyRankingsData")
+  public void copyConstructorShouldReturnCopy(int topN, List<Rankable> 
rankables) {
+    // given
+    Rankings rankings = new Rankings(topN);
+    for (Rankable r : rankables) {
+      rankings.updateWith(r);
+    }
+
+    // when
+    Rankings copy = new Rankings(rankings);
+
+    // then
+    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
+    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
+  }
+
+  @DataProvider
+  public Object[][] defensiveCopyRankingsData() {
+    return new Object[][]{ { 5, Lists.newArrayList(A, B, C), 
Lists.newArrayList(D) }, { 2, Lists.newArrayList(A, B, C,
+        D), Lists.newArrayList(E, F) }, { 1, Lists.newArrayList(), 
Lists.newArrayList(A) }, { 1, Lists.newArrayList(A),
+        Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO), 
Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO),
+        Lists.newArrayList() } };
+  }
+
+  @Test(dataProvider = "defensiveCopyRankingsData")
+  public void copyConstructorShouldReturnDefensiveCopy(int topN, 
List<Rankable> rankables, List<Rankable> changes) {
+    // given
+    Rankings original = new Rankings(topN);
+    for (Rankable r : rankables) {
+      original.updateWith(r);
+    }
+    int expSize = original.size();
+    List<Rankable> expRankings = original.getRankings();
+
+    // when
+    Rankings copy = new Rankings(original);
+    for (Rankable r : changes) {
+      copy.updateWith(r);
+    }
+
+    // then
+    assertThat(original.size()).isEqualTo(expSize);
+    assertThat(original.getRankings()).isEqualTo(expRankings);
+  }
+
+  @DataProvider
+  public Object[][] legalTopNData() {
+    return new Object[][]{ { 1 }, { 2 }, { 1000 }, { 1000000 } };
+  }
+
+  @Test(dataProvider = "legalTopNData")
+  public void constructorWithPositiveTopNShouldBeOk(int topN) {
+    // given/when
+    Rankings rankings = new Rankings(topN);
+
+    // then
+    assertThat(rankings.maxSize()).isEqualTo(topN);
+  }
+
+  @Test
+  public void shouldHaveDefaultConstructor() {
+    new Rankings();
+  }
+
+  @Test
+  public void defaultConstructorShouldSetPositiveTopN() {
+    // given/when
+    Rankings rankings = new Rankings();
+
+    // then
+    assertThat(rankings.maxSize()).isGreaterThan(0);
+  }
+
+  @DataProvider
+  public Object[][] rankingsGrowData() {
+    return new Object[][]{ { 2, Lists.newArrayList(new 
RankableObjectWithFields("A", 1), new RankableObjectWithFields(
+        "B", 2), new RankableObjectWithFields("C", 3)) }, { 2, 
Lists.newArrayList(new RankableObjectWithFields("A", 1),
+        new RankableObjectWithFields("B", 2), new 
RankableObjectWithFields("C", 3), new RankableObjectWithFields("D",
+        4)) } };
+  }
+
+  @Test(dataProvider = "rankingsGrowData")
+  public void sizeOfRankingsShouldNotGrowBeyondTopN(int topN, List<Rankable> 
rankables) {
+    // sanity check of the provided test data
+    assertThat(rankables.size()).overridingErrorMessage(
+        "The supplied test data is not correct: the number of rankables <%d> 
should be greater than <%d>",
+        rankables.size(), topN).isGreaterThan(topN);
+
+    // given
+    Rankings rankings = new Rankings(topN);
+
+    // when
+    for (Rankable r : rankables) {
+      rankings.updateWith(r);
+    }
+
+    // then
+    assertThat(rankings.size()).isLessThanOrEqualTo(rankings.maxSize());
+  }
+
+  @DataProvider
+  public Object[][] simulatedRankingsData() {
+    return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A) }, { 
Lists.newArrayList(B, D, A, C),
+        Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B, F, A, C, D, 
E), Lists.newArrayList(F, E, D, C, B,
+        A) }, { Lists.newArrayList(G, B, F, A, C, D, E, H), 
Lists.newArrayList(H, G, F, E, D, C, B, A) } };
+  }
+
+  @Test(dataProvider = "simulatedRankingsData")
+  public void shouldCorrectlyRankWhenUpdatedWithRankables(List<Rankable> 
unsorted, List<Rankable> expSorted) {
+    // given
+    Rankings rankings = new Rankings(unsorted.size());
+
+    // when
+    for (Rankable r : unsorted) {
+      rankings.updateWith(r);
+    }
+
+    // then
+    assertThat(rankings.getRankings()).isEqualTo(expSorted);
+  }
+
+  @Test(dataProvider = "simulatedRankingsData")
+  public void 
shouldCorrectlyRankWhenEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted,
+      List<Rankable> expSorted) {
+    // given
+    Rankings rankings = new Rankings(unsorted.size());
+    Rankings otherRankings = new Rankings(rankings.maxSize());
+    for (Rankable r : unsorted) {
+      otherRankings.updateWith(r);
+    }
+
+    // when
+    rankings.updateWith(otherRankings);
+
+    // then
+    assertThat(rankings.getRankings()).isEqualTo(expSorted);
+  }
+
+  @Test(dataProvider = "simulatedRankingsData")
+  public void 
shouldCorrectlyRankWhenUpdatedWithEmptyOtherRankings(List<Rankable> unsorted, 
List<Rankable> expSorted) {
+    // given
+    Rankings rankings = new Rankings(unsorted.size());
+    for (Rankable r : unsorted) {
+      rankings.updateWith(r);
+    }
+    Rankings emptyRankings = new Rankings(ANY_TOPN);
+
+    // when
+    rankings.updateWith(emptyRankings);
+
+    // then
+    assertThat(rankings.getRankings()).isEqualTo(expSorted);
+  }
+
+  @DataProvider
+  public Object[][] simulatedRankingsAndOtherRankingsData() {
+    return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A), 
Lists.newArrayList(A) },
+        { Lists.newArrayList(A, C), Lists.newArrayList(B, D), 
Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B,
+        F, A), Lists.newArrayList(C, D, E), Lists.newArrayList(F, E, D, C, B, 
A) }, { Lists.newArrayList(G, B, F, A, C),
+        Lists.newArrayList(D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, 
A) } };
+  }
+
+  @Test(dataProvider = "simulatedRankingsAndOtherRankingsData")
+  public void 
shouldCorrectlyRankWhenNotEmptyAndUpdatedWithOtherRankings(List<Rankable> 
unsorted,
+      List<Rankable> unsortedForOtherRankings, List<Rankable> expSorted) {
+    // given
+    Rankings rankings = new Rankings(expSorted.size());
+    for (Rankable r : unsorted) {
+      rankings.updateWith(r);
+    }
+    Rankings otherRankings = new Rankings(unsortedForOtherRankings.size());
+    for (Rankable r : unsortedForOtherRankings) {
+      otherRankings.updateWith(r);
+    }
+
+    // when
+    rankings.updateWith(otherRankings);
+
+    // then
+    assertThat(rankings.getRankings()).isEqualTo(expSorted);
+  }
+
+  @DataProvider
+  public Object[][] duplicatesData() {
+    Rankable A1 = new RankableObjectWithFields("A", 1);
+    Rankable A2 = new RankableObjectWithFields("A", 2);
+    Rankable A3 = new RankableObjectWithFields("A", 3);
+    return new Object[][]{ { Lists.newArrayList(ANY_RANKABLE, ANY_RANKABLE, 
ANY_RANKABLE) }, { Lists.newArrayList(A1,
+        A2, A3) }, };
+  }
+
+  @Test(dataProvider = "duplicatesData")
+  public void shouldNotRankDuplicateObjectsMoreThanOnce(List<Rankable> 
duplicates) {
+    // given
+    Rankings rankings = new Rankings(duplicates.size());
+
+    // when
+    for (Rankable r : duplicates) {
+      rankings.updateWith(r);
+    }
+
+    // then
+    assertThat(rankings.size()).isEqualTo(1);
+  }
+
+  @DataProvider
+  public Object[][] removeZeroRankingsData() {
+    return new Object[][]{ { Lists.newArrayList(A, ZERO), 
Lists.newArrayList(A) }, { Lists.newArrayList(A),
+        Lists.newArrayList(A) }, { Lists.newArrayList(ZERO, A), 
Lists.newArrayList(A) }, { Lists.newArrayList(ZERO),
+        Lists.newArrayList() }, { Lists.newArrayList(ZERO, new 
RankableObjectWithFields("ZERO2", 0)),
+        Lists.newArrayList() }, { Lists.newArrayList(B, ZERO, new 
RankableObjectWithFields("ZERO2", 0), D,
+        new RankableObjectWithFields("ZERO3", 0), new 
RankableObjectWithFields("ZERO4", 0), C), Lists.newArrayList(D, C,
+        B) }, { Lists.newArrayList(A, ZERO, B), Lists.newArrayList(B, A) } };
+  }
+
+  @Test(dataProvider = "removeZeroRankingsData")
+  public void shouldRemoveZeroCounts(List<Rankable> unsorted, List<Rankable> 
expSorted) {
+    // given
+    Rankings rankings = new Rankings(unsorted.size());
+    for (Rankable r : unsorted) {
+      rankings.updateWith(r);
+    }
+
+    // when
+    rankings.pruneZeroCounts();
+
+    // then
+    assertThat(rankings.getRankings()).isEqualTo(expSorted);
+  }
+
+  @Test
+  public void updatingWithNewRankablesShouldBeThreadSafe() throws 
InterruptedException {
+    // given
+    final List<Rankable> entries = ImmutableList.of(A, B, C, D);
+    final Rankings rankings = new Rankings(entries.size());
+
+    // We are capturing exceptions thrown in Blitzer's child threads into this 
data structure so that we can properly
+    // pass/fail this test.  The reason is that Blitzer doesn't report 
exceptions, which is a known bug in Blitzer
+    // (JMOCK-263).  See 
https://github.com/jmock-developers/jmock-library/issues/22 for more 
information.
+    final List<Exception> exceptions = Lists.newArrayList();
+    Blitzer blitzer = new Blitzer(1000);
+
+    // when
+    blitzer.blitz(new Runnable() {
+      public void run() {
+        for (Rankable r : entries) {
+          try {
+            rankings.updateWith(r);
+          }
+          catch (RuntimeException e) {
+            synchronized(exceptions) {
+              exceptions.add(e);
+            }
+          }
+        }
+      }
+    });
+    blitzer.shutdown();
+
+    // then
+    //
+    if (!exceptions.isEmpty()) {
+      for (Exception e : exceptions) {
+        System.err.println(Throwables.getStackTraceAsString(e));
+      }
+    }
+    assertThat(exceptions).isEmpty();
+  }
+
+  @Test(dataProvider = "copyRankingsData")
+  public void copyShouldReturnCopy(int topN, List<Rankable> rankables) {
+    // given
+    Rankings rankings = new Rankings(topN);
+    for (Rankable r : rankables) {
+      rankings.updateWith(r);
+    }
+
+    // when
+    Rankings copy = rankings.copy();
+
+    // then
+    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
+    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
+  }
+
+  @Test(dataProvider = "defensiveCopyRankingsData")
+  public void copyShouldReturnDefensiveCopy(int topN, List<Rankable> 
rankables, List<Rankable> changes) {
+    // given
+    Rankings original = new Rankings(topN);
+    for (Rankable r : rankables) {
+      original.updateWith(r);
+    }
+    int expSize = original.size();
+    List<Rankable> expRankings = original.getRankings();
+
+    // when
+    Rankings copy = original.copy();
+    for (Rankable r : changes) {
+      copy.updateWith(r);
+    }
+    copy.pruneZeroCounts();
+
+    // then
+    assertThat(original.size()).isEqualTo(expSize);
+    assertThat(original.getRankings()).isEqualTo(expRankings);
+  }
+
+}

Reply via email to