Repository: storm Updated Branches: refs/heads/master d7334849b -> 64d7ac6b2
STORM-616: Jdbc connector for storm. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5b160168 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5b160168 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5b160168 Branch: refs/heads/master Commit: 5b160168c75c0e8c4c402a5e24f606dab697fbef Parents: 65e9f0c Author: Parth Brahmbhatt <[email protected]> Authored: Mon Jan 5 22:14:18 2015 -0500 Committer: Parth Brahmbhatt <[email protected]> Committed: Mon Jan 5 22:22:46 2015 -0500 ---------------------------------------------------------------------- external/storm-jdbc/LICENSE | 202 ++++++++++++++++++ external/storm-jdbc/README.md | 117 ++++++++++ external/storm-jdbc/pom.xml | 120 +++++++++++ .../storm/jdbc/bolt/AbstractJdbcBolt.java | 57 +++++ .../org/apache/storm/jdbc/bolt/JdbcBolt.java | 85 ++++++++ .../org/apache/storm/jdbc/common/Column.java | 81 +++++++ .../apache/storm/jdbc/common/JDBCClient.java | 211 +++++++++++++++++++ .../java/org/apache/storm/jdbc/common/Util.java | 74 +++++++ .../apache/storm/jdbc/mapper/JdbcMapper.java | 33 +++ .../storm/jdbc/mapper/SimpleJdbcMapper.java | 87 ++++++++ .../storm/jdbc/trident/state/JdbcState.java | 101 +++++++++ .../jdbc/trident/state/JdbcStateFactory.java | 40 ++++ .../storm/jdbc/trident/state/JdbcUpdater.java | 32 +++ .../storm/jdbc/common/JdbcClientTest.java | 86 ++++++++ .../org/apache/storm/jdbc/spout/UserSpout.java | 90 ++++++++ .../jdbc/topology/UserPersistanceTopology.java | 78 +++++++ .../UserPersistanceTridentTopology.java | 76 +++++++ external/storm-jdbc/src/test/sql/test.sql | 1 + 18 files changed, 1571 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/LICENSE ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/LICENSE b/external/storm-jdbc/LICENSE new file mode 100644 index 0000000..e06d208 --- /dev/null +++ b/external/storm-jdbc/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/README.md ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md new file mode 100644 index 0000000..36db3ef --- /dev/null +++ b/external/storm-jdbc/README.md @@ -0,0 +1,117 @@ +#Storm HBase + +Storm/Trident integration for JDBC. + +## Usage +The main API for interacting with JDBC is the `org.apache.storm.jdbc.mapper.TupleToColumnMapper` +interface: + +```java +public interface JdbcMapper extends Serializable { + List<Column> getColumns(ITuple tuple); +} +``` + +The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database. + +### SimpleJdbcMapper +`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm +tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has fields with same name as the column name in +the database table that you intend to write to. + +To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map. + +The following code creates a `SimpleJdbcMapper` instance that: + +1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details. +2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and +automatically figure out the column names of the table that you intend to write to. + +```java +Map hikariConfigMap = Maps.newHashMap(); +hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); +hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test"); +hikariConfigMap.put("dataSource.user","root"); +hikariConfigMap.put("dataSource.password","password"); +String tableName = "user_details"; +JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map); +``` +### JdbcBolt +To use the `JdbcBolt`, construct it with the name of the table to write to, and a `JdbcMapper` implementation. In addition +you must specify a configuration key that hold the hikari configuration map. + + ```java +Config config = new Config(); +config.put("jdbc.conf", hikariConfigMap); + +JdbcBolt bolt = new JdbcBolt("user_details", jdbcMapper) + .withConfigKey("jdbc.conf"); + ``` +### JdbcTridentState +We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident +state you need to initialize it with the table name, the JdbcMapper instance and hikari configuration. See the example +below: + +```java +JdbcState.Options options = new JdbcState.Options() + .withConfigKey("jdbc.conf") + .withMapper(jdbcMapper) + .withTableName("user"); + +JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); +``` + +## Example: Persistent User details +A runnable example can be found in the `src/test/java/topology` directory. + +### Setup +* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration. +* Start the database and login to the database. +* Create table user using the following query: + +``` +> use test; +> create table user (id integer, user_name varchar(100), create_date date); +``` + +### Execution +Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args +storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> <tableName> [topology name] + +Mysql Example: +``` +storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar +org.apache.storm.jdbc.topology.UserPersistanceTridentTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource +jdbc:mysql://localhost/test root password user UserPersistenceTopology +``` + +You can execute a select query against the user table which shoule show newly inserted rows: + +``` +select * from user; +``` + +For trident you can view `org.apache.storm.jdbc.topology.UserPersistanceTridentTopology`. +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +## Committer + +* Parth Brahmbhatt ([[email protected]](mailto:[email protected])) + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/pom.xml b/external/storm-jdbc/pom.xml new file mode 100644 index 0000000..9130908 --- /dev/null +++ b/external/storm-jdbc/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>0.10.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>storm-jdbc</artifactId> + + <developers> + <developer> + <id>Parth-Brahmbhatt</id> + <name>Parth Brahmbhatt</name> + <email>[email protected]</email> + </developer> + </developers> + + <properties> + <hikari.version>2.2.5</hikari.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.3</version> + </dependency> + <dependency> + <groupId>com.zaxxer</groupId> + <artifactId>HikariCP-java6</artifactId> + <version>${hikari.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>sql-maven-plugin</artifactId> + <version>1.5</version> + <dependencies> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.2</version> + </dependency> + </dependencies> + <executions> + <execution> + <id>create-db</id> + <phase>process-test-resources</phase> + <goals> + <goal>execute</goal> + </goals> + <configuration> + <driver>org.hsqldb.jdbcDriver</driver> + <url>jdbc:hsqldb:mem:test;shutdown=false</url> + <username>SA</username> + <password></password> + <autocommit>true</autocommit> + <srcFiles> + <srcFile>src/test/sql/test.sql</srcFile> + </srcFiles> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java new file mode 100644 index 0000000..8dacc2d --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.bolt; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.base.BaseRichBolt; +import org.apache.commons.lang.Validate; +import org.apache.storm.jdbc.common.JDBCClient; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public abstract class AbstractJdbcBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcBolt.class); + + protected OutputCollector collector; + + protected transient JDBCClient jdbcClient; + protected String tableName; + protected JdbcMapper mapper; + protected String configKey; + + public AbstractJdbcBolt(String tableName, JdbcMapper mapper) { + Validate.notEmpty(tableName, "Table name can not be blank or null"); + Validate.notNull(mapper, "mapper can not be null"); + this.tableName = tableName; + this.mapper = mapper; + } + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { + this.collector = collector; + + Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey); + Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'"); + + this.jdbcClient = new JDBCClient(conf); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java new file mode 100644 index 0000000..e5df1ae --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.bolt; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Basic bolt for writing to any Database table. + * <p/> + * Note: Each JdbcBolt defined in a topology is tied to a specific table. + */ +public class JdbcBolt extends AbstractJdbcBolt { + private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class); + + boolean writeToWAL = true; + + public JdbcBolt(String tableName, JdbcMapper mapper) { + super(tableName, mapper); + } + + public JdbcBolt withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } + + @Override + public void execute(Tuple tuple) { + try { + List<Column> columns = mapper.getColumns(tuple); + List<List<Column>> columnLists = new ArrayList<List<Column>>(); + columnLists.add(columns); + this.jdbcClient.insert(this.tableName, columnLists); + } catch (Exception e) { + LOG.warn("Failing tuple.", e); + this.collector.fail(tuple); + return; + } + + this.collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java new file mode 100644 index 0000000..0346bf7 --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.common; + + +import java.lang.reflect.Field; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; + +public class Column<T> { + + private String columnName; + private T val; + private int sqlType; + + public Column(String columnName, T val, int sqlType) { + this.columnName = columnName; + this.val = val; + this.sqlType = sqlType; + } + + public String getColumnName() { + return columnName; + } + + public T getVal() { + return val; + } + + public int getSqlType() { + return sqlType; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Column)) return false; + + Column column = (Column) o; + + if (sqlType != column.sqlType) return false; + if (!columnName.equals(column.columnName)) return false; + if (!val.equals(column.val)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = columnName.hashCode(); + result = 31 * result + val.hashCode(); + result = 31 * result + sqlType; + return result; + } + + @Override + public String toString() { + return "Column{" + + "columnName='" + columnName + '\'' + + ", val=" + val + + ", sqlType=" + sqlType + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java new file mode 100644 index 0000000..5b63d2d --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.common; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.sql.Date; +import java.util.*; + +public class JDBCClient { + private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class); + + private HikariDataSource dataSource; + + public JDBCClient(Map<String, Object> map) { + Properties properties = new Properties(); + properties.putAll(map); + HikariConfig config = new HikariConfig(properties); + this.dataSource = new HikariDataSource(config); + } + + public int insert(String tableName, List<List<Column>> columnLists) { + Connection connection = null; + try { + connection = this.dataSource.getConnection(); + StringBuilder sb = new StringBuilder(); + sb.append("Insert into ").append(tableName).append(" ("); + Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() { + @Override + public String apply(Column input) { + return input.getColumnName(); + } + }); + String columns = Joiner.on(",").join(columnNames); + sb.append(columns).append(") values ( "); + + String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size())); + sb.append(placeHolders).append(")"); + + String query = sb.toString(); + if(LOG.isDebugEnabled()) { + LOG.debug("Executing query " + query); + } + + PreparedStatement preparedStatement = connection.prepareStatement(query); + for(List<Column> columnList : columnLists) { + setPreparedStatementParams(preparedStatement, columnList); + } + + return preparedStatement.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException("Failed to insert in table " + tableName, e); + } finally { + closeConnection(connection); + } + } + + public List<List<Column>> select(String sqlQuery, List<Column> queryParams) { + Connection connection = null; + Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>(); + try { + connection = this.dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery); + setPreparedStatementParams(preparedStatement, queryParams); + ResultSet resultSet = preparedStatement.executeQuery(); + List<List<Column>> rows = Lists.newArrayList(); + while(resultSet.next()){ + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + List<Column> row = Lists.newArrayList(); + for(int i=1 ; i <= columnCount; i++) { + String columnLabel = metaData.getColumnLabel(i); + int columnType = metaData.getColumnType(i); + Object val = null; + Class columnJavaType = Util.getJavaType(columnType); + if (columnJavaType == String.class) { + row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType)); + } else if (columnJavaType == Integer.class) { + row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType)); + } else if (columnJavaType == Double.class) { + row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType)); + } else if (columnJavaType == Float.class) { + row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType)); + } else if (columnJavaType == Short.class) { + row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType)); + } else if (columnJavaType == Boolean.class) { + row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType)); + } else if (columnJavaType == byte[].class) { + row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType)); + } else if (columnJavaType == Long.class) { + row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType)); + } else if (columnJavaType == Date.class) { + row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType)); + } else if (columnJavaType == Time.class) { + row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType)); + } else if (columnJavaType == Timestamp.class) { + row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType)); + } else { + throw new RuntimeException("type = " + columnType + " for column " + columnLabel + " not supported."); + } + } + rows.add(row); + } + return rows; + } catch (SQLException e) { + throw new RuntimeException("Failed to execute select query " + sqlQuery, e); + } finally { + closeConnection(connection); + } + } + + public Map<String, Integer> getColumnSchema(String tableName) { + Connection connection = null; + Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>(); + try { + connection = this.dataSource.getConnection(); + DatabaseMetaData metaData = connection.getMetaData(); + ResultSet resultSet = metaData.getColumns(null, null, tableName, null); + while (resultSet.next()) { + columnSchemaMap.put(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE")); + } + return columnSchemaMap; + } catch (SQLException e) { + throw new RuntimeException("Failed to get schema for table " + tableName, e); + } finally { + closeConnection(connection); + } + } + + public void executeSql(String sql) { + Connection connection = null; + try { + connection = this.dataSource.getConnection(); + Statement statement = connection.createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Failed to execute SQL", e); + } finally { + closeConnection(connection); + } + } + + private void setPreparedStatementParams(PreparedStatement preparedStatement, List<Column> columnList) throws SQLException { + int index = 1; + for (Column column : columnList) { + Class columnJavaType = Util.getJavaType(column.getSqlType()); + if (column.getVal() == null) { + preparedStatement.setNull(index, column.getSqlType()); + } else if (columnJavaType == String.class) { + preparedStatement.setString(index, (String) column.getVal()); + } else if (columnJavaType == Integer.class) { + preparedStatement.setInt(index, (Integer) column.getVal()); + } else if (columnJavaType == Double.class) { + preparedStatement.setDouble(index, (Double) column.getVal()); + } else if (columnJavaType == Float.class) { + preparedStatement.setFloat(index, (Float) column.getVal()); + } else if (columnJavaType == Short.class) { + preparedStatement.setShort(index, (Short) column.getVal()); + } else if (columnJavaType == Boolean.class) { + preparedStatement.setBoolean(index, (Boolean) column.getVal()); + } else if (columnJavaType == byte[].class) { + preparedStatement.setBytes(index, (byte[]) column.getVal()); + } else if (columnJavaType == Long.class) { + preparedStatement.setLong(index, (Long) column.getVal()); + } else if (columnJavaType == Date.class) { + preparedStatement.setDate(index, (Date) column.getVal()); + } else if (columnJavaType == Time.class) { + preparedStatement.setTime(index, (Time) column.getVal()); + } else if (columnJavaType == Timestamp.class) { + preparedStatement.setTimestamp(index, (Timestamp) column.getVal()); + } else { + throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName()); + } + ++index; + } + } + + private void closeConnection(Connection connection) { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + throw new RuntimeException("Failed to close connection", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java new file mode 100644 index 0000000..cc723c3 --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.common; + +import java.lang.reflect.Field; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; + +public class Util { + public static String getSqlTypeName(int sqlType) { + try { + for (Field field : Types.class.getFields()) { + if (sqlType == field.get(null)) { + return field.getName(); + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException("Could not get sqlTypeName ", e); + } + throw new RuntimeException("Unknown sqlType " + sqlType); + } + + public static Class getJavaType(int sqlType) { + switch (sqlType) { + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + return String.class; + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + return byte[].class; + case Types.BIT: + return Boolean.class; + case Types.TINYINT: + case Types.SMALLINT: + return Short.class; + case Types.INTEGER: + return Integer.class; + case Types.BIGINT: + return Long.class; + case Types.REAL: + return Float.class; + case Types.DOUBLE: + case Types.FLOAT: + return Double.class; + case Types.DATE: + return Date.class; + case Types.TIME: + return Time.class; + case Types.TIMESTAMP: + return Timestamp.class; + default: + throw new RuntimeException("We do not support tables with SqlType: " + getSqlTypeName(sqlType)); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java new file mode 100644 index 0000000..c8c80bc --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.mapper; + +import backtype.storm.tuple.ITuple; +import org.apache.storm.jdbc.common.Column; + +import java.io.Serializable; +import java.util.List; + +public interface JdbcMapper extends Serializable { + /** + * + * @param tuple + * @return list of columns that represents one row in a DB table. + */ + List<Column> getColumns(ITuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java new file mode 100644 index 0000000..7011a72 --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.mapper; + +import backtype.storm.tuple.ITuple; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.JDBCClient; +import org.apache.storm.jdbc.common.Util; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class SimpleJdbcMapper implements JdbcMapper { + + private Map<String, Integer> columnNameToType; + + public SimpleJdbcMapper(String tableName, Map map) { + JDBCClient client = new JDBCClient(map); + this.columnNameToType = client.getColumnSchema(tableName); + } + + @Override + public List<Column> getColumns(ITuple tuple) { + List<Column> columns = new ArrayList<Column>(); + for(Map.Entry<String, Integer> entry: columnNameToType.entrySet()) { + String columnName = entry.getKey(); + Integer columnSqlType = entry.getValue(); + + if(Util.getJavaType(columnSqlType).equals(String.class)) { + String value = tuple.getStringByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Short.class)) { + Short value = tuple.getShortByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Integer.class)) { + Integer value = tuple.getIntegerByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Long.class)) { + Long value = tuple.getLongByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Double.class)) { + Double value = tuple.getDoubleByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Float.class)) { + Float value = tuple.getFloatByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Boolean.class)) { + Boolean value = tuple.getBooleanByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(byte[].class)) { + byte[] value = tuple.getBinaryByField(columnName); + columns.add(new Column(columnName, value, columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Date.class)) { + Long value = tuple.getLongByField(columnName); + columns.add(new Column(columnName, new Date(value), columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Time.class)) { + Long value = tuple.getLongByField(columnName); + columns.add(new Column(columnName, new Time(value), columnSqlType)); + } else if(Util.getJavaType(columnSqlType).equals(Timestamp.class)) { + Long value = tuple.getLongByField(columnName); + columns.add(new Column(columnName, new Timestamp(value), columnSqlType)); + } else { + throw new RuntimeException("Unsupported java type in tuple " + Util.getJavaType(columnSqlType)); + } + } + return columns; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java new file mode 100644 index 0000000..fec2ee4 --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.trident.state; + +import backtype.storm.topology.FailedException; +import org.apache.commons.lang.Validate; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.JDBCClient; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.trident.operation.TridentCollector; +import storm.trident.state.State; +import storm.trident.tuple.TridentTuple; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class JdbcState implements State { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcState.class); + + private Options options; + private JDBCClient jdbcClient; + private Map map; + + protected JdbcState(Map map, int partitionIndex, int numPartitions, Options options) { + this.options = options; + this.map = map; + } + + public static class Options implements Serializable { + private JdbcMapper mapper; + private String configKey; + private String tableName; + + public Options withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } + + public Options withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Options withMapper(JdbcMapper mapper) { + this.mapper = mapper; + return this; + } + } + + protected void prepare() { + Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey); + Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'"); + + this.jdbcClient = new JDBCClient(conf); + } + + @Override + public void beginCommit(Long aLong) { + LOG.debug("beginCommit is noop."); + } + + @Override + public void commit(Long aLong) { + LOG.debug("commit is noop."); + } + + public void updateState(List<TridentTuple> tuples, TridentCollector collector) { + List<List<Column>> columnsLists = new ArrayList<List<Column>>(); + + for (TridentTuple tuple : tuples) { + columnsLists.add(options.mapper.getColumns(tuple)); + } + + try { + jdbcClient.insert(options.tableName, columnsLists); + } catch (Exception e) { + LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e); + throw new FailedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java new file mode 100644 index 0000000..a1bbdef --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.trident.state; + +import backtype.storm.task.IMetricsContext; +import storm.trident.state.State; +import storm.trident.state.StateFactory; + +import java.util.Map; + +public class JdbcStateFactory implements StateFactory { + + private JdbcState.Options options; + + public JdbcStateFactory(JdbcState.Options options) { + this.options = options; + } + + @Override + public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { + JdbcState state = new JdbcState(map , partitionIndex, numPartitions, options); + state.prepare(); + return state; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java new file mode 100644 index 0000000..b76e230 --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.trident.state; + +import storm.trident.operation.TridentCollector; +import storm.trident.state.BaseStateUpdater; +import storm.trident.tuple.TridentTuple; + +import java.util.List; + +public class JdbcUpdater extends BaseStateUpdater<JdbcState> { + + @Override + public void updateState(JdbcState jdbcState, List<TridentTuple> tuples, TridentCollector collector) { + jdbcState.updateState(tuples, collector); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java new file mode 100644 index 0000000..432d9f8 --- /dev/null +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.common; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Date; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class JdbcClientTest { + + private JDBCClient client; + + private static final String tableName = "user_details"; + @Before + public void setup() { + Map map = Maps.newHashMap(); + map.put("dataSourceClassName","org.hsqldb.jdbc.JDBCDataSource");//com.mysql.jdbc.jdbc2.optional.MysqlDataSource + map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test + map.put("dataSource.user","SA");//root + map.put("dataSource.password","");//password + + this.client = new JDBCClient(map); + client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)"); + } + + @Test + public void testInsertAndSelect() { + int id = 1; + String name = "bob"; + Date createDate = new Date(System.currentTimeMillis()); + + List<Column> columns = Lists.newArrayList( + new Column("id",id, Types.INTEGER), + new Column("user_name",name, Types.VARCHAR), + new Column("create_date", createDate , Types.DATE) + ); + + List<List<Column>> columnList = new ArrayList<List<Column>>(); + columnList.add(columns); + client.insert(tableName, columnList); + + List<List<Column>> rows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id, Types.INTEGER))); + for(List<Column> row : rows) { + for(Column column : row) { + if(column.getColumnName().equalsIgnoreCase("id")) { + Assert.assertEquals(id, column.getVal()); + } else if(column.getColumnName().equalsIgnoreCase("user_name")) { + Assert.assertEquals(name, column.getVal()); + } else if(column.getColumnName().equalsIgnoreCase("create_date")) { + Assert.assertEquals(createDate.toString(), column.getVal().toString()); + } else { + throw new AssertionError("Unknown column" + column); + } + } + } + } + + @After + public void cleanup() { + client.executeSql("drop table " + tableName); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java new file mode 100644 index 0000000..39fde59 --- /dev/null +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.spout; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import com.google.common.collect.Lists; + +import java.util.*; + +public class UserSpout implements IRichSpout { + boolean isDistributed; + SpoutOutputCollector collector; + public static final List<Values> rows = Lists.newArrayList( + new Values(1,"peter",System.currentTimeMillis()), + new Values(2,"bob",System.currentTimeMillis()), + new Values(3,"alice",System.currentTimeMillis())); + + public UserSpout() { + this(true); + } + + public UserSpout(boolean isDistributed) { + this.isDistributed = isDistributed; + } + + public boolean isDistributed() { + return this.isDistributed; + } + + @SuppressWarnings("rawtypes") + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + public void close() { + + } + + public void nextTuple() { + final Random rand = new Random(); + final Values row = rows.get(rand.nextInt(rows.size() - 1)); + this.collector.emit(row); + Thread.yield(); + } + + public void ack(Object msgId) { + + } + + public void fail(Object msgId) { + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id","user_name","create_date")); + } + + @Override + public void activate() { + } + + @Override + public void deactivate() { + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java new file mode 100644 index 0000000..21e4639 --- /dev/null +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.topology; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.topology.TopologyBuilder; +import com.google.common.collect.Maps; +import org.apache.storm.jdbc.bolt.JdbcBolt; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; +import org.apache.storm.jdbc.spout.UserSpout; + +import java.util.Map; + + +public class UserPersistanceTopology { + private static final String USER_SPOUT = "USER_SPOUT"; + private static final String USER_BOLT = "USER_BOLT"; + + public static void main(String[] args) throws Exception { + if(args.length < 5) { + System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " + + "<user> <password> <tableName> [topology name]"); + } + Map map = Maps.newHashMap(); + map.put("dataSourceClassName",args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource + map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test + map.put("dataSource.user",args[2]);//root + map.put("dataSource.password",args[3]);//password + String tableName = args[4];//database table name + JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map); + + Config config = new Config(); + + config.put("jdbc.conf", map); + + UserSpout spout = new UserSpout(); + JdbcBolt bolt = new JdbcBolt(tableName, jdbcMapper) + .withConfigKey("jdbc.conf"); + + // userSpout ==> jdbcBolt + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(USER_SPOUT, spout, 1); + builder.setBolt(USER_BOLT, bolt, 1).shuffleGrouping(USER_SPOUT); + + if (args.length == 5) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, builder.createTopology()); + Thread.sleep(30000); + cluster.killTopology("test"); + cluster.shutdown(); + System.exit(0); + } else if (args.length == 6) { + StormSubmitter.submitTopology(args[6], config, builder.createTopology()); + } else { + System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " + + "<user> <password> <tableName> [topology name]"); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java new file mode 100644 index 0000000..3b2ee66 --- /dev/null +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.topology; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.tuple.Fields; +import com.google.common.collect.Maps; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; +import org.apache.storm.jdbc.spout.UserSpout; +import org.apache.storm.jdbc.trident.state.JdbcState; +import org.apache.storm.jdbc.trident.state.JdbcStateFactory; +import org.apache.storm.jdbc.trident.state.JdbcUpdater; +import storm.trident.Stream; +import storm.trident.TridentTopology; + +import java.util.Map; + +public class UserPersistanceTridentTopology { + + public static void main(String[] args) throws Exception { + Map map = Maps.newHashMap(); + map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource + map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test + map.put("dataSource.user",args[2]);//root + map.put("dataSource.password",args[3]);//password + String tableName = args[4];//database table name + JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map); + + Config config = new Config(); + + config.put("jdbc.conf", map); + + TridentTopology topology = new TridentTopology(); + Stream stream = topology.newStream("userSpout", new UserSpout()); + + JdbcState.Options options = new JdbcState.Options() + .withConfigKey("jdbc.conf") + .withMapper(jdbcMapper) + .withTableName("user"); + + JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); + stream.partitionPersist(jdbcStateFactory, new Fields("id","user_name","create_date"), new JdbcUpdater(), new Fields()); + if (args.length == 5) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, topology.build()); + Thread.sleep(30000); + cluster.killTopology("test"); + cluster.shutdown(); + System.exit(0); + } else if (args.length == 6) { + StormSubmitter.submitTopology(args[6], config, topology.build()); + } else { + System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " + + "<user> <password> <tableName> [topology name]"); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/sql/test.sql ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/sql/test.sql b/external/storm-jdbc/src/test/sql/test.sql new file mode 100644 index 0000000..a402a68 --- /dev/null +++ b/external/storm-jdbc/src/test/sql/test.sql @@ -0,0 +1 @@ +create table user_details (id integer, user_name varchar(100), create_date date); \ No newline at end of file
