Repository: zeppelin Updated Branches: refs/heads/master 46ecf77b7 -> b24491baf
[ZEPPELIN-1405] ConnectionPool for JDBCInterpreter. ### What is this PR for? This PR is for refactoring code for JDBCInterpreter. There is no putting 'Connection' to 'propertyKeyUnusedConnectionListMap' anywhere in the original code. ### What type of PR is it? Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-1405 ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: astroshim <[email protected]> Closes #1396 from astroshim/ZEPPELIN-1405 and squashes the following commits: b07e162 [astroshim] add checking connection is null f6998c2 [astroshim] Merge branch 'master' into ZEPPELIN-1405 1862ae6 [astroshim] Merge branch 'master' into ZEPPELIN-1405 efc2bfc [astroshim] rebase 21217a7 [astroshim] fix indentation. 4d4f85c [astroshim] refactoring code of close() 9f1e368 [astroshim] replace ConnectionPool 4dabbcc [astroshim] wip) changing to use dbcp 12dd7cb [astroshim] remove propertyKeyUnusedConnectionListMap map Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b24491ba Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b24491ba Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b24491ba Branch: refs/heads/master Commit: b24491bafa78693457687dd5da460d5e387e9ddb Parents: 46ecf77 Author: astroshim <[email protected]> Authored: Fri Sep 23 16:31:19 2016 +0900 Committer: Jongyoul Lee <[email protected]> Committed: Wed Sep 28 23:10:37 2016 +0900 ---------------------------------------------------------------------- jdbc/pom.xml | 6 + .../apache/zeppelin/jdbc/JDBCInterpreter.java | 198 +++++++++---------- 2 files changed, 105 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b24491ba/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc/pom.xml b/jdbc/pom.xml index f4e97c9..73c66c0 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -104,6 +104,12 @@ <version>1.0.8</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-dbcp2</artifactId> + <version>2.0.1</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b24491ba/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 0655f3a..5f784d7 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -19,15 +19,16 @@ import java.io.*; import java.nio.charset.StandardCharsets; import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.*; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.dbcp2.ConnectionFactory; +import org.apache.commons.dbcp2.DriverManagerConnectionFactory; +import org.apache.commons.dbcp2.PoolableConnectionFactory; +import org.apache.commons.dbcp2.PoolingDriver; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.hadoop.security.UserGroupInformation; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -99,14 +100,15 @@ public class JDBCInterpreter extends Interpreter { static final String EMPTY_COLUMN_VALUE = ""; + private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use"; private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection"; + private final String DBCP_STRING = "jdbc:apache:commons:dbcp:"; + private final HashMap<String, Properties> propertiesMap; private final Map<String, Statement> paragraphIdStatementMap; - - private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap; - private final Map<String, Connection> paragraphIdConnectionMap; + private final Map<String, PoolingDriver> poolingDriverMap; private final Map<String, SqlCompleter> propertyKeySqlCompleterMap; @@ -122,9 +124,8 @@ public class JDBCInterpreter extends Interpreter { public JDBCInterpreter(Properties property) { super(property); propertiesMap = new HashMap<>(); - propertyKeyUnusedConnectionListMap = new HashMap<>(); paragraphIdStatementMap = new HashMap<>(); - paragraphIdConnectionMap = new HashMap<>(); + poolingDriverMap = new HashMap<>(); propertyKeySqlCompleterMap = new HashMap<>(); } @@ -193,22 +194,41 @@ public class JDBCInterpreter extends Interpreter { return completer; } + private boolean isConnectionInPool(String driverName) { + if (poolingDriverMap.containsKey(driverName)) return true; + return false; + } + + private void createConnectionPool(String url, String propertyKey, Properties properties) { + ConnectionFactory connectionFactory = + new DriverManagerConnectionFactory(url, properties); + + PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory( + connectionFactory, null); + ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory); + + poolableConnectionFactory.setPool(connectionPool); + PoolingDriver driver = new PoolingDriver(); + driver.registerPool(propertyKey, connectionPool); + + poolingDriverMap.put(propertyKey, driver); + } + + private Connection getConnectionFromPool(String url, String propertyKey, Properties properties) + throws SQLException { + if (!isConnectionInPool(propertyKey)) { + createConnectionPool(url, propertyKey, properties); + } + + return DriverManager.getConnection(DBCP_STRING + propertyKey); + } + public Connection getConnection(String propertyKey, String user) throws ClassNotFoundException, SQLException, InterpreterException { Connection connection = null; if (propertyKey == null || propertiesMap.get(propertyKey) == null) { return null; } - if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) { - ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey); - if (0 != connectionList.size()) { - connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0); - if (null != connection && connection.isClosed()) { - connection.close(); - connection = null; - } - } - } if (null == connection) { final Properties properties = (Properties) propertiesMap.get(propertyKey).clone(); logger.info(properties.getProperty(DRIVER_KEY)); @@ -222,16 +242,16 @@ public class JDBCInterpreter extends Interpreter { switch (authType) { case KERBEROS: if (user == null) { - connection = DriverManager.getConnection(url, properties); + connection = getConnectionFromPool(url, propertyKey, properties); } else { if ("hive".equalsIgnoreCase(propertyKey)) { - connection = DriverManager.getConnection(url + ";hive.server2.proxy.user=" + user, - properties); + connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" + user, + propertyKey, properties); } else { UserGroupInformation ugi = null; try { ugi = UserGroupInformation.createProxyUser(user, - UserGroupInformation.getCurrentUser()); + UserGroupInformation.getCurrentUser()); } catch (Exception e) { logger.error("Error in createProxyUser", e); StringBuilder stringBuilder = new StringBuilder(); @@ -239,11 +259,13 @@ public class JDBCInterpreter extends Interpreter { stringBuilder.append(e.getCause()); throw new InterpreterException(stringBuilder.toString()); } + + final String poolKey = propertyKey; try { connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() { @Override public Connection run() throws Exception { - return DriverManager.getConnection(url, properties); + return getConnectionFromPool(url, poolKey, properties); } }); } catch (Exception e) { @@ -258,7 +280,7 @@ public class JDBCInterpreter extends Interpreter { break; default: - connection = DriverManager.getConnection(url, properties); + connection = getConnectionFromPool(url, propertyKey, properties); } } } @@ -266,75 +288,41 @@ public class JDBCInterpreter extends Interpreter { return connection; } - public Statement getStatement(String propertyKey, String paragraphId, - InterpreterContext interpreterContext) - throws SQLException, ClassNotFoundException, InterpreterException { - Connection connection; - - if (paragraphIdConnectionMap.containsKey(paragraphId + - interpreterContext.getAuthenticationInfo().getUser())) { - connection = paragraphIdConnectionMap.get(paragraphId + - interpreterContext.getAuthenticationInfo().getUser()); - } else { - connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser()); - } - - if (connection == null) { - return null; + private void initStatementMap() { + for (Statement statement : paragraphIdStatementMap.values()) { + try { + statement.close(); + } catch (Exception e) { + logger.error("Error while closing paragraphIdStatementMap statement...", e); + } } + paragraphIdStatementMap.clear(); + } - Statement statement = connection.createStatement(); - if (isStatementClosed(statement)) { - connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser()); - statement = connection.createStatement(); + private void initConnectionPoolMap() throws SQLException { + Iterator<String> it = poolingDriverMap.keySet().iterator(); + while (it.hasNext()) { + String driverName = it.next(); + poolingDriverMap.get(driverName).closePool(driverName); + it.remove(); } - paragraphIdConnectionMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(), - connection); - paragraphIdStatementMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(), - statement); + poolingDriverMap.clear(); + } - return statement; + private void saveStatement(String key, Statement statement) throws SQLException { + paragraphIdStatementMap.put(key, statement); + statement.setMaxRows(getMaxResult()); } - private boolean isStatementClosed(Statement statement) { - try { - return statement.isClosed(); - } catch (Throwable t) { - logger.debug("{} doesn't support isClosed method", statement); - return false; - } + private void removeStatement(String key) { + paragraphIdStatementMap.remove(key); } @Override public void close() { try { - for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values()) { - for (Connection c : connectionList) { - try { - c.close(); - } catch (Exception e) { - logger.error("Error while closing propertyKeyUnusedConnectionListMap connection...", e); - } - } - } - - for (Statement statement : paragraphIdStatementMap.values()) { - try { - statement.close(); - } catch (Exception e) { - logger.error("Error while closing paragraphIdStatementMap statement...", e); - } - } - paragraphIdStatementMap.clear(); - - for (Connection connection : paragraphIdConnectionMap.values()) { - try { - connection.close(); - } catch (Exception e) { - logger.error("Error while closing paragraphIdConnectionMap connection...", e); - } - } - paragraphIdConnectionMap.clear(); + initStatementMap(); + initConnectionPoolMap(); } catch (Exception e) { logger.error("Error while closing...", e); } @@ -342,17 +330,21 @@ public class JDBCInterpreter extends Interpreter { private InterpreterResult executeSql(String propertyKey, String sql, InterpreterContext interpreterContext) { - String paragraphId = interpreterContext.getParagraphId(); + Connection connection; + Statement statement; + ResultSet resultSet = null; try { + connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser()); + if (connection == null) { + return new InterpreterResult(Code.ERROR, "Prefix not found."); + } - Statement statement = getStatement(propertyKey, paragraphId, interpreterContext); - + statement = connection.createStatement(); if (statement == null) { return new InterpreterResult(Code.ERROR, "Prefix not found."); } - statement.setMaxRows(getMaxResult()); StringBuilder msg = null; boolean isTableType = false; @@ -364,8 +356,9 @@ public class JDBCInterpreter extends Interpreter { isTableType = true; } - ResultSet resultSet = null; try { + saveStatement(paragraphId + + interpreterContext.getAuthenticationInfo().getUser(), statement); boolean isResultSetAvailable = statement.execute(sql); @@ -408,16 +401,24 @@ public class JDBCInterpreter extends Interpreter { msg.append(updateCount).append(NEWLINE); } } finally { - try { - if (resultSet != null) { + if (resultSet != null) { + try { resultSet.close(); - } - statement.close(); - } finally { - statement = null; + } catch (SQLException e) { /*ignored*/ } } + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { /*ignored*/ } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { /*ignored*/ } + } + removeStatement(paragraphId + + interpreterContext.getAuthenticationInfo().getUser()); } - return new InterpreterResult(Code.SUCCESS, msg.toString()); } catch (Exception e) { @@ -452,7 +453,6 @@ public class JDBCInterpreter extends Interpreter { cmd = cmd.trim(); logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd); - return executeSql(propertyKey, cmd, contextInterpreter); }
