STORM-616: adding jdbc Lookup bolt.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bfa6028 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bfa6028 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bfa6028 Branch: refs/heads/master Commit: 8bfa602876f84985864f136ee578bd2d9edb9ba7 Parents: cd96dd0 Author: Parth Brahmbhatt <[email protected]> Authored: Wed Jan 7 17:57:35 2015 -0500 Committer: Parth Brahmbhatt <[email protected]> Committed: Wed Jan 7 18:11:49 2015 -0500 ---------------------------------------------------------------------- external/storm-jdbc/README.md | 148 +++++++++++++++---- .../storm/jdbc/bolt/AbstractJdbcBolt.java | 10 +- .../org/apache/storm/jdbc/bolt/JdbcBolt.java | 18 ++- .../org/apache/storm/jdbc/common/Column.java | 8 +- .../apache/storm/jdbc/common/JDBCClient.java | 54 ++++--- .../storm/jdbc/mapper/JdbcLookupMapper.java | 26 ++++ .../jdbc/mapper/SimpleJdbcLookupMapper.java | 46 ++++++ .../storm/jdbc/mapper/SimpleJdbcMapper.java | 14 +- .../storm/jdbc/trident/state/JdbcQuery.java | 40 +++++ .../storm/jdbc/trident/state/JdbcState.java | 33 +++++ .../org/apache/storm/jdbc/spout/UserSpout.java | 2 +- .../jdbc/topology/AbstractUserTopology.java | 102 +++++++++++++ .../jdbc/topology/UserPersistanceTopology.java | 64 +++----- .../UserPersistanceTridentTopology.java | 61 +++----- 14 files changed, 466 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/README.md ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index a0273f2..bb43687 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -1,10 +1,11 @@ -#Storm HBase +#Storm JDBC +Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology +to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples +in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP. -Storm/Trident integration for JDBC. - -## Usage -The main API for interacting with JDBC is the `org.apache.storm.jdbc.mapper.TupleToColumnMapper` -interface: +## Inserting into a database. +The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table. +The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface: ```java public interface JdbcMapper extends Serializable { @@ -16,7 +17,7 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re ### SimpleJdbcMapper `storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm -tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has fields with same name as the column name in +tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in the database table that you intend to write to. To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map. @@ -25,8 +26,8 @@ The following code creates a `SimpleJdbcMapper` instance that: 1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details. 2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and -automatically figure out the column names of the table that you intend to write to. -Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to lear more about hikari configuration properties. +automatically figure out the column names and corresponding data types of the table that you intend to write to. +Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties. ```java Map hikariConfigMap = Maps.newHashMap(); @@ -35,49 +36,138 @@ hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test"); hikariConfigMap.put("dataSource.user","root"); hikariConfigMap.put("dataSource.password","password"); String tableName = "user_details"; -JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map); +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map); +``` +The mapper initialized in the example above assumes a storm tuple has value for all the columns. +If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values +and you want to only insert values for columns with no default values you can enforce the behavior by initializing the +`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table +`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);` +In this table the create_time column has a default value. To ensure only the columns with no default values are inserted +you can initialize the `jdbcMapper` as below: + +```java +List<Column> columnSchema = Lists.newArrayList( + new Column("user_id", java.sql.Types.INTEGER), + new Column("user_name", java.sql.Types.VARCHAR)); + JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); ``` + ### JdbcBolt -To use the `JdbcBolt`, construct it with the name of the table to write to, and a `JdbcMapper` implementation. In addition -you must specify a configuration key that hold the hikari configuration map. +To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map. +In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which +the rows will be inserted. ```java Config config = new Config(); config.put("jdbc.conf", hikariConfigMap); - -JdbcBolt bolt = new JdbcBolt("user_details", jdbcMapper) - .withConfigKey("jdbc.conf"); +JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf") + .withTableName("user_details") + .withJdbcMapper(simpleJdbcMapper); ``` ### JdbcTridentState We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident -state you need to initialize it with the table name, the JdbcMapper instance and hikari configuration. See the example -below: +state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the +hikari configuration map. See the example below: ```java JdbcState.Options options = new JdbcState.Options() .withConfigKey("jdbc.conf") .withMapper(jdbcMapper) - .withTableName("user"); + .withTableName("user_details"); JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); ``` - -## Example: Persistent User details -A runnable example can be found in the `src/test/java/topology` directory. -### Setup -* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration. -* Start the database and login to the database. -* Create table user using the following query: +## Lookup from Database +We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for +executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface: +```java + void declareOutputFields(OutputFieldsDeclarer declarer); + List<Column> getColumns(ITuple tuple); + public List<Values> toTuple(ITuple input, List<Column> columns); ``` -> use test; -> create table user (id integer, user_name varchar(100), create_date date); + +The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm +tuple. +The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use. +For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where +user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items. +The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the +second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the +returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select +query, the second item to second `?` in query and so on. +The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query +and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance +of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples. + +###SimpleJdbcLookupMapper +`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. + +To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of +columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper` +that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query. +SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example +`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the +select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, +and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. +So in the example below if the input tuple had fields `user_id, create_date` and the select query was +`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` +will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. +For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as +is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple. + +```java +Fields outputFields = new Fields("user_id", "user_name", "create_date"); +List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER)); +this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); ``` +### JdbcLookupBolt +To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map. +In addition you must specify the `JdbcLookupMapper` and the select query to execute. + +```java +JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf") + .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns)) + .withSelectSql("select user_name from user_details where user_id = ?") +``` + +### JdbcTridentState for lookup +We also support a trident query state that can be used with trident topologies. + +```java +JdbcState.Options options = new JdbcState.Options() + .withConfigKey("jdbc.conf") + .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER)))) + .withSelectQuery("select user_name from user_details where user_id = ?"); +``` + +## Example: +A runnable example can be found in the `src/test/java/topology` directory. + +### Setup +* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration. +* The test topologies executes the following queries so your intended DB must support these queries for test topologies +to work. +```SQL +create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date); +create table if not exists department (dept_id integer, dept_name varchar(100)); +create table if not exists user_department (user_id integer, dept_id integer); +insert into department values (1, 'R&D'); +insert into department values (2, 'Finance'); +insert into department values (3, 'HR'); +insert into department values (4, 'Sales'); +insert into user_department values (1, 1); +insert into user_department values (2, 2); +insert into user_department values (3, 3); +insert into user_department values (4, 4); +select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?; +``` ### Execution Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args -storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> <tableName> [topology name] +storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name] Mysql Example: ``` @@ -86,7 +176,7 @@ org.apache.storm.jdbc.topology.UserPersistanceTridentTopology com.mysql.jdbc.jd jdbc:mysql://localhost/test root password user UserPersistenceTopology ``` -You can execute a select query against the user table which shoule show newly inserted rows: +You can execute a select query against the user table which should show newly inserted rows: ``` select * from user; http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java index 8dacc2d..1e717eb 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -34,15 +34,11 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt { protected OutputCollector collector; protected transient JDBCClient jdbcClient; - protected String tableName; - protected JdbcMapper mapper; protected String configKey; - public AbstractJdbcBolt(String tableName, JdbcMapper mapper) { - Validate.notEmpty(tableName, "Table name can not be blank or null"); - Validate.notNull(mapper, "mapper can not be null"); - this.tableName = tableName; - this.mapper = mapper; + public AbstractJdbcBolt(String configKey) { + Validate.notEmpty(configKey, "configKey can not be null"); + this.configKey = configKey; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java index e5df1ae..d4ddfcb 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java @@ -51,21 +51,27 @@ import java.util.List; public class JdbcBolt extends AbstractJdbcBolt { private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class); - boolean writeToWAL = true; + private String tableName; + private JdbcMapper jdbcMapper; - public JdbcBolt(String tableName, JdbcMapper mapper) { - super(tableName, mapper); + public JdbcBolt(String configKey) { + super(configKey); } - public JdbcBolt withConfigKey(String configKey) { - this.configKey = configKey; + public JdbcBolt withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) { + this.jdbcMapper = jdbcMapper; return this; } @Override public void execute(Tuple tuple) { try { - List<Column> columns = mapper.getColumns(tuple); + List<Column> columns = jdbcMapper.getColumns(tuple); List<List<Column>> columnLists = new ArrayList<List<Column>>(); columnLists.add(columns); this.jdbcClient.insert(this.tableName, columnLists); http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java index 0346bf7..4c5b37d 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java @@ -18,13 +18,14 @@ package org.apache.storm.jdbc.common; +import java.io.Serializable; import java.lang.reflect.Field; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; -public class Column<T> { +public class Column<T> implements Serializable { private String columnName; private T val; @@ -36,6 +37,11 @@ public class Column<T> { this.sqlType = sqlType; } + public Column(String columnName, int sqlType) { + this.columnName = columnName; + this.sqlType = sqlType; + } + public String getColumnName() { return columnName; } http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java index 5b63d2d..410c884 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java @@ -81,7 +81,6 @@ public class JDBCClient { public List<List<Column>> select(String sqlQuery, List<Column> queryParams) { Connection connection = null; - Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>(); try { connection = this.dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery); @@ -95,29 +94,28 @@ public class JDBCClient { for(int i=1 ; i <= columnCount; i++) { String columnLabel = metaData.getColumnLabel(i); int columnType = metaData.getColumnType(i); - Object val = null; Class columnJavaType = Util.getJavaType(columnType); - if (columnJavaType == String.class) { + if (columnJavaType.equals(String.class)) { row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType)); - } else if (columnJavaType == Integer.class) { + } else if (columnJavaType.equals(Integer.class)) { row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType)); - } else if (columnJavaType == Double.class) { + } else if (columnJavaType.equals(Double.class)) { row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType)); - } else if (columnJavaType == Float.class) { + } else if (columnJavaType.equals(Float.class)) { row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType)); - } else if (columnJavaType == Short.class) { + } else if (columnJavaType.equals(Short.class)) { row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType)); - } else if (columnJavaType == Boolean.class) { + } else if (columnJavaType.equals(Boolean.class)) { row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType)); - } else if (columnJavaType == byte[].class) { + } else if (columnJavaType.equals(byte[].class)) { row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType)); - } else if (columnJavaType == Long.class) { + } else if (columnJavaType.equals(Long.class)) { row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType)); - } else if (columnJavaType == Date.class) { + } else if (columnJavaType.equals(Date.class)) { row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType)); - } else if (columnJavaType == Time.class) { + } else if (columnJavaType.equals(Time.class)) { row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType)); - } else if (columnJavaType == Timestamp.class) { + } else if (columnJavaType.equals(Timestamp.class)) { row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType)); } else { throw new RuntimeException("type = " + columnType + " for column " + columnLabel + " not supported."); @@ -133,17 +131,17 @@ public class JDBCClient { } } - public Map<String, Integer> getColumnSchema(String tableName) { + public List<Column> getColumnSchema(String tableName) { Connection connection = null; - Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>(); + List<Column> columns = new ArrayList<Column>(); try { connection = this.dataSource.getConnection(); DatabaseMetaData metaData = connection.getMetaData(); ResultSet resultSet = metaData.getColumns(null, null, tableName, null); while (resultSet.next()) { - columnSchemaMap.put(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE")); + columns.add(new Column(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE"))); } - return columnSchemaMap; + return columns; } catch (SQLException e) { throw new RuntimeException("Failed to get schema for table " + tableName, e); } finally { @@ -170,27 +168,27 @@ public class JDBCClient { Class columnJavaType = Util.getJavaType(column.getSqlType()); if (column.getVal() == null) { preparedStatement.setNull(index, column.getSqlType()); - } else if (columnJavaType == String.class) { + } else if (columnJavaType.equals(String.class)) { preparedStatement.setString(index, (String) column.getVal()); - } else if (columnJavaType == Integer.class) { + } else if (columnJavaType.equals(Integer.class)) { preparedStatement.setInt(index, (Integer) column.getVal()); - } else if (columnJavaType == Double.class) { + } else if (columnJavaType.equals(Double.class)) { preparedStatement.setDouble(index, (Double) column.getVal()); - } else if (columnJavaType == Float.class) { + } else if (columnJavaType.equals(Float.class)) { preparedStatement.setFloat(index, (Float) column.getVal()); - } else if (columnJavaType == Short.class) { + } else if (columnJavaType.equals(Short.class)) { preparedStatement.setShort(index, (Short) column.getVal()); - } else if (columnJavaType == Boolean.class) { + } else if (columnJavaType.equals(Boolean.class)) { preparedStatement.setBoolean(index, (Boolean) column.getVal()); - } else if (columnJavaType == byte[].class) { + } else if (columnJavaType.equals(byte[].class)) { preparedStatement.setBytes(index, (byte[]) column.getVal()); - } else if (columnJavaType == Long.class) { + } else if (columnJavaType.equals(Long.class)) { preparedStatement.setLong(index, (Long) column.getVal()); - } else if (columnJavaType == Date.class) { + } else if (columnJavaType.equals(Date.class)) { preparedStatement.setDate(index, (Date) column.getVal()); - } else if (columnJavaType == Time.class) { + } else if (columnJavaType.equals(Time.class)) { preparedStatement.setTime(index, (Time) column.getVal()); - } else if (columnJavaType == Timestamp.class) { + } else if (columnJavaType.equals(Timestamp.class)) { preparedStatement.setTimestamp(index, (Timestamp) column.getVal()); } else { throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName()); http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java new file mode 100644 index 0000000..77852f4 --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java @@ -0,0 +1,26 @@ +package org.apache.storm.jdbc.mapper; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.ITuple; +import backtype.storm.tuple.Values; +import org.apache.storm.jdbc.common.Column; + +import java.util.List; + +public interface JdbcLookupMapper extends JdbcMapper { + + /** + * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single + * storm input tuple and a single DB row to result in multiple output values. + * @param input the input tuple. + * @param columns list of columns that represents a row + * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple. + */ + public List<Values> toTuple(ITuple input, List<Column> columns); + + /** + * declare what are the fields that this code will output. + * @param declarer + */ + void declareOutputFields(OutputFieldsDeclarer declarer); +} http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java new file mode 100644 index 0000000..e2a7e8c --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java @@ -0,0 +1,46 @@ +package org.apache.storm.jdbc.mapper; + + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.ITuple; +import backtype.storm.tuple.Values; +import org.apache.storm.jdbc.common.Column; + +import java.util.ArrayList; +import java.util.List; + +public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLookupMapper { + + private Fields outputFields; + + public SimpleJdbcLookupMapper(Fields outputFields, List<Column> queryColumns) { + super(queryColumns); + this.outputFields = outputFields; + } + + @Override + public List<Values> toTuple(ITuple input, List<Column> columns) { + Values values = new Values(); + + for(String field : outputFields) { + if(input.contains(field)) { + values.add(input.getValueByField(field)); + } else { + for(Column column : columns) { + if(column.getColumnName().equals(field)) { + values.add(column.getVal()); + } + } + } + } + List<Values> result = new ArrayList<Values>(); + result.add(values); + return result; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(outputFields); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java index 7011a72..df25695 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java @@ -31,19 +31,23 @@ import java.util.Map; public class SimpleJdbcMapper implements JdbcMapper { - private Map<String, Integer> columnNameToType; + private List<Column> schemaColumns; public SimpleJdbcMapper(String tableName, Map map) { JDBCClient client = new JDBCClient(map); - this.columnNameToType = client.getColumnSchema(tableName); + this.schemaColumns = client.getColumnSchema(tableName); + } + + public SimpleJdbcMapper(List<Column> schemaColumns) { + this.schemaColumns = schemaColumns; } @Override public List<Column> getColumns(ITuple tuple) { List<Column> columns = new ArrayList<Column>(); - for(Map.Entry<String, Integer> entry: columnNameToType.entrySet()) { - String columnName = entry.getKey(); - Integer columnSqlType = entry.getValue(); + for(Column column : schemaColumns) { + String columnName = column.getColumnName(); + Integer columnSqlType = column.getSqlType(); if(Util.getJavaType(columnSqlType).equals(String.class)) { String value = tuple.getStringByField(columnName); http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java new file mode 100644 index 0000000..ad39f4b --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.trident.state; + +import backtype.storm.tuple.Values; +import storm.trident.operation.TridentCollector; +import storm.trident.state.BaseQueryFunction; +import storm.trident.tuple.TridentTuple; + +import java.util.List; + +public class JdbcQuery extends BaseQueryFunction<JdbcState, List<Values>> { + + @Override + public List<List<Values>> batchRetrieve(JdbcState jdbcState, List<TridentTuple> tridentTuples) { + return jdbcState.batchRetrieve(tridentTuples); + } + + @Override + public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) { + for (Values value : values) { + tridentCollector.emit(value); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java index fec2ee4..6b4e79a 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java @@ -18,10 +18,13 @@ package org.apache.storm.jdbc.trident.state; import backtype.storm.topology.FailedException; +import backtype.storm.tuple.Values; +import com.google.common.collect.Lists; import org.apache.commons.lang.Validate; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.JDBCClient; import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.jdbc.mapper.JdbcLookupMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.trident.operation.TridentCollector; @@ -48,8 +51,10 @@ public class JdbcState implements State { public static class Options implements Serializable { private JdbcMapper mapper; + private JdbcLookupMapper jdbcLookupMapper; private String configKey; private String tableName; + private String selectQuery; public Options withConfigKey(String configKey) { this.configKey = configKey; @@ -65,6 +70,16 @@ public class JdbcState implements State { this.mapper = mapper; return this; } + + public Options withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) { + this.jdbcLookupMapper = jdbcLookupMapper; + return this; + } + + public Options withSelectQuery(String selectQuery) { + this.selectQuery = selectQuery; + return this; + } } protected void prepare() { @@ -98,4 +113,22 @@ public class JdbcState implements State { throw new FailedException(e); } } + + public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) { + List<List<Values>> batchRetrieveResult = Lists.newArrayList(); + try { + for (TridentTuple tuple : tridentTuples) { + List<Column> columns = options.jdbcLookupMapper.getColumns(tuple); + List<List<Column>> rows = jdbcClient.select(options.selectQuery, columns); + for(List<Column> row : rows) { + List<Values> values = options.jdbcLookupMapper.toTuple(tuple, row); + batchRetrieveResult.add(values); + } + } + } catch (Exception e) { + LOG.warn("Batch get operation failed. Triggering replay.", e); + throw new FailedException(e); + } + return batchRetrieveResult; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java index 39fde59..718917a 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java @@ -72,7 +72,7 @@ public class UserSpout implements IRichSpout { } public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id","user_name","create_date")); + declarer.declare(new Fields("user_id","user_name","create_date")); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java new file mode 100644 index 0000000..700f83e --- /dev/null +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jdbc.topology; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; +import backtype.storm.tuple.Fields; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.JDBCClient; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.jdbc.mapper.JdbcLookupMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; +import org.apache.storm.jdbc.spout.UserSpout; +import backtype.storm.LocalCluster; + +import java.sql.Types; +import java.util.List; +import java.util.Map; + +public abstract class AbstractUserTopology { + private static final List<String> setupSqls = Lists.newArrayList( + "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)", + "create table if not exists department (dept_id integer, dept_name varchar(100))", + "create table if not exists user_department (user_id integer, dept_id integer)", + "insert into department values (1, 'R&D')", + "insert into department values (2, 'Finance')", + "insert into department values (3, 'HR')", + "insert into department values (4, 'Sales')", + "insert into user_department values (1, 1)", + "insert into user_department values (2, 2)", + "insert into user_department values (3, 3)", + "insert into user_department values (4, 4)" + ); + protected UserSpout userSpout; + protected JdbcMapper jdbcMapper; + protected JdbcLookupMapper jdbcLookupMapper; + + protected static final String TABLE_NAME = "user"; + protected static final String JDBC_CONF = "jdbc.conf"; + protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" + + " and user_department.user_id = ?"; + + public void execute(String[] args) throws Exception { + if (args.length != 4 && args.length != 5) { + System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> " + + "<user> <password> [topology name]"); + System.exit(-1); + } + Map map = Maps.newHashMap(); + map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource + map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test + map.put("dataSource.user", args[2]);//root + map.put("dataSource.password", args[3]);//password + + Config config = new Config(); + config.put(JDBC_CONF, map); + + JDBCClient jdbcClient = new JDBCClient(map); + for (String sql : setupSqls) { + jdbcClient.executeSql(sql); + } + + this.userSpout = new UserSpout(); + this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map); + Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date"); + List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER)); + this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); + + if (args.length == 4) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, getTopology()); + Thread.sleep(30000); + cluster.killTopology("test"); + cluster.shutdown(); + System.exit(0); + } else { + StormSubmitter.submitTopology(args[5], config, getTopology()); + } + } + + public abstract StormTopology getTopology(); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java index 21e4639..26a00aa 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java @@ -17,62 +17,36 @@ */ package org.apache.storm.jdbc.topology; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; -import com.google.common.collect.Maps; import org.apache.storm.jdbc.bolt.JdbcBolt; -import org.apache.storm.jdbc.mapper.JdbcMapper; -import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; -import org.apache.storm.jdbc.spout.UserSpout; +import org.apache.storm.jdbc.bolt.JdbcLookupBolt; -import java.util.Map; - -public class UserPersistanceTopology { +public class UserPersistanceTopology extends AbstractUserTopology { private static final String USER_SPOUT = "USER_SPOUT"; - private static final String USER_BOLT = "USER_BOLT"; + private static final String LOOKUP_BOLT = "LOOKUP_BOLT"; + private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT"; public static void main(String[] args) throws Exception { - if(args.length < 5) { - System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " + - "<user> <password> <tableName> [topology name]"); - } - Map map = Maps.newHashMap(); - map.put("dataSourceClassName",args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource - map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test - map.put("dataSource.user",args[2]);//root - map.put("dataSource.password",args[3]);//password - String tableName = args[4];//database table name - JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map); - - Config config = new Config(); - - config.put("jdbc.conf", map); + new UserPersistanceTopology().execute(args); + } - UserSpout spout = new UserSpout(); - JdbcBolt bolt = new JdbcBolt(tableName, jdbcMapper) - .withConfigKey("jdbc.conf"); + @Override + public StormTopology getTopology() { + JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF) + .withJdbcLookupMapper(this.jdbcLookupMapper) + .withSelectSql(SELECT_QUERY); + JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF) + .withTableName(TABLE_NAME) + .withJdbcMapper(this.jdbcMapper); // userSpout ==> jdbcBolt TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(USER_SPOUT, spout, 1); - builder.setBolt(USER_BOLT, bolt, 1).shuffleGrouping(USER_SPOUT); - - if (args.length == 5) { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", config, builder.createTopology()); - Thread.sleep(30000); - cluster.killTopology("test"); - cluster.shutdown(); - System.exit(0); - } else if (args.length == 6) { - StormSubmitter.submitTopology(args[6], config, builder.createTopology()); - } else { - System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " + - "<user> <password> <tableName> [topology name]"); - } + builder.setSpout(USER_SPOUT, this.userSpout, 1); + builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT); + builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT); + return builder.createTopology(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java index 3b2ee66..2cf3403 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java @@ -17,60 +17,45 @@ */ package org.apache.storm.jdbc.topology; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; -import com.google.common.collect.Maps; -import org.apache.storm.jdbc.mapper.JdbcMapper; -import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; +import com.google.common.collect.Lists; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; import org.apache.storm.jdbc.spout.UserSpout; +import org.apache.storm.jdbc.trident.state.JdbcQuery; import org.apache.storm.jdbc.trident.state.JdbcState; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.jdbc.trident.state.JdbcUpdater; import storm.trident.Stream; +import storm.trident.TridentState; import storm.trident.TridentTopology; -import java.util.Map; +import java.sql.Types; -public class UserPersistanceTridentTopology { +public class UserPersistanceTridentTopology extends AbstractUserTopology { public static void main(String[] args) throws Exception { - Map map = Maps.newHashMap(); - map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource - map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test - map.put("dataSource.user",args[2]);//root - map.put("dataSource.password",args[3]);//password - String tableName = args[4];//database table name - JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map); - - Config config = new Config(); - - config.put("jdbc.conf", map); + new UserPersistanceTridentTopology().execute(args); + } + @Override + public StormTopology getTopology() { TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("userSpout", new UserSpout()); JdbcState.Options options = new JdbcState.Options() - .withConfigKey("jdbc.conf") - .withMapper(jdbcMapper) - .withTableName("user"); + .withConfigKey(JDBC_CONF) + .withMapper(this.jdbcMapper) + .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER)))) + .withTableName(TABLE_NAME) + .withSelectQuery(SELECT_QUERY); JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); - stream.partitionPersist(jdbcStateFactory, new Fields("id","user_name","create_date"), new JdbcUpdater(), new Fields()); - if (args.length == 5) { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", config, topology.build()); - Thread.sleep(30000); - cluster.killTopology("test"); - cluster.shutdown(); - System.exit(0); - } else if (args.length == 6) { - StormSubmitter.submitTopology(args[6], config, topology.build()); - } else { - System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " + - "<user> <password> <tableName> [topology name]"); - } - } + Stream stream = topology.newStream("userSpout", new UserSpout()); + TridentState state = topology.newStaticState(jdbcStateFactory); + stream = stream.stateQuery(state, new Fields("user_id","user_name","create_date"), new JdbcQuery(), new Fields("dept_name")); + stream.partitionPersist(jdbcStateFactory, new Fields("user_id","user_name","dept_name","create_date"), new JdbcUpdater(), new Fields()); + return topology.build(); + } }
