Repository: storm Updated Branches: refs/heads/1.x-branch 139a8a3b2 -> 1783d7ae9
STORM-1659:Add documents for external projects Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad1d7324 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad1d7324 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad1d7324 Branch: refs/heads/1.x-branch Commit: ad1d732459d3e2bb3f67484f625af1fa589fd94f Parents: 139a8a3 Author: Xin Wang <[email protected]> Authored: Sun Mar 27 16:04:38 2016 +0800 Committer: Xin Wang <[email protected]> Committed: Sun Mar 27 16:42:04 2016 +0800 ---------------------------------------------------------------------- docs/Kestrel-and-Storm.md | 2 +- docs/index.md | 14 +- docs/storm-cassandra.md | 255 ++++++++++++++++++++++++++ docs/storm-elasticsearch.md | 105 +++++++++++ docs/storm-mongodb.md | 199 ++++++++++++++++++++ docs/storm-mqtt.md | 379 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 948 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/Kestrel-and-Storm.md ---------------------------------------------------------------------- diff --git a/docs/Kestrel-and-Storm.md b/docs/Kestrel-and-Storm.md index cd584ff..ff48995 100644 --- a/docs/Kestrel-and-Storm.md +++ b/docs/Kestrel-and-Storm.md @@ -3,7 +3,7 @@ title: Storm and Kestrel layout: documentation documentation: true --- -This page explains how to use to Storm to consume items from a Kestrel cluster. +This page explains how to use Storm to consume items from a Kestrel cluster. ## Preliminaries ### Storm http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/index.md ---------------------------------------------------------------------- diff --git a/docs/index.md b/docs/index.md index 8c6859e..bfd68db 100644 --- a/docs/index.md +++ b/docs/index.md @@ -64,15 +64,19 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [Worker Profiling](dynamic-worker-profiling.html) ### Integration With External Systems, and Other Libraries -* [Event Hubs Intergration](storm-eventhubs.html) +* [Apache Kafka Integration](storm-kafka.html) * [Apache HBase Integration](storm-hbase.html) * [Apache HDFS Integration](storm-hdfs.html) * [Apache Hive Integration](storm-hive.html) +* [Apache Solr Integration](storm-solr.html) * [JDBC Integration](storm-jdbc.html) -* [Apache Kafka Integration](storm-kafka.html) -* [REDIS Integration](storm-redis.html) -* [Kestrel and Storm](Kestrel-and-Storm.html) -* [Solr Integration](storm-solr.html) +* [Redis Integration](storm-redis.html) +* [Cassandra Integration](storm-cassandra.html) +* [Event Hubs Intergration](storm-eventhubs.html) +* [Elasticsearch Integration](storm-elasticsearch.html) +* [MQTT Integration](storm-mqtt.html) +* [Mongodb Integration](storm-mongodb.html) +* [Kestrel Integration](Kestrel-and-Storm.html) ### Advanced http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/storm-cassandra.md ---------------------------------------------------------------------- diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md new file mode 100644 index 0000000..c674fbc --- /dev/null +++ b/docs/storm-cassandra.md @@ -0,0 +1,255 @@ +--- +title: Storm Cassandra Integration +layout: documentation +documentation: true +--- + +### Bolt API implementation for Apache Cassandra + +This library provides core storm bolt on top of Apache Cassandra. +Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*. + + +### Configuration +The following properties may be passed to storm configuration. + +| **Property name** | **Description** | **Default** | +| ---------------------------------------------| ----------------| --------------------| +| **cassandra.keyspace** | - | | +| **cassandra.nodes** | - | {"localhost"} | +| **cassandra.username** | - | - | +| **cassandra.password** | - | - | +| **cassandra.port** | - | 9092 | +| **cassandra.output.consistencyLevel** | - | ONE | +| **cassandra.batch.size.rows** | - | 100 | +| **cassandra.retryPolicy** | - | DefaultRetryPolicy | +| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) | +| **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) | + +### CassandraWriterBolt + +####Static import +```java + +import static org.apache.storm.cassandra.DynamicStatementBuilder.* + +``` + +#### Insert Query Builder +##### Insert query including only the specified tuple fields. +```java + + new CassandraWriterBolt( + async( + simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);") + .with( + fields("title", "year", "performer", "genre", "tracks") + ) + ) + ); +``` + +##### Insert query including all tuple fields. +```java + + new CassandraWriterBolt( + async( + simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);") + .with( all() ) + ) + ); +``` + +##### Insert multiple queries from one input tuple. +```java + + new CassandraWriterBolt( + async( + simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())), + simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())) + ) + ); +``` + +##### Insert query using QueryBuilder +```java + + new CassandraWriterBolt( + async( + simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);") + .with(all())) + ) + ) +``` + +##### Insert query with static bound query +```java + + new CassandraWriterBolt( + async( + boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);") + .bind(all()); + ) + ); +``` + +##### Insert query with static bound query using named setters and aliases +```java + + new CassandraWriterBolt( + async( + boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);") + .bind( + field("ti"),as("title"), + field("ye").as("year")), + field("pe").as("performer")), + field("ge").as("genre")), + field("tr").as("tracks")) + ).byNamedSetters() + ) + ); +``` + +##### Insert query with bound statement load from storm configuration +```java + + new CassandraWriterBolt( + boundQuery(named("insertIntoAlbum")) + .bind(all()); +``` + +##### Insert query with bound statement load from tuple field +```java + + new CassandraWriterBolt( + boundQuery(namedByField("cql")) + .bind(all()); +``` + +##### Insert query with batch statement +```java + + // Logged + new CassandraWriterBolt(loggedBatch( + simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())), + simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())) + ) + ); +// UnLogged + new CassandraWriterBolt(unLoggedBatch( + simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())), + simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())) + ) + ); +``` + +### How to handle query execution results + +The interface *ExecutionResultHandler* can be used to custom how an execution result should be handle. + +```java +public interface ExecutionResultHandler extends Serializable { + void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple); + + void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple); + + void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple); + + void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple); + + void onQuerySuccess(OutputCollector collector, Tuple tuple); +} +``` + +By default, the CassandraBolt fails a tuple on all Cassandra Exception (see [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)) . + +```java + new CassandraWriterBolt(insertInto("album").values(with(all()).build()) + .withResultHandler(new MyCustomResultHandler()); +``` + +### Declare Output fields + +A CassandraBolt can declare output fields / stream output fields. +For instance, this may be used to remit a new tuple on error, or to chain queries. + +```java + new CassandraWriterBolt(insertInto("album").values(withFields(all()).build()) + .withResultHandler(new EmitOnDriverExceptionResultHandler()); + .withStreamOutputFields("stream_error", new Fields("message"); + + public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler { + @Override + protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) { + LOG.error("An error occurred while executing cassandra statement", e); + collector.emit("stream_error", new Values(e.getMessage())); + collector.ack(tuple); + } + } +``` + +### Murmur3FieldGrouping + +[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java) can be used to optimise cassandra writes. +The stream is partitioned among the bolt's tasks based on the specified row partition keys. + +```java +CassandraWriterBolt bolt = new CassandraWriterBolt( + insertInto("album") + .values( + with(fields("title", "year", "performer", "genre", "tracks") + ).build()); +builder.setBolt("BOLT_WRITER", bolt, 4) + .customGrouping("spout", new Murmur3StreamGrouping("title")) +``` + +### Trident API support +storm-cassandra support Trident `state` API for `inserting` data into Cassandra. +```java + CassandraState.Options options = new CassandraState.Options(new CassandraContext()); + CQLStatementTupleMapper insertTemperatureValues = boundQuery( + "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)") + .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))); + options.withCQLStatementTupleMapper(insertTemperatureValues); + CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options); + TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); + stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name")); + stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x")); + stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields()); +``` + +Below `state` API for `querying` data from Cassandra. +```java + CassandraState.Options options = new CassandraState.Options(new CassandraContext()); + CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?") + .bind(with(field("weather_station_id").as("id"))); + options.withCQLStatementTupleMapper(insertTemperatureValues); + options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name"))); + CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options); + CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory(); + TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); + stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name")); +``` + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +## Committer Sponsors + * Sriharha Chintalapani ([[email protected]](mailto:[email protected])) + * P. Taylor Goetz ([[email protected]](mailto:[email protected])) http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/storm-elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/storm-elasticsearch.md b/docs/storm-elasticsearch.md new file mode 100644 index 0000000..fff0f51 --- /dev/null +++ b/docs/storm-elasticsearch.md @@ -0,0 +1,105 @@ +--- +title: Storm Elasticsearch Integration +layout: documentation +documentation: true +--- + +# Storm Elasticsearch Bolt & Trident State + + EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into Elasticsearch directly. + For detailed description, please refer to the following. + +## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt) + +EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination. +Users should make sure that ```EsTupleMapper``` can extract "source", "index", "type", and "id" from input tuple. +"index" and "type" are used for identifying target index and type. +"source" is a document in JSON format string that will be indexed in Elasticsearch. + +```java +EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); +EsTupleMapper tupleMapper = new DefaultEsTupleMapper(); +EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper); +``` + +## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt) + +EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination. +User should make sure ```EsTupleMapper``` can extract "source", "index", "type" from input tuple. +"index" and "type" are used for identifying target index and type. +"source" is a document in JSON format string that will be sent in percolate request to Elasticsearch. + +```java +EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); +EsTupleMapper tupleMapper = new DefaultEsTupleMapper(); +EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper); +``` + +If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match +for each Percolate.Match in PercolateResponse. + +## EsState (org.apache.storm.elasticsearch.trident.EsState) + +Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg. + +```java +EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); +EsTupleMapper tupleMapper = new DefaultEsTupleMapper(); + +StateFactory factory = new EsStateFactory(esConfig, tupleMapper); +TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields()); + ``` + +## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt) + +EsLookupBolt performs a get request to Elasticsearch. +In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided: + ElasticsearchGetRequest is used to convert the incoming Tuple to the GetRequest that will be executed against Elasticsearch. + EsLookupResultOutput is used to declare the output fields and convert the GetResponse to values that are emited by the bolt. + +Incoming tuple is passed to provided GetRequest creator and the result of that execution is passed to Elasticsearch client. +The bolt then uses the provider output adapter (EsLookupResultOutput) to convert the GetResponse to Values to emit. +The output fields are also specified by the user of the bolt via the output adapter (EsLookupResultOutput). + +```java +EsConfig esConfig = createEsConfig(); +ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest(); +EsLookupResultOutput output = createOutput(); +EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output); +``` + +## EsConfig (org.apache.storm.elasticsearch.common.EsConfig) + +Provided components (Bolt, State) takes in EsConfig as a constructor arg. + +```java +EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); +``` + +or + +```java +Map<String, String> additionalParameters = new HashMap<>(); +additionalParameters.put("client.transport.sniff", "true"); +EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters); +``` + +### EsConfig params + +|Arg |Description | Type +|--- |--- |--- +|clusterName | Elasticsearch cluster name | String (required) | +|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) | +|additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map<String, String> (optional) | + +## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper) + +For storing tuple to Elasticsearch or percolating tuple from Elasticsearch, we need to define which fields are used for. +Users need to define your own by implementing ```EsTupleMapper```. +Storm-elasticsearch presents default mapper ```org.apache.storm.elasticsearch.common.DefaultEsTupleMapper```, which extracts its source, index, type, id values from identical fields. +You can refer implementation of DefaultEsTupleMapper to see how to implement your own. + +## Committer Sponsors + + * Sriharsha Chintalapani ([@harshach](https://github.com/harshach)) + * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR)) http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/storm-mongodb.md ---------------------------------------------------------------------- diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md new file mode 100644 index 0000000..90994bd --- /dev/null +++ b/docs/storm-mongodb.md @@ -0,0 +1,199 @@ +--- +title: Storm MongoDB Integration +layout: documentation +documentation: true +--- + +Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology. + +## Insert into Database +The bolt and trident state included in this package for inserting data into a database collection. + +### MongoMapper +The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface: + +```java +public interface MongoMapper extends Serializable { + Document toDocument(ITuple tuple); +} +``` + +### SimpleMongoMapper +`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to. + +```java +public class SimpleMongoMapper implements MongoMapper { + private String[] fields; + + @Override + public Document toDocument(ITuple tuple) { + Document document = new Document(); + for(String field : fields){ + document.append(field, tuple.getValueByField(field)); + } + return document; + } + + public SimpleMongoMapper withFields(String... fields) { + this.fields = fields; + return this; + } +} +``` + +### MongoInsertBolt +To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme: + `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]` + +More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options + + ```java +String url = "mongodb://127.0.0.1:27017/test"; +String collectionName = "wordcount"; + +MongoMapper mapper = new SimpleMongoMapper() + .withFields("word", "count"); + +MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper); + ``` + +### MongoTridentState +We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below: + + ```java + MongoMapper mapper = new SimpleMongoMapper() + .withFields("word", "count"); + + MongoState.Options options = new MongoState.Options() + .withUrl(url) + .withCollectionName(collectionName) + .withMapper(mapper); + + StateFactory factory = new MongoStateFactory(options); + + TridentTopology topology = new TridentTopology(); + Stream stream = topology.newStream("spout1", spout); + + stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields()); + ``` + **NOTE**: + >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents. + +## Update from Database +The bolt included in this package for updating data from a database collection. + +### SimpleMongoUpdateMapper +`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to. +`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit +https://docs.mongodb.org/manual/reference/operator/update/ + +```java +public class SimpleMongoUpdateMapper implements MongoMapper { + private String[] fields; + + @Override + public Document toDocument(ITuple tuple) { + Document document = new Document(); + for(String field : fields){ + document.append(field, tuple.getValueByField(field)); + } + return new Document("$set", document); + } + + public SimpleMongoUpdateMapper withFields(String... fields) { + this.fields = fields; + return this; + } +} +``` + + + +### QueryFilterCreator +The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface: + + ```java +public interface QueryFilterCreator extends Serializable { + Bson createFilter(ITuple tuple); +} + ``` + +### SimpleQueryFilterCreator +`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit +https://docs.mongodb.org/manual/reference/operator/query/ + + ```java +public class SimpleQueryFilterCreator implements QueryFilterCreator { + private String field; + + @Override + public Bson createFilter(ITuple tuple) { + return Filters.eq(field, tuple.getValueByField(field)); + } + + public SimpleQueryFilterCreator withField(String field) { + this.field = field; + return this; + } + +} + ``` + +### MongoUpdateBolt +To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document. + + ```java + MongoMapper mapper = new SimpleMongoUpdateMapper() + .withFields("word", "count"); + + QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator() + .withField("word"); + + MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); + + //if a new document should be inserted if there are no matches to the query filter + //updateBolt.withUpsert(true); + ``` + + Or use a anonymous inner class implementation for `QueryFilterCreator`: + + ```java + MongoMapper mapper = new SimpleMongoUpdateMapper() + .withFields("word", "count"); + + QueryFilterCreator updateQueryCreator = new QueryFilterCreator() { + @Override + public Bson createFilter(ITuple tuple) { + return Filters.gt("count", 3); + } + }; + + MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); + + //if a new document should be inserted if there are no matches to the query filter + //updateBolt.withUpsert(true); + ``` + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +## Committer Sponsors + + * Sriharsha Chintalapani ([[email protected]](mailto:[email protected])) + http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/storm-mqtt.md ---------------------------------------------------------------------- diff --git a/docs/storm-mqtt.md b/docs/storm-mqtt.md new file mode 100644 index 0000000..b730242 --- /dev/null +++ b/docs/storm-mqtt.md @@ -0,0 +1,379 @@ +--- +title: Storm MQTT Integration +layout: documentation +documentation: true +--- + +## About + +MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications. + +Further information can be found at http://mqtt.org. The HiveMQ website has a great series on +[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/). + +Features include: + +* Full MQTT support (e.g. last will, QoS 0-2, retain, etc.) +* Spout implementation(s) for subscribing to MQTT topics +* A bolt implementation for publishing MQTT messages +* A trident function implementation for publishing MQTT messages +* Authentication and TLS/SSL support +* User-defined "mappers" for converting MQTT messages to tuples (subscribers) +* User-defined "mappers" for converting tuples to MQTT messages (publishers) + + +## Quick Start +To quickly see MQTT integration in action, follow the instructions below. + +**Start a MQTT broker and publisher** + +The command below will create an MQTT broker on port 1883, and start a publsher that will publish random +temperature/humidity values to an MQTT topic. + +Open a terminal and execute the following command (change the path as necessary): + +```bash +java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher +``` + +**Run the example toplogy** + +Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout +publishing to a bolt that simply logs the information it receives. + +In a separate terminal, run the following command (Note that the `storm` executable must be on your PATH): + +```bash +storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local +``` + +You should see data from MQTT being logged by the bolt: + +``` +27020 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0} +27030 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0} +27040 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0} +27049 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0} +27059 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0} +27069 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0} +``` + +Either allow the local cluster to exit, or stop it by typing Cntrl-C. + +**MQTT Fault Tolerance In Action** + +After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker, +and it will continue to receive and queue messages (as long as the broker is running). + +If you run the toplogy again (while the broker is still running), when the spout initially connects to the MQTT broker, +it will receive all the messages it missed while it was down. You should see this as burst of messages, followed by a +rate of about two messages per second. + +This happens because, by default, the MQTT Spout creates a *session* when it subscribes -- that means it requests that +the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the +`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at least once delivery*. + +For more information about MQTT fault tolerance, see the **Delivery Guarantees** section below. + + + +## Delivery Guarantees +In Storm terms, ***the MQTT Spout provides at least once delivery***, depending on the configuration of the publisher as +well as the MQTT spout. + +The MQTT protocol defines the following QoS levels: + +* `0` - At Most Once (AKA "Fire and Forget") +* `1` - At Least Once +* `2` - Exactly Once + +This can be a little confusing as the MQTT protocol specification does not really address the concept of a node being +completely incinerated by a catasrophic event. This is in stark contrast with Storm's reliability model, which expects +and embraces the concept of node failure. + +So resiliancy is ultimately dependent on the underlying MQTT implementation and infrastructure. + +###Recommendations + +*You will never get at exactly once processing with this spout. It can be used with Trident, but it won't provide +transational semantics. You will only get at least once guarantees.* + +If you need reliability guarantees (i.e. *at least once processing*): + +1. For MQTT publishers (outside of Storm), publish messages with a QoS of `1` so the broker saves messages if/when the +spout is offline. +2. Use the spout defaults (`cleanSession = false` and `qos = 1`) +3. If you can, make sure any result of receiving and MQTT message is idempotent. +4. Make sure your MQTT brokers don't die or get isolated due to a network partition. Be prepared for natural and +man-made disasters and network partitions. Incineration and destruction happens. + + + + + +## Configuration +For the full range of configuration options, see the JavaDoc for `org.apache.storm.mqtt.common.MqttOptions`. + +### Message Mappers +To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the +`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this: + +```java +public interface MqttMessageMapper extends Serializable { + + Values toValues(MqttMessage message); + + Fields outputFields(); +} +``` + +The `MqttMessage` class contains the topic to which the message was published (`String`) and the message payload +(`byte[]`). For example, here is a `MqttMessageMapper` implementation that produces tuples based on the content of both +the message topic and payload: + +```java +/** + * Given a topic name: "users/{user}/{location}/{deviceId}" + * and a payload of "{temperature}/{humidity}" + * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float) + * + */ +public class CustomMessageMapper implements MqttMessageMapper { + private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class); + + + public Values toValues(MqttMessage message) { + String topic = message.getTopic(); + String[] topicElements = topic.split("/"); + String[] payloadElements = new String(message.getMessage()).split("/"); + + return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), + Float.parseFloat(payloadElements[1])); + } + + public Fields outputFields() { + return new Fields("user", "deviceId", "location", "temperature", "humidity"); + } +} +``` + +### Tuple Mappers +When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages +(topic/payload). This is done by implementing the `org.apache.storm.mqtt.MqttTupleMapper` interface: + +```java +public interface MqttTupleMapper extends Serializable{ + + MqttMessage toMessage(ITuple tuple); + +} +``` + +For example, a simple `MqttTupleMapper` implementation might look like this: + +```java +public class MyTupleMapper implements MqttTupleMapper { + public MqttMessage toMessage(ITuple tuple) { + String topic = "users/" + tuple.getStringByField("userId") + "/" + tuple.getStringByField("device"); + byte[] payload = tuple.getStringByField("message").getBytes(); + return new MqttMessage(topic, payload); + } +} +``` + +### MQTT Spout Parallelism +It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances +of the spout subscribed to the same topic(s), resulting in duplicate messages. + +If you want to parallelize the spout, it's recommended that you use multiple instances of the spout in your topolgoy +and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined +by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do +something like the following: + +```java +String spout1Topic = "users/east/#"; +String spout2Topic = "users/west/#"; +``` + +and then join the resulting streams together by subscribing a bolt to each stream. + + +### Using Flux + +The following Flux YAML configuration creates the toplolgy used in the example: + +```yaml +name: "mqtt-topology" + +components: + ########## MQTT Spout Config ############ + - id: "mqtt-type" + className: "org.apache.storm.mqtt.examples.CustomMessageMapper" + + - id: "mqtt-options" + className: "org.apache.storm.mqtt.common.MqttOptions" + properties: + - name: "url" + value: "tcp://localhost:1883" + - name: "topics" + value: + - "/users/tgoetz/#" + +# topology configuration +config: + topology.workers: 1 + topology.max.spout.pending: 1000 + +# spout definitions +spouts: + - id: "mqtt-spout" + className: "org.apache.storm.mqtt.spout.MqttSpout" + constructorArgs: + - ref: "mqtt-type" + - ref: "mqtt-options" + parallelism: 1 + +# bolt definitions +bolts: + - id: "log" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + + +streams: + - from: "mqtt-spout" + to: "log" + grouping: + type: SHUFFLE + +``` + + +### Using Java + +Similarly, you can create the same topology using the Storm Core Java API: + +```java +TopologyBuilder builder = new TopologyBuilder(); +MqttOptions options = new MqttOptions(); +options.setTopics(Arrays.asList("/users/tgoetz/#")); +options.setCleanConnection(false); +MqttSpout spout = new MqttSpout(new StringMessageMapper(), options); + +MqttBolt bolt = new LogInfoBolt(); + +builder.setSpout("mqtt-spout", spout); +builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout"); + +return builder.createTopology(); +``` + +## SSL/TLS +If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout +with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates. + +### SSL/TLS URIs +To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` instead of `tcp://`. For further control over +the algorithm, you can specify a specific protocol: + + * `ssl://` Use the JVM default version of the SSL protocol. + * `sslv*://` Use a specific version of the SSL protocol, where `*` is replaced by the version (e.g. `sslv3://`) + * `tls://` Use the JVM default version of the TLS protocol. + * `tlsv*://` Use a specific version of the TLS protocol, where `*` is replaced by the version (e.g. `tlsv1.1://`) + + +### Specifying Keystore/Truststore Locations + + The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors that take a `KeyStoreLoader` instance that + is used to load the certificates required for TLS/SSL connections. For example: + +```java + public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader) +``` + +The `DefaultKeyStoreLoader` class can be used to load certificates from the local filesystem. Note that the +keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use +`DefaultKeyStoreLoader` you specify the location of the keystore/truststore file(s), and set the necessary passwords: + +```java +DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", "/path/to/truststore.jks"); +ksl.setKeyStorePassword("password"); +ksl.setTrustStorePassword("password"); +//... +``` + +If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor: + +```java +DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks"); +ksl.setKeyStorePassword("password"); +//... +``` + +SSL/TLS can also be configured using Flux: + +```yaml +name: "mqtt-topology" + +components: + ########## MQTT Spout Config ############ + - id: "mqtt-type" + className: "org.apache.storm.mqtt.examples.CustomMessageMapper" + + - id: "keystore-loader" + className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader" + constructorArgs: + - "keystore.jks" + - "truststore.jks" + properties: + - name: "keyPassword" + value: "password" + - name: "keyStorePassword" + value: "password" + - name: "trustStorePassword" + value: "password" + + - id: "mqtt-options" + className: "org.apache.storm.mqtt.common.MqttOptions" + properties: + - name: "url" + value: "ssl://raspberrypi.local:8883" + - name: "topics" + value: + - "/users/tgoetz/#" + +# topology configuration +config: + topology.workers: 1 + topology.max.spout.pending: 1000 + +# spout definitions +spouts: + - id: "mqtt-spout" + className: "org.apache.storm.mqtt.spout.MqttSpout" + constructorArgs: + - ref: "mqtt-type" + - ref: "mqtt-options" + - ref: "keystore-loader" + parallelism: 1 + +# bolt definitions +bolts: + + - id: "log" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + + +streams: + + - from: "mqtt-spout" + to: "log" + grouping: + type: SHUFFLE + +``` + +## Committer Sponsors + + * P. Taylor Goetz ([[email protected]](mailto:[email protected])) \ No newline at end of file
