Repository: storm
Updated Branches:
  refs/heads/master d7334849b -> 64d7ac6b2


STORM-616: Jdbc connector for storm.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5b160168
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5b160168
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5b160168

Branch: refs/heads/master
Commit: 5b160168c75c0e8c4c402a5e24f606dab697fbef
Parents: 65e9f0c
Author: Parth Brahmbhatt <[email protected]>
Authored: Mon Jan 5 22:14:18 2015 -0500
Committer: Parth Brahmbhatt <[email protected]>
Committed: Mon Jan 5 22:22:46 2015 -0500

----------------------------------------------------------------------
 external/storm-jdbc/LICENSE                     | 202 ++++++++++++++++++
 external/storm-jdbc/README.md                   | 117 ++++++++++
 external/storm-jdbc/pom.xml                     | 120 +++++++++++
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  57 +++++
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    |  85 ++++++++
 .../org/apache/storm/jdbc/common/Column.java    |  81 +++++++
 .../apache/storm/jdbc/common/JDBCClient.java    | 211 +++++++++++++++++++
 .../java/org/apache/storm/jdbc/common/Util.java |  74 +++++++
 .../apache/storm/jdbc/mapper/JdbcMapper.java    |  33 +++
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  87 ++++++++
 .../storm/jdbc/trident/state/JdbcState.java     | 101 +++++++++
 .../jdbc/trident/state/JdbcStateFactory.java    |  40 ++++
 .../storm/jdbc/trident/state/JdbcUpdater.java   |  32 +++
 .../storm/jdbc/common/JdbcClientTest.java       |  86 ++++++++
 .../org/apache/storm/jdbc/spout/UserSpout.java  |  90 ++++++++
 .../jdbc/topology/UserPersistanceTopology.java  |  78 +++++++
 .../UserPersistanceTridentTopology.java         |  76 +++++++
 external/storm-jdbc/src/test/sql/test.sql       |   1 +
 18 files changed, 1571 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/LICENSE
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/LICENSE b/external/storm-jdbc/LICENSE
new file mode 100644
index 0000000..e06d208
--- /dev/null
+++ b/external/storm-jdbc/LICENSE
@@ -0,0 +1,202 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
new file mode 100644
index 0000000..36db3ef
--- /dev/null
+++ b/external/storm-jdbc/README.md
@@ -0,0 +1,117 @@
+#Storm HBase
+
+Storm/Trident integration for JDBC.
+
+## Usage
+The main API for interacting with JDBC is the 
`org.apache.storm.jdbc.mapper.TupleToColumnMapper`
+interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+    List<Column> getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of columns 
representing a row in a database.
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called 
`SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has fields 
with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to 
write to and provide a hikari configuration map.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns 
mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection pool 
with specified Database configuration and
+automatically figure out the column names of the table that you intend to 
write to.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+String tableName = "user_details";
+JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+### JdbcBolt
+To use the `JdbcBolt`, construct it with the name of the table to write to, 
and a `JdbcMapper` implementation. In addition
+you must specify a configuration key that hold the hikari configuration map.
+
+ ```java
+Config config = new Config();
+config.put("jdbc.conf", hikariConfigMap);
+
+JdbcBolt bolt = new JdbcBolt("user_details", jdbcMapper)
+        .withConfigKey("jdbc.conf");
+ ```
+### JdbcTridentState
+We also support a trident persistent state that can be used with trident 
topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name, the JdbcMapper instance 
and hikari configuration. See the example
+below:
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConfigKey("jdbc.conf")
+        .withMapper(jdbcMapper)
+        .withTableName("user");
+
+JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+```
+ 
+## Example: Persistent User details
+A runnable example can be found in the `src/test/java/topology` directory.
+
+### Setup
+* Ensure you have included JDBC implementation dependency for your chosen 
database as part of your build configuration.
+* Start the database and login to the database.
+* Create table user using the following query:
+
+```
+> use test;
+> create table user (id integer, user_name varchar(100), create_date date);
+```
+
+### Execution
+Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using 
storm jar command. The class expects 5 args
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology 
<dataSourceClassName> <dataSource.url> <user> <password> <tableName> [topology 
name]
+
+Mysql Example:
+```
+storm jar 
~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar
 
+org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  
com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
+jdbc:mysql://localhost/test root password user UserPersistenceTopology
+```
+
+You can execute a select query against the user table which shoule show newly 
inserted rows:
+
+```
+select * from user;
+```
+
+For trident you can view 
`org.apache.storm.jdbc.topology.UserPersistanceTridentTopology`.
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer
+
+* Parth Brahmbhatt 
([[email protected]](mailto:[email protected]))
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/pom.xml b/external/storm-jdbc/pom.xml
new file mode 100644
index 0000000..9130908
--- /dev/null
+++ b/external/storm-jdbc/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-jdbc</artifactId>
+
+    <developers>
+        <developer>
+            <id>Parth-Brahmbhatt</id>
+            <name>Parth Brahmbhatt</name>
+            <email>[email protected]</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <hikari.version>2.2.5</hikari.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.3</version>
+        </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP-java6</artifactId>
+            <version>${hikari.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hsqldb</groupId>
+            <artifactId>hsqldb</artifactId>
+            <version>2.3.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>sql-maven-plugin</artifactId>
+                <version>1.5</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.hsqldb</groupId>
+                        <artifactId>hsqldb</artifactId>
+                        <version>2.3.2</version>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <id>create-db</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>execute</goal>
+                        </goals>
+                        <configuration>
+                            <driver>org.hsqldb.jdbcDriver</driver>
+                            <url>jdbc:hsqldb:mem:test;shutdown=false</url>
+                            <username>SA</username>
+                            <password></password>
+                            <autocommit>true</autocommit>
+                            <srcFiles>
+                                <srcFile>src/test/sql/test.sql</srcFile>
+                            </srcFiles>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
new file mode 100644
index 0000000..8dacc2d
--- /dev/null
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.commons.lang.Validate;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public abstract class AbstractJdbcBolt extends BaseRichBolt {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcBolt.class);
+
+    protected OutputCollector collector;
+
+    protected transient JDBCClient jdbcClient;
+    protected String tableName;
+    protected JdbcMapper mapper;
+    protected String configKey;
+
+    public AbstractJdbcBolt(String tableName, JdbcMapper mapper) {
+        Validate.notEmpty(tableName, "Table name can not be blank or null");
+        Validate.notNull(mapper, "mapper can not be null");
+        this.tableName = tableName;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector collector) {
+        this.collector = collector;
+
+        Map<String, Object> conf = (Map<String, 
Object>)map.get(this.configKey);
+        Validate.notEmpty(conf, "Hikari configuration not found using key '" + 
this.configKey + "'");
+
+        this.jdbcClient = new JDBCClient(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
new file mode 100644
index 0000000..e5df1ae
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * <p/>
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
+
+    boolean writeToWAL = true;
+
+    public JdbcBolt(String tableName, JdbcMapper mapper) {
+        super(tableName, mapper);
+    }
+
+    public JdbcBolt withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<Column> columns = mapper.getColumns(tuple);
+            List<List<Column>> columnLists = new ArrayList<List<Column>>();
+            columnLists.add(columns);
+            this.jdbcClient.insert(this.tableName, columnLists);
+        } catch (Exception e) {
+            LOG.warn("Failing tuple.", e);
+            this.collector.fail(tuple);
+            return;
+        }
+
+        this.collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 
{
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
new file mode 100644
index 0000000..0346bf7
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+public class Column<T> {
+
+    private String columnName;
+    private T val;
+    private int sqlType;
+
+    public Column(String columnName, T val, int sqlType) {
+        this.columnName = columnName;
+        this.val = val;
+        this.sqlType = sqlType;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public T getVal() {
+        return val;
+    }
+
+    public int getSqlType() {
+        return sqlType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof Column)) return false;
+
+        Column column = (Column) o;
+
+        if (sqlType != column.sqlType) return false;
+        if (!columnName.equals(column.columnName)) return false;
+        if (!val.equals(column.val)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = columnName.hashCode();
+        result = 31 * result + val.hashCode();
+        result = 31 * result + sqlType;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "Column{" +
+                "columnName='" + columnName + '\'' +
+                ", val=" + val +
+                ", sqlType=" + sqlType +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
new file mode 100644
index 0000000..5b63d2d
--- /dev/null
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JDBCClient {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JDBCClient.class);
+
+    private HikariDataSource dataSource;
+
+    public JDBCClient(Map<String, Object> map) {
+        Properties properties = new Properties();
+        properties.putAll(map);
+        HikariConfig config = new HikariConfig(properties);
+        this.dataSource = new HikariDataSource(config);
+    }
+
+    public int insert(String tableName, List<List<Column>> columnLists) {
+        Connection connection = null;
+        try {
+            connection = this.dataSource.getConnection();
+            StringBuilder sb = new StringBuilder();
+            sb.append("Insert into ").append(tableName).append(" (");
+            Collection<String> columnNames = 
Collections2.transform(columnLists.get(0), new Function<Column, String>() {
+                @Override
+                public String apply(Column input) {
+                    return input.getColumnName();
+                }
+            });
+            String columns = Joiner.on(",").join(columnNames);
+            sb.append(columns).append(") values ( ");
+
+            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", 
columnNames.size()));
+            sb.append(placeHolders).append(")");
+
+            String query = sb.toString();
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Executing query " + query);
+            }
+
+            PreparedStatement preparedStatement = 
connection.prepareStatement(query);
+            for(List<Column> columnList : columnLists) {
+                setPreparedStatementParams(preparedStatement, columnList);
+            }
+
+            return preparedStatement.executeUpdate();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to insert in table " + 
tableName, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public List<List<Column>> select(String sqlQuery, List<Column> 
queryParams) {
+        Connection connection = null;
+        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
+        try {
+            connection = this.dataSource.getConnection();
+            PreparedStatement preparedStatement = 
connection.prepareStatement(sqlQuery);
+            setPreparedStatementParams(preparedStatement, queryParams);
+            ResultSet resultSet = preparedStatement.executeQuery();
+            List<List<Column>> rows = Lists.newArrayList();
+            while(resultSet.next()){
+                ResultSetMetaData metaData = resultSet.getMetaData();
+                int columnCount = metaData.getColumnCount();
+                List<Column> row = Lists.newArrayList();
+                for(int i=1 ; i <= columnCount; i++) {
+                    String columnLabel = metaData.getColumnLabel(i);
+                    int columnType = metaData.getColumnType(i);
+                    Object val = null;
+                    Class columnJavaType = Util.getJavaType(columnType);
+                    if (columnJavaType == String.class) {
+                        row.add(new Column<String>(columnLabel, 
resultSet.getString(columnLabel), columnType));
+                    } else if (columnJavaType == Integer.class) {
+                        row.add(new Column<Integer>(columnLabel, 
resultSet.getInt(columnLabel), columnType));
+                    } else if (columnJavaType == Double.class) {
+                        row.add(new Column<Double>(columnLabel, 
resultSet.getDouble(columnLabel), columnType));
+                    } else if (columnJavaType == Float.class) {
+                        row.add(new Column<Float>(columnLabel, 
resultSet.getFloat(columnLabel), columnType));
+                    } else if (columnJavaType == Short.class) {
+                        row.add(new Column<Short>(columnLabel, 
resultSet.getShort(columnLabel), columnType));
+                    } else if (columnJavaType == Boolean.class) {
+                        row.add(new Column<Boolean>(columnLabel, 
resultSet.getBoolean(columnLabel), columnType));
+                    } else if (columnJavaType == byte[].class) {
+                        row.add(new Column<byte[]>(columnLabel, 
resultSet.getBytes(columnLabel), columnType));
+                    } else if (columnJavaType == Long.class) {
+                        row.add(new Column<Long>(columnLabel, 
resultSet.getLong(columnLabel), columnType));
+                    } else if (columnJavaType == Date.class) {
+                        row.add(new Column<Date>(columnLabel, 
resultSet.getDate(columnLabel), columnType));
+                    } else if (columnJavaType == Time.class) {
+                        row.add(new Column<Time>(columnLabel, 
resultSet.getTime(columnLabel), columnType));
+                    } else if (columnJavaType == Timestamp.class) {
+                        row.add(new Column<Timestamp>(columnLabel, 
resultSet.getTimestamp(columnLabel), columnType));
+                    } else {
+                        throw new RuntimeException("type =  " + columnType + " 
for column " + columnLabel + " not supported.");
+                    }
+                }
+                rows.add(row);
+            }
+            return rows;
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute select query " + 
sqlQuery, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public Map<String, Integer> getColumnSchema(String tableName) {
+        Connection connection = null;
+        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
+        try {
+            connection = this.dataSource.getConnection();
+            DatabaseMetaData metaData = connection.getMetaData();
+            ResultSet resultSet = metaData.getColumns(null, null, tableName, 
null);
+            while (resultSet.next()) {
+                columnSchemaMap.put(resultSet.getString("COLUMN_NAME"), 
resultSet.getInt("DATA_TYPE"));
+            }
+            return columnSchemaMap;
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to get schema for table " + 
tableName, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public void executeSql(String sql) {
+        Connection connection = null;
+        try {
+            connection = this.dataSource.getConnection();
+            Statement statement = connection.createStatement();
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute SQL", e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    private void setPreparedStatementParams(PreparedStatement 
preparedStatement, List<Column> columnList) throws SQLException {
+        int index = 1;
+        for (Column column : columnList) {
+            Class columnJavaType = Util.getJavaType(column.getSqlType());
+            if (column.getVal() == null) {
+                preparedStatement.setNull(index, column.getSqlType());
+            } else if (columnJavaType == String.class) {
+                preparedStatement.setString(index, (String) column.getVal());
+            } else if (columnJavaType == Integer.class) {
+                preparedStatement.setInt(index, (Integer) column.getVal());
+            } else if (columnJavaType == Double.class) {
+                preparedStatement.setDouble(index, (Double) column.getVal());
+            } else if (columnJavaType == Float.class) {
+                preparedStatement.setFloat(index, (Float) column.getVal());
+            } else if (columnJavaType == Short.class) {
+                preparedStatement.setShort(index, (Short) column.getVal());
+            } else if (columnJavaType == Boolean.class) {
+                preparedStatement.setBoolean(index, (Boolean) column.getVal());
+            } else if (columnJavaType == byte[].class) {
+                preparedStatement.setBytes(index, (byte[]) column.getVal());
+            } else if (columnJavaType == Long.class) {
+                preparedStatement.setLong(index, (Long) column.getVal());
+            } else if (columnJavaType == Date.class) {
+                preparedStatement.setDate(index, (Date) column.getVal());
+            } else if (columnJavaType == Time.class) {
+                preparedStatement.setTime(index, (Time) column.getVal());
+            } else if (columnJavaType == Timestamp.class) {
+                preparedStatement.setTimestamp(index, (Timestamp) 
column.getVal());
+            } else {
+                throw new RuntimeException("Unknown type of value " + 
column.getVal() + " for column " + column.getColumnName());
+            }
+            ++index;
+        }
+    }
+
+    private void closeConnection(Connection connection) {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                throw new RuntimeException("Failed to close connection", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java
new file mode 100644
index 0000000..cc723c3
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+public class Util {
+    public static String getSqlTypeName(int sqlType) {
+        try {
+            for (Field field : Types.class.getFields()) {
+                if (sqlType == field.get(null)) {
+                    return field.getName();
+                }
+            }
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("Could not get sqlTypeName ", e);
+        }
+        throw new RuntimeException("Unknown sqlType " + sqlType);
+    }
+
+    public static Class getJavaType(int sqlType) {
+        switch (sqlType) {
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                return String.class;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+                return byte[].class;
+            case Types.BIT:
+                return Boolean.class;
+            case Types.TINYINT:
+            case Types.SMALLINT:
+                return Short.class;
+            case Types.INTEGER:
+                return Integer.class;
+            case Types.BIGINT:
+                return Long.class;
+            case Types.REAL:
+                return Float.class;
+            case Types.DOUBLE:
+            case Types.FLOAT:
+                return Double.class;
+            case Types.DATE:
+                return Date.class;
+            case Types.TIME:
+                return Time.class;
+            case Types.TIMESTAMP:
+                return Timestamp.class;
+            default:
+                throw new RuntimeException("We do not support tables with 
SqlType: " + getSqlTypeName(sqlType));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java
new file mode 100644
index 0000000..c8c80bc
--- /dev/null
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.mapper;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.jdbc.common.Column;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface JdbcMapper extends Serializable {
+    /**
+     *
+     * @param tuple
+     * @return list of columns that represents one row in a DB table.
+     */
+    List<Column> getColumns(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
new file mode 100644
index 0000000..7011a72
--- /dev/null
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.mapper;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.common.Util;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SimpleJdbcMapper implements JdbcMapper {
+
+    private Map<String, Integer> columnNameToType;
+
+    public SimpleJdbcMapper(String tableName, Map map) {
+        JDBCClient client = new JDBCClient(map);
+        this.columnNameToType = client.getColumnSchema(tableName);
+    }
+
+    @Override
+    public List<Column> getColumns(ITuple tuple) {
+        List<Column> columns = new ArrayList<Column>();
+        for(Map.Entry<String, Integer> entry: columnNameToType.entrySet()) {
+            String columnName = entry.getKey();
+            Integer columnSqlType = entry.getValue();
+
+            if(Util.getJavaType(columnSqlType).equals(String.class)) {
+                String value = tuple.getStringByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Short.class)) {
+                Short value = tuple.getShortByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Integer.class)) {
+                Integer value = tuple.getIntegerByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Long.class)) {
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Double.class)) {
+                Double value = tuple.getDoubleByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Float.class)) {
+                Float value = tuple.getFloatByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Boolean.class)) {
+                Boolean value = tuple.getBooleanByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(byte[].class)) {
+                byte[] value = tuple.getBinaryByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Date.class)) {
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, new Date(value), 
columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Time.class)) {
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, new Time(value), 
columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Timestamp.class)) 
{
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, new Timestamp(value), 
columnSqlType));
+            } else {
+                throw new RuntimeException("Unsupported java type in tuple " + 
Util.getJavaType(columnSqlType));
+            }
+        }
+        return columns;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
new file mode 100644
index 0000000..fec2ee4
--- /dev/null
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import backtype.storm.topology.FailedException;
+import org.apache.commons.lang.Validate;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcState.class);
+
+    private Options options;
+    private JDBCClient jdbcClient;
+    private Map map;
+
+    protected JdbcState(Map map, int partitionIndex, int numPartitions, 
Options options) {
+        this.options = options;
+        this.map = map;
+    }
+
+    public static class Options implements Serializable {
+        private JdbcMapper mapper;
+        private String configKey;
+        private String tableName;
+
+        public Options withConfigKey(String configKey) {
+            this.configKey = configKey;
+            return this;
+        }
+
+        public Options withTableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Options withMapper(JdbcMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        Map<String, Object> conf = (Map<String, Object>) 
map.get(options.configKey);
+        Validate.notEmpty(conf, "Hikari configuration not found using key '" + 
options.configKey + "'");
+
+        this.jdbcClient = new JDBCClient(conf);
+    }
+
+    @Override
+    public void beginCommit(Long aLong) {
+        LOG.debug("beginCommit is noop.");
+    }
+
+    @Override
+    public void commit(Long aLong) {
+        LOG.debug("commit is noop.");
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
+        List<List<Column>> columnsLists = new ArrayList<List<Column>>();
+
+        for (TridentTuple tuple : tuples) {
+            columnsLists.add(options.mapper.getColumns(tuple));
+        }
+
+        try {
+            jdbcClient.insert(options.tableName, columnsLists);
+        } catch (Exception e) {
+            LOG.warn("Batch write failed but some requests might have 
succeeded. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java
new file mode 100644
index 0000000..a1bbdef
--- /dev/null
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class JdbcStateFactory implements StateFactory {
+
+    private JdbcState.Options options;
+
+    public JdbcStateFactory(JdbcState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map map, IMetricsContext iMetricsContext, int 
partitionIndex, int numPartitions) {
+        JdbcState state = new JdbcState(map , partitionIndex, numPartitions, 
options);
+        state.prepare();
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
new file mode 100644
index 0000000..b76e230
--- /dev/null
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class JdbcUpdater extends BaseStateUpdater<JdbcState>  {
+
+    @Override
+    public void updateState(JdbcState jdbcState, List<TridentTuple> tuples, 
TridentCollector collector) {
+        jdbcState.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
new file mode 100644
index 0000000..432d9f8
--- /dev/null
+++ 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Date;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcClientTest {
+
+    private JDBCClient client;
+
+    private static final String tableName = "user_details";
+    @Before
+    public void setup() {
+        Map map = Maps.newHashMap();
+        
map.put("dataSourceClassName","org.hsqldb.jdbc.JDBCDataSource");//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", 
"jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test
+        map.put("dataSource.user","SA");//root
+        map.put("dataSource.password","");//password
+
+        this.client = new JDBCClient(map);
+        client.executeSql("create table user_details (id integer, user_name 
varchar(100), create_date date)");
+    }
+
+    @Test
+    public void testInsertAndSelect() {
+        int id = 1;
+        String name = "bob";
+        Date createDate = new Date(System.currentTimeMillis());
+
+        List<Column> columns = Lists.newArrayList(
+                new Column("id",id, Types.INTEGER),
+                new Column("user_name",name, Types.VARCHAR),
+                new Column("create_date", createDate , Types.DATE)
+                );
+
+        List<List<Column>> columnList = new ArrayList<List<Column>>();
+        columnList.add(columns);
+        client.insert(tableName, columnList);
+
+        List<List<Column>> rows = client.select("select * from user_details 
where id = ?", Lists.newArrayList(new Column("id", id, Types.INTEGER)));
+        for(List<Column> row : rows) {
+            for(Column column : row) {
+                if(column.getColumnName().equalsIgnoreCase("id")) {
+                    Assert.assertEquals(id, column.getVal());
+                } else 
if(column.getColumnName().equalsIgnoreCase("user_name")) {
+                    Assert.assertEquals(name, column.getVal());
+                } else 
if(column.getColumnName().equalsIgnoreCase("create_date")) {
+                    Assert.assertEquals(createDate.toString(), 
column.getVal().toString());
+                } else {
+                    throw new AssertionError("Unknown column" + column);
+                }
+            }
+        }
+    }
+
+    @After
+    public void cleanup() {
+        client.executeSql("drop table " + tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
new file mode 100644
index 0000000..39fde59
--- /dev/null
+++ 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+
+import java.util.*;
+
+public class UserSpout implements IRichSpout {
+    boolean isDistributed;
+    SpoutOutputCollector collector;
+    public static final List<Values> rows = Lists.newArrayList(
+            new Values(1,"peter",System.currentTimeMillis()),
+            new Values(2,"bob",System.currentTimeMillis()),
+            new Values(3,"alice",System.currentTimeMillis()));
+
+    public UserSpout() {
+        this(true);
+    }
+
+    public UserSpout(boolean isDistributed) {
+        this.isDistributed = isDistributed;
+    }
+
+    public boolean isDistributed() {
+        return this.isDistributed;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
+        this.collector = collector;
+    }
+
+    public void close() {
+
+    }
+
+    public void nextTuple() {
+        final Random rand = new Random();
+        final Values row = rows.get(rand.nextInt(rows.size() - 1));
+        this.collector.emit(row);
+        Thread.yield();
+    }
+
+    public void ack(Object msgId) {
+
+    }
+
+    public void fail(Object msgId) {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("id","user_name","create_date"));
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
new file mode 100644
index 0000000..21e4639
--- /dev/null
+++ 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.bolt.JdbcBolt;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+
+import java.util.Map;
+
+
+public class UserPersistanceTopology {
+    private static final String USER_SPOUT = "USER_SPOUT";
+    private static final String USER_BOLT = "USER_BOLT";
+
+    public static void main(String[] args) throws Exception {
+        if(args.length < 5) {
+            System.out.println("Usage: UserPersistanceTopology 
<dataSourceClassName> <dataSource.url> " +
+                    "<user> <password> <tableName> [topology name]");
+        }
+        Map map = Maps.newHashMap();
+        
map.put("dataSourceClassName",args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+        map.put("dataSource.user",args[2]);//root
+        map.put("dataSource.password",args[3]);//password
+        String tableName = args[4];//database table name
+        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+
+        Config config = new Config();
+
+        config.put("jdbc.conf", map);
+
+        UserSpout spout = new UserSpout();
+        JdbcBolt bolt = new JdbcBolt(tableName, jdbcMapper)
+                .withConfigKey("jdbc.conf");
+
+        // userSpout ==> jdbcBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(USER_SPOUT, spout, 1);
+        builder.setBolt(USER_BOLT, bolt, 1).shuffleGrouping(USER_SPOUT);
+
+        if (args.length == 5) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 6) {
+            StormSubmitter.submitTopology(args[6], config, 
builder.createTopology());
+        } else {
+            System.out.println("Usage: UserPersistanceTopology 
<dataSourceClassName> <dataSource.url> " +
+                    "<user> <password> <tableName> [topology name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
new file mode 100644
index 0000000..3b2ee66
--- /dev/null
+++ 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.trident.state.JdbcState;
+import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
+import org.apache.storm.jdbc.trident.state.JdbcUpdater;
+import storm.trident.Stream;
+import storm.trident.TridentTopology;
+
+import java.util.Map;
+
+public class UserPersistanceTridentTopology {
+
+    public static void main(String[] args) throws Exception {
+        Map map = Maps.newHashMap();
+        map.put("dataSourceClassName", 
args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+        map.put("dataSource.user",args[2]);//root
+        map.put("dataSource.password",args[3]);//password
+        String tableName = args[4];//database table name
+        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+
+        Config config = new Config();
+
+        config.put("jdbc.conf", map);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("userSpout", new UserSpout());
+
+        JdbcState.Options options = new JdbcState.Options()
+                .withConfigKey("jdbc.conf")
+                .withMapper(jdbcMapper)
+                .withTableName("user");
+
+        JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+        stream.partitionPersist(jdbcStateFactory, new 
Fields("id","user_name","create_date"),  new JdbcUpdater(), new Fields());
+        if (args.length == 5) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, topology.build());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 6) {
+            StormSubmitter.submitTopology(args[6], config, topology.build());
+        } else {
+            System.out.println("Usage: UserPersistanceTopology 
<dataSourceClassName> <dataSource.url> " +
+                    "<user> <password> <tableName> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/sql/test.sql
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/sql/test.sql 
b/external/storm-jdbc/src/test/sql/test.sql
new file mode 100644
index 0000000..a402a68
--- /dev/null
+++ b/external/storm-jdbc/src/test/sql/test.sql
@@ -0,0 +1 @@
+create table user_details (id integer, user_name varchar(100), create_date 
date);
\ No newline at end of file

Reply via email to