Added: storm/branches/bobby-versioned-site/releases/0.10.0/Trident-state.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Trident-state.md?rev=1735297&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/0.10.0/Trident-state.md (added)
+++ storm/branches/bobby-versioned-site/releases/0.10.0/Trident-state.md Wed 
Mar 16 21:01:12 2016
@@ -0,0 +1,331 @@
+---
+title: Trident State
+layout: documentation
+---
+
+
+Trident has first-class abstractions for reading from and writing to stateful 
sources. The state can either be internal to the topology – e.g., kept 
in-memory and backed by HDFS – or externally stored in a database like 
Memcached or Cassandra. There's no difference in the Trident API for either 
case.
+
+Trident manages state in a fault-tolerant way so that state updates are 
idempotent in the face of retries and failures. This lets you reason about 
Trident topologies as if each message were processed exactly-once.
+
+There's various levels of fault-tolerance possible when doing state updates. 
Before getting to those, let's look at an example that illustrates the tricks 
necessary to achieve exactly-once semantics. Suppose that you're doing a count 
aggregation of your stream and want to store the running count in a database. 
Now suppose you store in the database a single value representing the count, 
and every time you process a new tuple you increment the count.
+
+When failures occur, tuples will be replayed. This brings up a problem when 
doing state updates (or anything with side effects) – you have no idea if 
you've ever successfully updated the state based on this tuple before. Perhaps 
you never processed the tuple before, in which case you should increment the 
count. Perhaps you've processed the tuple and successfully incremented the 
count, but the tuple failed processing in another step. In this case, you 
should not increment the count. Or perhaps you saw the tuple before but got an 
error when updating the database. In this case, you *should* update the 
database.
+
+By just storing the count in the database, you have no idea whether or not 
this tuple has been processed before. So you need more information in order to 
make the right decision. Trident provides the following semantics which are 
sufficient for achieving exactly-once processing semantics:
+
+1. Tuples are processed as small batches (see [the 
tutorial](Trident-tutorial.html))
+2. Each batch of tuples is given a unique id called the "transaction id" 
(txid). If the batch is replayed, it is given the exact same txid.
+3. State updates are ordered among batches. That is, the state updates for 
batch 3 won't be applied until the state updates for batch 2 have succeeded.
+
+With these primitives, your State implementation can detect whether or not the 
batch of tuples has been processed before and take the appropriate action to 
update the state in a consistent way. The action you take depends on the exact 
semantics provided by your input spouts as to what's in each batch. There's 
three kinds of spouts possible with respect to fault-tolerance: 
"non-transactional", "transactional", and "opaque transactional". Likewise, 
there's three kinds of state possible with respect to fault-tolerance: 
"non-transactional", "transactional", and "opaque transactional". Let's take a 
look at each spout type and see what kind of fault-tolerance you can achieve 
with each.
+
+## Transactional spouts
+
+Remember, Trident processes tuples as small batches with each batch being 
given a unique transaction id. The properties of spouts vary according to the 
guarantees they can provide as to what's in each batch. A transactional spout 
has the following properties:
+
+1. Batches for a given txid are always the same. Replays of batches for a txid 
will exact same set of tuples as the first time that batch was emitted for that 
txid.
+2. There's no overlap between batches of tuples (tuples are in one batch or 
another, never multiple).
+3. Every tuple is in a batch (no tuples are skipped)
+
+This is a pretty easy type of spout to understand, the stream is divided into 
fixed batches that never change. storm-contrib has [an implementation of a 
transactional 
spout](https://github.com/apache/storm/tree/master/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java)
 for Kafka.
+
+You might be wondering – why wouldn't you just always use a transactional 
spout? They're simple and easy to understand. One reason you might not use one 
is because they're not necessarily very fault-tolerant. For example, the way 
TransactionalTridentKafkaSpout works is the batch for a txid will contain 
tuples from all the Kafka partitions for a topic. Once a batch has been 
emitted, any time that batch is re-emitted in the future the exact same set of 
tuples must be emitted to meet the semantics of transactional spouts. Now 
suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails 
to process, and at the same time one of the Kafka nodes goes down. You're now 
incapable of replaying the same batch as you did before (since the node is down 
and some partitions for the topic are not unavailable), and processing will 
halt. 
+
+This is why "opaque transactional" spouts exist – they are fault-tolerant to 
losing source nodes while still allowing you to achieve exactly-once processing 
semantics. We'll cover those spouts in the next section though.
+
+(One side note – once Kafka supports replication, it will be possible to 
have transactional spouts that are fault-tolerant to node failure, but that 
feature does not exist yet.)
+
+Before we get to "opaque transactional" spouts, let's look at how you would 
design a State implementation that has exactly-once semantics for transactional 
spouts. This State type is called a "transactional state" and takes advantage 
of the fact that any given txid is always associated with the exact same set of 
tuples.
+
+Suppose your topology computes word count and you want to store the word 
counts in a key/value database. The key will be the word, and the value will 
contain the count. You've already seen that storing just the count as the value 
isn't sufficient to know whether you've processed a batch of tuples before. 
Instead, what you can do is store the transaction id with the count in the 
database as an atomic value. Then, when updating the count, you can just 
compare the transaction id in the database with the transaction id for the 
current batch. If they're the same, you skip the update – because of the 
strong ordering, you know for sure that the value in the database incorporates 
the current batch. If they're different, you increment the count. This logic 
works because the batch for a txid never changes, and Trident ensures that 
state updates are ordered among batches.
+
+Consider this example of why it works. Suppose you are processing txid 3 which 
consists of the following batch of tuples:
+
+```
+["man"]
+["man"]
+["dog"]
+```
+
+Suppose the database currently holds the following key/value pairs:
+
+```
+man => [count=3, txid=1]
+dog => [count=4, txid=3]
+apple => [count=10, txid=2]
+```
+
+The txid associated with "man" is txid 1. Since the current txid is 3, you 
know for sure that this batch of tuples is not represented in that count. So 
you can go ahead and increment the count by 2 and update the txid. On the other 
hand, the txid for "dog" is the same as the current txid. So you know for sure 
that the increment from the current batch is already represented in the 
database for the "dog" key. So you can skip the update. After completing 
updates, the database looks like this:
+
+```
+man => [count=5, txid=3]
+dog => [count=4, txid=3]
+apple => [count=10, txid=2]
+```
+
+Let's now look at opaque transactional spouts and how to design states for 
that type of spout.
+
+## Opaque transactional spouts
+
+As described before, an opaque transactional spout cannot guarantee that the 
batch of tuples for a txid remains constant. An opaque transactional spout has 
the following property:
+
+1. Every tuple is *successfully* processed in exactly one batch. However, it's 
possible for a tuple to fail to process in one batch and then succeed to 
process in a later batch.
+
+[OpaqueTridentKafkaSpout](https://github.com/apache/storm/tree/master/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java)
 is a spout that has this property and is fault-tolerant to losing Kafka nodes. 
Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples 
starting from where the last batch finished emitting. This ensures that no 
tuple is ever skipped or successfully processed by multiple batches.
+
+With opaque transactional spouts, it's no longer possible to use the trick of 
skipping state updates if the transaction id in the database is the same as the 
transaction id for the current batch. This is because the batch may have 
changed between state updates.
+
+What you can do is store more state in the database. Rather than store a value 
and transaction id in the database, you instead store a value, transaction id, 
and the previous value in the database. Let's again use the example of storing 
a count in the database. Suppose the partial count for your batch is "2" and 
it's time to apply a state update. Suppose the value in the database looks like 
this:
+
+```
+{ value = 4,
+  prevValue = 1,
+  txid = 2
+}
+```
+
+Suppose your current txid is 3, different than what's in the database. In this 
case, you set "prevValue" equal to "value", increment "value" by your partial 
count, and update the txid. The new database value will look like this:
+
+```
+{ value = 6,
+  prevValue = 4,
+  txid = 3
+}
+```
+
+Now suppose your current txid is 2, equal to what's in the database. Now you 
know that the "value" in the database contains an update from a previous batch 
for your current txid, but that batch may have been different so you have to 
ignore it. What you do in this case is increment "prevValue" by your partial 
count to compute the new "value". You then set the value in the database to 
this:
+
+```
+{ value = 3,
+  prevValue = 1,
+  txid = 2
+}
+```
+
+This works because of the strong ordering of batches provided by Trident. Once 
Trident moves onto a new batch for state updates, it will never go back to a 
previous batch. And since opaque transactional spouts guarantee no overlap 
between batches – that each tuple is successfully processed by one batch – 
you can safely update based on the previous value.
+
+## Non-transactional spouts
+
+Non-transactional spouts don't provide any guarantees about what's in each 
batch. So it might have at-most-once processing, in which case tuples are not 
retried after failed batches. Or it might have at-least-once processing, where 
tuples can be processed successfully by multiple batches. There's no way to 
achieve exactly-once semantics for this kind of spout.
+
+## Summary of spout and state types
+
+This diagram shows which combinations of spouts / states enable exactly-once 
messaging semantics:
+
+![Spouts vs States](images/spout-vs-state.png)
+
+Opaque transactional states have the strongest fault-tolerance, but this comes 
at the cost of needing to store the txid and two values in the database. 
Transactional states require less state in the database, but only work with 
transactional spouts. Finally, non-transactional states require the least state 
in the database but cannot achieve exactly-once semantics.
+
+The state and spout types you choose are a tradeoff between fault-tolerance 
and storage costs, and ultimately your application requirements will determine 
which combination is right for you.
+
+## State APIs
+
+You've seen the intricacies of what it takes to achieve exactly-once 
semantics. The nice thing about Trident is that it internalizes all the 
fault-tolerance logic within the State – as a user you don't have to deal 
with comparing txids, storing multiple values in the database, or anything like 
that. You can write code like this:
+
+```java
+TridentTopology topology = new TridentTopology();        
+TridentState wordCounts =
+      topology.newStream("spout1", spout)
+        .each(new Fields("sentence"), new Split(), new Fields("word"))
+        .groupBy(new Fields("word"))
+        .persistentAggregate(MemcachedState.opaque(serverLocations), new 
Count(), new Fields("count"))                
+        .parallelismHint(6);
+```
+
+All the logic necessary to manage opaque transactional state logic is 
internalized in the MemcachedState.opaque call. Additionally, updates are 
automatically batched to minimize roundtrips to the database.
+
+The base State interface just has two methods:
+
+```java
+public interface State {
+    void beginCommit(Long txid); // can be null for things like 
partitionPersist occurring off a DRPC stream
+    void commit(Long txid);
+}
+```
+
+You're told when a state update is beginning, when a state update is ending, 
and you're given the txid in each case. Trident assumes nothing about how your 
state works, what kind of methods there are to update it, and what kind of 
methods there are to read from it.
+
+Suppose you have a home-grown database that contains user location information 
and you want to be able to access it from Trident. Your State implementation 
would have methods for getting and setting user information:
+
+```java
+public class LocationDB implements State {
+    public void beginCommit(Long txid) {    
+    }
+    
+    public void commit(Long txid) {    
+    }
+    
+    public void setLocation(long userId, String location) {
+      // code to access database and set location
+    }
+    
+    public String getLocation(long userId) {
+      // code to get location from database
+    }
+}
+```
+
+You then provide Trident a StateFactory that can create instances of your 
State object within Trident tasks. The StateFactory for your LocationDB might 
look something like this:
+
+```java
+public class LocationDBFactory implements StateFactory {
+   public State makeState(Map conf, int partitionIndex, int numPartitions) {
+      return new LocationDB();
+   } 
+}
+```
+
+Trident provides the QueryFunction interface for writing Trident operations 
that query a source of state, and the StateUpdater interface for writing 
Trident operations that update a source of state. For example, let's write an 
operation "QueryLocation" that queries the LocationDB for the locations of 
users. Let's start off with how you would use it in a topology. Let's say this 
topology consumes an input stream of userids:
+
+```java
+TridentTopology topology = new TridentTopology();
+TridentState locations = topology.newStaticState(new LocationDBFactory());
+topology.newStream("myspout", spout)
+        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new 
Fields("location"))
+```
+
+Now let's take a look at what the implementation of QueryLocation would look 
like:
+
+```java
+public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
+    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> 
inputs) {
+        List<String> ret = new ArrayList();
+        for(TridentTuple input: inputs) {
+            ret.add(state.getLocation(input.getLong(0)));
+        }
+        return ret;
+    }
+
+    public void execute(TridentTuple tuple, String location, TridentCollector 
collector) {
+        collector.emit(new Values(location));
+    }    
+}
+```
+
+QueryFunction's execute in two steps. First, Trident collects a batch of reads 
together and passes them to batchRetrieve. In this case, batchRetrieve will 
receive multiple user ids. batchRetrieve is expected to return a list of 
results that's the same size as the list of input tuples. The first element of 
the result list corresponds to the result for the first input tuple, the second 
is the result for the second input tuple, and so on.
+
+You can see that this code doesn't take advantage of the batching that Trident 
does, since it just queries the LocationDB one at a time. So a better way to 
write the LocationDB would be like this:
+
+```java
+public class LocationDB implements State {
+    public void beginCommit(Long txid) {    
+    }
+    
+    public void commit(Long txid) {    
+    }
+    
+    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
+      // set locations in bulk
+    }
+    
+    public List<String> bulkGetLocations(List<Long> userIds) {
+      // get locations in bulk
+    }
+}
+```
+
+Then, you can write the QueryLocation function like this:
+
+```java
+public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
+    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> 
inputs) {
+        List<Long> userIds = new ArrayList<Long>();
+        for(TridentTuple input: inputs) {
+            userIds.add(input.getLong(0));
+        }
+        return state.bulkGetLocations(userIds);
+    }
+
+    public void execute(TridentTuple tuple, String location, TridentCollector 
collector) {
+        collector.emit(new Values(location));
+    }    
+}
+```
+
+This code will be much more efficient by reducing roundtrips to the database. 
+
+To update state, you make use of the StateUpdater interface. Here's a 
StateUpdater that updates a LocationDB with new location information:
+
+```java
+public class LocationUpdater extends BaseStateUpdater<LocationDB> {
+    public void updateState(LocationDB state, List<TridentTuple> tuples, 
TridentCollector collector) {
+        List<Long> ids = new ArrayList<Long>();
+        List<String> locations = new ArrayList<String>();
+        for(TridentTuple t: tuples) {
+            ids.add(t.getLong(0));
+            locations.add(t.getString(1));
+        }
+        state.setLocationsBulk(ids, locations);
+    }
+}
+```
+
+Here's how you would use this operation in a Trident topology:
+
+```java
+TridentTopology topology = new TridentTopology();
+TridentState locations = 
+    topology.newStream("locations", locationsSpout)
+        .partitionPersist(new LocationDBFactory(), new Fields("userid", 
"location"), new LocationUpdater())
+```
+
+The partitionPersist operation updates a source of state. The StateUpdater 
receives the State and a batch of tuples with updates to that State. This code 
just grabs the userids and locations from the input tuples and does a bulk set 
into the State. 
+
+partitionPersist returns a TridentState object representing the location db 
being updated by the Trident topology. You could then use this state in 
stateQuery operations elsewhere in the topology. 
+
+You can also see that StateUpdaters are given a TridentCollector. Tuples 
emitted to this collector go to the "new values stream". In this case, there's 
nothing interesting to emit to that stream, but if you were doing something 
like updating counts in a database, you could emit the updated counts to that 
stream. You can then get access to the new values stream for further processing 
via the TridentState#newValuesStream method.
+
+## persistentAggregate
+
+Trident has another method for updating States called persistentAggregate. 
You've seen this used in the streaming word count example, shown again below:
+
+```java
+TridentTopology topology = new TridentTopology();        
+TridentState wordCounts =
+      topology.newStream("spout1", spout)
+        .each(new Fields("sentence"), new Split(), new Fields("word"))
+        .groupBy(new Fields("word"))
+        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new 
Fields("count"))
+```
+
+persistentAggregate is an additional abstraction built on top of 
partitionPersist that knows how to take a Trident aggregator and use it to 
apply updates to the source of state. In this case, since this is a grouped 
stream, Trident expects the state you provide to implement the "MapState" 
interface. The grouping fields will be the keys in the state, and the 
aggregation result will be the values in the state. The "MapState" interface 
looks like this:
+
+```java
+public interface MapState<T> extends State {
+    List<T> multiGet(List<List<Object>> keys);
+    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
+    void multiPut(List<List<Object>> keys, List<T> vals);
+}
+```
+
+When you do aggregations on non-grouped streams (a global aggregation), 
Trident expects your State object to implement the "Snapshottable" interface:
+
+```java
+public interface Snapshottable<T> extends State {
+    T get();
+    T update(ValueUpdater updater);
+    void set(T o);
+}
+```
+
+[MemoryMapState](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java)
 and 
[MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java)
 each implement both of these interfaces.
+
+## Implementing Map States
+
+Trident makes it easy to implement MapState's, doing almost all the work for 
you. The OpaqueMap, TransactionalMap, and NonTransactionalMap classes implement 
all the logic for doing the respective fault-tolerance logic. You simply 
provide these classes with an IBackingMap implementation that knows how to do 
multiGets and multiPuts of the respective key/values. IBackingMap looks like 
this:
+
+```java
+public interface IBackingMap<T> {
+    List<T> multiGet(List<List<Object>> keys); 
+    void multiPut(List<List<Object>> keys, List<T> vals); 
+}
+```
+
+OpaqueMap's will call multiPut with 
[OpaqueValue](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/OpaqueValue.java)'s
 for the vals, TransactionalMap's will give 
[TransactionalValue](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/TransactionalValue.java)'s
 for the vals, and NonTransactionalMaps will just pass the objects from the 
topology through.
+
+Trident also provides the 
[CachedMap](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/map/CachedMap.java)
 class to do automatic LRU caching of map key/vals.
+
+Finally, Trident provides the 
[SnapshottableMap](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/map/SnapshottableMap.java)
 class that turns a MapState into a Snapshottable object, by storing global 
aggregations into a fixed key.
+
+Take a look at the implementation of 
[MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java)
 to see how all these utilities can be put together to make a high performance 
MapState implementation. MemcachedState allows you to choose between opaque 
transactional, transactional, and non-transactional semantics.

Added: storm/branches/bobby-versioned-site/releases/0.10.0/Trident-tutorial.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Trident-tutorial.md?rev=1735297&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/0.10.0/Trident-tutorial.md 
(added)
+++ storm/branches/bobby-versioned-site/releases/0.10.0/Trident-tutorial.md Wed 
Mar 16 21:01:12 2016
@@ -0,0 +1,254 @@
+---
+title: Trident Tutorial
+layout: documentation
+documentation: true
+---
+
+Trident is a high-level abstraction for doing realtime computing on top of 
Storm. It allows you to seamlessly intermix high throughput (millions of 
messages per second), stateful stream processing with low latency distributed 
querying. If you're familiar with high level batch processing tools like Pig or 
Cascading, the concepts of Trident will be very familiar – Trident has joins, 
aggregations, grouping, functions, and filters. In addition to these, Trident 
adds primitives for doing stateful, incremental processing on top of any 
database or persistence store. Trident has consistent, exactly-once semantics, 
so it is easy to reason about Trident topologies.
+
+## Illustrative example
+
+Let's look at an illustrative example of Trident. This example will do two 
things:
+
+1. Compute streaming word count from an input stream of sentences
+2. Implement queries to get the sum of the counts for a list of words
+
+For the purposes of illustration, this example will read an infinite stream of 
sentences from the following source:
+
+```java
+FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
+               new Values("the cow jumped over the moon"),
+               new Values("the man went to the store and bought some candy"),
+               new Values("four score and seven years ago"),
+               new Values("how many apples can you eat"));
+spout.setCycle(true);
+```
+
+This spout cycles through that set of sentences over and over to produce the 
sentence stream. Here's the code to do the streaming word count part of the 
computation:
+
+```java
+TridentTopology topology = new TridentTopology();        
+TridentState wordCounts =
+     topology.newStream("spout1", spout)
+       .each(new Fields("sentence"), new Split(), new Fields("word"))
+       .groupBy(new Fields("word"))
+       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new 
Fields("count"))                
+       .parallelismHint(6);
+```
+
+Let's go through the code line by line. First a TridentTopology object is 
created, which exposes the interface for constructing Trident computations. 
TridentTopology has a method called newStream that creates a new stream of data 
in the topology reading from an input source. In this case, the input source is 
just the FixedBatchSpout defined from before. Input sources can also be queue 
brokers like Kestrel or Kafka. Trident keeps track of a small amount of state 
for each input source (metadata about what it has consumed) in Zookeeper, and 
the "spout1" string here specifies the node in Zookeeper where Trident should 
keep that metadata.
+
+Trident processes the stream as small batches of tuples. For example, the 
incoming stream of sentences might be divided into batches like so:
+
+![Batched stream](images/batched-stream.png)
+
+Generally the size of those small batches will be on the order of thousands or 
millions of tuples, depending on your incoming throughput.
+
+Trident provides a fully fledged batch processing API to process those small 
batches. The API is very similar to what you see in high level abstractions for 
Hadoop like Pig or Cascading: you can do group by's, joins, aggregations, run 
functions, run filters, and so on. Of course, processing each small batch in 
isolation isn't that interesting, so Trident provides functions for doing 
aggregations across batches and persistently storing those aggregations – 
whether in memory, in Memcached, in Cassandra, or some other store. Finally, 
Trident has first-class functions for querying sources of realtime state. That 
state could be updated by Trident (like in this example), or it could be an 
independent source of state.
+
+Back to the example, the spout emits a stream containing one field called 
"sentence". The next line of the topology definition applies the Split function 
to each tuple in the stream, taking the "sentence" field and splitting it into 
words. Each sentence tuple creates potentially many word tuples – for 
instance, the sentence "the cow jumped over the moon" creates six "word" 
tuples. Here's the definition of Split:
+
+```java
+public class Split extends BaseFunction {
+   public void execute(TridentTuple tuple, TridentCollector collector) {
+       String sentence = tuple.getString(0);
+       for(String word: sentence.split(" ")) {
+           collector.emit(new Values(word));                
+       }
+   }
+}
+```
+
+As you can see, it's really simple. It simply grabs the sentence, splits it on 
whitespace, and emits a tuple for each word.
+
+The rest of the topology computes word count and keeps the results 
persistently stored. First the stream is grouped by the "word" field. Then, 
each group is persistently aggregated using the Count aggregator. The 
persistentAggregate function knows how to store and update the results of the 
aggregation in a source of state. In this example, the word counts are kept in 
memory, but this can be trivially swapped to use Memcached, Cassandra, or any 
other persistent store. Swapping this topology to store counts in Memcached is 
as simple as replacing the persistentAggregate line with this (using 
[trident-memcached](https://github.com/nathanmarz/trident-memcached)), where 
the "serverLocations" is a list of host/ports for the Memcached cluster:
+
+```java
+.persistentAggregate(MemcachedState.transactional(serverLocations), new 
Count(), new Fields("count"))        
+MemcachedState.transactional()
+```
+
+The values stored by persistentAggregate represents the aggregation of all 
batches ever emitted by the stream.
+
+One of the cool things about Trident is that it has fully fault-tolerant, 
exactly-once processing semantics. This makes it easy to reason about your 
realtime processing. Trident persists state in a way so that if failures occur 
and retries are necessary, it won't perform multiple updates to the database 
for the same source data.
+
+The persistentAggregate method transforms a Stream into a TridentState object. 
In this case the TridentState object represents all the word counts. We will 
use this TridentState object to implement the distributed query portion of the 
computation.
+
+The next part of the topology implements a low latency distributed query on 
the word counts. The query takes as input a whitespace separated list of words 
and return the sum of the counts for those words. These queries are executed 
just like normal RPC calls, except they are parallelized in the background. 
Here's an example of how you might invoke one of these queries:
+
+```java
+DRPCClient client = new DRPCClient("drpc.server.location", 3772);
+System.out.println(client.execute("words", "cat dog the man");
+// prints the JSON-encoded result, e.g.: "[[5078]]"
+```
+
+As you can see, it looks just like a regular remote procedure call (RPC), 
except it's executing in parallel across a Storm cluster. The latency for small 
queries like this are typically around 10ms. More intense DRPC queries can take 
longer of course, although the latency largely depends on how many resources 
you have allocated for the computation.
+
+The implementation of the distributed query portion of the topology looks like 
this:
+
+```java
+topology.newDRPCStream("words")
+       .each(new Fields("args"), new Split(), new Fields("word"))
+       .groupBy(new Fields("word"))
+       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new 
Fields("count"))
+       .each(new Fields("count"), new FilterNull())
+       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+```
+
+The same TridentTopology object is used to create the DRPC stream, and the 
function is named "words". The function name corresponds to the function name 
given in the first argument of execute when using a DRPCClient.
+
+Each DRPC request is treated as its own little batch processing job that takes 
as input a single tuple representing the request. The tuple contains one field 
called "args" that contains the argument provided by the client. In this case, 
the argument is a whitespace separated list of words.
+
+First, the Split function is used to split the arguments for the request into 
its constituent words. The stream is grouped by "word", and the stateQuery 
operator is used to query the TridentState object that the first part of the 
topology generated. stateQuery takes in a source of state – in this case, the 
word counts computed by the other portion of the topology – and a function 
for querying that state. In this case, the MapGet function is invoked, which 
gets the count for each word. Since the DRPC stream is grouped the exact same 
way as the TridentState was (by the "word" field), each word query is routed to 
the exact partition of the TridentState object that manages updates for that 
word.
+
+Next, words that didn't have a count are filtered out via the FilterNull 
filter and the counts are summed using the Sum aggregator to get the result. 
Then, Trident automatically sends the result back to the waiting client.
+
+Trident is intelligent about how it executes a topology to maximize 
performance. There's two interesting things happening automatically in this 
topology:
+
+1. Operations that read from or write to state (like persistentAggregate and 
stateQuery) automatically batch operations to that state. So if there's 20 
updates that need to be made to the database for the current batch of 
processing, rather than do 20 read requests and 20 writes requests to the 
database, Trident will automatically batch up the reads and writes, doing only 
1 read request and 1 write request (and in many cases, you can use caching in 
your State implementation to eliminate the read request). So you get the best 
of both words of convenience – being able to express your computation in 
terms of what should be done with each tuple – and performance.
+2. Trident aggregators are heavily optimized. Rather than transfer all tuples 
for a group to the same machine and then run the aggregator, Trident will do 
partial aggregations when possible before sending tuples over the network. For 
example, the Count aggregator computes the count on each partition, sends the 
partial count over the network, and then sums together all the partial counts 
to get the total count. This technique is similar to the use of combiners in 
MapReduce.
+
+Let's look at another example of Trident.
+
+## Reach
+
+The next example is a pure DRPC topology that computes the reach of a URL on 
demand. Reach is the number of unique people exposed to a URL on Twitter. To 
compute reach, you need to fetch all the people who ever tweeted a URL, fetch 
all the followers of all those people, unique that set of followers, and that 
count that uniqued set. Computing reach is too intense for a single machine – 
it can require thousands of database calls and tens of millions of tuples. With 
Storm and Trident, you can parallelize the computation of each step across a 
cluster.
+
+This topology will read from two sources of state. One database maps URLs to a 
list of people who tweeted that URL. The other database maps a person to a list 
of followers for that person. The topology definition looks like this:
+
+```java
+TridentState urlToTweeters =
+       topology.newStaticState(getUrlToTweetersState());
+TridentState tweetersToFollowers =
+       topology.newStaticState(getTweeterToFollowersState());
+
+topology.newDRPCStream("reach")
+       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new 
Fields("tweeters"))
+       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
+       .shuffle()
+       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), 
new Fields("followers"))
+       .parallelismHint(200)
+       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
+       .groupBy(new Fields("follower"))
+       .aggregate(new One(), new Fields("one"))
+       .parallelismHint(20)
+       .aggregate(new Count(), new Fields("reach"));
+```
+
+The topology creates TridentState objects representing each external database 
using the newStaticState method. These can then be queried in the topology. 
Like all sources of state, queries to these databases will be automatically 
batched for maximum efficiency.
+
+The topology definition is straightforward – it's just a simple batch 
processing job. First, the urlToTweeters database is queried to get the list of 
people who tweeted the URL for this request. That returns a list, so the 
ExpandList function is invoked to create a tuple for each tweeter.
+
+Next, the followers for each tweeter must be fetched. It's important that this 
step be parallelized, so shuffle is invoked to evenly distribute the tweeters 
among all workers for the topology. Then, the followers database is queried to 
get the list of followers for each tweeter. You can see that this portion of 
the topology is given a large parallelism since this is the most intense 
portion of the computation.
+
+Next, the set of followers is uniqued and counted. This is done in two steps. 
First a "group by" is done on the batch by "follower", running the "One" 
aggregator on each group. The "One" aggregator simply emits a single tuple 
containing the number one for each group. Then, the ones are summed together to 
get the unique count of the followers set. Here's the definition of the "One" 
aggregator:
+
+```java
+public class One implements CombinerAggregator<Integer> {
+   public Integer init(TridentTuple tuple) {
+       return 1;
+   }
+
+   public Integer combine(Integer val1, Integer val2) {
+       return 1;
+   }
+
+   public Integer zero() {
+       return 1;
+   }        
+}
+```
+
+This is a "combiner aggregator", which knows how to do partial aggregations 
before transferring tuples over the network to maximize efficiency. Sum is also 
defined as a combiner aggregator, so the global sum done at the end of the 
topology will be very efficient.
+
+Let's now look at Trident in more detail.
+
+## Fields and tuples
+
+The Trident data model is the TridentTuple which is a named list of values. 
During a topology, tuples are incrementally built up through a sequence of 
operations. Operations generally take in a set of input fields and emit a set 
of "function fields". The input fields are used to select a subset of the tuple 
as input to the operation, while the "function fields" name the fields the 
operation emits.
+
+Consider this example. Suppose you have a stream called "stream" that contains 
the fields "x", "y", and "z". To run a filter MyFilter that takes in "y" as 
input, you would say:
+
+```java
+stream.each(new Fields("y"), new MyFilter())
+```
+
+Suppose the implementation of MyFilter is this:
+
+```java
+public class MyFilter extends BaseFilter {
+   public boolean isKeep(TridentTuple tuple) {
+       return tuple.getInteger(0) < 10;
+   }
+}
+```
+
+This will keep all tuples whose "y" field is less than 10. The TridentTuple 
given as input to MyFilter will only contain the "y" field. Note that Trident 
is able to project a subset of a tuple extremely efficiently when selecting the 
input fields: the projection is essentially free.
+
+Let's now look at how "function fields" work. Suppose you had this function:
+
+```java
+public class AddAndMultiply extends BaseFunction {
+   public void execute(TridentTuple tuple, TridentCollector collector) {
+       int i1 = tuple.getInteger(0);
+       int i2 = tuple.getInteger(1);
+       collector.emit(new Values(i1 + i2, i1 * i2));
+   }
+}
+```
+
+This function takes two numbers as input and emits two new values: the 
addition of the numbers and the multiplication of the numbers. Suppose you had 
a stream with the fields "x", "y", and "z". You would use this function like 
this:
+
+```java
+stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", 
"multiplied"));
+```
+
+The output of functions is additive: the fields are added to the input tuple. 
So the output of this each call would contain tuples with the five fields "x", 
"y", "z", "added", and "multiplied". "added" corresponds to the first value 
emitted by AddAndMultiply, while "multiplied" corresponds to the second value.
+
+With aggregators, on the other hand, the function fields replace the input 
tuples. So if you had a stream containing the fields "val1" and "val2", and you 
did this:
+
+```java
+stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
+```
+
+The output stream would only contain a single tuple with a single field called 
"sum", representing the sum of all "val2" fields in that batch.
+
+With grouped streams, the output will contain the grouping fields followed by 
the fields emitted by the aggregator. For example:
+
+```java
+stream.groupBy(new Fields("val1"))
+     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
+```
+
+In this example, the output will contain the fields "val1" and "sum".
+
+## State
+
+A key problem to solve with realtime computation is how to manage state so 
that updates are idempotent in the face of failures and retries. It's 
impossible to eliminate failures, so when a node dies or something else goes 
wrong, batches need to be retried. The question is – how do you do state 
updates (whether external databases or state internal to the topology) so that 
it's like each message was only processed only once?
+
+This is a tricky problem, and can be illustrated with the following example. 
Suppose that you're doing a count aggregation of your stream and want to store 
the running count in a database. If you store only the count in the database 
and it's time to apply a state update for a batch, there's no way to know if 
you applied that state update before. The batch could have been attempted 
before, succeeded in updating the database, and then failed at a later step. Or 
the batch could have been attempted before and failed to update the database. 
You just don't know.
+
+Trident solves this problem by doing two things:
+
+1. Each batch is given a unique id called the "transaction id". If a batch is 
retried it will have the exact same transaction id.
+2. State updates are ordered among batches. That is, the state updates for 
batch 3 won't be applied until the state updates for batch 2 have succeeded.
+
+With these two primitives, you can achieve exactly-once semantics with your 
state updates. Rather than store just the count in the database, what you can 
do instead is store the transaction id with the count in the database as an 
atomic value. Then, when updating the count, you can just compare the 
transaction id in the database with the transaction id for the current batch. 
If they're the same, you skip the update – because of the strong ordering, 
you know for sure that the value in the database incorporates the current 
batch. If they're different, you increment the count.
+
+Of course, you don't have to do this logic manually in your topologies. This 
logic is wrapped by the State abstraction and done automatically. Nor is your 
State object required to implement the transaction id trick: if you don't want 
to pay the cost of storing the transaction id in the database, you don't have 
to. In that case the State will have at-least-once-processing semantics in the 
case of failures (which may be fine for your application). You can read more 
about how to implement a State and the various fault-tolerance tradeoffs 
possible [in this doc](/documentation/Trident-state).
+
+A State is allowed to use whatever strategy it wants to store state. So it 
could store state in an external database or it could keep the state in-memory 
but backed by HDFS (like how HBase works). State's are not required to hold 
onto state forever. For example, you could have an in-memory State 
implementation that only keeps the last X hours of data available and drops 
anything older. Take a look at the implementation of the [Memcached 
integration](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java)
 for an example State implementation.
+
+## Execution of Trident topologies
+
+Trident topologies compile down into as efficient of a Storm topology as 
possible. Tuples are only sent over the network when a repartitioning of the 
data is required, such as if you do a groupBy or a shuffle. So if you had this 
Trident topology:
+
+![Compiling Trident to Storm 1](images/trident-to-storm1.png)
+
+It would compile into Storm spouts/bolts like this:
+
+![Compiling Trident to Storm 2](images/trident-to-storm2.png)
+
+## Conclusion
+
+Trident makes realtime computation elegant. You've seen how high throughput 
stream processing, state manipulation, and low-latency querying can be 
seamlessly intermixed via Trident's API. Trident lets you express your realtime 
computations in a natural way while still getting maximal performance.
\ No newline at end of file

Added: storm/branches/bobby-versioned-site/releases/0.10.0/Troubleshooting.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Troubleshooting.md?rev=1735297&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/0.10.0/Troubleshooting.md 
(added)
+++ storm/branches/bobby-versioned-site/releases/0.10.0/Troubleshooting.md Wed 
Mar 16 21:01:12 2016
@@ -0,0 +1,145 @@
+---
+title: Troubleshooting
+layout: documentation
+documentation: true
+---
+
+This page lists issues people have run into when using Storm along with their 
solutions.
+
+### Worker processes are crashing on startup with no stack trace
+
+Possible symptoms:
+ 
+ * Topologies work with one node, but workers crash with multiple nodes
+
+Solutions:
+
+ * You may have a misconfigured subnet, where nodes can't locate other nodes 
based on their hostname. ZeroMQ sometimes crashes the process when it can't 
resolve a host. There are two solutions:
+  * Make a mapping from hostname to IP address in /etc/hosts
+  * Set up an internal DNS so that nodes can locate each other based on 
hostname.
+  
+### Nodes are unable to communicate with each other
+
+Possible symptoms:
+
+ * Every spout tuple is failing
+ * Processing is not working
+
+Solutions:
+
+ * Storm doesn't work with ipv6. You can force ipv4 by adding 
`-Djava.net.preferIPv4Stack=true` to the supervisor child options and 
restarting the supervisor. 
+ * You may have a misconfigured subnet. See the solutions for `Worker 
processes are crashing on startup with no stack trace`
+
+### Topology stops processing tuples after awhile
+
+Symptoms:
+
+ * Processing works fine for awhile, and then suddenly stops and spout tuples 
start failing en masse. 
+ 
+Solutions:
+
+ * This is a known issue with ZeroMQ 2.1.10. Downgrade to ZeroMQ 2.1.7.
+ 
+### Not all supervisors appear in Storm UI
+
+Symptoms:
+ 
+ * Some supervisor processes are missing from the Storm UI
+ * List of supervisors in Storm UI changes on refreshes
+
+Solutions:
+
+ * Make sure the supervisor local dirs are independent (e.g., not sharing a 
local dir over NFS)
+ * Try deleting the local dirs for the supervisors and restarting the daemons. 
Supervisors create a unique id for themselves and store it locally. When that 
id is copied to other nodes, Storm gets confused. 
+
+### "Multiple defaults.yaml found" error
+
+Symptoms:
+
+ * When deploying a topology with "storm jar", you get this error
+
+Solution:
+
+ * You're most likely including the Storm jars inside your topology jar. When 
packaging your topology jar, don't include the Storm jars as Storm will put 
those on the classpath for you.
+
+### "NoSuchMethodError" when running storm jar
+
+Symptoms:
+
+ * When running storm jar, you get a cryptic "NoSuchMethodError"
+
+Solution:
+
+ * You're deploying your topology with a different version of Storm than you 
built your topology against. Make sure the storm client you use comes from the 
same version as the version you compiled your topology against.
+
+
+### Kryo ConcurrentModificationException
+
+Symptoms:
+
+ * At runtime, you get a stack trace like the following:
+
+```
+java.lang.RuntimeException: java.util.ConcurrentModificationException
+       at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
+       at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
+       at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
+       at 
backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
+       at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
+       at clojure.lang.AFn.run(AFn.java:24)
+       at java.lang.Thread.run(Thread.java:679)
+Caused by: java.util.ConcurrentModificationException
+       at 
java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:390)
+       at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:409)
+       at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:408)
+       at java.util.HashMap.writeObject(HashMap.java:1016)
+       at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
+       at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
+       at java.lang.reflect.Method.invoke(Method.java:616)
+       at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:959)
+       at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
+       at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
+       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
+       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
+       at 
backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
+       at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
+       at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
+       at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
+       at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
+       at 
backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
+```
+
+Solution: 
+
+ * This means that you're emitting a mutable object as an output tuple. 
Everything you emit into the output collector must be immutable. What's 
happening is that your bolt is modifying the object while it is being 
serialized to be sent over the network.
+
+
+### NullPointerException from deep inside Storm
+
+Symptoms:
+
+ * You get a NullPointerException that looks something like:
+
+```
+java.lang.RuntimeException: java.lang.NullPointerException
+    at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
+    at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
+    at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
+    at 
backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67)
+    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
+    at clojure.lang.AFn.run(AFn.java:24)
+    at java.lang.Thread.run(Thread.java:662)
+Caused by: java.lang.NullPointerException
+    at 
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
+    at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4126$fn__4130.invoke(worker.clj:99)
+    at backtype.storm.util$fast_list_map.invoke(util.clj:771)
+    at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4126.invoke(worker.clj:99)
+    at 
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3904.invoke(executor.clj:205)
+    at 
backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
+    at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
+    ... 6 more
+```
+
+Solution:
+
+ * This is caused by having multiple threads issue methods on the 
`OutputCollector`. All emits, acks, and fails must happen on the same thread. 
One subtle way this can happen is if you make a `IBasicBolt` that emits on a 
separate thread. `IBasicBolt`'s automatically ack after execute is called, so 
this would cause multiple threads to use the `OutputCollector` leading to this 
exception. When using a basic bolt, all emits must happen in the same thread 
that runs `execute`.
\ No newline at end of file

Added: storm/branches/bobby-versioned-site/releases/0.10.0/Tutorial.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Tutorial.md?rev=1735297&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/0.10.0/Tutorial.md (added)
+++ storm/branches/bobby-versioned-site/releases/0.10.0/Tutorial.md Wed Mar 16 
21:01:12 2016
@@ -0,0 +1,312 @@
+---
+title: Tutorial
+layout: documentation
+documentation: true
+---
+In this tutorial, you'll learn how to create Storm topologies and deploy them 
to a Storm cluster. Java will be the main language used, but a few examples 
will use Python to illustrate Storm's multi-language capabilities.
+
+## Preliminaries
+
+This tutorial uses examples from the 
[storm-starter](https://github.com/apache/storm/blob/master/examples/storm-starter)
 project. It's recommended that you clone the project and follow along with the 
examples. Read [Setting up a development 
environment](Setting-up-development-environment.html) and [Creating a new Storm 
project](Creating-a-new-Storm-project.html) to get your machine set up.
+
+## Components of a Storm cluster
+
+A Storm cluster is superficially similar to a Hadoop cluster. Whereas on 
Hadoop you run "MapReduce jobs", on Storm you run "topologies". "Jobs" and 
"topologies" themselves are very different -- one key difference is that a 
MapReduce job eventually finishes, whereas a topology processes messages 
forever (or until you kill it).
+
+There are two kinds of nodes on a Storm cluster: the master node and the 
worker nodes. The master node runs a daemon called "Nimbus" that is similar to 
Hadoop's "JobTracker". Nimbus is responsible for distributing code around the 
cluster, assigning tasks to machines, and monitoring for failures.
+
+Each worker node runs a daemon called the "Supervisor". The supervisor listens 
for work assigned to its machine and starts and stops worker processes as 
necessary based on what Nimbus has assigned to it. Each worker process executes 
a subset of a topology; a running topology consists of many worker processes 
spread across many machines.
+
+![Storm cluster](images/storm-cluster.png)
+
+All coordination between Nimbus and the Supervisors is done through a 
[Zookeeper](http://zookeeper.apache.org/) cluster. Additionally, the Nimbus 
daemon and Supervisor daemons are fail-fast and stateless; all state is kept in 
Zookeeper or on local disk. This means you can kill -9 Nimbus or the 
Supervisors and they'll start back up like nothing happened. This design leads 
to Storm clusters being incredibly stable.
+
+## Topologies
+
+To do realtime computation on Storm, you create what are called "topologies". 
A topology is a graph of computation. Each node in a topology contains 
processing logic, and links between nodes indicate how data should be passed 
around between nodes.
+
+Running a topology is straightforward. First, you package all your code and 
dependencies into a single jar. Then, you run a command like the following:
+
+```
+storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
+```
+
+This runs the class `backtype.storm.MyTopology` with the arguments `arg1` and 
`arg2`. The main function of the class defines the topology and submits it to 
Nimbus. The `storm jar` part takes care of connecting to Nimbus and uploading 
the jar.
+
+Since topology definitions are just Thrift structs, and Nimbus is a Thrift 
service, you can create and submit topologies using any programming language. 
The above example is the easiest way to do it from a JVM-based language. See 
[Running topologies on a production 
cluster](Running-topologies-on-a-production-cluster.html)] for more information 
on starting and stopping topologies.
+
+## Streams
+
+The core abstraction in Storm is the "stream". A stream is an unbounded 
sequence of tuples. Storm provides the primitives for transforming a stream 
into a new stream in a distributed and reliable way. For example, you may 
transform a stream of tweets into a stream of trending topics.
+
+The basic primitives Storm provides for doing stream transformations are 
"spouts" and "bolts". Spouts and bolts have interfaces that you implement to 
run your application-specific logic.
+
+A spout is a source of streams. For example, a spout may read tuples off of a 
[Kestrel](http://github.com/nathanmarz/storm-kestrel) queue and emit them as a 
stream. Or a spout may connect to the Twitter API and emit a stream of tweets.
+
+A bolt consumes any number of input streams, does some processing, and 
possibly emits new streams. Complex stream transformations, like computing a 
stream of trending topics from a stream of tweets, require multiple steps and 
thus multiple bolts. Bolts can do anything from run functions, filter tuples, 
do streaming aggregations, do streaming joins, talk to databases, and more.
+
+Networks of spouts and bolts are packaged into a "topology" which is the 
top-level abstraction that you submit to Storm clusters for execution. A 
topology is a graph of stream transformations where each node is a spout or 
bolt. Edges in the graph indicate which bolts are subscribing to which streams. 
When a spout or bolt emits a tuple to a stream, it sends the tuple to every 
bolt that subscribed to that stream.
+
+![A Storm topology](images/topology.png)
+
+Links between nodes in your topology indicate how tuples should be passed 
around. For example, if there is a link between Spout A and Bolt B, a link from 
Spout A to Bolt C, and a link from Bolt B to Bolt C, then everytime Spout A 
emits a tuple, it will send the tuple to both Bolt B and Bolt C. All of Bolt 
B's output tuples will go to Bolt C as well.
+
+Each node in a Storm topology executes in parallel. In your topology, you can 
specify how much parallelism you want for each node, and then Storm will spawn 
that number of threads across the cluster to do the execution.
+
+A topology runs forever, or until you kill it. Storm will automatically 
reassign any failed tasks. Additionally, Storm guarantees that there will be no 
data loss, even if machines go down and messages are dropped.
+
+## Data model
+
+Storm uses tuples as its data model. A tuple is a named list of values, and a 
field in a tuple can be an object of any type. Out of the box, Storm supports 
all the primitive types, strings, and byte arrays as tuple field values. To use 
an object of another type, you just need to implement [a 
serializer](Serialization.html) for the type.
+
+Every node in a topology must declare the output fields for the tuples it 
emits. For example, this bolt declares that it emits 2-tuples with the fields 
"double" and "triple":
+
+```java
+public class DoubleAndTripleBolt extends BaseRichBolt {
+    private OutputCollectorBase _collector;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollectorBase 
collector) {
+        _collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        int val = input.getInteger(0);        
+        _collector.emit(input, new Values(val*2, val*3));
+        _collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("double", "triple"));
+    }    
+}
+```
+
+The `declareOutputFields` function declares the output fields `["double", 
"triple"]` for the component. The rest of the bolt will be explained in the 
upcoming sections.
+
+## A simple topology
+
+Let's take a look at a simple topology to explore the concepts more and see 
how the code shapes up. Let's look at the `ExclamationTopology` definition from 
storm-starter:
+
+```java
+TopologyBuilder builder = new TopologyBuilder();        
+builder.setSpout("words", new TestWordSpout(), 10);        
+builder.setBolt("exclaim1", new ExclamationBolt(), 3)
+        .shuffleGrouping("words");
+builder.setBolt("exclaim2", new ExclamationBolt(), 2)
+        .shuffleGrouping("exclaim1");
+```
+
+This topology contains a spout and two bolts. The spout emits words, and each 
bolt appends the string "!!!" to its input. The nodes are arranged in a line: 
the spout emits to the first bolt which then emits to the second bolt. If the 
spout emits the tuples ["bob"] and ["john"], then the second bolt will emit the 
words ["bob!!!!!!"] and ["john!!!!!!"].
+
+This code defines the nodes using the `setSpout` and `setBolt` methods. These 
methods take as input a user-specified id, an object containing the processing 
logic, and the amount of parallelism you want for the node. In this example, 
the spout is given id "words" and the bolts are given ids "exclaim1" and 
"exclaim2". 
+
+The object containing the processing logic implements the 
[IRichSpout](/javadoc/apidocs/backtype/storm/topology/IRichSpout.html) 
interface for spouts and the 
[IRichBolt](/javadoc/apidocs/backtype/storm/topology/IRichBolt.html) interface 
for bolts.
+
+The last parameter, how much parallelism you want for the node, is optional. 
It indicates how many threads should execute that component across the cluster. 
If you omit it, Storm will only allocate one thread for that node.
+
+`setBolt` returns an 
[InputDeclarer](/javadoc/apidocs/backtype/storm/topology/InputDeclarer.html) 
object that is used to define the inputs to the Bolt. Here, component 
"exclaim1" declares that it wants to read all the tuples emitted by component 
"words" using a shuffle grouping, and component "exclaim2" declares that it 
wants to read all the tuples emitted by component "exclaim1" using a shuffle 
grouping. "shuffle grouping" means that tuples should be randomly distributed 
from the input tasks to the bolt's tasks. There are many ways to group data 
between components. These will be explained in a few sections.
+
+If you wanted component "exclaim2" to read all the tuples emitted by both 
component "words" and component "exclaim1", you would write component 
"exclaim2"'s definition like this:
+
+```java
+builder.setBolt("exclaim2", new ExclamationBolt(), 5)
+            .shuffleGrouping("words")
+            .shuffleGrouping("exclaim1");
+```
+
+As you can see, input declarations can be chained to specify multiple sources 
for the Bolt.
+
+Let's dig into the implementations of the spouts and bolts in this topology. 
Spouts are responsible for emitting new messages into the topology. 
`TestWordSpout` in this topology emits a random word from the list ["nathan", 
"mike", "jackson", "golda", "bertels"] as a 1-tuple every 100ms. The 
implementation of `nextTuple()` in TestWordSpout looks like this:
+
+```java
+public void nextTuple() {
+    Utils.sleep(100);
+    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", 
"bertels"};
+    final Random rand = new Random();
+    final String word = words[rand.nextInt(words.length)];
+    _collector.emit(new Values(word));
+}
+```
+
+As you can see, the implementation is very straightforward.
+
+`ExclamationBolt` appends the string "!!!" to its input. Let's take a look at 
the full implementation for `ExclamationBolt`:
+
+```java
+public static class ExclamationBolt implements IRichBolt {
+    OutputCollector _collector;
+
+    public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
+        _collector = collector;
+    }
+
+    public void execute(Tuple tuple) {
+        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+        _collector.ack(tuple);
+    }
+
+    public void cleanup() {
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+    
+    public Map getComponentConfiguration() {
+        return null;
+    }
+}
+```
+
+The `prepare` method provides the bolt with an `OutputCollector` that is used 
for emitting tuples from this bolt. Tuples can be emitted at anytime from the 
bolt -- in the `prepare`, `execute`, or `cleanup` methods, or even 
asynchronously in another thread. This `prepare` implementation simply saves 
the `OutputCollector` as an instance variable to be used later on in the 
`execute` method.
+
+The `execute` method receives a tuple from one of the bolt's inputs. The 
`ExclamationBolt` grabs the first field from the tuple and emits a new tuple 
with the string "!!!" appended to it. If you implement a bolt that subscribes 
to multiple input sources, you can find out which component the 
[Tuple](/javadoc/apidocs/backtype/storm/tuple/Tuple.html) came from by using 
the `Tuple#getSourceComponent` method.
+
+There's a few other things going in in the `execute` method, namely that the 
input tuple is passed as the first argument to `emit` and the input tuple is 
acked on the final line. These are part of Storm's reliability API for 
guaranteeing no data loss and will be explained later in this tutorial. 
+
+The `cleanup` method is called when a Bolt is being shutdown and should 
cleanup any resources that were opened. There's no guarantee that this method 
will be called on the cluster: for example, if the machine the task is running 
on blows up, there's no way to invoke the method. The `cleanup` method is 
intended for when you run topologies in [local mode](Local-mode.html) (where a 
Storm cluster is simulated in process), and you want to be able to run and kill 
many topologies without suffering any resource leaks.
+
+The `declareOutputFields` method declares that the `ExclamationBolt` emits 
1-tuples with one field called "word".
+
+The `getComponentConfiguration` method allows you to configure various aspects 
of how this component runs. This is a more advanced topic that is explained 
further on [Configuration](Configuration.html).
+
+Methods like `cleanup` and `getComponentConfiguration` are often not needed in 
a bolt implementation. You can define bolts more succinctly by using a base 
class that provides default implementations where appropriate. 
`ExclamationBolt` can be written more succinctly by extending `BaseRichBolt`, 
like so:
+
+```java
+public static class ExclamationBolt extends BaseRichBolt {
+    OutputCollector _collector;
+
+    public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
+        _collector = collector;
+    }
+
+    public void execute(Tuple tuple) {
+        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+        _collector.ack(tuple);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }    
+}
+```
+
+## Running ExclamationTopology in local mode
+
+Let's see how to run the `ExclamationTopology` in local mode and see that it's 
working.
+
+Storm has two modes of operation: local mode and distributed mode. In local 
mode, Storm executes completely in process by simulating worker nodes with 
threads. Local mode is useful for testing and development of topologies. When 
you run the topologies in storm-starter, they'll run in local mode and you'll 
be able to see what messages each component is emitting. You can read more 
about running topologies in local mode on [Local mode](Local-mode.html).
+
+In distributed mode, Storm operates as a cluster of machines. When you submit 
a topology to the master, you also submit all the code necessary to run the 
topology. The master will take care of distributing your code and allocating 
workers to run your topology. If workers go down, the master will reassign them 
somewhere else. You can read more about running topologies on a cluster on 
[Running topologies on a production 
cluster](Running-topologies-on-a-production-cluster.html)]. 
+
+Here's the code that runs `ExclamationTopology` in local mode:
+
+```java
+Config conf = new Config();
+conf.setDebug(true);
+conf.setNumWorkers(2);
+
+LocalCluster cluster = new LocalCluster();
+cluster.submitTopology("test", conf, builder.createTopology());
+Utils.sleep(10000);
+cluster.killTopology("test");
+cluster.shutdown();
+```
+
+First, the code defines an in-process cluster by creating a `LocalCluster` 
object. Submitting topologies to this virtual cluster is identical to 
submitting topologies to distributed clusters. It submits a topology to the 
`LocalCluster` by calling `submitTopology`, which takes as arguments a name for 
the running topology, a configuration for the topology, and then the topology 
itself.
+
+The name is used to identify the topology so that you can kill it later on. A 
topology will run indefinitely until you kill it.
+
+The configuration is used to tune various aspects of the running topology. The 
two configurations specified here are very common:
+
+1. **TOPOLOGY_WORKERS** (set with `setNumWorkers`) specifies how many 
_processes_ you want allocated around the cluster to execute the topology. Each 
component in the topology will execute as many _threads_. The number of threads 
allocated to a given component is configured through the `setBolt` and 
`setSpout` methods. Those _threads_ exist within worker _processes_. Each 
worker _process_ contains within it some number of _threads_ for some number of 
components. For instance, you may have 300 threads specified across all your 
components and 50 worker processes specified in your config. Each worker 
process will execute 6 threads, each of which of could belong to a different 
component. You tune the performance of Storm topologies by tweaking the 
parallelism for each component and the number of worker processes those threads 
should run within.
+2. **TOPOLOGY_DEBUG** (set with `setDebug`), when set to true, tells Storm to 
log every message every emitted by a component. This is useful in local mode 
when testing topologies, but you probably want to keep this turned off when 
running topologies on the cluster.
+
+There's many other configurations you can set for the topology. The various 
configurations are detailed on [the Javadoc for 
Config](/javadoc/apidocs/backtype/storm/Config.html).
+
+To learn about how to set up your development environment so that you can run 
topologies in local mode (such as in Eclipse), see [Creating a new Storm 
project](Creating-a-new-Storm-project.html).
+
+## Stream groupings
+
+A stream grouping tells a topology how to send tuples between two components. 
Remember, spouts and bolts execute in parallel as many tasks across the 
cluster. If you look at how a topology is executing at the task level, it looks 
something like this:
+
+![Tasks in a topology](images/topology-tasks.png)
+
+When a task for Bolt A emits a tuple to Bolt B, which task should it send the 
tuple to?
+
+A "stream grouping" answers this question by telling Storm how to send tuples 
between sets of tasks. Before we dig into the different kinds of stream 
groupings, let's take a look at another topology from 
[storm-starter](http://github.com/apache/storm/blob/master/examples/storm-starter).
 This 
[WordCountTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java)
 reads sentences off of a spout and streams out of `WordCountBolt` the total 
number of times it has seen that word before:
+
+```java
+TopologyBuilder builder = new TopologyBuilder();
+        
+builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
+builder.setBolt("split", new SplitSentence(), 8)
+        .shuffleGrouping("sentences");
+builder.setBolt("count", new WordCount(), 12)
+        .fieldsGrouping("split", new Fields("word"));
+```
+
+`SplitSentence` emits a tuple for each word in each sentence it receives, and 
`WordCount` keeps a map in memory from word to count. Each time `WordCount` 
receives a word, it updates its state and emits the new word count.
+
+There's a few different kinds of stream groupings.
+
+The simplest kind of grouping is called a "shuffle grouping" which sends the 
tuple to a random task. A shuffle grouping is used in the `WordCountTopology` 
to send tuples from `RandomSentenceSpout` to the `SplitSentence` bolt. It has 
the effect of evenly distributing the work of processing the tuples across all 
of `SplitSentence` bolt's tasks.
+
+A more interesting kind of grouping is the "fields grouping". A fields 
grouping is used between the `SplitSentence` bolt and the `WordCount` bolt. It 
is critical for the functioning of the `WordCount` bolt that the same word 
always go to the same task. Otherwise, more than one task will see the same 
word, and they'll each emit incorrect values for the count since each has 
incomplete information. A fields grouping lets you group a stream by a subset 
of its fields. This causes equal values for that subset of fields to go to the 
same task. Since `WordCount` subscribes to `SplitSentence`'s output stream 
using a fields grouping on the "word" field, the same word always goes to the 
same task and the bolt produces the correct output.
+
+Fields groupings are the basis of implementing streaming joins and streaming 
aggregations as well as a plethora of other use cases. Underneath the hood, 
fields groupings are implemented using mod hashing.
+
+There's a few other kinds of stream groupings. You can read more about them on 
[Concepts](Concepts.html). 
+
+## Defining Bolts in other languages
+
+Bolts can be defined in any language. Bolts written in another language are 
executed as subprocesses, and Storm communicates with those subprocesses with 
JSON messages over stdin/stdout. The communication protocol just requires an 
~100 line adapter library, and Storm ships with adapter libraries for Ruby, 
Python, and Fancy. 
+
+Here's the definition of the `SplitSentence` bolt from `WordCountTopology`:
+
+```java
+public static class SplitSentence extends ShellBolt implements IRichBolt {
+    public SplitSentence() {
+        super("python", "splitsentence.py");
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+}
+```
+
+`SplitSentence` overrides `ShellBolt` and declares it as running using 
`python` with the arguments `splitsentence.py`. Here's the implementation of 
`splitsentence.py`:
+
+```python
+import storm
+
+class SplitSentenceBolt(storm.BasicBolt):
+    def process(self, tup):
+        words = tup.values[0].split(" ")
+        for word in words:
+          storm.emit([word])
+
+SplitSentenceBolt().run()
+```
+
+For more information on writing spouts and bolts in other languages, and to 
learn about how to create topologies in other languages (and avoid the JVM 
completely), see [Using non-JVM languages with 
Storm](Using-non-JVM-languages-with-Storm.html).
+
+## Guaranteeing message processing
+
+Earlier on in this tutorial, we skipped over a few aspects of how tuples are 
emitted. Those aspects were part of Storm's reliability API: how Storm 
guarantees that every message coming off a spout will be fully processed. See 
[Guaranteeing message processing](Guaranteeing-message-processing.html) for 
information on how this works and what you have to do as a user to take 
advantage of Storm's reliability capabilities.
+
+## Transactional topologies
+
+Storm guarantees that every message will be played through the topology at 
least once. A common question asked is "how do you do things like counting on 
top of Storm? Won't you overcount?" Storm has a feature called transactional 
topologies that let you achieve exactly-once messaging semantics for most 
computations. Read more about transactional topologies 
[here](Transactional-topologies.html). 
+
+## Distributed RPC
+
+This tutorial showed how to do basic stream processing on top of Storm. 
There's lots more things you can do with Storm's primitives. One of the most 
interesting applications of Storm is Distributed RPC, where you parallelize the 
computation of intense functions on the fly. Read more about Distributed RPC 
[here](Distributed-RPC.html). 
+
+## Conclusion
+
+This tutorial gave a broad overview of developing, testing, and deploying 
Storm topologies. The rest of the documentation dives deeper into all the 
aspects of using Storm.
\ No newline at end of file

Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.md?rev=1735297&view=auto
==============================================================================
--- 
storm/branches/bobby-versioned-site/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.md
 (added)
+++ 
storm/branches/bobby-versioned-site/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.md
 Wed Mar 16 21:01:12 2016
@@ -0,0 +1,123 @@
+---
+title: Understanding the Parallelism of a Storm Topology
+layout: documentation
+documentation: true
+---
+## What makes a running topology: worker processes, executors and tasks
+
+Storm distinguishes between the following three main entities that are used to 
actually run a topology in a Storm cluster:
+
+1. Worker processes
+2. Executors (threads)
+3. Tasks
+
+Here is a simple illustration of their relationships:
+
+![The relationships of worker processes, executors (threads) and tasks in 
Storm](images/relationships-worker-processes-executors-tasks.png)
+
+A _worker process_ executes a subset of a topology. A worker process belongs 
to a specific topology and may run one or more executors for one or more 
components (spouts or bolts) of this topology. A running topology consists of 
many such processes running on many machines within a Storm cluster.
+
+An _executor_ is a thread that is spawned by a worker process. It may run one 
or more tasks for the same component (spout or bolt).
+
+A _task_ performs the actual data processing — each spout or bolt that you 
implement in your code executes as many tasks across the cluster. The number of 
tasks for a component is always the same throughout the lifetime of a topology, 
but the number of executors (threads) for a component can change over time. 
This means that the following condition holds true: ``#threads ≤ #tasks``. By 
default, the number of tasks is set to be the same as the number of executors, 
i.e. Storm will run one task per thread.
+
+## Configuring the parallelism of a topology
+
+Note that in Storm’s terminology "parallelism" is specifically used to 
describe the so-called _parallelism hint_, which means the initial number of 
executor (threads) of a component. In this document though we use the term 
"parallelism" in a more general sense to describe how you can configure not 
only the number of executors but also the number of worker processes and the 
number of tasks of a Storm topology. We will specifically call out when 
"parallelism" is used in the normal, narrow definition of Storm.
+
+The following sections give an overview of the various configuration options 
and how to set them in your code. There is more than one way of setting these 
options though, and the table lists only some of them. Storm currently has the 
following [order of precedence for configuration settings](Configuration.html): 
``defaults.yaml`` < ``storm.yaml`` < topology-specific configuration < internal 
component-specific configuration < external component-specific configuration.
+
+### Number of worker processes
+
+* Description: How many worker processes to create _for the topology_ across 
machines in the cluster.
+* Configuration option: 
[TOPOLOGY_WORKERS](/javadoc/apidocs/backtype/storm/Config.html#TOPOLOGY_WORKERS)
+* How to set in your code (examples):
+    * [Config#setNumWorkers](/javadoc/apidocs/backtype/storm/Config.html)
+
+### Number of executors (threads)
+
+* Description: How many executors to spawn _per component_.
+* Configuration option: ?
+* How to set in your code (examples):
+    * 
[TopologyBuilder#setSpout()](/javadoc/apidocs/backtype/storm/topology/TopologyBuilder.html)
+    * 
[TopologyBuilder#setBolt()](/javadoc/apidocs/backtype/storm/topology/TopologyBuilder.html)
+    * Note that as of Storm 0.8 the ``parallelism_hint`` parameter now 
specifies the initial number of executors (not tasks!) for that bolt.
+
+### Number of tasks
+
+* Description: How many tasks to create _per component_.
+* Configuration option: 
[TOPOLOGY_TASKS](/javadoc/apidocs/backtype/storm/Config.html#TOPOLOGY_TASKS)
+* How to set in your code (examples):
+    * 
[ComponentConfigurationDeclarer#setNumTasks()](/javadoc/apidocs/backtype/storm/topology/ComponentConfigurationDeclarer.html)
+
+
+Here is an example code snippet to show these settings in practice:
+
+```java
+topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
+               .setNumTasks(4)
+               .shuffleGrouping("blue-spout);
+```
+
+In the above code we configured Storm to run the bolt ``GreenBolt`` with an 
initial number of two executors and four associated tasks. Storm will run two 
tasks per executor (thread). If you do not explicitly configure the number of 
tasks, Storm will run by default one task per executor.
+
+## Example of a running topology
+
+The following illustration shows how a simple topology would look like in 
operation. The topology consists of three components: one spout called 
``BlueSpout`` and two bolts called ``GreenBolt`` and ``YellowBolt``. The 
components are linked such that ``BlueSpout`` sends its output to 
``GreenBolt``, which in turns sends its own output to ``YellowBolt``.
+
+![Example of a running topology in 
Storm](images/example-of-a-running-topology.png)
+
+The ``GreenBolt`` was configured as per the code snippet above whereas 
``BlueSpout`` and ``YellowBolt`` only set the parallelism hint (number of 
executors). Here is the relevant code:
+
+```java
+Config conf = new Config();
+conf.setNumWorkers(2); // use two worker processes
+
+topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism 
hint to 2
+
+topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
+               .setNumTasks(4)
+               .shuffleGrouping("blue-spout");
+
+topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
+               .shuffleGrouping("green-bolt");
+
+StormSubmitter.submitTopology(
+        "mytopology",
+        conf,
+        topologyBuilder.createTopology()
+    );
+```
+
+And of course Storm comes with additional configuration settings to control 
the parallelism of a topology, including:
+
+* 
[TOPOLOGY_MAX_TASK_PARALLELISM](/javadoc/apidocs/backtype/storm/Config.html#TOPOLOGY_MAX_TASK_PARALLELISM):
 This setting puts a ceiling on the number of executors that can be spawned for 
a single component. It is typically used during testing to limit the number of 
threads spawned when running a topology in local mode. You can set this option 
via e.g. 
[Config#setMaxTaskParallelism()](/javadoc/apidocs/backtype/storm/Config.html#setMaxTaskParallelism(int)).
+
+## How to change the parallelism of a running topology
+
+A nifty feature of Storm is that you can increase or decrease the number of 
worker processes and/or executors without being required to restart the cluster 
or the topology. The act of doing so is called rebalancing.
+
+You have two options to rebalance a topology:
+
+1. Use the Storm web UI to rebalance the topology.
+2. Use the CLI tool storm rebalance as described below.
+
+Here is an example of using the CLI tool:
+
+```
+## Reconfigure the topology "mytopology" to use 5 worker processes,
+## the spout "blue-spout" to use 3 executors and
+## the bolt "yellow-bolt" to use 10 executors.
+
+$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
+```
+
+## References
+
+* [Concepts](Concepts.html)
+* [Configuration](Configuration.html)
+* [Running topologies on a production 
cluster](Running-topologies-on-a-production-cluster.html)]
+* [Local mode](Local-mode.html)
+* [Tutorial](Tutorial.html)
+* [Storm API documentation](/javadoc/apidocs/), most notably the class 
``Config``
+

Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Using-non-JVM-languages-with-Storm.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Using-non-JVM-languages-with-Storm.md?rev=1735297&view=auto
==============================================================================
--- 
storm/branches/bobby-versioned-site/releases/0.10.0/Using-non-JVM-languages-with-Storm.md
 (added)
+++ 
storm/branches/bobby-versioned-site/releases/0.10.0/Using-non-JVM-languages-with-Storm.md
 Wed Mar 16 21:01:12 2016
@@ -0,0 +1,52 @@
+---
+layout: documentation
+---
+- two pieces: creating topologies and implementing spouts and bolts in other 
languages
+- creating topologies in another language is easy since topologies are just 
thrift structures (link to storm.thrift)
+- implementing spouts and bolts in another language is called a "multilang 
components" or "shelling"
+   - Here's a specification of the protocol: [Multilang 
protocol](Multilang-protocol.html)
+   - the thrift structure lets you define multilang components explicitly as a 
program and a script (e.g., python and the file implementing your bolt)
+   - In Java, you override ShellBolt or ShellSpout to create multilang 
components
+       - note that output fields declarations happens in the thrift structure, 
so in Java you create multilang components like the following:
+            - declare fields in java, processing code in the other language by 
specifying it in constructor of shellbolt
+   - multilang uses json messages over stdin/stdout to communicate with the 
subprocess
+   - storm comes with ruby, python, and fancy adapters that implement the 
protocol. show an example of python
+      - python supports emitting, anchoring, acking, and logging
+- "storm shell" command makes constructing jar and uploading to nimbus easy
+  - makes jar and uploads it
+  - calls your program with host/port of nimbus and the jarfile id
+
+## Notes on implementing a DSL in a non-JVM language
+
+The right place to start is src/storm.thrift. Since Storm topologies are just 
Thrift structures, and Nimbus is a Thrift daemon, you can create and submit 
topologies in any language.
+
+When you create the Thrift structs for spouts and bolts, the code for the 
spout or bolt is specified in the ComponentObject struct:
+
+```
+union ComponentObject {
+  1: binary serialized_java;
+  2: ShellComponent shell;
+  3: JavaObject java_object;
+}
+```
+
+For a non-JVM DSL, you would want to make use of "2" and "3". ShellComponent 
lets you specify a script to run that component (e.g., your python code). And 
JavaObject lets you specify native java spouts and bolts for the component (and 
Storm will use reflection to create that spout or bolt).
+
+There's a "storm shell" command that will help with submitting a topology. Its 
usage is like this:
+
+```
+storm shell resources/ python topology.py arg1 arg2
+```
+
+storm shell will then package resources/ into a jar, upload the jar to Nimbus, 
and call your topology.py script like this:
+
+```
+python topology.py arg1 arg2 {nimbus-host} {nimbus-port} 
{uploaded-jar-location}
+```
+
+Then you can connect to Nimbus using the Thrift API and submit the topology, 
passing {uploaded-jar-location} into the submitTopology method. For reference, 
here's the submitTopology definition:
+
+```
+void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string 
jsonConf, 4: StormTopology topology)
+    throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
+```
\ No newline at end of file


Reply via email to