Repository: storm
Updated Branches:
  refs/heads/master f118060dc -> a48fae243


STORM-1659:Add documents for external projects


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a00c671
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a00c671
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a00c671

Branch: refs/heads/master
Commit: 0a00c67170db83da3f5c29dcecf6f185fd066b9b
Parents: f118060
Author: Xin Wang <[email protected]>
Authored: Sun Mar 27 16:04:38 2016 +0800
Committer: Xin Wang <[email protected]>
Committed: Sun Mar 27 16:10:53 2016 +0800

----------------------------------------------------------------------
 docs/Kestrel-and-Storm.md   |   2 +-
 docs/index.md               |  14 +-
 docs/storm-cassandra.md     | 255 ++++++++++++++++++++++++++
 docs/storm-elasticsearch.md | 105 +++++++++++
 docs/storm-mongodb.md       | 199 ++++++++++++++++++++
 docs/storm-mqtt.md          | 379 +++++++++++++++++++++++++++++++++++++++
 6 files changed, 948 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/Kestrel-and-Storm.md
----------------------------------------------------------------------
diff --git a/docs/Kestrel-and-Storm.md b/docs/Kestrel-and-Storm.md
index cd584ff..ff48995 100644
--- a/docs/Kestrel-and-Storm.md
+++ b/docs/Kestrel-and-Storm.md
@@ -3,7 +3,7 @@ title: Storm and Kestrel
 layout: documentation
 documentation: true
 ---
-This page explains how to use to Storm to consume items from a Kestrel cluster.
+This page explains how to use Storm to consume items from a Kestrel cluster.
 
 ## Preliminaries
 ### Storm

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 8c6859e..bfd68db 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -64,15 +64,19 @@ Trident is an alternative interface to Storm. It provides 
exactly-once processin
 * [Worker Profiling](dynamic-worker-profiling.html)
 
 ### Integration With External Systems, and Other Libraries
-* [Event Hubs Intergration](storm-eventhubs.html)
+* [Apache Kafka Integration](storm-kafka.html)
 * [Apache HBase Integration](storm-hbase.html)
 * [Apache HDFS Integration](storm-hdfs.html)
 * [Apache Hive Integration](storm-hive.html)
+* [Apache Solr Integration](storm-solr.html)
 * [JDBC Integration](storm-jdbc.html)
-* [Apache Kafka Integration](storm-kafka.html)
-* [REDIS Integration](storm-redis.html) 
-* [Kestrel and Storm](Kestrel-and-Storm.html)
-* [Solr Integration](storm-solr.html)
+* [Redis Integration](storm-redis.html) 
+* [Cassandra Integration](storm-cassandra.html)
+* [Event Hubs Intergration](storm-eventhubs.html)
+* [Elasticsearch Integration](storm-elasticsearch.html)
+* [MQTT Integration](storm-mqtt.html)
+* [Mongodb Integration](storm-mongodb.html)
+* [Kestrel Integration](Kestrel-and-Storm.html)
 
 ### Advanced
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-cassandra.md
----------------------------------------------------------------------
diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md
new file mode 100644
index 0000000..c674fbc
--- /dev/null
+++ b/docs/storm-cassandra.md
@@ -0,0 +1,255 @@
+---
+title: Storm Cassandra Integration
+layout: documentation
+documentation: true
+---
+
+### Bolt API implementation for Apache Cassandra
+
+This library provides core storm bolt on top of Apache Cassandra.
+Provides simple DSL to map storm *Tuple* to Cassandra Query Language 
*Statement*.
+
+
+### Configuration
+The following properties may be passed to storm configuration.
+
+| **Property name**                            | **Description** | **Default** 
        |
+| ---------------------------------------------| ----------------| 
--------------------|
+| **cassandra.keyspace**                       | -               |             
        |
+| **cassandra.nodes**                          | -               | 
{"localhost"}       |
+| **cassandra.username**                       | -               | -           
        |
+| **cassandra.password**                       | -               | -           
        |
+| **cassandra.port**                           | -               | 9092        
        |
