STORM-616: Making all the required params part of constructor args. changing executeUpdate to executeBatch and added test case.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04fccb1b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04fccb1b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04fccb1b Branch: refs/heads/master Commit: 04fccb1b152bdd3454adcfda8c4d71502ad2c6db Parents: 017360b Author: Parth Brahmbhatt <[email protected]> Authored: Tue Feb 10 15:17:39 2015 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Wed Feb 18 12:21:21 2015 -0800 ---------------------------------------------------------------------- external/storm-jdbc/README.md | 13 ++--- .../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 10 +--- .../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 12 +---- .../apache/storm/jdbc/common/JdbcClient.java | 25 +++++++-- .../storm/jdbc/mapper/JdbcLookupMapper.java | 2 +- .../storm/jdbc/mapper/SimpleJdbcMapper.java | 4 +- .../storm/jdbc/common/JdbcClientTest.java | 55 ++++++++++---------- .../jdbc/topology/AbstractUserTopology.java | 3 ++ .../jdbc/topology/UserPersistanceTopology.java | 8 +-- 9 files changed, 64 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/README.md ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index cfe449d..6fb1d41 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -63,9 +63,7 @@ to be <= topology.message.timeout.secs. ```java Config config = new Config(); config.put("jdbc.conf", hikariConfigMap); -JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf") - .withTableName("user_details") - .withJdbcMapper(simpleJdbcMapper) +JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf","user_details",simpleJdbcMapper) .withQueryTimeoutSecs(30); ``` ### JdbcTridentState @@ -135,9 +133,9 @@ You can optionally specify a query timeout seconds param that specifies max seco The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs. ```java -JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf") - .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns)) - .withSelectSql("select user_name from user_details where user_id = ?") +String selectSql = "select user_name from user_details where user_id = ?"; +SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns) +JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper) .withQueryTimeoutSecs(30); ``` @@ -208,8 +206,7 @@ mvn clean compile assembly:single. Mysql Example: ``` -storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar -org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology +storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology ``` You can execute a select query against the user table which should show newly inserted rows: http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java index 9abd553..f7be7ad 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java @@ -37,18 +37,10 @@ public class JdbcInsertBolt extends AbstractJdbcBolt { private String tableName; private JdbcMapper jdbcMapper; - public JdbcInsertBolt(String configKey) { + public JdbcInsertBolt(String configKey, String tableName, JdbcMapper jdbcMapper) { super(configKey); - } - - public JdbcInsertBolt withTableName(String tableName) { this.tableName = tableName; - return this; - } - - public JdbcInsertBolt withJdbcMapper(JdbcMapper jdbcMapper) { this.jdbcMapper = jdbcMapper; - return this; } public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) { http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java index 8232c2f..e1b1553 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java @@ -37,18 +37,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt { private JdbcLookupMapper jdbcLookupMapper; - public JdbcLookupBolt(String configKey) { + public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) { super(configKey); - } - - public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) { - this.jdbcLookupMapper = jdbcLookupMapper; - return this; - } - - public JdbcLookupBolt withSelectSql(String selectQuery) { this.selectQuery = selectQuery; - return this; + this.jdbcLookupMapper = jdbcLookupMapper; } public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) { http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java index 4992ed7..4ad108c 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java @@ -37,18 +37,23 @@ public class JdbcClient { private HikariDataSource dataSource; private int queryTimeoutSecs; - public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) { + public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) { Properties properties = new Properties(); - properties.putAll(map); + properties.putAll(hikariConfigMap); HikariConfig config = new HikariConfig(properties); this.dataSource = new HikariDataSource(config); this.queryTimeoutSecs = queryTimeoutSecs; } - public int insert(String tableName, List<List<Column>> columnLists) { + public void insert(String tableName, List<List<Column>> columnLists) { Connection connection = null; try { connection = this.dataSource.getConnection(); + boolean autoCommit = connection.getAutoCommit(); + if(autoCommit) { + connection.setAutoCommit(false); + } + StringBuilder sb = new StringBuilder(); sb.append("Insert into ").append(tableName).append(" ("); Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() { @@ -67,14 +72,24 @@ public class JdbcClient { LOG.debug("Executing query {}", query); - PreparedStatement preparedStatement = connection.prepareStatement(query); preparedStatement.setQueryTimeout(queryTimeoutSecs); for(List<Column> columnList : columnLists) { setPreparedStatementParams(preparedStatement, columnList); + preparedStatement.addBatch(); } - return preparedStatement.executeUpdate(); + int[] results = preparedStatement.executeBatch(); + if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) { + connection.rollback(); + throw new RuntimeException("failed at least one sql statement in the batch, operation rolled back."); + } else { + try { + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed to commit inserts in table " + tableName, e); + } + } } catch (SQLException e) { throw new RuntimeException("Failed to insert in table " + tableName, e); } finally { http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java index 77852f4..f8c79a3 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java @@ -10,7 +10,7 @@ import java.util.List; public interface JdbcLookupMapper extends JdbcMapper { /** - * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single + * Converts a DB row to a list of storm values that can be emitted. This is done to allow a single * storm input tuple and a single DB row to result in multiple output values. * @param input the input tuple. * @param columns list of columns that represents a row http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java index ad7f1c0..841d5d6 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java @@ -33,9 +33,9 @@ public class SimpleJdbcMapper implements JdbcMapper { private List<Column> schemaColumns; - public SimpleJdbcMapper(String tableName, Map map) { + public SimpleJdbcMapper(String tableName, Map hikariConfigurationMap) { int queryTimeoutSecs = 30; - JdbcClient client = new JdbcClient(map, queryTimeoutSecs); + JdbcClient client = new JdbcClient(hikariConfigurationMap, queryTimeoutSecs); this.schemaColumns = client.getColumnSchema(tableName); } http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java index 6423e8f..787b887 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java @@ -24,7 +24,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.sql.Date; +import java.sql.Timestamp; +import java.util.Date; import java.sql.Types; import java.util.ArrayList; import java.util.List; @@ -45,39 +46,39 @@ public class JdbcClientTest { int queryTimeoutSecs = 60; this.client = new JdbcClient(map, queryTimeoutSecs); - client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)"); + client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)"); } @Test public void testInsertAndSelect() { - int id = 1; - String name = "bob"; - Date createDate = new Date(System.currentTimeMillis()); + int id1 = 1; + String name1 = "bob"; + Timestamp createDate1 = new Timestamp(System.currentTimeMillis()); - List<Column> columns = Lists.newArrayList( - new Column("id",id, Types.INTEGER), - new Column("user_name",name, Types.VARCHAR), - new Column("create_date", createDate , Types.DATE) - ); + List<Column> row1 = Lists.newArrayList( + new Column("ID",id1, Types.INTEGER), + new Column("USER_NAME",name1, Types.VARCHAR), + new Column("CREATED_TIMESTAMP", createDate1 , Types.TIMESTAMP)); - List<List<Column>> columnList = new ArrayList<List<Column>>(); - columnList.add(columns); - client.insert(tableName, columnList); + int id2 = 2; + String name2 = "alice"; + Timestamp createDate2 = new Timestamp(System.currentTimeMillis()); + List<Column> row2 = Lists.newArrayList( + new Column("ID",id2, Types.INTEGER), + new Column("USER_NAME",name2, Types.VARCHAR), + new Column("CREATED_TIMESTAMP", createDate2 , Types.TIMESTAMP)); - List<List<Column>> rows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id, Types.INTEGER))); - for(List<Column> row : rows) { - for(Column column : row) { - if(column.getColumnName().equalsIgnoreCase("id")) { - Assert.assertEquals(id, column.getVal()); - } else if(column.getColumnName().equalsIgnoreCase("user_name")) { - Assert.assertEquals(name, column.getVal()); - } else if(column.getColumnName().equalsIgnoreCase("create_date")) { - Assert.assertEquals(createDate.toString(), column.getVal().toString()); - } else { - throw new AssertionError("Unknown column" + column); - } - } - } + List<List<Column>> rows = Lists.newArrayList(row1, row2); + client.insert(tableName, rows); + + List<List<Column>> selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id1, Types.INTEGER))); + List<List<Column>> expectedRows = Lists.newArrayList(); + expectedRows.add(row1); + + Assert.assertEquals(expectedRows, selectedRows); + + selectedRows = client.select("select * from user_details order by id", Lists.<Column>newArrayList()); + Assert.assertEquals(rows, selectedRows); } @After http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java index 9cb0bfa..e94aca2 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java @@ -38,6 +38,9 @@ import java.util.Map; public abstract class AbstractUserTopology { private static final List<String> setupSqls = Lists.newArrayList( + "drop table if exists user", + "drop table if exists department", + "drop table if exists user_department", "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)", "create table if not exists department (dept_id integer, dept_name varchar(100))", "create table if not exists user_department (user_id integer, dept_id integer)", http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java index 32c012e..0b96f4d 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java @@ -34,12 +34,8 @@ public class UserPersistanceTopology extends AbstractUserTopology { @Override public StormTopology getTopology() { - JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF) - .withJdbcLookupMapper(this.jdbcLookupMapper) - .withSelectSql(SELECT_QUERY); - JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF) - .withTableName(TABLE_NAME) - .withJdbcMapper(this.jdbcMapper); + JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper); + JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, TABLE_NAME, this.jdbcMapper); // userSpout ==> jdbcBolt TopologyBuilder builder = new TopologyBuilder();
