http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/storm-hdfs.md
----------------------------------------------------------------------
diff --git a/docs/storm-hdfs.md b/docs/storm-hdfs.md
new file mode 100644
index 0000000..b5bf64d
--- /dev/null
+++ b/docs/storm-hdfs.md
@@ -0,0 +1,368 @@
+---
+title: Storm HDFS Integration
+layout: documentation
+documentation: true
+---
+
+Storm components for interacting with HDFS file systems
+
+
+## Usage
+The following example will write pipe("|")-delimited files to the HDFS path 
hdfs://localhost:54310/foo. After every
+1,000 tuples it will sync filesystem, making that data visible to other HDFS 
clients. It will rotate files when they
+reach 5 megabytes in size.
+
+```java
+// use "|" instead of "," for field delimiter
+RecordFormat format = new DelimitedRecordFormat()
+        .withFieldDelimiter("|");
+
+// sync the filesystem after every 1k tuples
+SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+        .withPath("/foo/");
+
+HdfsBolt bolt = new HdfsBolt()
+        .withFsUrl("hdfs://localhost:54310")
+        .withFileNameFormat(fileNameFormat)
+        .withRecordFormat(format)
+        .withRotationPolicy(rotationPolicy)
+        .withSyncPolicy(syncPolicy);
+```
+
+### Packaging a Topology
+When packaging your topology, it's important that you use the 
[maven-shade-plugin]() as opposed to the
+[maven-assembly-plugin]().
+
+The shade plugin provides facilities for merging JAR manifest entries, which 
the hadoop client leverages for URL scheme
+resolution.
+
+If you experience errors such as the following:
+
+```
+java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for 
scheme: hdfs
+```
+
+it's an indication that your topology jar file isn't packaged properly.
+
+If you are using maven to create your topology jar, you should use the 
following `maven-shade-plugin` configuration to
+create your topology jar:
+
+```xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>1.4</version>
+    <configuration>
+        <createDependencyReducedPom>true</createDependencyReducedPom>
+    </configuration>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer
+                            
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                    <transformer
+                            
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                        <mainClass></mainClass>
+                    </transformer>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+
+```
+
+### Specifying a Hadoop Version
+By default, storm-hdfs uses the following Hadoop dependencies:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.2.0</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-hdfs</artifactId>
+    <version>2.2.0</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
+If you are using a different version of Hadoop, you should exclude the Hadoop 
libraries from the storm-hdfs dependency
+and add the dependencies for your preferred version in your pom.
+
+Hadoop client version incompatibilites can manifest as errors like:
+
+```
+com.google.protobuf.InvalidProtocolBufferException: Protocol message contained 
an invalid tag (zero)
+```
+
+## Customization
+
+### Record Formats
+Record format can be controlled by providing an implementation of the 
`org.apache.storm.hdfs.format.RecordFormat`
+interface:
+
+```java
+public interface RecordFormat extends Serializable {
+    byte[] format(Tuple tuple);
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable 
of producing formats such as CSV and
+tab-delimited files.
+
+
+### File Naming
+File naming can be controlled by providing an implementation of the 
`org.apache.storm.hdfs.format.FileNameFormat`
+interface:
+
+```java
+public interface FileNameFormat extends Serializable {
+    void prepare(Map conf, TopologyContext topologyContext);
+    String getName(long rotation, long timeStamp);
+    String getPath();
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat`  will create 
file names with the following format:
+
+     {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
+
+For example:
+
+     MyBolt-5-7-1390579837830.txt
+
+By default, prefix is empty and extenstion is ".txt".
+
+
+
+### Sync Policies
+Sync policies allow you to control when buffered data is flushed to the 
underlying filesystem (thus making it available
+to clients reading the data) by implementing the 
`org.apache.storm.hdfs.sync.SyncPolicy` interface:
+
+```java
+public interface SyncPolicy extends Serializable {
+    boolean mark(Tuple tuple, long offset);
+    void reset();
+}
+```
+The `HdfsBolt` will call the `mark()` method for every tuple it processes. 
Returning `true` will trigger the `HdfsBolt`
+to perform a sync/flush, after which it will call the `reset()` method.
+
+The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync 
after the specified number of tuples have
+been processed.
+
+### File Rotation Policies
+Similar to sync policies, file rotation policies allow you to control when 
data files are rotated by providing a
+`org.apache.storm.hdfs.rotation.FileRotation` interface:
+
+```java
+public interface FileRotationPolicy extends Serializable {
+    boolean mark(Tuple tuple, long offset);
+    void reset();
+}
+``` 
+
+The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation 
allows you to trigger file rotation when
+data files reach a specific file size:
+
+```java
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+### File Rotation Actions
+Both the HDFS bolt and Trident State implementation allow you to register any 
number of `RotationAction`s.
+What `RotationAction`s do is provide a hook to allow you to perform some 
action right after a file is rotated. For
+example, moving a file to a different location or renaming it.
+
+
+```java
+public interface RotationAction extends Serializable {
+    void execute(FileSystem fileSystem, Path filePath) throws IOException;
+}
+```
+
+Storm-HDFS includes a simple action that will move a file after rotation:
+
+```java
+public class MoveFileAction implements RotationAction {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MoveFileAction.class);
+
+    private String destination;
+
+    public MoveFileAction withDestination(String destDir){
+        destination = destDir;
+        return this;
+    }
+
+    @Override
+    public void execute(FileSystem fileSystem, Path filePath) throws 
IOException {
+        Path destPath = new Path(destination, filePath.getName());
+        LOG.info("Moving file {} to {}", filePath, destPath);
+        boolean success = fileSystem.rename(filePath, destPath);
+        return;
+    }
+}
+```
+
+If you are using Trident and sequence files you can do something like this:
+
+```java
+        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310")
+                .addRotationAction(new 
MoveFileAction().withDestination("/dest2/"));
+```
+
+
+## Support for HDFS Sequence Files
+
+The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write 
storm data to HDFS sequence files:
+
+```java
+        // sync the filesystem after every 1k tuples
+        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+        // rotate files when they reach 5MB
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, 
Units.MB);
+
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                .withExtension(".seq")
+                .withPath("/data/");
+
+        // create sequence format instance.
+        DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", 
"sentence");
+
+        SequenceFileBolt bolt = new SequenceFileBolt()
+                .withFsUrl("hdfs://localhost:54310")
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(format)
+                .withRotationPolicy(rotationPolicy)
+                .withSyncPolicy(syncPolicy)
+                .withCompressionType(SequenceFile.CompressionType.RECORD)
+                .withCompressionCodec("deflate");
+```
+
+The `SequenceFileBolt` requires that you provide a 
`org.apache.storm.hdfs.bolt.format.SequenceFormat` that maps tuples to
+key/value pairs:
+
+```java
+public interface SequenceFormat extends Serializable {
+    Class keyClass();
+    Class valueClass();
+
+    Writable key(Tuple tuple);
+    Writable value(Tuple tuple);
+}
+```
+
+## Trident API
+storm-hdfs also includes a Trident `state` implementation for writing data to 
HDFS, with an API that closely mirrors
+that of the bolts.
+
+ ```java
+         Fields hdfsFields = new Fields("field1", "field2");
+
+         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                 .withPath("/trident")
+                 .withPrefix("trident")
+                 .withExtension(".txt");
+
+         RecordFormat recordFormat = new DelimitedRecordFormat()
+                 .withFields(hdfsFields);
+
+         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, 
FileSizeRotationPolicy.Units.MB);
+
+        HdfsState.Options options = new HdfsState.HdfsFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(recordFormat)
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310");
+
+         StateFactory factory = new HdfsStateFactory().withOptions(options);
+
+         TridentState state = stream
+                 .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new 
Fields());
+ ```
+
+ To use the sequence file `State` implementation, use the 
`HdfsState.SequenceFileOptions`:
+
+ ```java
+        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310")
+                .addRotationAction(new 
MoveFileAction().toDestination("/dest2/"));
+```
+
+##Working with Secure HDFS
+If your topology is going to interact with secure HDFS, your bolts/states 
needs to be authenticated by NameNode. We 
+currently have 2 options to support this:
+
+### Using HDFS delegation tokens 
+Your administrator can configure nimbus to automatically get delegation tokens 
on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : 
["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+nimbus.credential.renewers.classes : 
["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs 
super user that can impersonate other users.)
+hdfs.kerberos.principal: "[email protected]" 
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to 
be renewed every 24 hours so this value should be
+less then 24 hours.)
+topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default 
we will use value of "fs.defaultFS" property
+specified in hadoop's core-site.xml)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+
+If nimbus did not have the above configuration you need to add it and then 
restart it. Ensure the hadoop configuration 
+files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the 
dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to 
authenticate with Namenode. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and 
acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with 
topology.auto-credentials set to AutoHDFS, nimbus will push the
+delegation tokens to all the workers for your topology and the hdfs bolt/state 
will authenticate with namenode using 
+these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the 
user specified in hdfs.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this 
you need to follow configuration directions 
+listed on this link
+http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
+
+You can read about setting up secure HDFS here: 
http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
+
+### Using keytabs on all worker hosts
+If you have distributed the keytab files for hdfs user on all potential worker 
hosts then you can use this method. You should specify a 
+hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and 
the value map of this key should have following 2 properties:
+
+hdfs.keytab.file: "/path/to/keytab/"
+hdfs.kerberos.principal: "[email protected]"
+
+On worker hosts the bolt/trident-state code will use the keytab file with 
principal provided in the config to authenticate with 
+Namenode. This method is little dangerous as you need to ensure all workers 
have the keytab file at the same location and you need
+to remember this as you bring up new hosts in the cluster.

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/storm-hive.md
----------------------------------------------------------------------
diff --git a/docs/storm-hive.md b/docs/storm-hive.md
new file mode 100644
index 0000000..e2dd657
--- /dev/null
+++ b/docs/storm-hive.md
@@ -0,0 +1,111 @@
+---
+title: Storm Hive Integration
+layout: documentation
+documentation: true
+---
+
+  Hive offers streaming API that allows data to be written continuously into 
Hive. The incoming data 
+  can be continuously committed in small batches of records into existing Hive 
partition or table. Once the data
+  is committed its immediately visible to all hive queries. More info on Hive 
Streaming API 
+  https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
+  
+  With the help of Hive Streaming API, HiveBolt and HiveState allows users to 
stream data from Storm into Hive directly.
+  To use Hive streaming API users need to create a bucketed table with ORC 
format.  Example below
+  
+  ```sql
+  create table test_table ( id INT, name STRING, phone STRING, street STRING) 
partitioned by (city STRING, state STRING) stored as orc tblproperties 
("orc.compress"="NONE");
+  ```
+  
+
+## HiveBolt (org.apache.storm.hive.bolt.HiveBolt)
+
+HiveBolt streams tuples directly into Hive. Tuples are written using Hive 
Transactions. 
+Partitions to which HiveBolt will stream to can either created or pre-created 
or optionally
+HiveBolt can create them if they are missing. Fields from Tuples are mapped to 
table columns.
+User should make sure that Tuple field names are matched to the table column 
names.
+
+```java
+DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper);
+HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+```
+
+### RecordHiveMapper
+   This class maps Tuple field names to Hive table column names.
+   There are two implementaitons available
+ 
+   
+   + DelimitedRecordHiveMapper 
(org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper)
+   + JsonRecordHiveMapper 
(org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper)
+   
+   ```java
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+    or
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+   ```
+
+|Arg | Description | Type
+|--- |--- |---
+|withColumnFields| field names in a tuple to be mapped to table column names | 
Fields (required) |
+|withPartitionFields| field names in a tuple can be mapped to hive table 
partitions | Fields |
+|withTimeAsPartitionField| users can select system time as partition in hive 
table| String . Date format|
+
+### HiveOptions (org.apache.storm.hive.common.HiveOptions)
+  
+HiveBolt takes in HiveOptions as a constructor arg.
+
+  ```java
+  HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                                               .withBatchSize(1000)
+                                       .withIdleTimeout(10)
+  ```
+
+
+HiveOptions params
+
+|Arg  |Description | Type
+|---   |--- |---
+|metaStoreURI | hive meta store URI (can be found in hive-site.xml) | String 
(required) |
+|dbName | database name | String (required) |
+|tblName | table name | String (required) |
+|mapper| Mapper class to map Tuple field names to Table column names | 
DelimitedRecordHiveMapper or JsonRecordHiveMapper (required) |
+|withTxnsPerBatch | Hive grants a *batch of transactions* instead of single 
transactions to streaming clients like HiveBolt.This setting configures the 
number of desired transactions per Transaction Batch. Data from all 
transactions in a single batch end up in a single file. Flume will write a 
maximum of batchSize events in each transaction in the batch. This setting in 
conjunction with batchSize provides control over the size of each file. Note 
that eventually Hive will transparently compact these files into larger files.| 
Integer . default 100 |
+|withMaxOpenConnections| Allow only this number of open connections. If this 
number is exceeded, the least recently used connection is closed.| Integer . 
default 100|
+|withBatchSize| Max number of events written to Hive in a single Hive 
transaction| Integer. default 15000|
+|withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, 
such as openTxn, write, commit, abort. | Integer. default 10000|
+|withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats 
sent to Hive to keep unused transactions from expiring. Set this value to 0 to 
disable heartbeats.| Integer. default 240 |
+|withAutoCreatePartitions| HiveBolt will automatically create the necessary 
Hive partitions to stream to. |Boolean. default true |
+|withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | 
String|
+|withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
+|withTickTupleInterval| (In seconds) If > 0 then the Hive Bolt will 
periodically flush transaction batches. Enabling this is recommended to avoid 
tuple timeouts while waiting for a batch to fill up.| Integer. default 0|
+
+
+ 
+## HiveState (org.apache.storm.hive.trident.HiveTrident)
+
+Hive Trident state also follows similar pattern to HiveBolt it takes in 
HiveOptions as an arg.
+
+```java
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+            
+   HiveOptions hiveOptions = new 
HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                                               .withBatchSize(1000)
+                                       .withIdleTimeout(10)
+                                       
+   StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+   TridentState state = stream.partitionPersist(factory, hiveFields, new 
HiveUpdater(), new Fields());
+ ```
+   
+
+
+
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/storm-jdbc.md
----------------------------------------------------------------------
diff --git a/docs/storm-jdbc.md b/docs/storm-jdbc.md
new file mode 100644
index 0000000..15aa2a3
--- /dev/null
+++ b/docs/storm-jdbc.md
@@ -0,0 +1,285 @@
+---
+title: Storm JDBC Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for JDBC. This package includes the core bolts and 
trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select queries 
against a database and enrich tuples 
+in a storm topology.
+
+**Note**: Throughout the examples below, we make use of 
com.google.common.collect.Lists and com.google.common.collect.Maps.
+
+## Inserting into a database.
+The bolt and trident state included in this package for inserting data into a 
database tables are tied to a single table.
+
+### ConnectionProvider
+An interface that should be implemented by different connection pooling 
mechanism `org.apache.storm.jdbc.common.ConnectionProvider`
+
+```java
+public interface ConnectionProvider extends Serializable {
+    /**
+     * method must be idempotent.
+     */
+    void prepare();
+
+    /**
+     *
+     * @return a DB connection over which the queries can be executed.
+     */
+    Connection getConnection();
+
+    /**
+     * called once when the system is shutting down, should be idempotent.
+     */
+    void cleanup();
+}
+```
+
+Out of the box we support 
`org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an 
implementation that uses HikariCP.
+
+###JdbcMapper
+The main API for inserting data in a table using JDBC is the 
`org.apache.storm.jdbc.mapper.JdbcMapper` interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+    List<Column> getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of columns 
representing a row in a database. 
+**The order of the returned list is important. The place holders in the 
supplied queries are resolved in the same order as returned list.**
+For example if the user supplied insert query is `insert into user(user_id, 
user_name, create_date) values (?,?, now())` the 1st item 
+of the returned list of `getColumns` method will map to the 1st place holder 
and the 2nd to the 2nd and so on. We do not parse
+the supplied queries to try and resolve place holder by column names. Not 
making any assumptions about the query syntax allows this connector
+to be used by some non-standard sql frameworks like Pheonix which only 
supports upsert into.
+
+### JdbcInsertBolt
+To use the `JdbcInsertBolt`, you construct an instance of it by specifying a 
`ConnectionProvider` implementation
+and a `JdbcMapper` implementation that converts storm tuple to DB row. In 
addition, you must either supply
+a table name  using `withTableName` method or an insert query using 
`withInsertQuery`. 
+If you specify a insert query you should ensure that your `JdbcMapper` 
implementation will return a list of columns in the same order as in your 
insert query.
+You can optionally specify a query timeout seconds param that specifies max 
seconds an insert query can take. 
+The default is set to value of topology.message.timeout.secs and a value of -1 
will indicate not to set any query timeout.
+You should set the query timeout value to be <= topology.message.timeout.secs.
+
+ ```java
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new 
HikariCPConnectionProvider(hikariConfigMap);
+
+String tableName = "user_details";
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, 
connectionProvider);
+
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, 
simpleJdbcMapper)
+                                    .withTableName("user")
+                                    .withQueryTimeoutSecs(30);
+                                    Or
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, 
simpleJdbcMapper)
+                                    .withInsertQuery("insert into user values 
(?,?)")
+                                    .withQueryTimeoutSecs(30);                 
                   
+ ```
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called 
`SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has 
fields with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to 
write to and provide a connectionProvider instance.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns 
mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection pool 
with specified Database configuration and
+automatically figure out the column names and corresponding data types of the 
table that you intend to write to. 
+Please see 
https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn 
more about hikari configuration properties.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new 
HikariCPConnectionProvider(hikariConfigMap);
+String tableName = "user_details";
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, 
connectionProvider);
+```
+The mapper initialized in the example above assumes a storm tuple has value 
for all the columns of the table you intend to insert data into and its 
`getColumn`
+method will return the columns in the order in which Jdbc connection 
instance's `connection.getMetaData().getColumns();` method returns them.
+
+**If you specified your own insert query to `JdbcInsertBolt` you must 
initialize `SimpleJdbcMapper` with explicit columnschema such that the schema 
has columns in the same order as your insert queries.**
+For example if your insert query is `Insert into user (user_id, user_name) 
values (?,?)` then your `SimpleJdbcMapper` should be initialized with the 
following statements:
+```java
+List<Column> columnSchema = Lists.newArrayList(
+    new Column("user_id", java.sql.Types.INTEGER),
+    new Column("user_name", java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
+
+If your storm tuple only has fields for a subset of columns i.e. if some of 
the columns in your table have default values and you want to only insert 
values for columns with no default values you can enforce the behavior by 
initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a 
user_details table `create table if not exists user_details (user_id integer, 
user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only the 
columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
+
+```java
+List<Column> columnSchema = Lists.newArrayList(
+    new Column("user_id", java.sql.Types.INTEGER),
+    new Column("user_name", java.sql.Types.VARCHAR),
+    new Column("dept_name", java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
+### JdbcTridentState
+We also support a trident persistent state that can be used with trident 
topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name or an insert query, the 
JdbcMapper instance and connection provider instance.
+See the example below:
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConnectionProvider(connectionProvider)
+        .withMapper(jdbcMapper)
+        .withTableName("user_details")
+        .withQueryTimeoutSecs(30);
+JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+```
+similar to `JdbcInsertBolt` you can specify a custom insert query using 
`withInsertQuery` instead of specifying a table name.
+
+## Lookup from Database
+We support `select` queries from databases to allow enrichment of storm tuples 
in a topology. The main API for 
+executing select queries against a database using JDBC is the 
`org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
+
+```java
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+    List<Column> getColumns(ITuple tuple);
+    List<Values> toTuple(ITuple input, List<Column> columns);
+```
+
+The `declareOutputFields` method is used to indicate what fields will be 
emitted as part of output tuple of processing a storm 
+tuple. 
+
+The `getColumns` method specifies the place holder columns in a select query 
and their SQL type and the value to use.
+For example in the user_details table mentioned above if you were executing a 
query `select user_name from user_details where
+user_id = ? and create_time > ?` the `getColumns` method would take a storm 
input tuple and return a List containing two items.
+The first instance of `Column` type's `getValue()` method will be used as the 
value of `user_id` to lookup for and the
+second instance of `Column` type's `getValue()` method will be used as the 
value of `create_time`.
+**Note: the order in the returned list determines the place holder's value. In 
other words the first item in the list maps 
+to first `?` in select query, the second item to second `?` in query and so 
on.** 
+
+The `toTuple` method takes in the input tuple and a list of columns 
representing a DB row as a result of the select query
+and returns a list of values to be emitted. 
+**Please note that it returns a list of `Values` and not just a single 
instance of `Values`.** 
+This allows a for a single DB row to be mapped to multiple output storm tuples.
+
+###SimpleJdbcLookupMapper
+`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation 
called `SimpleJdbcLookupMapper`. 
+
+To use `SimpleJdbcMapper`, you have to initialize it with the fields that will 
be outputted by your bolt and the list of
+columns that are used in your select query as place holder. The following 
example shows initialization of a `SimpleJdbcLookupMapper`
+that declares `user_id,user_name,create_date` as output fields and `user_id` 
as the place holder column in select query.
+SimpleJdbcMapper assumes the field name in your tuple is equal to the place 
holder column name, i.e. in our example 
+`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use 
its value as the place holder's value in the
+select query. For constructing output tuples, it looks for fields specified in 
`outputFields` in the input tuple first, 
+and if it is not found in input tuple then it looks at select query's output 
row for a column with same name as field name. 
+So in the example below if the input tuple had fields `user_id, create_date` 
and the select query was 
+`select user_name from user_details where user_id = ?`, For each input tuple 
`SimpleJdbcLookupMapper.getColumns(tuple)` 
+will return the value of `tuple.getValueByField("user_id")` which will be used 
as the value in `?` of select query. 
+For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the 
`user_id, create_date` from the input tuple as 
+is adding only `user_name` from the resulting row and returning these 3 fields 
as a single output tuple.
+
+```java
+Fields outputFields = new Fields("user_id", "user_name", "create_date");
+List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", 
Types.INTEGER));
+this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, 
queryParamColumns);
+```
+
+### JdbcLookupBolt
+To use the `JdbcLookupBolt`, construct an instance of it using a 
`ConnectionProvider` instance, `JdbcLookupMapper` instance and the select query 
to execute.
+You can optionally specify a query timeout seconds param that specifies max 
seconds the select query can take. 
+The default is set to value of topology.message.timeout.secs. You should set 
this value to be <= topology.message.timeout.secs.
+
+```java
+String selectSql = "select user_name from user_details where user_id = ?";
+SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, 
queryParamColumns)
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, 
selectSql, lookupMapper)
+        .withQueryTimeoutSecs(30);
+```
+
+### JdbcTridentState for lookup
+We also support a trident query state that can be used with trident 
topologies. 
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConnectionProvider(connectionProvider)
+        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new 
Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+        .withSelectQuery("select user_name from user_details where user_id = 
?");
+        .withQueryTimeoutSecs(30);
+```
+
+## Example:
+A runnable example can be found in the `src/test/java/topology` directory.
+
+### Setup
+* Ensure you have included JDBC implementation dependency for your chosen 
database as part of your build configuration.
+* The test topologies executes the following queries so your intended DB must 
support these queries for test topologies
+to work. 
+```SQL
+create table if not exists user (user_id integer, user_name varchar(100), 
dept_name varchar(100), create_date date);
+create table if not exists department (dept_id integer, dept_name 
varchar(100));
+create table if not exists user_department (user_id integer, dept_id integer);
+insert into department values (1, 'R&D');
+insert into department values (2, 'Finance');
+insert into department values (3, 'HR');
+insert into department values (4, 'Sales');
+insert into user_department values (1, 1);
+insert into user_department values (2, 2);
+insert into user_department values (3, 3);
+insert into user_department values (4, 4);
+select dept_name from department, user_department where department.dept_id = 
user_department.dept_id and user_department.user_id = ?;
+```
+### Execution
+Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using 
storm jar command. The class expects 5 args
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology 
<dataSourceClassName> <dataSource.url> <user> <password> [topology name]
+
+To make it work with Mysql, you can add the following to the pom.xml
+
+```
+<dependency>
+    <groupId>mysql</groupId>
+    <artifactId>mysql-connector-java</artifactId>
+    <version>5.1.31</version>
+</dependency>
+```
+
+You can generate a single jar with dependencies using mvn assembly plugin. To 
use the plugin add the following to your pom.xml and execute 
+`mvn clean compile assembly:single`
+
+```
+<plugin>
+    <artifactId>maven-assembly-plugin</artifactId>
+    <configuration>
+        <archive>
+            <manifest>
+                <mainClass>fully.qualified.MainClass</mainClass>
+            </manifest>
+        </archive>
+        <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+        </descriptorRefs>
+    </configuration>
+</plugin>
+```
+
+Mysql Example:
+```
+storm jar 
~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar
 org.apache.storm.jdbc.topology.UserPersistanceTopology  
com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root 
password UserPersistenceTopology
+```
+
+You can execute a select query against the user table which should show newly 
inserted rows:
+
+```
+select * from user;
+```
+
+For trident you can view 
`org.apache.storm.jdbc.topology.UserPersistanceTridentTopology`.

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/storm-kafka.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md
new file mode 100644
index 0000000..46a2b89
--- /dev/null
+++ b/docs/storm-kafka.md
@@ -0,0 +1,287 @@
+---
+title: Storm Kafka Integration
+layout: documentation
+documentation: true
+---
+
+Provides core Storm and Trident spout implementations for consuming data from 
Apache Kafka 0.8.x.
+
+##Spouts
+We support both Trident and core Storm spouts. For both spout implementations, 
we use a BrokerHost interface that
+tracks Kafka broker host to partition mapping and kafkaConfig that controls 
some Kafka related parameters.
+ 
+###BrokerHosts
+In order to initialize your Kafka spout/emitter you need to construct an 
instance of the marker interface BrokerHosts. 
+Currently, we support the following two implementations:
+
+####ZkHosts
+ZkHosts is what you should use if you want to dynamically track Kafka broker 
to partition mapping. This class uses 
+Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can 
instantiate an object by calling
+```java
+    public ZkHosts(String brokerZkStr, String brokerZkPath) 
+    public ZkHosts(String brokerZkStr)
+```
+Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the 
root directory under which all the topics and
+partition information is stored. By default this is /brokers which is what the 
default Kafka implementation uses.
+
+By default, the broker-partition mapping is refreshed every 60 seconds from 
ZooKeeper. If you want to change it, you
+should set host.refreshFreqSecs to your chosen value.
+
+####StaticHosts
+This is an alternative implementation where broker -> partition information is 
static. In order to construct an instance
+of this class, you need to first construct an instance of 
GlobalPartitionInformation.
+
+```java
+    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
+    Broker brokerForPartition1 = new Broker("localhost", 
9092);//localhost:9092 but we specified the port explicitly
+    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 
specified as one string.
+    GlobalPartitionInformation partitionInfo = new 
GlobalPartitionInformation();
+    partitionInfo.addPartition(0, brokerForPartition0);//mapping from 
partition 0 to brokerForPartition0
+    partitionInfo.addPartition(1, brokerForPartition1);//mapping from 
partition 1 to brokerForPartition1
+    partitionInfo.addPartition(2, brokerForPartition2);//mapping from 
partition 2 to brokerForPartition2
+    StaticHosts hosts = new StaticHosts(partitionInfo);
+```
+
+###KafkaConfig
+The second thing needed for constructing a kafkaSpout is an instance of 
KafkaConfig. 
+```java
+    public KafkaConfig(BrokerHosts hosts, String topic)
+    public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
+```
+
+The BrokerHosts can be any implementation of BrokerHosts interface as 
described above. The topic is name of Kafka topic.
+The optional ClientId is used as a part of the ZooKeeper path where the 
spout's current consumption offset is stored.
+
+There are 2 extensions of KafkaConfig currently in use.
+
+Spoutconfig is an extension of KafkaConfig that supports additional fields 
with ZooKeeper connection info and for controlling
+behavior specific to KafkaSpout. The Zkroot will be used as root to store your 
consumer's offset. The id should uniquely
+identify your spout.
+```java
+public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
+public SpoutConfig(BrokerHosts hosts, String topic, String id);
+```
+In addition to these parameters, SpoutConfig contains the following fields 
that control how KafkaSpout behaves:
+```java
+    // setting for how often to save the current Kafka offset to ZooKeeper
+    public long stateUpdateIntervalMs = 2000;
+
+    // Exponential back-off retry settings.  These are used when retrying 
messages after a bolt
+    // calls OutputCollector.fail().
+    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS 
appropriately to prevent
+    // resubmitting the message while still retrying.
+    public long retryInitialDelayMs = 0;
+    public double retryDelayMultiplier = 1.0;
+    public long retryDelayMaxMs = 60 * 1000;
+
+    // if set to true, spout will set Kafka topic as the emitted Stream ID
+    public boolean topicAsStreamId = false;
+```
+Core KafkaSpout only accepts an instance of SpoutConfig.
+
+TridentKafkaConfig is another extension of KafkaConfig.
+TridentKafkaEmitter only accepts TridentKafkaConfig.
+
+The KafkaConfig class also has bunch of public variables that controls your 
application's behavior. Here are defaults:
+```java
+    public int fetchSizeBytes = 1024 * 1024;
+    public int socketTimeoutMs = 10000;
+    public int fetchMaxWait = 10000;
+    public int bufferSizeBytes = 1024 * 1024;
+    public MultiScheme scheme = new RawMultiScheme();
+    public boolean ignoreZkOffsets = false;
+    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+    public long maxOffsetBehind = Long.MAX_VALUE;
+    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+    public int metricsTimeBucketSizeInSecs = 60;
+```
+
+Most of them are self explanatory except MultiScheme.
+###MultiScheme
+MultiScheme is an interface that dictates how the byte[] consumed from Kafka 
gets transformed into a storm tuple. It
+also controls the naming of your output field.
+
+```java
+  public Iterable<List<Object>> deserialize(byte[] ser);
+  public Fields getOutputFields();
+```
+
+The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with 
`byte[]` as is. The name of the
+outputField is "bytes".  There are alternative implementation like 
`SchemeAsMultiScheme` and
+`KeyValueSchemeAsMultiScheme` which can convert the `byte[]` to `String`.
+
+
+### Examples
+
+#### Core Spout
+
+```java
+BrokerHosts hosts = new ZkHosts(zkConnString);
+SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, 
UUID.randomUUID().toString());
+spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
+KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+```
+
+#### Trident Spout
+```java
+TridentTopology topology = new TridentTopology();
+BrokerHosts zk = new ZkHosts("localhost");
+TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
+spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
+OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
+```
+
+
+### How KafkaSpout stores offsets of a Kafka topic and recovers in case of 
failures
+
+As shown in the above KafkaConfig properties, you can control from where in 
the Kafka topic the spout begins to read by
+setting `KafkaConfig.startOffsetTime` as follows:
+
+1. `kafka.api.OffsetRequest.EarliestTime()`:  read from the beginning of the 
topic (i.e. from the oldest messages onwards)
+2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic 
(i.e. any new messsages that are being written to the topic)
+3. A Unix timestamp aka seconds since the epoch (e.g. via 
`System.currentTimeMillis()`):
+   see [How do I accurately get offsets of messages for a certain timestamp 
using 
OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?)
 in the Kafka FAQ
+
+As the topology runs the Kafka spout keeps track of the offsets it has read 
and emitted by storing state information
+under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`.  In the 
case of failures it recovers from the last
+written offset in ZooKeeper.
+
+> **Important:**  When re-deploying a topology make sure that the settings for 
`SpoutConfig.zkRoot` and `SpoutConfig.id`
+> were not modified, otherwise the spout will not be able to read its previous 
consumer state information (i.e. the
+> offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to 
data loss, depending on your use case.
+
+This means that when a topology has run once the setting 
`KafkaConfig.startOffsetTime` will not have an effect for
+subsequent runs of the topology because now the topology will rely on the 
consumer state information (offsets) in
+ZooKeeper to determine from where it should begin (more precisely: resume) 
reading.
+If you want to force the spout to ignore any consumer state information stored 
in ZooKeeper, then you should
+set the parameter `KafkaConfig.ignoreZkOffsets` to `true`.  If `true`, the 
spout will always begin reading from the
+offset defined by `KafkaConfig.startOffsetTime` as described above.
+
+
+## Using storm-kafka with different versions of Scala
+
+Storm-kafka's Kafka dependency is defined as `provided` scope in maven, 
meaning it will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka built 
against a specific Scala version.
+
+When building a project with storm-kafka, you must explicitly add the Kafka 
dependency. For example, to
+use Kafka 0.8.1.1 built against Scala 2.10, you would use the following 
dependency in your `pom.xml`:
+
+```xml
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.1.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+```
+
+Note that the ZooKeeper and log4j dependencies are excluded to prevent version 
conflicts with Storm's dependencies.
+
+##Writing to Kafka as part of your topology
+You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a 
component to your topology or if you 
+are using trident you can use storm.kafka.trident.TridentState, 
storm.kafka.trident.TridentStateFactory and
+storm.kafka.trident.TridentKafkaUpdater.
+
+You need to provide implementation of following 2 interfaces
+
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
+
+```java
+    K getKeyFromTuple(Tuple/TridentTuple tuple);
+    V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to Kafka key and 
Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java 
+implementation. In the KafkaBolt, the implementation always looks for a field 
with field name "key" and "message" if you 
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for 
backward compatibility 
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key and 
message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+    String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published 
+You can return a null and the message will be ignored. If you have one static 
topic name then you can use 
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+
+### Specifying Kafka producer properties
+You can provide all the produce properties , see 
http://kafka.apache.org/documentation.html#producerconfigs 
+section "Important configuration properties for the producer", in your Storm 
topology config by setting the properties
+map with key kafka.broker.properties.
+
+###Putting it all together
+
+For the bolt :
+```java
+        TopologyBuilder builder = new TopologyBuilder();
+    
+        Fields fields = new Fields("key", "message");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                    new Values("storm", "1"),
+                    new Values("trident", "1"),
+                    new Values("needs", "1"),
+                    new Values("javadoc", "1")
+        );
+        spout.setCycle(true);
+        builder.setSpout("spout", spout, 5);
+        KafkaBolt bolt = new KafkaBolt()
+                .withTopicSelector(new DefaultTopicSelector("test"))
+                .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper());
+        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+        
+        Config conf = new Config();
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "localhost:9092");
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        
+        StormSubmitter.submitTopology("kafkaboltTest", conf, 
builder.createTopology());
+```
+
+For Trident:
+
+```java
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", "1"),
+                new Values("trident", "1"),
+                new Values("needs", "1"),
+                new Values("javadoc", "1")
+        );
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+                .withTridentTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("word", "count"));
+        stream.partitionPersist(stateFactory, fields, new 
TridentKafkaUpdater(), new Fields());
+
+        Config conf = new Config();
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "localhost:9092");
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
+        StormSubmitter.submitTopology("kafkaTridentTest", conf, 
topology.build());
+```

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/storm-redis.md
----------------------------------------------------------------------
diff --git a/docs/storm-redis.md b/docs/storm-redis.md
new file mode 100644
index 0000000..adbac68
--- /dev/null
+++ b/docs/storm-redis.md
@@ -0,0 +1,258 @@
+---
+title: Storm Redis Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [Redis](http://redis.io/)
+
+Storm-redis uses Jedis for Redis client.
+
+## Usage
+
+### How do I use it?
+
+use it as a maven dependency:
+
+```xml
+<dependency>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>storm-redis</artifactId>
+    <version>${storm.version}</version>
+    <type>jar</type>
+</dependency>
+```
+
+### For normal Bolt
+
+Storm-redis provides basic Bolt implementations, ```RedisLookupBolt``` and 
```RedisStoreBolt```.
+
+As name represents its usage, ```RedisLookupBolt``` retrieves value from Redis 
using key, and ```RedisStoreBolt``` stores key / value to Redis. One tuple will 
be matched to one key / value pair, and you can define match pattern to 
```TupleMapper```.
+
+You can also choose data type from ```RedisDataTypeDescription``` to use. 
Please refer ```RedisDataTypeDescription.RedisDataType``` to see what data 
types are supported. In some data types (hash and sorted set), it requires 
additional key and converted key from tuple becomes element.
+
+These interfaces are combined with ```RedisLookupMapper``` and 
```RedisStoreMapper``` which fit ```RedisLookupBolt``` and ```RedisStoreBolt``` 
respectively.
+
+#### RedisLookupBolt example
+
+```java
+
+class WordCountRedisLookupMapper implements RedisLookupMapper {
+    private RedisDataTypeDescription description;
+    private final String hashKey = "wordCount";
+
+    public WordCountRedisLookupMapper() {
+        description = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+    }
+
+    @Override
+    public List<Values> toTuple(ITuple input, Object value) {
+        String member = getKeyFromTuple(input);
+        List<Values> values = Lists.newArrayList();
+        values.add(new Values(member, value));
+        return values;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("wordName", "count"));
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return description;
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return tuple.getStringByField("word");
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return null;
+    }
+}
+
+```
+
+```java
+
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+        .setHost(host).setPort(port).build();
+RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
+RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+```
+
+#### RedisStoreBolt example
+
+```java
+
+class WordCountStoreMapper implements RedisStoreMapper {
+    private RedisDataTypeDescription description;
+    private final String hashKey = "wordCount";
+
+    public WordCountStoreMapper() {
+        description = new RedisDataTypeDescription(
+            RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return description;
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return tuple.getStringByField("word");
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return tuple.getStringByField("count");
+    }
+}
+```
+
+```java
+
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                .setHost(host).setPort(port).build();
+RedisStoreMapper storeMapper = new WordCountStoreMapper();
+RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
+```
+
+### For non-simple Bolt
+
+If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt```, 
storm-redis also provides ```AbstractRedisBolt``` to let you extend and apply 
your business logic.
+
+```java
+
+    public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
+        private static final Logger LOG = 
LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
+        private static final Random RANDOM = new Random();
+
+        public LookupWordTotalCountBolt(JedisPoolConfig config) {
+            super(config);
+        }
+
+        public LookupWordTotalCountBolt(JedisClusterConfig config) {
+            super(config);
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            JedisCommands jedisCommands = null;
+            try {
+                jedisCommands = getInstance();
+                String wordName = input.getStringByField("word");
+                String countStr = jedisCommands.get(wordName);
+                if (countStr != null) {
+                    int count = Integer.parseInt(countStr);
+                    this.collector.emit(new Values(wordName, count));
+
+                    // print lookup result with low probability
+                    if(RANDOM.nextInt(1000) > 995) {
+                        LOG.info("Lookup result - word : " + wordName + " / 
count : " + count);
+                    }
+                } else {
+                    // skip
+                    LOG.warn("Word not found in Redis - word : " + wordName);
+                }
+            } finally {
+                if (jedisCommands != null) {
+                    returnInstance(jedisCommands);
+                }
+                this.collector.ack(input);
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            // wordName, count
+            declarer.declare(new Fields("wordName", "count"));
+        }
+    }
+
+```
+
+### Trident State usage
+
+1. RedisState and RedisMapState, which provide Jedis interface just for single 
redis.
+
+2. RedisClusterState and RedisClusterMapState, which provide JedisCluster 
interface, just for redis cluster.
+
+RedisState
+```java
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                                        .setHost(redisHost).setPort(redisPort)
+                                        .build();
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
+        RedisState.Factory factory = new RedisState.Factory(poolConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory,
+                                fields,
+                                new 
RedisStateUpdater(storeMapper).withExpire(86400000),
+                                new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisStateQuerier(lookupMapper),
+                                new Fields("columnName","columnValue"));
+```
+
+RedisClusterState
+```java
+        Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+        for (String hostPort : redisHostPort.split(",")) {
+            String[] host_port = hostPort.split(":");
+            nodes.add(new InetSocketAddress(host_port[0], 
Integer.valueOf(host_port[1])));
+        }
+        JedisClusterConfig clusterConfig = new 
JedisClusterConfig.Builder().setNodes(nodes)
+                                        .build();
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
+        RedisClusterState.Factory factory = new 
RedisClusterState.Factory(clusterConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory,
+                                fields,
+                                new 
RedisClusterStateUpdater(storeMapper).withExpire(86400000),
+                                new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisClusterStateQuerier(lookupMapper),
+                                new Fields("columnName","columnValue"));
+```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Robert Evans ([@revans2](https://github.com/revans2))
+ * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))

Reply via email to