+| **cassandra.output.consistencyLevel**        | -               | ONE         
        |
+| **cassandra.batch.size.rows**                | -               | 100         
        |
+| **cassandra.retryPolicy**                    | -               | 
DefaultRetryPolicy  |
+| **cassandra.reconnectionPolicy.baseDelayMs** | -               | 100 (ms)    
        |
+| **cassandra.reconnectionPolicy.maxDelayMs**  | -               | 60000 (ms)  
        |
+
+### CassandraWriterBolt
+
+####Static import
+```java
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*
+
+```
+
+#### Insert Query Builder
+##### Insert query including only the specified tuple fields.
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) 
VALUES (?, ?, ?, ?, ?);")
+                .with(
+                    fields("title", "year", "performer", "genre", "tracks")
+                 )
+            )
+    );
+```
+
+##### Insert query including all tuple fields.
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) 
VALUES (?, ?, ?, ?, ?);")
+                .with( all() )
+            )
+    );
+```
+
+##### Insert multiple queries from one input tuple.
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO titles_per_album 
(title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer 
(title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+        )
+    );
+```
+
+##### Insert query using QueryBuilder
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) 
VALUES (?, ?, ?, ?, ?);")
+                .with(all()))
+            )
+    )
+```
+
+##### Insert query with static bound query
+```java
+
+    new CassandraWriterBolt(
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) 
VALUES (?, ?, ?, ?, ?);")
+                .bind(all());
+         )
+    );
+```
+
+##### Insert query with static bound query using named setters and aliases
+```java
+
+    new CassandraWriterBolt(
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) 
VALUES (:ti, :ye, :pe, :ge, :tr);")
+                .bind(
+                    field("ti"),as("title"),
+                    field("ye").as("year")),
+                    field("pe").as("performer")),
+                    field("ge").as("genre")),
+                    field("tr").as("tracks"))
+                ).byNamedSetters()
+         )
+    );
+```
+
+##### Insert query with bound statement load from storm configuration
+```java
+
+    new CassandraWriterBolt(
+         boundQuery(named("insertIntoAlbum"))
+            .bind(all());
+```
+
+##### Insert query with bound statement load from tuple field
+```java
+
+    new CassandraWriterBolt(
+         boundQuery(namedByField("cql"))
+            .bind(all());
+```
+
+##### Insert query with batch statement
+```java
+
+    // Logged
+    new CassandraWriterBolt(loggedBatch(
+            simpleQuery("INSERT INTO titles_per_album 
(title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer 
(title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+        )
+    );
+// UnLogged
+    new CassandraWriterBolt(unLoggedBatch(
+            simpleQuery("INSERT INTO titles_per_album 
(title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer 
(title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+        )
+    );
+```
+
+### How to handle query execution results
+
+The interface *ExecutionResultHandler* can be used to custom how an execution 
result should be handle.
+
+```java
+public interface ExecutionResultHandler extends Serializable {
+    void onQueryValidationException(QueryValidationException e, 
OutputCollector collector, Tuple tuple);
+
+    void onReadTimeoutException(ReadTimeoutException e, OutputCollector 
collector, Tuple tuple);
+
+    void onWriteTimeoutException(WriteTimeoutException e, OutputCollector 
collector, Tuple tuple);
+
+    void onUnavailableException(UnavailableException e, OutputCollector 
collector, Tuple tuple);
+
+    void onQuerySuccess(OutputCollector collector, Tuple tuple);
+}
+```
+
+By default, the CassandraBolt fails a tuple on all Cassandra Exception (see 
[BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java))
 .
+
+```java
+    new CassandraWriterBolt(insertInto("album").values(with(all()).build())
+            .withResultHandler(new MyCustomResultHandler());
+```
+
+### Declare Output fields
+
+A CassandraBolt can declare output fields / stream output fields.
+For instance, this may be used to remit a new tuple on error, or to chain 
queries.
+
+```java
+    new 
CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
+            .withResultHandler(new EmitOnDriverExceptionResultHandler());
+            .withStreamOutputFields("stream_error", new Fields("message");
+
+    public static class EmitOnDriverExceptionResultHandler extends 
BaseExecutionResultHandler {
+        @Override
+        protected void onDriverException(DriverException e, OutputCollector 
collector, Tuple tuple) {
+            LOG.error("An error occurred while executing cassandra statement", 
e);
+            collector.emit("stream_error", new Values(e.getMessage()));
+            collector.ack(tuple);
+        }
+    }
+```
+
+### Murmur3FieldGrouping
+
+[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java)
  can be used to optimise cassandra writes.
