Repository: incubator-zeppelin Updated Branches: refs/heads/master 22859f659 -> 82af1e79b
ZEPPELIN-487 Change supporting multiple statements to multiple connections Changed multiple statements to multiple connection. Some JDBC don't support parallel executions with one connection. Author: Jongyoul Lee <[email protected]> Closes #517 from jongyoul/ZEPPELIN-487 and squashes the following commits: fa54749 [Jongyoul Lee] ZEPPELIN-487 HiveInterpreter Multiple Connections - Fix the error when statement doesn't support isClosed method 113a583 [Jongyoul Lee] ZEPPELIN-487 HiveInterpreter Multiple Connections - Multiple Statements -> multiple connections b23dcb4 [Jongyoul Lee] ZEPPELIN-487 HiveInterpreter Multiple Connections - keyConnectionMap -> propertyKeyConnectionMap Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/82af1e79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/82af1e79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/82af1e79 Branch: refs/heads/master Commit: 82af1e79b52a3e11f01e079a691ea1575bca16ce Parents: 22859f6 Author: Jongyoul Lee <[email protected]> Authored: Mon Dec 7 12:04:18 2015 +0900 Committer: Jongyoul Lee <[email protected]> Committed: Mon Dec 14 13:24:28 2015 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/hive/HiveInterpreter.java | 84 ++++++++++++++------ 1 file changed, 60 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/82af1e79/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java index b1f3339..912b55e 100644 --- a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java +++ b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java @@ -23,6 +23,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -73,9 +74,11 @@ public class HiveInterpreter extends Interpreter { static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY; private final HashMap<String, Properties> propertiesMap; - private final Map<String, Connection> keyConnectionMap; private final Map<String, Statement> paragraphIdStatementMap; + private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap; + private final Map<String, Connection> paragraphIdConnectionMap; + static { Interpreter.register( "hql", @@ -92,8 +95,9 @@ public class HiveInterpreter extends Interpreter { public HiveInterpreter(Properties property) { super(property); propertiesMap = new HashMap<>(); - keyConnectionMap = new HashMap<>(); + propertyKeyUnusedConnectionListMap = new HashMap<>(); paragraphIdStatementMap = new HashMap<>(); + paragraphIdConnectionMap = new HashMap<>(); } public HashMap<String, Properties> getPropertiesMap() { @@ -142,15 +146,22 @@ public class HiveInterpreter extends Interpreter { @Override public void close() { try { + for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values()) { + for (Connection c : connectionList) { + c.close(); + } + } + for (Statement statement : paragraphIdStatementMap.values()) { statement.close(); } paragraphIdStatementMap.clear(); - for (Connection connection : keyConnectionMap.values()) { + for (Connection connection : paragraphIdConnectionMap.values()) { connection.close(); } - keyConnectionMap.clear(); + paragraphIdConnectionMap.clear(); + } catch (SQLException e) { logger.error("Error while closing...", e); } @@ -158,12 +169,14 @@ public class HiveInterpreter extends Interpreter { public Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException { Connection connection = null; - if (keyConnectionMap.containsKey(propertyKey)) { - connection = keyConnectionMap.get(propertyKey); - if (connection.isClosed() || !connection.isValid(10)) { - connection.close(); - connection = null; - keyConnectionMap.remove(propertyKey); + if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) { + ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey); + if (0 != connectionList.size()) { + connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0); + if (null != connection && connection.isClosed()) { + connection.close(); + connection = null; + } } } if (null == connection) { @@ -177,28 +190,40 @@ public class HiveInterpreter extends Interpreter { } else { connection = DriverManager.getConnection(url, properties); } - keyConnectionMap.put(propertyKey, connection); } return connection; } public Statement getStatement(String propertyKey, String paragraphId) throws SQLException, ClassNotFoundException { - Statement statement = null; - if (paragraphIdStatementMap.containsKey(paragraphId)) { - statement = paragraphIdStatementMap.get(paragraphId); - if (statement.isClosed()) { - statement = null; - paragraphIdStatementMap.remove(paragraphId); - } + Connection connection; + if (paragraphIdConnectionMap.containsKey(paragraphId)) { + // Never enter for now. + connection = paragraphIdConnectionMap.get(paragraphId); + } else { + connection = getConnection(propertyKey); } - if (null == statement) { - statement = getConnection(propertyKey).createStatement(); - paragraphIdStatementMap.put(paragraphId, statement); + + Statement statement = connection.createStatement(); + if (isStatementClosed(statement)) { + connection = getConnection(propertyKey); + statement = connection.createStatement(); } + paragraphIdConnectionMap.put(paragraphId, connection); + paragraphIdStatementMap.put(paragraphId, statement); + return statement; } + private boolean isStatementClosed(Statement statement) { + try { + return statement.isClosed(); + } catch (Throwable t) { + logger.debug("{} doesn't support isClosed method", statement); + return false; + } + } + public InterpreterResult executeSql(String propertyKey, String sql, InterpreterContext interpreterContext) { String paragraphId = interpreterContext.getParagraphId(); @@ -259,7 +284,7 @@ public class HiveInterpreter extends Interpreter { } statement.close(); } finally { - removeStatement(paragraphId); + moveConnectionToUnused(propertyKey, paragraphId); } } @@ -271,8 +296,19 @@ public class HiveInterpreter extends Interpreter { } } - private void removeStatement(String paragraphId) { - paragraphIdStatementMap.remove(paragraphId); + private void moveConnectionToUnused(String propertyKey, String paragraphId) { + if (paragraphIdConnectionMap.containsKey(paragraphId)) { + Connection connection = paragraphIdConnectionMap.remove(paragraphId); + if (null != connection) { + if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) { + propertyKeyUnusedConnectionListMap.get(propertyKey).add(connection); + } else { + ArrayList<Connection> connectionList = new ArrayList<>(); + connectionList.add(connection); + propertyKeyUnusedConnectionListMap.put(propertyKey, connectionList); + } + } + } } @Override
