Repository: storm Updated Branches: refs/heads/master 4c9a864f3 -> b48a8a1a6
STORM-2028: Fix for uprooting the JDBC client exceptions in case of subsequent connection closure issues Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/921635ad Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/921635ad Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/921635ad Branch: refs/heads/master Commit: 921635ad1e3b04d5ee1555178e064797ade5d3d6 Parents: 3c53dd5 Author: Rahul Jain <rahu...@adobe.com> Authored: Thu Aug 3 12:13:34 2017 +0530 Committer: Rahul Jain <rahu...@adobe.com> Committed: Thu Aug 3 12:13:34 2017 +0530 ---------------------------------------------------------------------- .../apache/storm/jdbc/common/JdbcClient.java | 42 ++++++++++++++----- .../storm/jdbc/common/JdbcClientTest.java | 43 ++++++++++++++++++++ 2 files changed, 75 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/921635ad/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java index 228babe..e29edd5 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java @@ -46,6 +46,7 @@ public class JdbcClient { } public void executeInsertQuery(String query, List<List<Column>> columnLists) { + Exception insertException = null; Connection connection = null; try { connection = connectionProvider.getConnection(); @@ -78,9 +79,11 @@ public class JdbcClient { } } } catch (SQLException e) { - throw new RuntimeException("Failed to execute insert query " + query, e); + insertException = new RuntimeException("Failed to execute insert query " + query, e); + } catch (RuntimeException e) { + insertException = e; } finally { - closeConnection(connection); + closeConnection(connection, insertException); } } @@ -103,6 +106,7 @@ public class JdbcClient { } public List<List<Column>> select(String sqlQuery, List<Column> queryParams) { + Exception selectException = null; Connection connection = null; try { connection = connectionProvider.getConnection(); @@ -151,13 +155,17 @@ public class JdbcClient { } return rows; } catch (SQLException e) { - throw new RuntimeException("Failed to execute select query " + sqlQuery, e); + selectException = new RuntimeException("Failed to execute select query " + sqlQuery, e); + } catch (RuntimeException e) { + selectException = e; } finally { - closeConnection(connection); + closeConnection(connection, selectException); } + return null; } public List<Column> getColumnSchema(String tableName) { + Exception getSchemaException = null; Connection connection = null; List<Column> columns = new ArrayList<Column>(); try { @@ -169,22 +177,28 @@ public class JdbcClient { } return columns; } catch (SQLException e) { - throw new RuntimeException("Failed to get schema for table " + tableName, e); + getSchemaException = new RuntimeException("Failed to get schema for table " + tableName, e); + } catch (RuntimeException e) { + getSchemaException = e; } finally { - closeConnection(connection); + closeConnection(connection, getSchemaException); } + return null; } public void executeSql(String sql) { + Exception execException = null; Connection connection = null; try { connection = connectionProvider.getConnection(); Statement statement = connection.createStatement(); statement.execute(sql); } catch (SQLException e) { - throw new RuntimeException("Failed to execute SQL", e); + execException = new RuntimeException("Failed to execute SQL", e); + } catch (RuntimeException e) { + execException = e; } finally { - closeConnection(connection); + closeConnection(connection, execException); } } @@ -223,13 +237,21 @@ public class JdbcClient { } } - private void closeConnection(Connection connection) { + private void closeConnection(Connection connection, Exception finalException) { if (connection != null) { try { connection.close(); } catch (SQLException e) { - throw new RuntimeException("Failed to close connection", e); + if (finalException != null) { + LOG.error("Failed to close connection: " + e.getMessage()); + } else { + finalException = new RuntimeException("Failed to close connection", e); + } } } + + if (finalException != null) { + throw new IllegalStateException(finalException); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/921635ad/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java index 551cd72..e55700c 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java @@ -22,8 +22,13 @@ import com.google.common.collect.Maps; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.runners.model.MultipleFailureException; import org.junit.Test; +import java.sql.Connection; +import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; @@ -80,6 +85,24 @@ public class JdbcClientTest { Assert.assertEquals(rows, selectedRows); } + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testInsertConnectionError() { + + ConnectionProvider connectionProvider = new ThrowingConnectionProvider(null); + this.client = new JdbcClient(connectionProvider, 60); + + List<Column> row = createRow(1, "frank"); + List<List<Column>> rows = new ArrayList<List<Column>>(); + rows.add(row); + String query = "insert into user_details values(?,?,?)"; + + thrown.expect(MultipleFailureException.class); + client.executeInsertQuery(query, rows); + } + private List<Column> createRow(int id, String name) { return Lists.newArrayList( new Column("ID", id, Types.INTEGER), @@ -92,3 +115,23 @@ public class JdbcClientTest { client.executeSql("drop table " + tableName); } } + +class ThrowingConnectionProvider implements ConnectionProvider { + + private Map<String, Object> configMap; + + public ThrowingConnectionProvider(Map<String, Object> mockCPConfigMap) { + this.configMap = mockCPConfigMap; + } + + @Override + public synchronized void prepare() {} + + @Override + public Connection getConnection() { + throw new RuntimeException("connection error"); + } + + @Override + public void cleanup() {} +}