+The stream is partitioned among the bolt's tasks based on the specified row 
partition keys.
+
+```java
+CassandraWriterBolt bolt = new CassandraWriterBolt(
+    insertInto("album")
+        .values(
+            with(fields("title", "year", "performer", "genre", "tracks")
+            ).build());
+builder.setBolt("BOLT_WRITER", bolt, 4)
+        .customGrouping("spout", new Murmur3StreamGrouping("title"))
+```
+
+### Trident API support
+storm-cassandra support Trident `state` API for `inserting` data into 
Cassandra. 
+```java
+        CassandraState.Options options = new CassandraState.Options(new 
CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+                "INSERT INTO weather.temperature(weather_station_id, 
weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+                .bind(with(field("weather_station_id"), 
field("name").as("weather_station_name"), field("event_time").now(), 
field("temperature")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        CassandraStateFactory insertValuesStateFactory =  new 
CassandraStateFactory(options);
+        TridentState selectState = 
topology.newStaticState(selectWeatherStationStateFactory);
+        stream = stream.stateQuery(selectState, new 
Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+        stream = stream.each(new Fields("name"), new PrintFunction(), new 
Fields("name_x"));
+        stream.partitionPersist(insertValuesStateFactory, new 
Fields("weather_station_id", "name", "event_time", "temperature"), new 
CassandraStateUpdater(), new Fields());
+```
+
+Below `state` API for `querying` data from Cassandra.
+```java
+        CassandraState.Options options = new CassandraState.Options(new 
CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT 
name FROM weather.station WHERE id = ?")
+                 .bind(with(field("weather_station_id").as("id")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        options.withCQLResultSetValuesMapper(new 
TridentResultSetValuesMapper(new Fields("name")));
+        CassandraStateFactory selectWeatherStationStateFactory =  new 
CassandraStateFactory(options);
+        CassandraStateFactory selectWeatherStationStateFactory = 
getSelectWeatherStationStateFactory();
+        TridentState selectState = 
topology.newStaticState(selectWeatherStationStateFactory);
+        stream = stream.stateQuery(selectState, new 
Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));        
 
+```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+ * Sriharha Chintalapani ([[email protected]](mailto:[email protected]))
+ * P. Taylor Goetz ([[email protected]](mailto:[email protected]))

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/storm-elasticsearch.md b/docs/storm-elasticsearch.md
new file mode 100644
index 0000000..fff0f51
--- /dev/null
+++ b/docs/storm-elasticsearch.md
@@ -0,0 +1,105 @@
+---
+title: Storm Elasticsearch Integration
+layout: documentation
+documentation: true
+---
+
+# Storm Elasticsearch Bolt & Trident State
+
+  EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from 
storm into Elasticsearch directly.
+  For detailed description, please refer to the following.
+
+## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
+
+EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in 
specified index & type combination. 
+Users should make sure that ```EsTupleMapper``` can extract "source", "index", 
"type", and "id" from input tuple.
+"index" and "type" are used for identifying target index and type.
+"source" is a document in JSON format string that will be indexed in 
Elasticsearch.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
+```
+
+## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
+
+EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to 
send percolate request to specified index & type combination. 
+User should make sure ```EsTupleMapper``` can extract "source", "index", 
"type" from input tuple.
+"index" and "type" are used for identifying target index and type.
+"source" is a document in JSON format string that will be sent in percolate 
request to Elasticsearch.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
+```
+
+If there exists non-empty percolate response, EsPercolateBolt will emit tuple 
with original source and Percolate.Match
+for each Percolate.Match in PercolateResponse.
+
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+Elasticsearch Trident state also follows similar pattern to EsBolts. It takes 
in EsConfig and EsTupleMapper as an arg.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+
+StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+TridentState state = stream.partitionPersist(factory, esFields, new 
EsUpdater(), new Fields());
+ ```
+
+## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
+
+EsLookupBolt performs a get request to Elasticsearch. 
+In order to do that, three dependencies need to be satisfied. Apart from usual 
EsConfig, two other dependencies must be provided:
+    ElasticsearchGetRequest is used to convert the incoming Tuple to the 
GetRequest that will be executed against Elasticsearch.
+    EsLookupResultOutput is used to declare the output fields and convert the 
GetResponse to values that are emited by the bolt.
+
+Incoming tuple is passed to provided GetRequest creator and the result of that 
execution is passed to Elasticsearch client.
+The bolt then uses the provider output adapter (EsLookupResultOutput) to 
convert the GetResponse to Values to emit.
+The output fields are also specified by the user of the bolt via the output 
adapter (EsLookupResultOutput).
+
+```java
+EsConfig esConfig = createEsConfig();
+ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
+EsLookupResultOutput output = createOutput();
+EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, 
output);
+```
+
+## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
+  
+Provided components (Bolt, State) takes in EsConfig as a constructor arg.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+```
+
+or
+
+```java
+Map<String, String> additionalParameters = new HashMap<>();
+additionalParameters.put("client.transport.sniff", "true");
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, 
additionalParameters);
+```
+
+### EsConfig params
+
+|Arg  |Description | Type
+|---   |--- |---
+|clusterName | Elasticsearch cluster name | String (required) |
+|nodes | Elasticsearch nodes in a String array, each element should follow 
{host}:{port} pattern | String array (required) |
+|additionalParameters | Additional Elasticsearch Transport Client 
configuration parameters | Map<String, String> (optional) |
+
+## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
+
+For storing tuple to Elasticsearch or percolating tuple from Elasticsearch, we 
need to define which fields are used for.
+Users need to define your own by implementing ```EsTupleMapper```.
+Storm-elasticsearch presents default mapper 
```org.apache.storm.elasticsearch.common.DefaultEsTupleMapper```, which 
extracts its source, index, type, id values from identical fields.
+You can refer implementation of DefaultEsTupleMapper to see how to implement 
your own.
+  
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([@harshach](https://github.com/harshach))
+ * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-mongodb.md
----------------------------------------------------------------------
diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md
new file mode 100644
index 0000000..90994bd
--- /dev/null
+++ b/docs/storm-mongodb.md
@@ -0,0 +1,199 @@
+---
+title: Storm MongoDB Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This 
package includes the core bolts and trident states that allows a storm topology 
to either insert storm tuples in a database collection or to execute update 
queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a 
database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the 
`org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+    Document toDocument(ITuple tuple);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called 
`SimpleMongoMapper` that can map Storm tuple to a Database document.  
`SimpleMongoMapper` assumes that the storm tuple has fields with same name as 
the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying 
url, collectionName and a `MongoMapper` implementation that converts storm 
tuple to DB document. The following is the standard URI connection scheme:
+ 
`mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can 
visit 
https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+        .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+### MongoTridentState
+We also support a trident persistent state that can be used with trident 
topologies. To create a Mongo persistent trident state you need to initialize 
it with the url, collectionName, the `MongoMapper` instance. See the example 
below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new 
Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of 
failures may result in duplicate documents.
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called 
`SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. 
`SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same 
name as the document field name in the database collection that you intend to 
write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a 
field in a document. More information about update operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+
+ 
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the 
`org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+    Bson createFilter(ITuple tuple);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation 
called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by 
given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that 
are equal to a specified value. More information about query operator, you can 
visit 
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying 
Mongo url, collectionName, a `QueryFilterCreator` implementation and a 
`MongoMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+        MongoMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, 
updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the 
query filter
+        //updateBolt.withUpsert(true);
+ ```
+ 
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+ 
+  ```java
+        MongoMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+            @Override
+            public Bson createFilter(ITuple tuple) {
+                return Filters.gt("count", 3);
+            }
+        };
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, 
updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the 
query filter
+        //updateBolt.withUpsert(true);
+ ```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([[email protected]](mailto:[email protected]))
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-mqtt.md
----------------------------------------------------------------------
diff --git a/docs/storm-mqtt.md b/docs/storm-mqtt.md
new file mode 100644
index 0000000..b730242
--- /dev/null
+++ b/docs/storm-mqtt.md
@@ -0,0 +1,379 @@
+---
+title: Storm MQTT Integration
+layout: documentation
+documentation: true
+---
+
+## About
+
+MQTT is a lightweight publish/subscribe protocol frequently used in IoT 
applications.
+
+Further information can be found at http://mqtt.org. The HiveMQ website has a 
great series on 
+[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/).
+
+Features include:
+
+* Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
+* Spout implementation(s) for subscribing to MQTT topics
+* A bolt implementation for publishing MQTT messages
+* A trident function implementation for publishing MQTT messages
+* Authentication and TLS/SSL support
+* User-defined "mappers" for converting MQTT messages to tuples (subscribers)
+* User-defined "mappers" for converting tuples to MQTT messages (publishers)
+
+
+## Quick Start
+To quickly see MQTT integration in action, follow the instructions below.
+
+**Start a MQTT broker and publisher**
+
+The command below will create an MQTT broker on port 1883, and start a 
publsher that will publish random 
+temperature/humidity values to an MQTT topic.
+
+Open a terminal and execute the following command (change the path as 
necessary):
+
+```bash
+java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar 
org.apache.storm.mqtt.examples.MqttBrokerPublisher
+```
+
+**Run the example toplogy**
+
+Run the sample topology using Flux. This will start a local mode cluster and 
topology that consists of the MQTT Spout
+publishing to a bolt that simply logs the information it receives.
+
+In a separate terminal, run the following command (Note that the `storm` 
executable must be on your PATH):
+
+```bash
+storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar 
org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local
+```
+
+You should see data from MQTT being logged by the bolt:
+
+```
+27020 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0}
+27030 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0}
+27040 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0}
+27049 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
+27059 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
+27069 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
+```
+
+Either allow the local cluster to exit, or stop it by typing Cntrl-C.
+
+**MQTT Fault Tolerance In Action**
+
+After the toplogy has been shutdown, the MQTT subscription created by the MQTT 
spout will persist with the broker,
+and it will continue to receive and queue messages (as long as the broker is 
running).
+
+If you run the toplogy again (while the broker is still running), when the 
spout initially connects to the MQTT broker,
+it will receive all the messages it missed while it was down. You should see 
this as burst of messages, followed by a 
+rate of about two messages per second.
+
+This happens because, by default, the MQTT Spout creates a *session* when it 
subscribes -- that means it requests that
+the broker hold onto and redeliver any messages it missed while offline. 
Another important factor is the the 
+`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at 
least once delivery*.
+
+For more information about MQTT fault tolerance, see the **Delivery 
Guarantees** section below.
+
+
+
+## Delivery Guarantees
+In Storm terms, ***the MQTT Spout provides at least once delivery***, 
depending on the configuration of the publisher as
+well as the MQTT spout.
+
+The MQTT protocol defines the following QoS levels:
+
+* `0` - At Most Once (AKA "Fire and Forget")
+* `1` - At Least Once
+* `2` - Exactly Once
+
+This can be a little confusing as the MQTT protocol specification does not 
really address the concept of a node being 
+completely incinerated by a catasrophic event. This is in stark contrast with 
Storm's reliability model, which expects 
+and embraces the concept of node failure.
+
+So resiliancy is ultimately dependent on the underlying MQTT implementation 
and infrastructure.
+
+###Recommendations
+
+*You will never get at exactly once processing with this spout. It can be used 
with Trident, but it won't provide 
+transational semantics. You will only get at least once guarantees.*
+
+If you need reliability guarantees (i.e. *at least once processing*):
+
+1. For MQTT publishers (outside of Storm), publish messages with a QoS of `1` 
so the broker saves messages if/when the 
+spout is offline.
+2. Use the spout defaults (`cleanSession = false` and `qos = 1`)
+3. If you can, make sure any result of receiving and MQTT message is 
idempotent.
+4. Make sure your MQTT brokers don't die or get isolated due to a network 
partition. Be prepared for natural and 
+man-made disasters and network partitions. Incineration and destruction 
happens.
+
+
+
+
+
+## Configuration
+For the full range of configuration options, see the JavaDoc for 
`org.apache.storm.mqtt.common.MqttOptions`.
+
+### Message Mappers
+To define how MQTT messages are mapped to Storm tuples, you configure the MQTT 
spout with an implementation of the 
+`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this:
+
+```java
+public interface MqttMessageMapper extends Serializable {
+
+    Values toValues(MqttMessage message);
+
+    Fields outputFields();
+}
+```
+
+The `MqttMessage` class contains the topic to which the message was published 
(`String`) and the message payload 
+(`byte[]`). For example, here is a `MqttMessageMapper` implementation that 
produces tuples based on the content of both
+the message topic and payload:
+
+```java
+/**
+ * Given a topic name: "users/{user}/{location}/{deviceId}"
+ * and a payload of "{temperature}/{humidity}"
+ * emits a tuple containing user(String), deviceId(String), location(String), 
temperature(float), humidity(float)
+ *
+ */
+public class CustomMessageMapper implements MqttMessageMapper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CustomMessageMapper.class);
+
+
+    public Values toValues(MqttMessage message) {
+        String topic = message.getTopic();
+        String[] topicElements = topic.split("/");
+        String[] payloadElements = new String(message.getMessage()).split("/");
+
+        return new Values(topicElements[2], topicElements[4], 
topicElements[3], Float.parseFloat(payloadElements[0]), 
+                Float.parseFloat(payloadElements[1]));
+    }
+
+    public Fields outputFields() {
+        return new Fields("user", "deviceId", "location", "temperature", 
"humidity");
+    }
+}
+```
+
+### Tuple Mappers
+When publishing MQTT messages with the MQTT bolt or Trident function, you need 
to map Storm tuple data to MQTT messages 
+(topic/payload). This is done by implementing the 
`org.apache.storm.mqtt.MqttTupleMapper` interface:
+
+```java
+public interface MqttTupleMapper extends Serializable{
+
+    MqttMessage toMessage(ITuple tuple);
+
+}
+```
+
+For example, a simple `MqttTupleMapper` implementation might look like this:
+
+```java
+public class MyTupleMapper implements MqttTupleMapper {
+    public MqttMessage toMessage(ITuple tuple) {
+        String topic = "users/" + tuple.getStringByField("userId") + "/" + 
tuple.getStringByField("device");
+        byte[] payload = tuple.getStringByField("message").getBytes();
+        return new MqttMessage(topic, payload);
+    }
+}
+```
+
+### MQTT Spout Parallelism
+It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise 
you will end up with multiple instances
+of the spout subscribed to the same topic(s), resulting in duplicate messages.
+
+If you want to parallelize the spout, it's recommended that you use multiple 
instances of the spout in your topolgoy 
+and use MQTT topic selectors to parition the data. How you implement the 
partitioning strategy is ultimately determined 
+by your MQTT topic structure. As an example, if you had topics partitioned by 
region (e.g. east/west) you could do 
+something like the following:
+
+```java
+String spout1Topic = "users/east/#";
+String spout2Topic = "users/west/#";
+```
+
+and then join the resulting streams together by subscribing a bolt to each 
stream.
+
+
+### Using Flux
+
+The following Flux YAML configuration creates the toplolgy used in the example:
+
+```yaml
+name: "mqtt-topology"
+
+components:
+   ########## MQTT Spout Config ############
+  - id: "mqtt-type"
+    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+  - id: "mqtt-options"
+    className: "org.apache.storm.mqtt.common.MqttOptions"
+    properties:
+      - name: "url"
+        value: "tcp://localhost:1883"
+      - name: "topics"
+        value:
+          - "/users/tgoetz/#"
+
+# topology configuration
+config:
+  topology.workers: 1
+  topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+  - id: "mqtt-spout"
+    className: "org.apache.storm.mqtt.spout.MqttSpout"
+    constructorArgs:
+      - ref: "mqtt-type"
+      - ref: "mqtt-options"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+
+streams:
+  - from: "mqtt-spout"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+
+```
+
+
+### Using Java
+
+Similarly, you can create the same topology using the Storm Core Java API:
+
+```java
+TopologyBuilder builder = new TopologyBuilder();
+MqttOptions options = new MqttOptions();
+options.setTopics(Arrays.asList("/users/tgoetz/#"));
+options.setCleanConnection(false);
+MqttSpout spout = new MqttSpout(new StringMessageMapper(), options);
+
+MqttBolt bolt = new LogInfoBolt();
+
+builder.setSpout("mqtt-spout", spout);
+builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout");
+
+return builder.createTopology();
+```
+
+## SSL/TLS
+If the MQTT broker you are connecting to requires SSL or SSL client 
authentication, you need to configure the spout 
+with an appropriate URI, and the location of keystore/truststore files 
containing the necessary certificates.
+
+### SSL/TLS URIs
+To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` 
instead of `tcp://`. For further control over
+the algorithm, you can specify a specific protocol:
+
+ * `ssl://` Use the JVM default version of the SSL protocol.
+ * `sslv*://` Use a specific version of the SSL protocol, where `*` is 
replaced by the version (e.g. `sslv3://`)
+ * `tls://` Use the JVM default version of the TLS protocol.
+ * `tlsv*://` Use a specific version of the TLS protocol, where `*` is 
replaced by the version (e.g. `tlsv1.1://`)
+ 
+ 
+### Specifying Keystore/Truststore Locations
+ 
+ The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors 
that take a `KeyStoreLoader` instance that
+ is used to load the certificates required for TLS/SSL connections. For 
example:
+ 
+```java
+ public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader 
keyStoreLoader)
+```
+ 
+The `DefaultKeyStoreLoader` class can be used to load certificates from the 
local filesystem. Note that the 
+keystore/truststore need to be available on all worker nodes where the 
spout/bolt might be executed. To use 
+`DefaultKeyStoreLoader` you specify the location of the keystore/truststore 
file(s), and set the necessary passwords:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", 
"/path/to/truststore.jks");
+ksl.setKeyStorePassword("password");
+ksl.setTrustStorePassword("password");
+//...
+```
+
+If your keystore/truststore certificates are stored in a single file, you can 
use the one-argument constructor:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks");
+ksl.setKeyStorePassword("password");
+//...
+```
+
+SSL/TLS can also be configured using Flux:
+
+```yaml
+name: "mqtt-topology"
+
+components:
+   ########## MQTT Spout Config ############
+  - id: "mqtt-type"
+    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+  - id: "keystore-loader"
+    className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
+    constructorArgs:
+      - "keystore.jks"
+      - "truststore.jks"
+    properties:
+      - name: "keyPassword"
+        value: "password"
+      - name: "keyStorePassword"
+        value: "password"
+      - name: "trustStorePassword"
+        value: "password"
+
+  - id: "mqtt-options"
+    className: "org.apache.storm.mqtt.common.MqttOptions"
+    properties:
+      - name: "url"
+        value: "ssl://raspberrypi.local:8883"
+      - name: "topics"
+        value:
+          - "/users/tgoetz/#"
+
+# topology configuration
+config:
+  topology.workers: 1
+  topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+  - id: "mqtt-spout"
+    className: "org.apache.storm.mqtt.spout.MqttSpout"
+    constructorArgs:
+      - ref: "mqtt-type"
+      - ref: "mqtt-options"
+      - ref: "keystore-loader"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+
+streams:
+
+  - from: "mqtt-spout"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+
+```
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([[email protected]](mailto:[email protected]))
\ No newline at end of file

Reply via email to