storm-616: adding query timeout configuration.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d6c5ed3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d6c5ed3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d6c5ed3 Branch: refs/heads/master Commit: 2d6c5ed338872cd464f3d42c27864f7530d2da9c Parents: 1e0f623 Author: Parth Brahmbhatt <[email protected]> Authored: Wed Jan 14 14:59:14 2015 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Wed Jan 14 14:59:14 2015 -0800 ---------------------------------------------------------------------- external/storm-jdbc/README.md | 25 +++++++++++++------- .../storm/jdbc/bolt/AbstractJdbcBolt.java | 8 ++----- .../org/apache/storm/jdbc/bolt/JdbcBolt.java | 10 ++++++-- .../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 9 +++++-- .../apache/storm/jdbc/common/JDBCClient.java | 6 ++++- .../storm/jdbc/mapper/SimpleJdbcMapper.java | 3 ++- .../storm/jdbc/trident/state/JdbcState.java | 8 ++++++- .../storm/jdbc/common/JdbcClientTest.java | 3 ++- .../jdbc/topology/AbstractUserTopology.java | 3 ++- .../jdbc/topology/UserPersistanceTopology.java | 6 +++-- 10 files changed, 56 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/README.md ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index 4bb5e61..948ba23 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -54,16 +54,20 @@ List<Column> columnSchema = Lists.newArrayList( ``` ### JdbcBolt -To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map. -In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which -the rows will be inserted. +To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the +hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and +the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies +max seconds an insert query can take. The default is set to 30 seconds which is equal to topology.message.timeout.secs. +You should set this value to be <= topology.message.timeout.secs. ```java Config config = new Config(); config.put("jdbc.conf", hikariConfigMap); -JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf") +JdbcBolt userPersistanceBolt = new JdbcBolt() + .withConfigKey("jdbc.conf") .withTableName("user_details") - .withJdbcMapper(simpleJdbcMapper); + .withJdbcMapper(simpleJdbcMapper) + .withQueryTimeoutSecs(30); ``` ### JdbcTridentState We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident @@ -74,7 +78,8 @@ hikari configuration map. See the example below: JdbcState.Options options = new JdbcState.Options() .withConfigKey("jdbc.conf") .withMapper(jdbcMapper) - .withTableName("user_details"); + .withTableName("user_details") + .withQueryTimeoutSecs(30); JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); ``` @@ -125,13 +130,16 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum ``` ### JdbcLookupBolt -To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map. -In addition you must specify the `JdbcLookupMapper` and the select query to execute. +To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the +hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute. +You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. +The default is set to 30 seconds which is equal to topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs. ```java JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf") .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns)) .withSelectSql("select user_name from user_details where user_id = ?") + .withQueryTimeoutSecs(30); ``` ### JdbcTridentState for lookup @@ -142,6 +150,7 @@ JdbcState.Options options = new JdbcState.Options() .withConfigKey("jdbc.conf") .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER)))) .withSelectQuery("select user_name from user_details where user_id = ?"); + .withQueryTimeoutSecs(30); ``` ## Example: http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java index ae5a249..4b93d4d 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -34,11 +34,7 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt { protected transient JdbcClient jdbcClient; protected String configKey; - - public AbstractJdbcBolt(String configKey) { - Validate.notEmpty(configKey, "configKey can not be null"); - this.configKey = configKey; - } + protected int queryTimeoutSecs = 30; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { @@ -47,6 +43,6 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt { Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey); Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'"); - this.jdbcClient = new JdbcClient(conf); + this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs); } } http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java index fd27285..f4921f5 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java @@ -37,8 +37,9 @@ public class JdbcBolt extends AbstractJdbcBolt { private String tableName; private JdbcMapper jdbcMapper; - public JdbcBolt(String configKey) { - super(configKey); + public JdbcBolt withConfigKey(String configKey) { + this.configKey = configKey; + return this; } public JdbcBolt withTableName(String tableName) { @@ -51,6 +52,11 @@ public class JdbcBolt extends AbstractJdbcBolt { return this; } + public JdbcBolt withQueryTimeoutSecs(int queryTimeoutSecs) { + this.queryTimeoutSecs = queryTimeoutSecs; + return this; + } + @Override public void execute(Tuple tuple) { try { http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java index 7e548ff..041fbe8 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java @@ -37,8 +37,9 @@ public class JdbcLookupBolt extends AbstractJdbcBolt { private JdbcLookupMapper jdbcLookupMapper; - public JdbcLookupBolt(String configKey) { - super(configKey); + public JdbcLookupBolt withConfigKey(String configKey) { + this.configKey = configKey; + return this; } public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) { @@ -51,6 +52,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt { return this; } + public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) { + this.queryTimeoutSecs = queryTimeoutSecs; + return this; + } @Override public void execute(Tuple tuple) { http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java index ab3f8a7..d11d1b3 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java @@ -35,12 +35,14 @@ public class JdbcClient { private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class); private HikariDataSource dataSource; + private int queryTimeoutSecs; - public JdbcClient(Map<String, Object> map) { + public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) { Properties properties = new Properties(); properties.putAll(map); HikariConfig config = new HikariConfig(properties); this.dataSource = new HikariDataSource(config); + this.queryTimeoutSecs = queryTimeoutSecs; } public int insert(String tableName, List<List<Column>> columnLists) { @@ -67,6 +69,7 @@ public class JdbcClient { } PreparedStatement preparedStatement = connection.prepareStatement(query); + preparedStatement.setQueryTimeout(queryTimeoutSecs); for(List<Column> columnList : columnLists) { setPreparedStatementParams(preparedStatement, columnList); } @@ -84,6 +87,7 @@ public class JdbcClient { try { connection = this.dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery); + preparedStatement.setQueryTimeout(queryTimeoutSecs); setPreparedStatementParams(preparedStatement, queryParams); ResultSet resultSet = preparedStatement.executeQuery(); List<List<Column>> rows = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java index 81fc207..ad7f1c0 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java @@ -34,7 +34,8 @@ public class SimpleJdbcMapper implements JdbcMapper { private List<Column> schemaColumns; public SimpleJdbcMapper(String tableName, Map map) { - JdbcClient client = new JdbcClient(map); + int queryTimeoutSecs = 30; + JdbcClient client = new JdbcClient(map, queryTimeoutSecs); this.schemaColumns = client.getColumnSchema(tableName); } http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java index 129191a..48fde4e 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java @@ -55,6 +55,7 @@ public class JdbcState implements State { private String configKey; private String tableName; private String selectQuery; + private int queryTimeoutSecs = 30; public Options withConfigKey(String configKey) { this.configKey = configKey; @@ -80,13 +81,18 @@ public class JdbcState implements State { this.selectQuery = selectQuery; return this; } + + public Options withQueryTimeoutSecs(int queryTimeoutSecs) { + this.queryTimeoutSecs = queryTimeoutSecs; + return this; + } } protected void prepare() { Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey); Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'"); - this.jdbcClient = new JdbcClient(conf); + this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java index 3623b77..6423e8f 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java @@ -43,7 +43,8 @@ public class JdbcClientTest { map.put("dataSource.user","SA");//root map.put("dataSource.password","");//password - this.client = new JdbcClient(map); + int queryTimeoutSecs = 60; + this.client = new JdbcClient(map, queryTimeoutSecs); client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)"); } http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java index dc04ac1..9cb0bfa 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java @@ -74,7 +74,8 @@ public abstract class AbstractUserTopology { Config config = new Config(); config.put(JDBC_CONF, map); - JdbcClient jdbcClient = new JdbcClient(map); + int queryTimeoutSecs = 60; + JdbcClient jdbcClient = new JdbcClient(map, queryTimeoutSecs); for (String sql : setupSqls) { jdbcClient.executeSql(sql); } http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java index 26a00aa..fbb0b6c 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java @@ -34,10 +34,12 @@ public class UserPersistanceTopology extends AbstractUserTopology { @Override public StormTopology getTopology() { - JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF) + JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt() + .withConfigKey(JDBC_CONF) .withJdbcLookupMapper(this.jdbcLookupMapper) .withSelectSql(SELECT_QUERY); - JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF) + JdbcBolt userPersistanceBolt = new JdbcBolt() + .withConfigKey(JDBC_CONF) .withTableName(TABLE_NAME) .withJdbcMapper(this.jdbcMapper);
