Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 22859f659 -> 82af1e79b


ZEPPELIN-487 Change supporting multiple statements to multiple connections

Changed multiple statements to multiple connection. Some JDBC don't support 
parallel executions with one connection.

Author: Jongyoul Lee <[email protected]>

Closes #517 from jongyoul/ZEPPELIN-487 and squashes the following commits:

fa54749 [Jongyoul Lee] ZEPPELIN-487 HiveInterpreter Multiple Connections - Fix 
the error when statement doesn't support isClosed method
113a583 [Jongyoul Lee] ZEPPELIN-487 HiveInterpreter Multiple Connections - 
Multiple Statements -> multiple connections
b23dcb4 [Jongyoul Lee] ZEPPELIN-487 HiveInterpreter Multiple Connections - 
keyConnectionMap -> propertyKeyConnectionMap


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/82af1e79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/82af1e79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/82af1e79

Branch: refs/heads/master
Commit: 82af1e79b52a3e11f01e079a691ea1575bca16ce
Parents: 22859f6
Author: Jongyoul Lee <[email protected]>
Authored: Mon Dec 7 12:04:18 2015 +0900
Committer: Jongyoul Lee <[email protected]>
Committed: Mon Dec 14 13:24:28 2015 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/hive/HiveInterpreter.java   | 84 ++++++++++++++------
 1 file changed, 60 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/82af1e79/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java 
b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java
index b1f3339..912b55e 100644
--- a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java
+++ b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -73,9 +74,11 @@ public class HiveInterpreter extends Interpreter {
   static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY;
 
   private final HashMap<String, Properties> propertiesMap;
-  private final Map<String, Connection> keyConnectionMap;
   private final Map<String, Statement> paragraphIdStatementMap;
 
+  private final Map<String, ArrayList<Connection>> 
propertyKeyUnusedConnectionListMap;
+  private final Map<String, Connection> paragraphIdConnectionMap;
+
   static {
     Interpreter.register(
         "hql",
@@ -92,8 +95,9 @@ public class HiveInterpreter extends Interpreter {
   public HiveInterpreter(Properties property) {
     super(property);
     propertiesMap = new HashMap<>();
-    keyConnectionMap = new HashMap<>();
+    propertyKeyUnusedConnectionListMap = new HashMap<>();
     paragraphIdStatementMap = new HashMap<>();
+    paragraphIdConnectionMap = new HashMap<>();
   }
 
   public HashMap<String, Properties> getPropertiesMap() {
@@ -142,15 +146,22 @@ public class HiveInterpreter extends Interpreter {
   @Override
   public void close() {
     try {
+      for (List<Connection> connectionList : 
propertyKeyUnusedConnectionListMap.values()) {
+        for (Connection c : connectionList) {
+          c.close();
+        }
+      }
+
       for (Statement statement : paragraphIdStatementMap.values()) {
         statement.close();
       }
       paragraphIdStatementMap.clear();
 
-      for (Connection connection : keyConnectionMap.values()) {
+      for (Connection connection : paragraphIdConnectionMap.values()) {
         connection.close();
       }
-      keyConnectionMap.clear();
+      paragraphIdConnectionMap.clear();
+
     } catch (SQLException e) {
       logger.error("Error while closing...", e);
     }
@@ -158,12 +169,14 @@ public class HiveInterpreter extends Interpreter {
 
   public Connection getConnection(String propertyKey) throws 
ClassNotFoundException, SQLException {
     Connection connection = null;
-    if (keyConnectionMap.containsKey(propertyKey)) {
-      connection = keyConnectionMap.get(propertyKey);
-      if (connection.isClosed() || !connection.isValid(10)) {
-        connection.close();
-        connection = null;
-        keyConnectionMap.remove(propertyKey);
+    if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
+      ArrayList<Connection> connectionList = 
propertyKeyUnusedConnectionListMap.get(propertyKey);
+      if (0 != connectionList.size()) {
+        connection = 
propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0);
+        if (null != connection && connection.isClosed()) {
+          connection.close();
+          connection = null;
+        }
       }
     }
     if (null == connection) {
@@ -177,28 +190,40 @@ public class HiveInterpreter extends Interpreter {
       } else {
         connection = DriverManager.getConnection(url, properties);
       }
-      keyConnectionMap.put(propertyKey, connection);
     }
     return connection;
   }
 
   public Statement getStatement(String propertyKey, String paragraphId)
       throws SQLException, ClassNotFoundException {
-    Statement statement = null;
-    if (paragraphIdStatementMap.containsKey(paragraphId)) {
-      statement = paragraphIdStatementMap.get(paragraphId);
-      if (statement.isClosed()) {
-        statement = null;
-        paragraphIdStatementMap.remove(paragraphId);
-      }
+    Connection connection;
+    if (paragraphIdConnectionMap.containsKey(paragraphId)) {
+      // Never enter for now.
+      connection = paragraphIdConnectionMap.get(paragraphId);
+    } else {
+      connection = getConnection(propertyKey);
     }
-    if (null == statement) {
-      statement = getConnection(propertyKey).createStatement();
-      paragraphIdStatementMap.put(paragraphId, statement);
+
+    Statement statement = connection.createStatement();
+    if (isStatementClosed(statement)) {
+      connection = getConnection(propertyKey);
+      statement = connection.createStatement();
     }
+    paragraphIdConnectionMap.put(paragraphId, connection);
+    paragraphIdStatementMap.put(paragraphId, statement);
+
     return statement;
   }
 
+  private boolean isStatementClosed(Statement statement) {
+    try {
+      return statement.isClosed();
+    } catch (Throwable t) {
+      logger.debug("{} doesn't support isClosed method", statement);
+      return false;
+    }
+  }
+
   public InterpreterResult executeSql(String propertyKey, String sql,
                                       InterpreterContext interpreterContext) {
     String paragraphId = interpreterContext.getParagraphId();
@@ -259,7 +284,7 @@ public class HiveInterpreter extends Interpreter {
           }
           statement.close();
         } finally {
-          removeStatement(paragraphId);
+          moveConnectionToUnused(propertyKey, paragraphId);
         }
       }
 
@@ -271,8 +296,19 @@ public class HiveInterpreter extends Interpreter {
     }
   }
 
-  private void removeStatement(String paragraphId) {
-    paragraphIdStatementMap.remove(paragraphId);
+  private void moveConnectionToUnused(String propertyKey, String paragraphId) {
+    if (paragraphIdConnectionMap.containsKey(paragraphId)) {
+      Connection connection = paragraphIdConnectionMap.remove(paragraphId);
+      if (null != connection) {
+        if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
+          propertyKeyUnusedConnectionListMap.get(propertyKey).add(connection);
+        } else {
+          ArrayList<Connection> connectionList = new ArrayList<>();
+          connectionList.add(connection);
+          propertyKeyUnusedConnectionListMap.put(propertyKey, connectionList);
+        }
+      }
+    }
   }
 
   @Override

Reply via email to