http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java
new file mode 100644
index 0000000..730f156
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.NthLastModifiedTimeTracker;
+import org.apache.storm.starter.tools.SlidingWindowCounter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This bolt performs rolling counts of incoming objects, i.e. sliding window 
based counting.
+ * <p/>
+ * The bolt is configured by two parameters, the length of the sliding window 
in seconds (which influences the output
+ * data of the bolt, i.e. how it will count objects) and the emit frequency in 
seconds (which influences how often the
+ * bolt will output the latest window counts). For instance, if the window 
length is set to an equivalent of five
+ * minutes and the emit frequency to one minute, then the bolt will output the 
latest five-minute sliding window every
+ * minute.
+ * <p/>
+ * The bolt emits a rolling count tuple per object, consisting of the object 
itself, its latest rolling count, and the
+ * actual duration of the sliding window. The latter is included in case the 
expected sliding window length (as
+ * configured by the user) is different from the actual length, e.g. due to 
high system load. Note that the actual
+ * window length is tracked and calculated for the window, and not 
individually for each object within a window.
+ * <p/>
+ * Note: During the startup phase you will usually observe that the bolt warns 
you about the actual sliding window
+ * length being smaller than the expected length. This behavior is expected 
and is caused by the way the sliding window
+ * counts are initially "loaded up". You can safely ignore this warning during 
startup (e.g. you will see this warning
+ * during the first ~ five minutes of startup time if the window length is set 
to five minutes).
+ */
+public class RollingCountBolt extends BaseRichBolt {
+
+  private static final long serialVersionUID = 5537727428628598519L;
+  private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
+  private static final int NUM_WINDOW_CHUNKS = 5;
+  private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = 
NUM_WINDOW_CHUNKS * 60;
+  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 
DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+  private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+      "Actual window length is %d seconds when it should be %d seconds"
+          + " (you can safely ignore this warning during the startup phase)";
+
+  private final SlidingWindowCounter<Object> counter;
+  private final int windowLengthInSeconds;
+  private final int emitFrequencyInSeconds;
+  private OutputCollector collector;
+  private NthLastModifiedTimeTracker lastModifiedTracker;
+
+  public RollingCountBolt() {
+    this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+  }
+
+  public RollingCountBolt(int windowLengthInSeconds, int 
emitFrequencyInSeconds) {
+    this.windowLengthInSeconds = windowLengthInSeconds;
+    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+    counter = new 
SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
+        this.emitFrequencyInSeconds));
+  }
+
+  private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int 
windowUpdateFrequencyInSeconds) {
+    return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
+    this.collector = collector;
+    lastModifiedTracker = new 
NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
+        this.emitFrequencyInSeconds));
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    if (TupleUtils.isTick(tuple)) {
+      LOG.debug("Received tick tuple, triggering emit of current window 
counts");
+      emitCurrentWindowCounts();
+    }
+    else {
+      countObjAndAck(tuple);
+    }
+  }
+
+  private void emitCurrentWindowCounts() {
+    Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
+    int actualWindowLengthInSeconds = 
lastModifiedTracker.secondsSinceOldestModification();
+    lastModifiedTracker.markAsModified();
+    if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+      LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, 
actualWindowLengthInSeconds, windowLengthInSeconds));
+    }
+    emit(counts, actualWindowLengthInSeconds);
+  }
+
+  private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) 
{
+    for (Entry<Object, Long> entry : counts.entrySet()) {
+      Object obj = entry.getKey();
+      Long count = entry.getValue();
+      collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
+    }
+  }
+
+  private void countObjAndAck(Tuple tuple) {
+    Object obj = tuple.getValue(0);
+    counter.incrementCount(obj);
+    collector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("obj", "count", 
"actualWindowLengthInSeconds"));
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    Map<String, Object> conf = new HashMap<String, Object>();
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
new file mode 100644
index 0000000..163c0f2
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TimeCacheMap;
+
+import java.util.*;
+
+public class SingleJoinBolt extends BaseRichBolt {
+  OutputCollector _collector;
+  Fields _idFields;
+  Fields _outFields;
+  int _numSources;
+  TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
+  Map<String, GlobalStreamId> _fieldLocations;
+
+  public SingleJoinBolt(Fields outFields) {
+    _outFields = outFields;
+  }
+
+  @Override
+  public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
+    _fieldLocations = new HashMap<String, GlobalStreamId>();
+    _collector = collector;
+    int timeout = ((Number) 
conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
+    _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, 
Tuple>>(timeout, new ExpireCallback());
+    _numSources = context.getThisSources().size();
+    Set<String> idFields = null;
+    for (GlobalStreamId source : context.getThisSources().keySet()) {
+      Fields fields = 
context.getComponentOutputFields(source.get_componentId(), 
source.get_streamId());
+      Set<String> setFields = new HashSet<String>(fields.toList());
+      if (idFields == null)
+        idFields = setFields;
+      else
+        idFields.retainAll(setFields);
+
+      for (String outfield : _outFields) {
+        for (String sourcefield : fields) {
+          if (outfield.equals(sourcefield)) {
+            _fieldLocations.put(outfield, source);
+          }
+        }
+      }
+    }
+    _idFields = new Fields(new ArrayList<String>(idFields));
+
+    if (_fieldLocations.size() != _outFields.size()) {
+      throw new RuntimeException("Cannot find all outfields among sources");
+    }
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    List<Object> id = tuple.select(_idFields);
+    GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), 
tuple.getSourceStreamId());
+    if (!_pending.containsKey(id)) {
+      _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
+    }
+    Map<GlobalStreamId, Tuple> parts = _pending.get(id);
+    if (parts.containsKey(streamId))
+      throw new RuntimeException("Received same side of single join twice");
+    parts.put(streamId, tuple);
+    if (parts.size() == _numSources) {
+      _pending.remove(id);
+      List<Object> joinResult = new ArrayList<Object>();
+      for (String outField : _outFields) {
+        GlobalStreamId loc = _fieldLocations.get(outField);
+        joinResult.add(parts.get(loc).getValueByField(outField));
+      }
+      _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
+
+      for (Tuple part : parts.values()) {
+        _collector.ack(part);
+      }
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(_outFields);
+  }
+
+  private class ExpireCallback implements 
TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
+    @Override
+    public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
+      for (Tuple tuple : tuples.values()) {
+        _collector.fail(tuple);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java
new file mode 100644
index 0000000..cd58380
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingWindowSumBolt extends BaseWindowedBolt {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SlidingWindowSumBolt.class);
+
+    private int sum = 0;
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+            /*
+             * The inputWindow gives a view of
+             * (a) all the events in the window
+             * (b) events that expired since last activation of the window
+             * (c) events that newly arrived since last activation of the 
window
+             */
+        List<Tuple> tuplesInWindow = inputWindow.get();
+        List<Tuple> newTuples = inputWindow.getNew();
+        List<Tuple> expiredTuples = inputWindow.getExpired();
+
+        LOG.debug("Events in current window: " + tuplesInWindow.size());
+            /*
+             * Instead of iterating over all the tuples in the window to 
compute
+             * the sum, the values for the new events are added and old events 
are
+             * subtracted. Similar optimizations might be possible in other
+             * windowing computations.
+             */
+        for (Tuple tuple : newTuples) {
+            sum += (int) tuple.getValue(0);
+        }
+        for (Tuple tuple : expiredTuples) {
+            sum -= (int) tuple.getValue(0);
+        }
+        collector.emit(new Values(sum));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("sum"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java
new file mode 100644
index 0000000..bfed34e
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.Rankings;
+
+/**
+ * This bolt merges incoming {@link Rankings}.
+ * <p/>
+ * It can be used to merge intermediate rankings generated by {@link 
IntermediateRankingsBolt} into a final,
+ * consolidated ranking. To do so, configure this bolt with a globalGrouping 
on {@link IntermediateRankingsBolt}.
+ */
+public final class TotalRankingsBolt extends AbstractRankerBolt {
+
+  private static final long serialVersionUID = -8447525895532302198L;
+  private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
+
+  public TotalRankingsBolt() {
+    super();
+  }
+
+  public TotalRankingsBolt(int topN) {
+    super(topN);
+  }
+
+  public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
+    super(topN, emitFrequencyInSeconds);
+  }
+
+  @Override
+  void updateRankingsWithTuple(Tuple tuple) {
+    Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
+    super.getRankings().updateWith(rankingsToBeMerged);
+    super.getRankings().pruneZeroCounts();
+  }
+
+  @Override
+  Logger getLogger() {
+    return LOG;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
new file mode 100644
index 0000000..e81ca40
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Emits a random integer and a timestamp value (offset by one day),
+ * every 100 ms. The ts field can be used in tuple time based windowing.
+ */
+public class RandomIntegerSpout extends BaseRichSpout {
+    private SpoutOutputCollector collector;
+    private Random rand;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("value", "ts"));
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
+        this.collector = collector;
+        this.rand = new Random();
+    }
+
+    @Override
+    public void nextTuple() {
+        Utils.sleep(100);
+        collector.emit(new Values(rand.nextInt(1000), 
System.currentTimeMillis() - (24 * 60 * 60 * 1000)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
new file mode 100644
index 0000000..49bec2e
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomSentenceSpout extends BaseRichSpout {
+  SpoutOutputCollector _collector;
+  Random _rand;
+
+
+  @Override
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
+    _collector = collector;
+    _rand = new Random();
+  }
+
+  @Override
+  public void nextTuple() {
+    Utils.sleep(100);
+    String[] sentences = new String[]{ "the cow jumped over the moon", "an 
apple a day keeps the doctor away",
+        "four score and seven years ago", "snow white and the seven dwarfs", 
"i am at two with nature" };
+    String sentence = sentences[_rand.nextInt(sentences.length)];
+    _collector.emit(new Values(sentence));
+  }
+
+  @Override
+  public void ack(Object id) {
+  }
+
+  @Override
+  public void fail(Object id) {
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("word"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java
new file mode 100644
index 0000000..df26d25
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.starter.spout;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.auth.AccessToken;
+import twitter4j.conf.ConfigurationBuilder;
+
+import org.apache.storm.Config;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+@SuppressWarnings("serial")
+public class TwitterSampleSpout extends BaseRichSpout {
+
+       SpoutOutputCollector _collector;
+       LinkedBlockingQueue<Status> queue = null;
+       TwitterStream _twitterStream;
+       String consumerKey;
+       String consumerSecret;
+       String accessToken;
+       String accessTokenSecret;
+       String[] keyWords;
+
+       public TwitterSampleSpout(String consumerKey, String consumerSecret,
+                       String accessToken, String accessTokenSecret, String[] 
keyWords) {
+               this.consumerKey = consumerKey;
+               this.consumerSecret = consumerSecret;
+               this.accessToken = accessToken;
+               this.accessTokenSecret = accessTokenSecret;
+               this.keyWords = keyWords;
+       }
+
+       public TwitterSampleSpout() {
+               // TODO Auto-generated constructor stub
+       }
+
+       @Override
+       public void open(Map conf, TopologyContext context,
+                       SpoutOutputCollector collector) {
+               queue = new LinkedBlockingQueue<Status>(1000);
+               _collector = collector;
+
+               StatusListener listener = new StatusListener() {
+
+                       @Override
+                       public void onStatus(Status status) {
+                       
+                               queue.offer(status);
+                       }
+
+                       @Override
+                       public void onDeletionNotice(StatusDeletionNotice sdn) {
+                       }
+
+                       @Override
+                       public void onTrackLimitationNotice(int i) {
+                       }
+
+                       @Override
+                       public void onScrubGeo(long l, long l1) {
+                       }
+
+                       @Override
+                       public void onException(Exception ex) {
+                       }
+
+                       @Override
+                       public void onStallWarning(StallWarning arg0) {
+                               // TODO Auto-generated method stub
+
+                       }
+
+               };
+
+               TwitterStream twitterStream = new TwitterStreamFactory(
+                               new 
ConfigurationBuilder().setJSONStoreEnabled(true).build())
+                               .getInstance();
+
+               twitterStream.addListener(listener);
+               twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
+               AccessToken token = new AccessToken(accessToken, 
accessTokenSecret);
+               twitterStream.setOAuthAccessToken(token);
+               
+               if (keyWords.length == 0) {
+
+                       twitterStream.sample();
+               }
+
+               else {
+
+                       FilterQuery query = new FilterQuery().track(keyWords);
+                       twitterStream.filter(query);
+               }
+
+       }
+
+       @Override
+       public void nextTuple() {
+               Status ret = queue.poll();
+               if (ret == null) {
+                       Utils.sleep(50);
+               } else {
+                       _collector.emit(new Values(ret));
+
+               }
+       }
+
+       @Override
+       public void close() {
+               _twitterStream.shutdown();
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               Config ret = new Config();
+               ret.setMaxTaskParallelism(1);
+               return ret;
+       }
+
+       @Override
+       public void ack(Object id) {
+       }
+
+       @Override
+       public void fail(Object id) {
+       }
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields("tweet"));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
new file mode 100644
index 0000000..faa4e32
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import org.apache.storm.utils.Time;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+
+/**
+ * This class tracks the time-since-last-modify of a "thing" in a rolling 
fashion.
+ * <p/>
+ * For example, create a 5-slot tracker to track the five most recent 
time-since-last-modify.
+ * <p/>
+ * You must manually "mark" that the "something" that you want to track -- in 
terms of modification times -- has just
+ * been modified.
+ */
+public class NthLastModifiedTimeTracker {
+
+  private static final int MILLIS_IN_SEC = 1000;
+
+  private final CircularFifoBuffer lastModifiedTimesMillis;
+
+  public NthLastModifiedTimeTracker(int numTimesToTrack) {
+    if (numTimesToTrack < 1) {
+      throw new IllegalArgumentException(
+          "numTimesToTrack must be greater than zero (you requested " + 
numTimesToTrack + ")");
+    }
+    lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
+    initLastModifiedTimesMillis();
+  }
+
+  private void initLastModifiedTimesMillis() {
+    long nowCached = now();
+    for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
+      lastModifiedTimesMillis.add(Long.valueOf(nowCached));
+    }
+  }
+
+  private long now() {
+    return Time.currentTimeMillis();
+  }
+
+  public int secondsSinceOldestModification() {
+    long modifiedTimeMillis = ((Long) 
lastModifiedTimesMillis.get()).longValue();
+    return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
+  }
+
+  public void markAsModified() {
+    updateLastModifiedTime();
+  }
+
+  private void updateLastModifiedTime() {
+    lastModifiedTimesMillis.add(now());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java
new file mode 100644
index 0000000..85f2b62
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+public interface Rankable extends Comparable<Rankable> {
+
+  Object getObject();
+
+  long getCount();
+
+  /**
+   * Note: We do not defensively copy the object wrapped by the Rankable.  It 
is passed as is.
+   *
+   * @return a defensive copy
+   */
+  Rankable copy();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
new file mode 100644
index 0000000..b1a9dca
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import org.apache.storm.tuple.Tuple;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * This class wraps an objects and its associated count, including any 
additional data fields.
+ * <p/>
+ * This class can be used, for instance, to track the number of occurrences of 
an object in a Storm topology.
+ */
+public class RankableObjectWithFields implements Rankable, Serializable {
+
+  private static final long serialVersionUID = -9102878650001058090L;
+  private static final String toStringSeparator = "|";
+
+  private final Object obj;
+  private final long count;
+  private final ImmutableList<Object> fields;
+
+  public RankableObjectWithFields(Object obj, long count, Object... 
otherFields) {
+    if (obj == null) {
+      throw new IllegalArgumentException("The object must not be null");
+    }
+    if (count < 0) {
+      throw new IllegalArgumentException("The count must be >= 0");
+    }
+    this.obj = obj;
+    this.count = count;
+    fields = ImmutableList.copyOf(otherFields);
+
+  }
+
+  /**
+   * Construct a new instance based on the provided {@link Tuple}.
+   * <p/>
+   * This method expects the object to be ranked in the first field (index 0) 
of the provided tuple, and the number of
+   * occurrences of the object (its count) in the second field (index 1). Any 
further fields in the tuple will be
+   * extracted and tracked, too. These fields can be accessed via {@link 
RankableObjectWithFields#getFields()}.
+   *
+   * @param tuple
+   *
+   * @return new instance based on the provided tuple
+   */
+  public static RankableObjectWithFields from(Tuple tuple) {
+    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
+    Object obj = otherFields.remove(0);
+    Long count = (Long) otherFields.remove(0);
+    return new RankableObjectWithFields(obj, count, otherFields.toArray());
+  }
+
+  public Object getObject() {
+    return obj;
+  }
+
+  public long getCount() {
+    return count;
+  }
+
+  /**
+   * @return an immutable list of any additional data fields of the object 
(may be empty but will never be null)
+   */
+  public List<Object> getFields() {
+    return fields;
+  }
+
+  @Override
+  public int compareTo(Rankable other) {
+    long delta = this.getCount() - other.getCount();
+    if (delta > 0) {
+      return 1;
+    }
+    else if (delta < 0) {
+      return -1;
+    }
+    else {
+      return 0;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof RankableObjectWithFields)) {
+      return false;
+    }
+    RankableObjectWithFields other = (RankableObjectWithFields) o;
+    return obj.equals(other.obj) && count == other.count;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    int countHash = (int) (count ^ (count >>> 32));
+    result = 31 * result + countHash;
+    result = 31 * result + obj.hashCode();
+    return result;
+  }
+
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    buf.append("[");
+    buf.append(obj);
+    buf.append(toStringSeparator);
+    buf.append(count);
+    for (Object field : fields) {
+      buf.append(toStringSeparator);
+      buf.append(field);
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+
+  /**
+   * Note: We do not defensively copy the wrapped object and any accompanying 
fields.  We do guarantee, however,
+   * do return a defensive (shallow) copy of the List object that is wrapping 
any accompanying fields.
+   *
+   * @return
+   */
+  @Override
+  public Rankable copy() {
+    List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
+    return new RankableObjectWithFields(getObject(), getCount(), 
shallowCopyOfFields);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
new file mode 100644
index 0000000..17174b3
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+public class Rankings implements Serializable {
+
+  private static final long serialVersionUID = -1549827195410578903L;
+  private static final int DEFAULT_COUNT = 10;
+
+  private final int maxSize;
+  private final List<Rankable> rankedItems = Lists.newArrayList();
+
+  public Rankings() {
+    this(DEFAULT_COUNT);
+  }
+
+  public Rankings(int topN) {
+    if (topN < 1) {
+      throw new IllegalArgumentException("topN must be >= 1");
+    }
+    maxSize = topN;
+  }
+
+  /**
+   * Copy constructor.
+   * @param other
+   */
+  public Rankings(Rankings other) {
+    this(other.maxSize());
+    updateWith(other);
+  }
+
+  /**
+   * @return the maximum possible number (size) of ranked objects this 
instance can hold
+   */
+  public int maxSize() {
+    return maxSize;
+  }
+
+  /**
+   * @return the number (size) of ranked objects this instance is currently 
holding
+   */
+  public int size() {
+    return rankedItems.size();
+  }
+
+  /**
+   * The returned defensive copy is only "somewhat" defensive.  We do, for 
instance, return a defensive copy of the
+   * enclosing List instance, and we do try to defensively copy any contained 
Rankable objects, too.  However, the
+   * contract of {@link org.apache.storm.starter.tools.Rankable#copy()} does 
not guarantee that any Object's embedded within
+   * a Rankable will be defensively copied, too.
+   *
+   * @return a somewhat defensive copy of ranked items
+   */
+  public List<Rankable> getRankings() {
+    List<Rankable> copy = Lists.newLinkedList();
+    for (Rankable r: rankedItems) {
+      copy.add(r.copy());
+    }
+    return ImmutableList.copyOf(copy);
+  }
+
+  public void updateWith(Rankings other) {
+    for (Rankable r : other.getRankings()) {
+      updateWith(r);
+    }
+  }
+
+  public void updateWith(Rankable r) {
+    synchronized(rankedItems) {
+      addOrReplace(r);
+      rerank();
+      shrinkRankingsIfNeeded();
+    }
+  }
+
+  private void addOrReplace(Rankable r) {
+    Integer rank = findRankOf(r);
+    if (rank != null) {
+      rankedItems.set(rank, r);
+    }
+    else {
+      rankedItems.add(r);
+    }
+  }
+
+  private Integer findRankOf(Rankable r) {
+    Object tag = r.getObject();
+    for (int rank = 0; rank < rankedItems.size(); rank++) {
+      Object cur = rankedItems.get(rank).getObject();
+      if (cur.equals(tag)) {
+        return rank;
+      }
+    }
+    return null;
+  }
+
+  private void rerank() {
+    Collections.sort(rankedItems);
+    Collections.reverse(rankedItems);
+  }
+
+  private void shrinkRankingsIfNeeded() {
+    if (rankedItems.size() > maxSize) {
+      rankedItems.remove(maxSize);
+    }
+  }
+
+  /**
+   * Removes ranking entries that have a count of zero.
+   */
+  public void pruneZeroCounts() {
+    int i = 0;
+    while (i < rankedItems.size()) {
+      if (rankedItems.get(i).getCount() == 0) {
+        rankedItems.remove(i);
+      }
+      else {
+        i++;
+      }
+    }
+  }
+
+  public String toString() {
+    return rankedItems.toString();
+  }
+
+  /**
+   * Creates a (defensive) copy of itself.
+   */
+  public Rankings copy() {
+    return new Rankings(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
new file mode 100644
index 0000000..b95a6a9
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * This class counts objects in a sliding window fashion.
+ * <p/>
+ * It is designed 1) to give multiple "producer" threads write access to the 
counter, i.e. being able to increment
+ * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link 
PeriodicSlidingWindowCounter}) read access
+ * to the counter. Whenever the consumer thread performs a read operation, 
this class will advance the head slot of the
+ * sliding window counter. This means that the consumer thread indirectly 
controls where writes of the producer threads
+ * will go to. Also, by itself this class will not advance the head slot.
+ * <p/>
+ * A note for analyzing data based on a sliding window count: During the 
initial <code>windowLengthInSlots</code>
+ * iterations, this sliding window counter will always return object counts 
that are equal or greater than in the
+ * previous iteration. This is the effect of the counter "loading up" at the 
very start of its existence. Conceptually,
+ * this is the desired behavior.
+ * <p/>
+ * To give an example, using a counter with 5 slots which for the sake of this 
example represent 1 minute of time each:
+ * <p/>
+ * <pre>
+ * {@code
+ * Sliding window counts of an object X over time
+ *
+ * Minute (timeline):
+ * 1    2   3   4   5   6   7   8
+ *
+ * Observed counts per minute:
+ * 1    1   1   1   0   0   0   0
+ *
+ * Counts returned by counter:
+ * 1    2   3   4   4   3   2   1
+ * }
+ * </pre>
+ * <p/>
+ * As you can see in this example, for the first 
<code>windowLengthInSlots</code> (here: the first five minutes) the
+ * counter will always return counts equal or greater than in the previous 
iteration (1, 2, 3, 4, 4). This initial load
+ * effect needs to be accounted for whenever you want to perform analyses such 
as trending topics; otherwise your
+ * analysis algorithm might falsely identify the object to be trending as the 
counter seems to observe continuously
+ * increasing counts. Also, note that during the initial load phase <em>every 
object</em> will exhibit increasing
+ * counts.
+ * <p/>
+ * On a high-level, the counter exhibits the following behavior: If you asked 
the example counter after two minutes,
+ * "how often did you count the object during the past five minutes?", then it 
should reply "I have counted it 2 times
+ * in the past five minutes", implying that it can only account for the last 
two of those five minutes because the
+ * counter was not running before that time.
+ *
+ * @param <T> The type of those objects we want to count.
+ */
+public final class SlidingWindowCounter<T> implements Serializable {
+
+  private static final long serialVersionUID = -2645063988768785810L;
+
+  private SlotBasedCounter<T> objCounter;
+  private int headSlot;
+  private int tailSlot;
+  private int windowLengthInSlots;
+
+  public SlidingWindowCounter(int windowLengthInSlots) {
+    if (windowLengthInSlots < 2) {
+      throw new IllegalArgumentException(
+          "Window length in slots must be at least two (you requested " + 
windowLengthInSlots + ")");
+    }
+    this.windowLengthInSlots = windowLengthInSlots;
+    this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
+
+    this.headSlot = 0;
+    this.tailSlot = slotAfter(headSlot);
+  }
+
+  public void incrementCount(T obj) {
+    objCounter.incrementCount(obj, headSlot);
+  }
+
+  /**
+   * Return the current (total) counts of all tracked objects, then advance 
the window.
+   * <p/>
+   * Whenever this method is called, we consider the counts of the current 
sliding window to be available to and
+   * successfully processed "upstream" (i.e. by the caller). Knowing this we 
will start counting any subsequent
+   * objects within the next "chunk" of the sliding window.
+   *
+   * @return The current (total) counts of all tracked objects.
+   */
+  public Map<T, Long> getCountsThenAdvanceWindow() {
+    Map<T, Long> counts = objCounter.getCounts();
+    objCounter.wipeZeros();
+    objCounter.wipeSlot(tailSlot);
+    advanceHead();
+    return counts;
+  }
+
+  private void advanceHead() {
+    headSlot = tailSlot;
+    tailSlot = slotAfter(tailSlot);
+  }
+
+  private int slotAfter(int slot) {
+    return (slot + 1) % windowLengthInSlots;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
new file mode 100644
index 0000000..b8ca15b
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class provides per-slot counts of the occurrences of objects.
+ * <p/>
+ * It can be used, for instance, as a building block for implementing sliding 
window counting of objects.
+ *
+ * @param <T> The type of those objects we want to count.
+ */
+public final class SlotBasedCounter<T> implements Serializable {
+
+  private static final long serialVersionUID = 4858185737378394432L;
+
+  private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
+  private final int numSlots;
+
+  public SlotBasedCounter(int numSlots) {
+    if (numSlots <= 0) {
+      throw new IllegalArgumentException("Number of slots must be greater than 
zero (you requested " + numSlots + ")");
+    }
+    this.numSlots = numSlots;
+  }
+
+  public void incrementCount(T obj, int slot) {
+    long[] counts = objToCounts.get(obj);
+    if (counts == null) {
+      counts = new long[this.numSlots];
+      objToCounts.put(obj, counts);
+    }
+    counts[slot]++;
+  }
+
+  public long getCount(T obj, int slot) {
+    long[] counts = objToCounts.get(obj);
+    if (counts == null) {
+      return 0;
+    }
+    else {
+      return counts[slot];
+    }
+  }
+
+  public Map<T, Long> getCounts() {
+    Map<T, Long> result = new HashMap<T, Long>();
+    for (T obj : objToCounts.keySet()) {
+      result.put(obj, computeTotalCount(obj));
+    }
+    return result;
+  }
+
+  private long computeTotalCount(T obj) {
+    long[] curr = objToCounts.get(obj);
+    long total = 0;
+    for (long l : curr) {
+      total += l;
+    }
+    return total;
+  }
+
+  /**
+   * Reset the slot count of any tracked objects to zero for the given slot.
+   *
+   * @param slot
+   */
+  public void wipeSlot(int slot) {
+    for (T obj : objToCounts.keySet()) {
+      resetSlotCountToZero(obj, slot);
+    }
+  }
+
+  private void resetSlotCountToZero(T obj, int slot) {
+    long[] counts = objToCounts.get(obj);
+    counts[slot] = 0;
+  }
+
+  private boolean shouldBeRemovedFromCounter(T obj) {
+    return computeTotalCount(obj) == 0;
+  }
+
+  /**
+   * Remove any object from the counter whose total count is zero (to free up 
memory).
+   */
+  public void wipeZeros() {
+    Set<T> objToBeRemoved = new HashSet<T>();
+    for (T obj : objToCounts.keySet()) {
+      if (shouldBeRemovedFromCounter(obj)) {
+        objToBeRemoved.add(obj);
+      }
+    }
+    for (T obj : objToBeRemoved) {
+      objToCounts.remove(obj);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
new file mode 100644
index 0000000..dc4cb4b
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+package org.apache.storm.starter.trident;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.storm.kafka.StringScheme;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.bolt.KafkaBolt;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+import org.apache.storm.starter.spout.RandomSentenceSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.FilterNull;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.testing.Split;
+
+import java.util.Properties;
+
+/**
+ * A sample word count trident topology using transactional kafka spout that 
has the following components.
+ * <ol>
+ * <li> {@link KafkaBolt}
+ * that receives random sentences from {@link RandomSentenceSpout} and
+ * publishes the sentences to a kafka "test" topic.
+ * </li>
+ * <li> {@link TransactionalTridentKafkaSpout}
+ * that consumes sentences from the "test" topic, splits it into words, 
aggregates
+ * and stores the word count in a {@link MemoryMapState}.
+ * </li>
+ * <li> DRPC query
+ * that returns the word counts by querying the trident state (MemoryMapState).
+ * </li>
+ * </ol>
+ * <p>
+ *     For more background read the <a 
href="https://storm.apache.org/documentation/Trident-tutorial.html";>trident 
tutorial</a>,
+ *     <a href="https://storm.apache.org/documentation/Trident-state";>trident 
state</a> and
+ *     <a 
href="https://github.com/apache/storm/tree/master/external/storm-kafka";> Storm 
Kafka </a>.
+ * </p>
+ */
+public class TridentKafkaWordCount {
+
+    private String zkUrl;
+    private String brokerUrl;
+
+    TridentKafkaWordCount(String zkUrl, String brokerUrl) {
+        this.zkUrl = zkUrl;
+        this.brokerUrl = brokerUrl;
+    }
+
+    /**
+     * Creates a transactional kafka spout that consumes any new data 
published to "test" topic.
+     * <p/>
+     * For more info on transactional spouts
+     * see "Transactional spouts" section in
+     * <a href="https://storm.apache.org/documentation/Trident-state";> Trident 
state</a> doc.
+     *
+     * @return a transactional trident kafka spout.
+     */
+    private TransactionalTridentKafkaSpout createKafkaSpout() {
+        ZkHosts hosts = new ZkHosts(zkUrl);
+        TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+
+        // Consume new data from the topic
+        config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
+        return new TransactionalTridentKafkaSpout(config);
+    }
+
+
+    private Stream addDRPCStream(TridentTopology tridentTopology, TridentState 
state, LocalDRPC drpc) {
+        return tridentTopology.newDRPCStream("words", drpc)
+                .each(new Fields("args"), new Split(), new Fields("word"))
+                .groupBy(new Fields("word"))
+                .stateQuery(state, new Fields("word"), new MapGet(), new 
Fields("count"))
+                .each(new Fields("count"), new FilterNull())
+                .project(new Fields("word", "count"));
+    }
+
+    private TridentState addTridentState(TridentTopology tridentTopology) {
+        return tridentTopology.newStream("spout1", 
createKafkaSpout()).parallelismHint(1)
+                .each(new Fields("str"), new Split(), new Fields("word"))
+                .groupBy(new Fields("word"))
+                .persistentAggregate(new MemoryMapState.Factory(), new 
Count(), new Fields("count"))
+                .parallelismHint(1);
+    }
+
+    /**
+     * Creates a trident topology that consumes sentences from the kafka 
"test" topic using a
+     * {@link TransactionalTridentKafkaSpout} computes the word count and 
stores it in a {@link MemoryMapState}.
+     * A DRPC stream is then created to query the word counts.
+     * @param drpc
+     * @return
+     */
+    public StormTopology buildConsumerTopology(LocalDRPC drpc) {
+        TridentTopology tridentTopology = new TridentTopology();
+        addDRPCStream(tridentTopology, addTridentState(tridentTopology), drpc);
+        return tridentTopology.build();
+    }
+
+    /**
+     * Return the consumer topology config.
+     *
+     * @return the topology config
+     */
+    public Config getConsumerConfig() {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        //  conf.setDebug(true);
+        return conf;
+    }
+
+    /**
+     * A topology that produces random sentences using {@link 
RandomSentenceSpout} and
+     * publishes the sentences using a KafkaBolt to kafka "test" topic.
+     *
+     * @return the storm topology
+     */
+    public StormTopology buildProducerTopology(Properties prop) {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new RandomSentenceSpout(), 2);
+        /**
+         * The output field of the RandomSentenceSpout ("word") is provided as 
the boltMessageField
+         * so that this gets written out as the message in the kafka topic.
+         */
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop)
+                .withTopicSelector(new DefaultTopicSelector("test"))
+                .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("key", "word"));
+        builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
+        return builder.createTopology();
+    }
+
+    /**
+     * Returns the storm config for the topology that publishes sentences to 
kafka "test" topic using a kafka bolt.
+     * The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt.
+     *
+     * @return the topology config
+     */
+    public Properties getProducerConfig() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
+        return props;
+    }
+
+    /**
+     * <p>
+     * To run this topology ensure you have a kafka broker running.
+     * </p>
+     * Create a topic test with command line,
+     * kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partition 1 --topic test
+     */
+    public static void main(String[] args) throws Exception {
+
+        String zkUrl = "localhost:2181";        // the defaults.
+        String brokerUrl = "localhost:9092";
+
+        if (args.length > 2 || (args.length == 1 && 
args[0].matches("^-h|--help$"))) {
+            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper 
url] [kafka broker url]");
+            System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" 
+ " [" + brokerUrl + "]");
+            System.exit(1);
+        } else if (args.length == 1) {
+            zkUrl = args[0];
+        } else if (args.length == 2) {
+            zkUrl = args[0];
+            brokerUrl = args[1];
+        }
+
+        System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker 
url: " + brokerUrl);
+
+        TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, 
brokerUrl);
+
+        LocalDRPC drpc = new LocalDRPC();
+        LocalCluster cluster = new LocalCluster();
+
+        // submit the consumer topology.
+        cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), 
wordCount.buildConsumerTopology(drpc));
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        // submit the producer topology.
+        cluster.submitTopology("kafkaBolt", conf, 
wordCount.buildProducerTopology(wordCount.getProducerConfig()));
+
+        // keep querying the word counts for a minute.
+        for (int i = 0; i < 60; i++) {
+            System.out.println("DRPC RESULT: " + drpc.execute("words", "the 
and apple snow jumped"));
+            Thread.sleep(1000);
+        }
+
+        cluster.killTopology("kafkaBolt");
+        cluster.killTopology("wordCounter");
+        cluster.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
new file mode 100644
index 0000000..056b2b6
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.ReadOnlyState;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.map.ReadOnlyMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.*;
+
+public class TridentReach {
+  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, 
List<String>>() {{
+    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", 
"nathan"));
+    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", 
"sally", "nathan"));
+    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+  }};
+
+  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, 
List<String>>() {{
+    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", 
"jai"));
+    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", 
"vivian"));
+    put("tim", Arrays.asList("alex"));
+    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", 
"vivian", "emily", "jordan"));
+    put("adam", Arrays.asList("david", "carissa"));
+    put("mike", Arrays.asList("john", "bob"));
+    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+  }};
+
+  public static class StaticSingleKeyMapState extends ReadOnlyState implements 
ReadOnlyMapState<Object> {
+    public static class Factory implements StateFactory {
+      Map _map;
+
+      public Factory(Map map) {
+        _map = map;
+      }
+
+      @Override
+      public State makeState(Map conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
+        return new StaticSingleKeyMapState(_map);
+      }
+
+    }
+
+    Map _map;
+
+    public StaticSingleKeyMapState(Map map) {
+      _map = map;
+    }
+
+
+    @Override
+    public List<Object> multiGet(List<List<Object>> keys) {
+      List<Object> ret = new ArrayList();
+      for (List<Object> key : keys) {
+        Object singleKey = key.get(0);
+        ret.add(_map.get(singleKey));
+      }
+      return ret;
+    }
+
+  }
+
+  public static class One implements CombinerAggregator<Integer> {
+    @Override
+    public Integer init(TridentTuple tuple) {
+      return 1;
+    }
+
+    @Override
+    public Integer combine(Integer val1, Integer val2) {
+      return 1;
+    }
+
+    @Override
+    public Integer zero() {
+      return 1;
+    }
+  }
+
+  public static class ExpandList extends BaseFunction {
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+      List l = (List) tuple.getValue(0);
+      if (l != null) {
+        for (Object o : l) {
+          collector.emit(new Values(o));
+        }
+      }
+    }
+
+  }
+
+  public static StormTopology buildTopology(LocalDRPC drpc) {
+    TridentTopology topology = new TridentTopology();
+    TridentState urlToTweeters = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(TWEETERS_DB));
+    TridentState tweetersToFollowers = topology.newStaticState(new 
StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
+
+
+    topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new 
Fields("args"), new MapGet(), new Fields(
+        "tweeters")).each(new Fields("tweeters"), new ExpandList(), new 
Fields("tweeter")).shuffle().stateQuery(
+        tweetersToFollowers, new Fields("tweeter"), new MapGet(), new 
Fields("followers")).each(new Fields("followers"),
+        new ExpandList(), new Fields("follower")).groupBy(new 
Fields("follower")).aggregate(new One(), new Fields(
+        "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
+    return topology.build();
+  }
+
+  public static void main(String[] args) throws Exception {
+    LocalDRPC drpc = new LocalDRPC();
+
+    Config conf = new Config();
+    LocalCluster cluster = new LocalCluster();
+
+    cluster.submitTopology("reach", conf, buildTopology(drpc));
+
+    Thread.sleep(2000);
+
+    System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+    System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
+    System.out.println("REACH: " + drpc.execute("reach", 
"engineering.twitter.com/blog/5"));
+
+
+    cluster.shutdown();
+    drpc.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
new file mode 100644
index 0000000..93ccf18
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.FilterNull;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+public class TridentWordCount {
+  public static class Split extends BaseFunction {
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+      String sentence = tuple.getString(0);
+      for (String word : sentence.split(" ")) {
+        collector.emit(new Values(word));
+      }
+    }
+  }
+
+  public static StormTopology buildTopology(LocalDRPC drpc) {
+    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new 
Values("the cow jumped over the moon"),
+        new Values("the man went to the store and bought some candy"), new 
Values("four score and seven years ago"),
+        new Values("how many apples can you eat"), new Values("to be or not to 
be the person"));
+    spout.setCycle(true);
+
+    TridentTopology topology = new TridentTopology();
+    TridentState wordCounts = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+        new Split(), new Fields("word")).groupBy(new 
Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
+        new Count(), new Fields("count")).parallelismHint(16);
+
+    topology.newDRPCStream("words", drpc).each(new Fields("args"), new 
Split(), new Fields("word")).groupBy(new Fields(
+        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new 
Fields("count")).each(new Fields("count"),
+        new FilterNull()).aggregate(new Fields("count"), new Sum(), new 
Fields("sum"));
+    return topology.build();
+  }
+
+  public static void main(String[] args) throws Exception {
+    Config conf = new Config();
+    conf.setMaxSpoutPending(20);
+    if (args.length == 0) {
+      LocalDRPC drpc = new LocalDRPC();
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
+      for (int i = 0; i < 100; i++) {
+        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the 
dog jumped"));
+        Thread.sleep(1000);
+      }
+    }
+    else {
+      conf.setNumWorkers(3);
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, 
buildTopology(null));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
new file mode 100644
index 0000000..d7f2bf4
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.util;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+
+public final class StormRunner {
+
+  private static final int MILLIS_IN_SEC = 1000;
+
+  private StormRunner() {
+  }
+
+  public static void runTopologyLocally(StormTopology topology, String 
topologyName, Config conf, int runtimeInSeconds)
+      throws InterruptedException {
+    LocalCluster cluster = new LocalCluster();
+    cluster.submitTopology(topologyName, conf, topology);
+    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
+    cluster.killTopology(topologyName);
+    cluster.shutdown();
+  }
+
+  public static void runTopologyRemotely(StormTopology topology, String 
topologyName, Config conf)
+      throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException {
+    StormSubmitter.submitTopology(topologyName, conf, topology);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java 
b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
deleted file mode 100644
index 3ea83a1..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.StormSubmitter;
-import backtype.storm.drpc.LinearDRPCTopologyBuilder;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-/**
- * This topology is a basic example of doing distributed RPC on top of Storm. 
It implements a function that appends a
- * "!" to any string you send the DRPC function.
- *
- * @see <a 
href="http://storm.apache.org/documentation/Distributed-RPC.html";>Distributed 
RPC</a>
- */
-public class BasicDRPCTopology {
-  public static class ExclaimBolt extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String input = tuple.getString(1);
-      collector.emit(new Values(tuple.getValue(0), input + "!"));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "result"));
-    }
-
-  }
-
-  public static void main(String[] args) throws Exception {
-    LinearDRPCTopologyBuilder builder = new 
LinearDRPCTopologyBuilder("exclamation");
-    builder.addBolt(new ExclaimBolt(), 3);
-
-    Config conf = new Config();
-
-    if (args == null || args.length == 0) {
-      LocalDRPC drpc = new LocalDRPC();
-      LocalCluster cluster = new LocalCluster();
-
-      cluster.submitTopology("drpc-demo", conf, 
builder.createLocalTopology(drpc));
-
-      for (String word : new String[]{ "hello", "goodbye" }) {
-        System.out.println("Result for \"" + word + "\": " + 
drpc.execute("exclamation", word));
-      }
-
-      Thread.sleep(10000);
-      drpc.shutdown();
-      cluster.shutdown();
-    }
-    else {
-      conf.setNumWorkers(3);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, 
builder.createRemoteTopology());
-    }
-  }
-}

Reply via email to