Repository: hive Updated Branches: refs/heads/master 98699b3b7 -> 418f936cf
HIVE-13400: Following up HIVE-12481, add retry for Zookeeper service discovery (Reviewed by Chaoyu Tang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/418f936c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/418f936c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/418f936c Branch: refs/heads/master Commit: 418f936cff5c344587ea4f58ce08a5cdda931164 Parents: 98699b3 Author: Aihua Xu <aihu...@apache.org> Authored: Fri Apr 1 15:46:22 2016 -0400 Committer: Aihua Xu <aihu...@apache.org> Committed: Thu Apr 14 17:21:12 2016 -0400 ---------------------------------------------------------------------- .../org/apache/hive/jdbc/HiveConnection.java | 84 +++++++++----------- jdbc/src/java/org/apache/hive/jdbc/Utils.java | 20 +++-- 2 files changed, 51 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/418f936c/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 352744f..40ad3b2 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -18,6 +18,7 @@ package org.apache.hive.jdbc; +import org.apache.commons.lang.StringUtils; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.KerberosSaslHelper; @@ -109,8 +110,6 @@ public class HiveConnection implements java.sql.Connection { private String host; private int port; private final Map<String, String> sessConfMap; - private final Map<String, String> hiveConfMap; - private final Map<String, String> hiveVarMap; private JdbcConnectionParams connParams; private final boolean isEmbeddedMode; private TTransport transport; @@ -141,8 +140,6 @@ public class HiveConnection implements java.sql.Connection { host = connParams.getHost(); port = connParams.getPort(); sessConfMap = connParams.getSessionVars(); - hiveConfMap = connParams.getHiveConfs(); - hiveVarMap = connParams.getHiveVars(); isEmbeddedMode = connParams.isEmbeddedMode(); if (sessConfMap.containsKey(JdbcConnectionParams.FETCH_SIZE)) { @@ -177,56 +174,51 @@ public class HiveConnection implements java.sql.Connection { } private void openTransport() throws SQLException { - int numRetries = 0; int maxRetries = 1; try { - maxRetries = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.RETRIES)); - } catch(NumberFormatException e) { + String strRetries = sessConfMap.get(JdbcConnectionParams.RETRIES); + if (StringUtils.isNotBlank(strRetries)) { + maxRetries = Integer.parseInt(strRetries); + } + } catch(NumberFormatException e) { // Ignore the exception } - while (true) { - try { - assumeSubject = - JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap - .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE)); - transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); - if (!transport.isOpen()) { - transport.open(); - logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort()); - } - break; - } catch (TTransportException e) { - // We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper - if (isZkDynamicDiscoveryMode()) { - LOG.info("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort()); - try { - // Update jdbcUriString, host & port variables in connParams - // Throw an exception if all HiveServer2 nodes have been exhausted, - // or if we're unable to connect to ZooKeeper. - Utils.updateConnParamsFromZooKeeper(connParams); - } catch (ZooKeeperHiveClientException ze) { - throw new SQLException( - "Could not open client transport for any of the Server URI's in ZooKeeper: " - + ze.getMessage(), " 08S01", ze); + for (int numRetries = 0;;) { + try { + assumeSubject = + JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap + .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE)); + transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); + if (!transport.isOpen()) { + transport.open(); + logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort()); } - // Update with new values - jdbcUriString = connParams.getJdbcUriString(); - host = connParams.getHost(); - port = connParams.getPort(); - } else { - LOG.info("Transport Used for JDBC connection: " + - sessConfMap.get(JdbcConnectionParams.TRANSPORT_MODE)); - - // Retry maxRetries times - String errMsg = "Could not open client transport with JDBC Uri: " + - jdbcUriString + ": " + e.getMessage(); - if (++numRetries >= maxRetries) { - throw new SQLException(errMsg, " 08S01", e); + break; + } catch (TTransportException e) { + LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort()); + String errMsg = null; + String warnMsg = "Could not open client transport with JDBC Uri: " + jdbcUriString + ": "; + if (isZkDynamicDiscoveryMode()) { + errMsg = "Could not open client transport for any of the Server URI's in ZooKeeper: "; + // Try next available server in zookeeper, or retry all the servers again if retry is enabled + while(!Utils.updateConnParamsFromZooKeeper(connParams) && ++numRetries < maxRetries) { + connParams.getRejectedHostZnodePaths().clear(); + } + // Update with new values + jdbcUriString = connParams.getJdbcUriString(); + host = connParams.getHost(); + port = connParams.getPort(); } else { - LOG.warn(errMsg + " Retrying " + numRetries + " of " + maxRetries); + errMsg = warnMsg; + ++numRetries; + } + + if (numRetries >= maxRetries) { + throw new SQLException(errMsg + e.getMessage(), " 08S01", e); + } else { + LOG.warn(warnMsg + e.getMessage() + " Retrying " + numRetries + " of " + maxRetries); } } - } } } http://git-wip-us.apache.org/repos/asf/hive/blob/418f936c/jdbc/src/java/org/apache/hive/jdbc/Utils.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 754f89f..42181d7 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -543,19 +543,25 @@ class Utils { * explored. Also update the host, port, jdbcUriString and other configs published by the server. * * @param connParams - * @throws ZooKeeperHiveClientException + * @return true if new server info is retrieved successfully */ - static void updateConnParamsFromZooKeeper(JdbcConnectionParams connParams) - throws ZooKeeperHiveClientException { + static boolean updateConnParamsFromZooKeeper(JdbcConnectionParams connParams) { // Add current host to the rejected list connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath()); String oldServerHost = connParams.getHost(); int oldServerPort = connParams.getPort(); // Update connection params (including host, port) from ZooKeeper - ZooKeeperHiveClientHelper.configureConnParams(connParams); - connParams.setJdbcUriString(connParams.getJdbcUriString().replace( - oldServerHost + ":" + oldServerPort, connParams.getHost() + ":" + connParams.getPort())); - LOG.info("Selected HiveServer2 instance with uri: " + connParams.getJdbcUriString()); + try { + ZooKeeperHiveClientHelper.configureConnParams(connParams); + connParams.setJdbcUriString(connParams.getJdbcUriString().replace( + oldServerHost + ":" + oldServerPort, connParams.getHost() + ":" + connParams.getPort())); + LOG.info("Selected HiveServer2 instance with uri: " + connParams.getJdbcUriString()); + } catch(ZooKeeperHiveClientException e) { + LOG.error(e.getMessage()); + return false; + } + + return true; } private static String joinStringArray(String[] stringArray, String seperator) {