Repository: storm
Updated Branches:
  refs/heads/master 4c9a864f3 -> b48a8a1a6


STORM-2028: Fix for uprooting the JDBC client exceptions in case of subsequent 
connection closure issues


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/921635ad
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/921635ad
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/921635ad

Branch: refs/heads/master
Commit: 921635ad1e3b04d5ee1555178e064797ade5d3d6
Parents: 3c53dd5
Author: Rahul Jain <rahu...@adobe.com>
Authored: Thu Aug 3 12:13:34 2017 +0530
Committer: Rahul Jain <rahu...@adobe.com>
Committed: Thu Aug 3 12:13:34 2017 +0530

----------------------------------------------------------------------
 .../apache/storm/jdbc/common/JdbcClient.java    | 42 ++++++++++++++-----
 .../storm/jdbc/common/JdbcClientTest.java       | 43 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/921635ad/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 228babe..e29edd5 100644
--- 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -46,6 +46,7 @@ public class JdbcClient {
     }
 
     public void executeInsertQuery(String query, List<List<Column>> 
columnLists) {
+        Exception insertException = null;
         Connection connection = null;
         try {
             connection = connectionProvider.getConnection();
@@ -78,9 +79,11 @@ public class JdbcClient {
                 }
             }
         } catch (SQLException e) {
-            throw new RuntimeException("Failed to execute insert query " + 
query, e);
+            insertException = new RuntimeException("Failed to execute insert 
query " + query, e);
+        } catch (RuntimeException e) {
+            insertException = e;
         } finally {
-            closeConnection(connection);
+            closeConnection(connection, insertException);
         }
     }
 
@@ -103,6 +106,7 @@ public class JdbcClient {
     }
 
     public List<List<Column>> select(String sqlQuery, List<Column> 
queryParams) {
+        Exception selectException = null;
         Connection connection = null;
         try {
             connection = connectionProvider.getConnection();
@@ -151,13 +155,17 @@ public class JdbcClient {
             }
             return rows;
         } catch (SQLException e) {
-            throw new RuntimeException("Failed to execute select query " + 
sqlQuery, e);
+            selectException = new RuntimeException("Failed to execute select 
query " + sqlQuery, e);
+        } catch (RuntimeException e) {
+            selectException = e;
         } finally {
-            closeConnection(connection);
+            closeConnection(connection, selectException);
         }
+        return null;
     }
 
     public List<Column> getColumnSchema(String tableName) {
+        Exception getSchemaException = null;
         Connection connection = null;
         List<Column> columns = new ArrayList<Column>();
         try {
@@ -169,22 +177,28 @@ public class JdbcClient {
             }
             return columns;
         } catch (SQLException e) {
-            throw new RuntimeException("Failed to get schema for table " + 
tableName, e);
+            getSchemaException = new RuntimeException("Failed to get schema 
for table " + tableName, e);
+        } catch (RuntimeException e) {
+            getSchemaException = e;
         } finally {
-            closeConnection(connection);
+            closeConnection(connection, getSchemaException);
         }
+        return null;
     }
 
     public void executeSql(String sql) {
+        Exception execException = null;
         Connection connection = null;
         try {
             connection = connectionProvider.getConnection();
             Statement statement = connection.createStatement();
             statement.execute(sql);
         } catch (SQLException e) {
-            throw new RuntimeException("Failed to execute SQL", e);
+            execException = new RuntimeException("Failed to execute SQL", e);
+        } catch (RuntimeException e) {
+            execException = e;
         } finally {
-            closeConnection(connection);
+            closeConnection(connection, execException);
         }
     }
 
@@ -223,13 +237,21 @@ public class JdbcClient {
         }
     }
 
-    private void closeConnection(Connection connection) {
+    private void closeConnection(Connection connection, Exception 
finalException) {
         if (connection != null) {
             try {
                 connection.close();
             } catch (SQLException e) {
-                throw new RuntimeException("Failed to close connection", e);
+                if (finalException != null) {
+                    LOG.error("Failed to close connection: " + e.getMessage());
+                } else {
+                    finalException = new RuntimeException("Failed to close 
connection", e);
+                }
             }
         }
+
+        if (finalException != null) {
+            throw new IllegalStateException(finalException);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/921635ad/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 551cd72..e55700c 100644
--- 
a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ 
b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -22,8 +22,13 @@ import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runners.model.MultipleFailureException;
 import org.junit.Test;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
@@ -80,6 +85,24 @@ public class JdbcClientTest {
         Assert.assertEquals(rows, selectedRows);
     }
 
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testInsertConnectionError() {
+
+        ConnectionProvider connectionProvider = new 
ThrowingConnectionProvider(null);
+        this.client = new JdbcClient(connectionProvider, 60);
+
+        List<Column> row = createRow(1, "frank");
+        List<List<Column>> rows  = new ArrayList<List<Column>>();
+        rows.add(row);
+        String query  = "insert into user_details values(?,?,?)";
+
+        thrown.expect(MultipleFailureException.class);
+        client.executeInsertQuery(query, rows);
+    }
+
     private List<Column> createRow(int id, String name) {
         return Lists.newArrayList(
                 new Column("ID", id, Types.INTEGER),
@@ -92,3 +115,23 @@ public class JdbcClientTest {
         client.executeSql("drop table " + tableName);
     }
 }
+
+class ThrowingConnectionProvider implements ConnectionProvider {
+
+    private Map<String, Object> configMap;
+
+    public ThrowingConnectionProvider(Map<String, Object> mockCPConfigMap) {
+        this.configMap = mockCPConfigMap;
+    }
+
+    @Override
+    public synchronized void prepare() {}
+
+    @Override
+    public Connection getConnection() {
+        throw new RuntimeException("connection error");
+    }
+
+    @Override
+    public void cleanup() {}
+}

Reply via email to