Fixing stylecheck problems with storm-jdbc
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6d20c4af Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6d20c4af Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6d20c4af Branch: refs/heads/master Commit: 6d20c4af585611c6d317ad817b0a0b4b172a40ce Parents: 53adebc Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 23:26:15 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 02:32:42 2018 -0400 ---------------------------------------------------------------------- external/storm-jdbc/pom.xml | 2 +- .../storm/jdbc/bolt/AbstractJdbcBolt.java | 55 ++++---- .../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 42 +++--- .../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 29 ++-- .../org/apache/storm/jdbc/common/Column.java | 27 ++-- .../storm/jdbc/common/ConnectionProvider.java | 19 +-- .../jdbc/common/HikariCPConnectionProvider.java | 30 ++-- .../apache/storm/jdbc/common/JdbcClient.java | 39 ++--- .../java/org/apache/storm/jdbc/common/Util.java | 19 +-- .../storm/jdbc/mapper/JdbcLookupMapper.java | 24 ++-- .../apache/storm/jdbc/mapper/JdbcMapper.java | 24 ++-- .../jdbc/mapper/SimpleJdbcLookupMapper.java | 36 ++--- .../storm/jdbc/mapper/SimpleJdbcMapper.java | 57 ++++---- .../storm/jdbc/trident/state/JdbcQuery.java | 24 ++-- .../storm/jdbc/trident/state/JdbcState.java | 141 +++++++++---------- .../jdbc/trident/state/JdbcStateFactory.java | 24 ++-- .../storm/jdbc/trident/state/JdbcUpdater.java | 24 ++-- .../storm/jdbc/bolt/JdbcInsertBoltTest.java | 28 ++-- .../storm/jdbc/bolt/JdbcLookupBoltTest.java | 29 ++-- .../storm/jdbc/common/JdbcClientTest.java | 63 ++++----- .../org/apache/storm/jdbc/common/UtilTest.java | 22 ++- 21 files changed, 323 insertions(+), 435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/pom.xml b/external/storm-jdbc/pom.xml index 759ddae..3a09b1b 100644 --- a/external/storm-jdbc/pom.xml +++ b/external/storm-jdbc/pom.xml @@ -92,7 +92,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>149</maxAllowedViolations> + <maxAllowedViolations>36</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java index 9ae455c..8aa262c 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.jdbc.bolt; @@ -32,14 +26,7 @@ import org.slf4j.LoggerFactory; public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt { private static final Logger LOG = LoggerFactory.getLogger( - AbstractJdbcBolt.class); - - protected OutputCollector collector; - - protected transient JdbcClient jdbcClient; - protected String configKey; - protected Integer queryTimeoutSecs; - protected ConnectionProvider connectionProvider; + AbstractJdbcBolt.class); static { /* @@ -58,6 +45,22 @@ public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt { DriverManager.getDrivers(); } + protected OutputCollector collector; + protected transient JdbcClient jdbcClient; + protected String configKey; + protected Integer queryTimeoutSecs; + protected ConnectionProvider connectionProvider; + + /** + * Constructor. + * <p/> + * @param connectionProviderParam database connection provider + */ + public AbstractJdbcBolt(final ConnectionProvider connectionProviderParam) { + Validate.notNull(connectionProviderParam); + this.connectionProvider = connectionProviderParam; + } + /** * Subclasses should call this to ensure output collector and connection * provider are set up, and finally jdbcClient is initialized properly. @@ -73,7 +76,7 @@ public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt { if (queryTimeoutSecs == null) { String msgTimeout = map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) - .toString(); + .toString(); queryTimeoutSecs = Integer.parseInt(msgTimeout); } @@ -81,16 +84,6 @@ public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt { } /** - * Constructor. - * <p/> - * @param connectionProviderParam database connection provider - */ - public AbstractJdbcBolt(final ConnectionProvider connectionProviderParam) { - Validate.notNull(connectionProviderParam); - this.connectionProvider = connectionProviderParam; - } - - /** * Cleanup. * <p/> * Subclasses should call this to ensure connection provider can be http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java index 897ece0..77f3c6b 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java @@ -1,38 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.bolt; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.commons.lang.Validate; import org.apache.commons.lang3.StringUtils; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; -import org.apache.storm.utils.TupleUtils; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - /** * Basic bolt for writing to any Database table. * <p/> @@ -45,7 +39,7 @@ public class JdbcInsertBolt extends AbstractJdbcBolt { private String insertQuery; private JdbcMapper jdbcMapper; - public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) { + public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) { super(connectionProvider); Validate.notNull(jdbcMapper); @@ -76,7 +70,7 @@ public class JdbcInsertBolt extends AbstractJdbcBolt { @Override public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector collector) { super.prepare(map, topologyContext, collector); - if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) { + if (StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) { throw new IllegalArgumentException("You must supply either a tableName or an insert Query."); } } @@ -87,7 +81,7 @@ public class JdbcInsertBolt extends AbstractJdbcBolt { List<Column> columns = jdbcMapper.getColumns(tuple); List<List<Column>> columnLists = new ArrayList<List<Column>>(); columnLists.add(columns); - if(!StringUtils.isBlank(tableName)) { + if (!StringUtils.isBlank(tableName)) { this.jdbcClient.insert(this.tableName, columnLists); } else { this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java index 9589edd..ace4089 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java @@ -1,35 +1,28 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.bolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; +import java.util.List; import org.apache.commons.lang.Validate; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; -import org.apache.storm.utils.TupleUtils; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * Basic bolt for querying from any database. */ http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java index 73bc0fd..c796d3b 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.common; @@ -103,9 +98,9 @@ public class Column<T> implements Serializable { @Override public String toString() { return "Column{" + - "columnName='" + columnName + '\'' + - ", val=" + val + - ", sqlType=" + sqlType + - '}'; + "columnName='" + columnName + '\'' + + ", val=" + val + + ", sqlType=" + sqlType + + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java index 0232ba2..6804b5d 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.common; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java index 3f3151d..dbb0009 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java @@ -1,30 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.common; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; - import java.sql.Connection; import java.sql.SQLException; import java.util.Map; import java.util.Properties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +33,13 @@ public class HikariCPConnectionProvider implements ConnectionProvider { @Override public synchronized void prepare() { - if(dataSource == null) { + if (dataSource == null) { Properties properties = new Properties(); properties.putAll(configMap); HikariConfig config = new HikariConfig(properties); - if(properties.containsKey("dataSource.url")) { + if (properties.containsKey("dataSource.url")) { LOG.info("DataSource Url: " + properties.getProperty("dataSource.url")); - } - else if (config.getJdbcUrl() != null) { + } else if (config.getJdbcUrl() != null) { LOG.info("JDBC Url: " + config.getJdbcUrl()); } this.dataSource = new HikariDataSource(config); @@ -66,7 +58,7 @@ public class HikariCPConnectionProvider implements ConnectionProvider { @Override public void cleanup() { - if(dataSource != null) { + if (dataSource != null) { dataSource.close(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java index f591236..76233cc 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java @@ -1,34 +1,39 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.common; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.*; -import java.sql.Date; -import java.util.*; - public class JdbcClient { private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class); @@ -51,7 +56,7 @@ public class JdbcClient { try { connection = connectionProvider.getConnection(); boolean autoCommit = connection.getAutoCommit(); - if(autoCommit) { + if (autoCommit) { connection.setAutoCommit(false); } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java index 7bac4c8..5b83ef6 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.common; import java.lang.reflect.Field; http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java index 462bee9..341dedf 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.mapper; +import java.util.List; +import org.apache.storm.jdbc.common.Column; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Values; -import org.apache.storm.jdbc.common.Column; - -import java.util.List; public interface JdbcLookupMapper extends JdbcMapper { http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java index b18af32..8cfa2a4 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.jdbc.mapper; -import org.apache.storm.tuple.ITuple; -import org.apache.storm.jdbc.common.Column; +package org.apache.storm.jdbc.mapper; import java.io.Serializable; import java.util.List; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.tuple.ITuple; public interface JdbcMapper extends Serializable { /** http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java index 7fadbe9..ef141e7 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java @@ -1,32 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.mapper; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang.Validate; +import org.apache.storm.jdbc.common.Column; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Values; -import org.apache.commons.lang.Validate; -import org.apache.storm.jdbc.common.Column; - -import java.util.ArrayList; -import java.util.List; public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLookupMapper { @@ -43,12 +37,12 @@ public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLook public List<Values> toTuple(ITuple input, List<Column> columns) { Values values = new Values(); - for(String field : outputFields) { - if(input.contains(field)) { + for (String field : outputFields) { + if (input.contains(field)) { values.add(input.getValueByField(field)); } else { - for(Column column : columns) { - if(column.getColumnName().equalsIgnoreCase(field)) { + for (Column column : columns) { + if (column.getColumnName().equalsIgnoreCase(field)) { values.add(column.getVal()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java index d93c29f..e4b659c 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java @@ -1,35 +1,28 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.jdbc.mapper; -import org.apache.storm.tuple.ITuple; -import org.apache.commons.lang.Validate; -import org.apache.storm.jdbc.common.Column; -import org.apache.storm.jdbc.common.ConnectionProvider; -import org.apache.storm.jdbc.common.JdbcClient; -import org.apache.storm.jdbc.common.Util; +package org.apache.storm.jdbc.mapper; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import org.apache.commons.lang.Validate; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; +import org.apache.storm.jdbc.common.JdbcClient; +import org.apache.storm.jdbc.common.Util; +import org.apache.storm.tuple.ITuple; public class SimpleJdbcMapper implements JdbcMapper { @@ -53,41 +46,41 @@ public class SimpleJdbcMapper implements JdbcMapper { @Override public List<Column> getColumns(ITuple tuple) { List<Column> columns = new ArrayList<Column>(); - for(Column column : schemaColumns) { + for (Column column : schemaColumns) { String columnName = column.getColumnName(); Integer columnSqlType = column.getSqlType(); - if(Util.getJavaType(columnSqlType).equals(String.class)) { + if (Util.getJavaType(columnSqlType).equals(String.class)) { String value = tuple.getStringByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Short.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Short.class)) { Short value = tuple.getShortByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Integer.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Integer.class)) { Integer value = tuple.getIntegerByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Long.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Long.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Double.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Double.class)) { Double value = tuple.getDoubleByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Float.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Float.class)) { Float value = tuple.getFloatByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Boolean.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Boolean.class)) { Boolean value = tuple.getBooleanByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(byte[].class)) { + } else if (Util.getJavaType(columnSqlType).equals(byte[].class)) { byte[] value = tuple.getBinaryByField(columnName); columns.add(new Column(columnName, value, columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Date.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Date.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, new Date(value), columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Time.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Time.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, new Time(value), columnSqlType)); - } else if(Util.getJavaType(columnSqlType).equals(Timestamp.class)) { + } else if (Util.getJavaType(columnSqlType).equals(Timestamp.class)) { Long value = tuple.getLongByField(columnName); columns.add(new Column(columnName, new Timestamp(value), columnSqlType)); } else { http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java index 8d014cd..f6623a0 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.trident.state; -import org.apache.storm.tuple.Values; +import java.util.List; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseQueryFunction; import org.apache.storm.trident.tuple.TridentTuple; - -import java.util.List; +import org.apache.storm.tuple.Values; public class JdbcQuery extends BaseQueryFunction<JdbcState, List<Values>> { http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java index eabf011..1954ef3 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java @@ -1,42 +1,36 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.trident.state; -import org.apache.storm.Config; -import org.apache.storm.topology.FailedException; -import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.apache.storm.Config; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; -import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.topology.FailedException; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JdbcState implements State { @@ -51,60 +45,16 @@ public class JdbcState implements State { this.map = map; } - public static class Options implements Serializable { - private JdbcMapper mapper; - private JdbcLookupMapper jdbcLookupMapper; - private ConnectionProvider connectionProvider; - private String tableName; - private String insertQuery; - private String selectQuery; - private Integer queryTimeoutSecs; - - public Options withConnectionProvider(ConnectionProvider connectionProvider) { - this.connectionProvider = connectionProvider; - return this; - } - - public Options withTableName(String tableName) { - this.tableName = tableName; - return this; - } - - public Options withInsertQuery(String insertQuery) { - this.insertQuery = insertQuery; - return this; - } - - public Options withMapper(JdbcMapper mapper) { - this.mapper = mapper; - return this; - } - - public Options withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) { - this.jdbcLookupMapper = jdbcLookupMapper; - return this; - } - - public Options withSelectQuery(String selectQuery) { - this.selectQuery = selectQuery; - return this; - } - - public Options withQueryTimeoutSecs(int queryTimeoutSecs) { - this.queryTimeoutSecs = queryTimeoutSecs; - return this; - } - } - protected void prepare() { options.connectionProvider.prepare(); - if(StringUtils.isBlank(options.insertQuery) && StringUtils.isBlank(options.tableName) && StringUtils.isBlank(options.selectQuery)) { + if (StringUtils.isBlank(options.insertQuery) && StringUtils.isBlank(options.tableName) && + StringUtils.isBlank(options.selectQuery)) { throw new IllegalArgumentException("If you are trying to insert into DB you must supply either insertQuery or tableName." + - "If you are attempting to user a query state you must supply a select query."); + "If you are attempting to user a query state you must supply a select query."); } - if(options.queryTimeoutSecs == null) { + if (options.queryTimeoutSecs == null) { options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); } @@ -129,7 +79,7 @@ public class JdbcState implements State { } try { - if(!StringUtils.isBlank(options.tableName)) { + if (!StringUtils.isBlank(options.tableName)) { jdbcClient.insert(options.tableName, columnsLists); } else { jdbcClient.executeInsertQuery(options.insertQuery, columnsLists); @@ -146,7 +96,7 @@ public class JdbcState implements State { for (TridentTuple tuple : tridentTuples) { List<Column> columns = options.jdbcLookupMapper.getColumns(tuple); List<List<Column>> rows = jdbcClient.select(options.selectQuery, columns); - for(List<Column> row : rows) { + for (List<Column> row : rows) { List<Values> values = options.jdbcLookupMapper.toTuple(tuple, row); batchRetrieveResult.add(values); } @@ -157,4 +107,49 @@ public class JdbcState implements State { } return batchRetrieveResult; } + + public static class Options implements Serializable { + private JdbcMapper mapper; + private JdbcLookupMapper jdbcLookupMapper; + private ConnectionProvider connectionProvider; + private String tableName; + private String insertQuery; + private String selectQuery; + private Integer queryTimeoutSecs; + + public Options withConnectionProvider(ConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; + return this; + } + + public Options withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Options withInsertQuery(String insertQuery) { + this.insertQuery = insertQuery; + return this; + } + + public Options withMapper(JdbcMapper mapper) { + this.mapper = mapper; + return this; + } + + public Options withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) { + this.jdbcLookupMapper = jdbcLookupMapper; + return this; + } + + public Options withSelectQuery(String selectQuery) { + this.selectQuery = selectQuery; + return this; + } + + public Options withQueryTimeoutSecs(int queryTimeoutSecs) { + this.queryTimeoutSecs = queryTimeoutSecs; + return this; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java index abc169f..6e15786 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.trident.state; +import java.util.Map; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; -import java.util.Map; - public class JdbcStateFactory implements StateFactory { private JdbcState.Options options; @@ -33,7 +27,7 @@ public class JdbcStateFactory implements StateFactory { @Override public State makeState(Map<String, Object> map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { - JdbcState state = new JdbcState(map , partitionIndex, numPartitions, options); + JdbcState state = new JdbcState(map, partitionIndex, numPartitions, options); state.prepare(); return state; } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java index d2ca5b8..47c01ba 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.trident.state; +import java.util.List; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; -import java.util.List; - -public class JdbcUpdater extends BaseStateUpdater<JdbcState> { +public class JdbcUpdater extends BaseStateUpdater<JdbcState> { @Override public void updateState(JdbcState jdbcState, List<TridentTuple> tuples, TridentCollector collector) { http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java index 1b393e9..df0a457 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcInsertBoltTest.java @@ -1,23 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.bolt; import com.google.common.collect.Lists; +import java.util.HashMap; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; @@ -26,8 +22,6 @@ import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.junit.Assert; import org.junit.Test; -import java.util.HashMap; - /** * Created by pbrahmbhatt on 10/29/15. */ @@ -45,7 +39,7 @@ public class JdbcInsertBoltTest { bolt.withInsertQuery("test"); bolt.withTableName("test"); Assert.fail("Should have thrown IllegalArgumentException."); - } catch(IllegalArgumentException ne) { + } catch (IllegalArgumentException ne) { //expected } @@ -54,7 +48,7 @@ public class JdbcInsertBoltTest { bolt.withTableName("test"); bolt.withInsertQuery("test"); Assert.fail("Should have thrown IllegalArgumentException."); - } catch(IllegalArgumentException ne) { + } catch (IllegalArgumentException ne) { //expected } } @@ -63,7 +57,7 @@ public class JdbcInsertBoltTest { try { JdbcInsertBolt bolt = new JdbcInsertBolt(provider, mapper); Assert.fail("Should have thrown IllegalArgumentException."); - } catch(IllegalArgumentException ne) { + } catch (IllegalArgumentException ne) { //expected } } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java index 9a5ec09..a26fc4a 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java @@ -1,37 +1,28 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.bolt; -import org.apache.storm.tuple.Fields; import com.google.common.collect.Lists; +import java.util.HashMap; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; -import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; -import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; +import org.apache.storm.tuple.Fields; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; -import java.util.HashMap; - /** * Created by pbrahmbhatt on 10/29/15. */ @@ -51,7 +42,7 @@ public class JdbcLookupBoltTest { try { JdbcLookupBolt bolt = new JdbcLookupBolt(provider, selectQuery, mapper); Assert.fail("Should have thrown IllegalArgumentException."); - } catch(IllegalArgumentException ne) { + } catch (IllegalArgumentException ne) { //expected } } http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java index 05f4f31..d77e8be 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java @@ -1,52 +1,47 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.common; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.rules.ExpectedException; -import org.junit.runners.model.MultipleFailureException; -import org.junit.Test; - import java.sql.Connection; -import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runners.model.MultipleFailureException; public class JdbcClientTest { + private static final String tableName = "user_details"; + @Rule + public ExpectedException thrown = ExpectedException.none(); private JdbcClient client; - private static final String tableName = "user_details"; @Before public void setup() { Map<String, Object> map = Maps.newHashMap(); - map.put("dataSourceClassName","org.hsqldb.jdbc.JDBCDataSource");//com.mysql.jdbc.jdbc2.optional.MysqlDataSource + map.put("dataSourceClassName", "org.hsqldb.jdbc.JDBCDataSource");//com.mysql.jdbc.jdbc2.optional.MysqlDataSource map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test - map.put("dataSource.user","SA");//root - map.put("dataSource.password","");//password + map.put("dataSource.user", "SA");//root + map.put("dataSource.password", "");//password ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map); connectionProvider.prepare(); @@ -64,13 +59,14 @@ public class JdbcClientTest { List<List<Column>> rows = Lists.newArrayList(row1, row2); client.insert(tableName, rows); - List<List<Column>> selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", 1, Types.INTEGER))); + List<List<Column>> selectedRows = + client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", 1, Types.INTEGER))); List<List<Column>> expectedRows = Lists.newArrayList(); expectedRows.add(row1); Assert.assertEquals(expectedRows, selectedRows); List<Column> row3 = createRow(3, "frank"); - List<List<Column>> moreRows = new ArrayList<List<Column>>(); + List<List<Column>> moreRows = new ArrayList<List<Column>>(); moreRows.add(row3); client.executeInsertQuery("insert into user_details values(?,?,?)", moreRows); @@ -85,9 +81,6 @@ public class JdbcClientTest { Assert.assertEquals(rows, selectedRows); } - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Test public void testInsertConnectionError() { @@ -95,9 +88,9 @@ public class JdbcClientTest { this.client = new JdbcClient(connectionProvider, 60); List<Column> row = createRow(1, "frank"); - List<List<Column>> rows = new ArrayList<List<Column>>(); + List<List<Column>> rows = new ArrayList<List<Column>>(); rows.add(row); - String query = "insert into user_details values(?,?,?)"; + String query = "insert into user_details values(?,?,?)"; thrown.expect(MultipleFailureException.class); client.executeInsertQuery(query, rows); @@ -105,9 +98,9 @@ public class JdbcClientTest { private List<Column> createRow(int id, String name) { return Lists.newArrayList( - new Column("ID", id, Types.INTEGER), - new Column("USER_NAME", name, Types.VARCHAR), - new Column("CREATED_TIMESTAMP", new Timestamp(System.currentTimeMillis()) , Types.TIMESTAMP)); + new Column("ID", id, Types.INTEGER), + new Column("USER_NAME", name, Types.VARCHAR), + new Column("CREATED_TIMESTAMP", new Timestamp(System.currentTimeMillis()), Types.TIMESTAMP)); } @After http://git-wip-us.apache.org/repos/asf/storm/blob/6d20c4af/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/UtilTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/UtilTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/UtilTest.java index 8b6e8b3..76d5f6a 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/UtilTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/UtilTest.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jdbc.common; import java.sql.Date; @@ -23,7 +18,8 @@ import java.sql.Timestamp; import java.sql.Types; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class UtilTest {
