Repository: hive Updated Branches: refs/heads/master 3ea96eeb8 -> f1d4fcf64
HIVE-18982: Provide a CLI option to manually trigger failover (Prasanth Jayachandran reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f1d4fcf6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f1d4fcf6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f1d4fcf6 Branch: refs/heads/master Commit: f1d4fcf64f54a4d447eca18e272f7223dd0b2c22 Parents: 3ea96ee Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Fri Mar 23 18:46:07 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Fri Mar 23 18:46:07 2018 -0700 ---------------------------------------------------------------------- bin/ext/hiveserver2.sh | 2 +- .../java/org/apache/hive/http/HttpServer.java | 22 ++ .../apache/hive/jdbc/TestActivePassiveHA.java | 180 +++++++++++++---- .../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 9 + .../server/HS2ActivePassiveHARegistry.java | 53 ++++- .../apache/hive/service/server/HiveServer2.java | 199 +++++++++++++++++-- .../service/servlet/HS2LeadershipStatus.java | 89 +++++++++ .../apache/hive/service/servlet/HS2Peers.java | 31 ++- .../service/server/TestHS2HttpServerPam.java | 2 +- 9 files changed, 523 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/bin/ext/hiveserver2.sh ---------------------------------------------------------------------- diff --git a/bin/ext/hiveserver2.sh b/bin/ext/hiveserver2.sh index 1e94542..95bc151 100644 --- a/bin/ext/hiveserver2.sh +++ b/bin/ext/hiveserver2.sh @@ -17,7 +17,7 @@ THISSERVICE=hiveserver2 export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " hiveserver2() { - echo "$(timestamp): Starting HiveServer2" + >&2 echo "$(timestamp): Starting HiveServer2" CLASS=org.apache.hive.service.server.HiveServer2 if $cygwin; then HIVE_LIB=`cygpath -w "$HIVE_LIB"` http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/common/src/java/org/apache/hive/http/HttpServer.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hive/http/HttpServer.java b/common/src/java/org/apache/hive/http/HttpServer.java index 71b2668..93b11e3 100644 --- a/common/src/java/org/apache/hive/http/HttpServer.java +++ b/common/src/java/org/apache/hive/http/HttpServer.java @@ -273,6 +273,28 @@ public class HttpServer { } /** + * Same as {@link HttpServer#isInstrumentationAccessAllowed(ServletContext, HttpServletRequest, HttpServletResponse)} + * except that it returns true only if <code>hadoop.security.instrumentation.requires.admin</code> is set to true. + */ + @InterfaceAudience.LimitedPrivate("hive") + public static boolean isInstrumentationAccessAllowedStrict( + ServletContext servletContext, HttpServletRequest request, + HttpServletResponse response) throws IOException { + Configuration conf = + (Configuration) servletContext.getAttribute(CONF_CONTEXT_ATTRIBUTE); + + boolean access; + boolean adminAccess = conf.getBoolean( + CommonConfigurationKeys.HADOOP_SECURITY_INSTRUMENTATION_REQUIRES_ADMIN, false); + if (adminAccess) { + access = hasAdministratorAccess(servletContext, request, response); + } else { + return false; + } + return access; + } + + /** * Check if the remote user has access to an object (e.g. query history) that belongs to a user * * @param ctx the context containing the admin ACL. http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index 72b2a8c..e53826d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -21,33 +21,37 @@ package org.apache.hive.jdbc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethodBase; +import org.apache.commons.httpclient.methods.DeleteMethod; +import org.apache.commons.httpclient.methods.GetMethod; import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hive.http.security.PamAuthenticator; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.service.server.HS2ActivePassiveHARegistry; import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; import org.apache.hive.service.server.HiveServer2Instance; +import org.apache.hive.service.server.TestHS2HttpServerPam; import org.apache.hive.service.servlet.HS2Peers; +import org.apache.http.HttpHeaders; import org.codehaus.jackson.map.ObjectMapper; import org.junit.After; import org.junit.AfterClass; @@ -60,6 +64,8 @@ public class TestActivePassiveHA { private MiniHS2 miniHS2_2 = null; private static TestingServer zkServer; private Connection hs2Conn = null; + private static String ADMIN_USER = "user1"; // user from TestPamAuthenticator + private static String ADMIN_PASSWORD = "1"; private static String zkHANamespace = "hs2ActivePassiveHATest"; private HiveConf hiveConf1; private HiveConf hiveConf2; @@ -124,15 +130,8 @@ public class TestActivePassiveHA { public void testActivePassiveHA() throws Exception { String instanceId1 = UUID.randomUUID().toString(); miniHS2_1.start(getConfOverlay(instanceId1)); - while (!miniHS2_1.isStarted()) { - Thread.sleep(100); - } - String instanceId2 = UUID.randomUUID().toString(); miniHS2_2.start(getConfOverlay(instanceId2)); - while (!miniHS2_2.isStarted()) { - Thread.sleep(100); - } assertEquals(true, miniHS2_1.isLeader()); String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; @@ -175,9 +174,6 @@ public class TestActivePassiveHA { miniHS2_1.stop(); - while (!miniHS2_2.isStarted()) { - Thread.sleep(100); - } assertEquals(true, miniHS2_2.isLeader()); url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; assertEquals("true", sendGet(url)); @@ -219,9 +215,6 @@ public class TestActivePassiveHA { instanceId1 = UUID.randomUUID().toString(); miniHS2_1.start(getConfOverlay(instanceId1)); - while (!miniHS2_1.isStarted()) { - Thread.sleep(100); - } assertEquals(false, miniHS2_1.isLeader()); url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; assertEquals("false", sendGet(url)); @@ -264,17 +257,11 @@ public class TestActivePassiveHA { public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { String instanceId1 = UUID.randomUUID().toString(); miniHS2_1.start(getConfOverlay(instanceId1)); - while (!miniHS2_1.isStarted()) { - Thread.sleep(100); - } String instanceId2 = UUID.randomUUID().toString(); Map<String, String> confOverlay = getConfOverlay(instanceId2); confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); miniHS2_2.start(confOverlay); - while (!miniHS2_2.isStarted()) { - Thread.sleep(100); - } assertEquals(true, miniHS2_1.isLeader()); String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; @@ -323,6 +310,92 @@ public class TestActivePassiveHA { openConnectionAndRunQuery(zkJdbcUrl); } + @Test(timeout = 60000) + public void testManualFailover() throws Exception { + setPamConfs(hiveConf1); + setPamConfs(hiveConf2); + PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1); + PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2); + try { + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.setPamAuthenticator(pamAuthenticator1); + miniHS2_1.start(getSecureConfOverlay(instanceId1)); + String instanceId2 = UUID.randomUUID().toString(); + Map<String, String> confOverlay = getSecureConfOverlay(instanceId2); + confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); + confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); + miniHS2_2.setPamAuthenticator(pamAuthenticator2); + miniHS2_2.start(confOverlay); + String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + + // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1, true)); + + // trigger failover on miniHS2_1 + String resp = sendDelete(url1, true); + assertTrue(resp.contains("Failover successful!")); + + // make sure miniHS2_1 is not leader + assertEquals(false, miniHS2_1.isLeader()); + assertEquals("false", sendGet(url1, true)); + + // make sure miniHS2_2 is the new leader + assertEquals(true, miniHS2_2.isLeader()); + assertEquals("true", sendGet(url2, true)); + + // send failover request again to miniHS2_1 and get a failure + resp = sendDelete(url1, true); + assertTrue(resp.contains("Cannot failover an instance that is not a leader")); + assertEquals(false, miniHS2_1.isLeader()); + + // send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners) + resp = sendDelete(url2, true); + assertTrue(resp.contains("Failover successful!")); + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1, true)); + assertEquals("false", sendGet(url2, true)); + assertEquals(false, miniHS2_2.isLeader()); + } finally { + // revert configs to not affect other tests + unsetPamConfs(hiveConf1); + unsetPamConfs(hiveConf2); + } + } + + @Test(timeout = 60000) + public void testManualFailoverUnauthorized() throws Exception { + setPamConfs(hiveConf1); + PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1); + try { + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.setPamAuthenticator(pamAuthenticator1); + miniHS2_1.start(getSecureConfOverlay(instanceId1)); + + // dummy HS2 instance just to trigger failover + String instanceId2 = UUID.randomUUID().toString(); + Map<String, String> confOverlay = getSecureConfOverlay(instanceId2); + confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); + confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); + miniHS2_2.start(confOverlay); + + String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1, true)); + + // trigger failover on miniHS2_1 without authorization header + assertEquals("Unauthorized", sendDelete(url1, false)); + assertTrue(sendDelete(url1, true).contains("Failover successful!")); + assertEquals(false, miniHS2_1.isLeader()); + assertEquals(true, miniHS2_2.isLeader()); + } finally { + // revert configs to not affect other tests + unsetPamConfs(hiveConf1); + } + } + private Connection getConnection(String jdbcURL, String user) throws SQLException { return DriverManager.getConnection(jdbcURL, user, "bar"); } @@ -346,23 +419,62 @@ public class TestActivePassiveHA { } private String sendGet(String url) throws Exception { - URL obj = new URL(url); - HttpURLConnection con = (HttpURLConnection) obj.openConnection(); - con.setRequestMethod("GET"); - BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); - String inputLine; - StringBuilder response = new StringBuilder(); - while ((inputLine = in.readLine()) != null) { - response.append(inputLine); + return sendGet(url, false); + } + + private String sendGet(String url, boolean enableAuth) throws Exception { + return sendAuthMethod(new GetMethod(url), enableAuth); + } + + private String sendDelete(String url, boolean enableAuth) throws Exception { + return sendAuthMethod(new DeleteMethod(url), enableAuth); + } + + private String sendAuthMethod(HttpMethodBase method, boolean enableAuth) throws Exception { + HttpClient client = new HttpClient(); + try { + if (enableAuth) { + String userPass = ADMIN_USER + ":" + ADMIN_PASSWORD; + method.addRequestHeader(HttpHeaders.AUTHORIZATION, + "Basic " + new String(Base64.getEncoder().encode(userPass.getBytes()))); + } + int statusCode = client.executeMethod(method); + if (statusCode == 200) { + return method.getResponseBodyAsString(); + } else { + return method.getStatusLine().getReasonPhrase(); + } + } finally { + method.releaseConnection(); } - in.close(); - return response.toString(); } - private Map<String,String> getConfOverlay(final String instanceId) { + private Map<String, String> getConfOverlay(final String instanceId) { Map<String, String> confOverlay = new HashMap<>(); confOverlay.put("hive.server2.zookeeper.publish.configs", "true"); confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId); return confOverlay; } + + private Map<String, String> getSecureConfOverlay(final String instanceId) { + Map<String, String> confOverlay = new HashMap<>(); + confOverlay.put("hive.server2.zookeeper.publish.configs", "true"); + confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId); + confOverlay.put("hadoop.security.instrumentation.requires.admin", "true"); + confOverlay.put("hadoop.security.authorization", "true"); + confOverlay.put(ConfVars.USERS_IN_ADMIN_ROLE.varname, ADMIN_USER); + return confOverlay; + } + + private void setPamConfs(final HiveConf hiveConf) { + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_PAM_SERVICES, "sshd"); + hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM, true); + hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, true); + } + + private void unsetPamConfs(final HiveConf hiveConf) { + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_PAM_SERVICES, ""); + hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM, false); + hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, false); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index adf7018..fa5edec 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim; import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.http.security.PamAuthenticator; import org.apache.hive.jdbc.Utils; import org.apache.hive.service.Service; import org.apache.hive.service.cli.CLIServiceClient; @@ -76,6 +77,7 @@ public class MiniHS2 extends AbstractHiveService { private final boolean isMetastoreSecure; private MiniClusterType miniClusterType = MiniClusterType.LOCALFS_ONLY; private boolean usePortsFromConf = false; + private PamAuthenticator pamAuthenticator; public enum MiniClusterType { MR, @@ -352,6 +354,9 @@ public class MiniHS2 extends AbstractHiveService { for (int tryCount = 0; (tryCount < MetaStoreTestUtils.RETRY_COUNT); tryCount++) { try { hiveServer2 = new HiveServer2(); + if (pamAuthenticator != null) { + hiveServer2.setPamAuthenticator(pamAuthenticator); + } hiveServer2.init(getHiveConf()); hiveServer2.start(); hs2Started = true; @@ -411,6 +416,10 @@ public class MiniHS2 extends AbstractHiveService { return hiveServer2.isLeader(); } + public void setPamAuthenticator(final PamAuthenticator pamAuthenticator) { + this.pamAuthenticator = pamAuthenticator; + } + public CLIServiceClient getServiceClient() { verifyStarted(); return getServiceClientInternal(); http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java index 7c75489..f4b4362 100644 --- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instance> implements - ServiceRegistry<HiveServer2Instance>, HiveServer2HAInstanceSet { + ServiceRegistry<HiveServer2Instance>, HiveServer2HAInstanceSet, HiveServer2.FailoverHandler { private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class); static final String ACTIVE_ENDPOINT = "activeEndpoint"; static final String PASSIVE_ENDPOINT = "passiveEndpoint"; @@ -60,6 +60,8 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan private static final String INSTANCE_GROUP = "instances"; private static final String LEADER_LATCH_PATH = "/_LEADER"; private LeaderLatch leaderLatch; + private Map<LeaderLatchListener, ExecutorService> registeredListeners = new HashMap<>(); + private String latchPath; private ServiceRecord srv; private boolean isClient; private final String uniqueId; @@ -80,7 +82,7 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); String zkNameSpacePrefix = zkNameSpace + "-"; return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab, - SASL_LOGIN_CONTEXT_NAME, conf, isClient); + isClient ? null : SASL_LOGIN_CONTEXT_NAME, conf, isClient); } private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix, @@ -96,7 +98,8 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan } else { this.uniqueId = UNIQUE_ID.toString(); } - leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER); + this.latchPath = leaderLatchPath; + this.leaderLatch = getNewLeaderLatchPath(); } @Override @@ -105,7 +108,7 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan if (!isClient) { this.srv = getNewServiceRecord(); register(); - leaderLatch.addListener(new HS2LeaderLatchListener()); + registerLeaderLatchListener(new HS2LeaderLatchListener(), null); try { // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader // which can be verified via leaderLatch.hasLeadership() @@ -205,6 +208,38 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan return leaderLatch.hasLeadership(); } + @Override + public void failover() throws Exception { + if (hasLeadership()) { + LOG.info("Failover request received for HS2 instance: {}. Restarting leader latch..", uniqueId); + leaderLatch.close(LeaderLatch.CloseMode.NOTIFY_LEADER); + leaderLatch = getNewLeaderLatchPath(); + // re-attach all registered listeners + for (Map.Entry<LeaderLatchListener, ExecutorService> registeredListener : registeredListeners.entrySet()) { + if (registeredListener.getValue() == null) { + leaderLatch.addListener(registeredListener.getKey()); + } else { + leaderLatch.addListener(registeredListener.getKey(), registeredListener.getValue()); + } + } + leaderLatch.start(); + LOG.info("Failover complete. Leader latch restarted successfully. New leader: {}", + leaderLatch.getLeader().getId()); + } else { + LOG.warn("Failover request received for HS2 instance: {} that is not leader. Skipping..", uniqueId); + } + } + + /** + * Returns a new instance of leader latch path but retains the same uniqueId. This is only used when HS2 startsup or + * when a manual failover is triggered (in which case uniqueId will still remain as the instance has not restarted) + * + * @return - new leader latch + */ + private LeaderLatch getNewLeaderLatchPath() { + return new LeaderLatch(zooKeeperClient, latchPath, uniqueId, LeaderLatch.CloseMode.NOTIFY_LEADER); + } + private class HS2LeaderLatchListener implements LeaderLatchListener { // leadership state changes and sending out notifications to listener happens inside synchronous method in curator. @@ -283,7 +318,12 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan * @param executorService - event handler executor service */ void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) { - leaderLatch.addListener(latchListener, executorService); + registeredListeners.put(latchListener, executorService); + if (executorService == null) { + leaderLatch.addListener(latchListener); + } else { + leaderLatch.addListener(latchListener, executorService); + } } private Map<String, String> getConfsToPublish() { @@ -291,6 +331,9 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan // Hostname confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname)); + // Web port + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT.varname)); // Hostname:port confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG)); confsToPublish.put(UNIQUE_IDENTIFIER, uniqueId); http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index bb92c44..90ba752 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,9 +18,15 @@ package org.apache.hive.service.server; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +45,9 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethodBase; +import org.apache.commons.httpclient.methods.DeleteMethod; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.curator.framework.CuratorFramework; @@ -87,12 +96,14 @@ import org.apache.hive.http.security.PamAuthenticator; import org.apache.hive.service.CompositeService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; import org.apache.hive.service.servlet.HS2LeadershipStatus; import org.apache.hive.service.servlet.HS2Peers; import org.apache.hive.service.servlet.QueryProfileServlet; +import org.apache.http.HttpHeaders; import org.apache.logging.log4j.util.Strings; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -150,6 +161,10 @@ public class HiveServer2 extends CompositeService { this.pamAuthenticator = pamAuthenticator; } + @VisibleForTesting + public void setPamAuthenticator(PamAuthenticator pamAuthenticator) { + this.pamAuthenticator = pamAuthenticator; + } @Override public synchronized void init(HiveConf hiveConf) { @@ -222,6 +237,22 @@ public class HiveServer2 extends CompositeService { this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE); + try { + if (serviceDiscovery) { + serviceUri = getServerInstanceURI(); + addConfsToPublish(hiveConf, confsToPublish, serviceUri); + if (activePassiveHA) { + hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); + leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get()); + leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Leader Actions Handler Thread").build()); + hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); + } + } + } catch (Exception e) { + throw new ServiceException(e); + } + // Setup web UI final int webUIPort; final String webHost; @@ -295,6 +326,7 @@ public class HiveServer2 extends CompositeService { } if (serviceDiscovery && activePassiveHA) { builder.setContextAttribute("hs2.isLeader", isLeader); + builder.setContextAttribute("hs2.failover.callback", new FailoverHandlerCallback(hs2HARegistry)); builder.setContextAttribute("hiveconf", hiveConf); builder.addServlet("leader", HS2LeadershipStatus.class); builder.addServlet("peers", HS2Peers.class); @@ -311,22 +343,6 @@ public class HiveServer2 extends CompositeService { throw new ServiceException(ie); } - try { - if (serviceDiscovery) { - serviceUri = getServerInstanceURI(); - addConfsToPublish(hiveConf, confsToPublish, serviceUri); - if (activePassiveHA) { - hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); - leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get()); - leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Leader Actions Handler Thread").build()); - hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); - } - } - } catch (Exception e) { - throw new ServiceException(e); - } - // Add a shutdown hook for catching SIGTERM & SIGINT ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop()); } @@ -532,7 +548,22 @@ public class HiveServer2 extends CompositeService { return isLeader.get(); } + interface FailoverHandler { + void failover() throws Exception; + } + + public static class FailoverHandlerCallback implements FailoverHandler { + private HS2ActivePassiveHARegistry hs2HARegistry; + + FailoverHandlerCallback(HS2ActivePassiveHARegistry hs2HARegistry) { + this.hs2HARegistry = hs2HARegistry; + } + @Override + public void failover() throws Exception { + hs2HARegistry.failover(); + } + } /** * The watcher class which sets the de-register flag when the znode corresponding to this server * instance is deleted. Additionally, it shuts down the server if there are no more active client @@ -663,6 +694,9 @@ public class HiveServer2 extends CompositeService { public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); + // TODO: should we explicitly close client connections with appropriate error msg? SessionManager.closeSession() + // will shut itself down upon explicit --deregister after all connections are closed. Something similar but for + // failover. hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); } @@ -998,6 +1032,19 @@ public class HiveServer2 extends CompositeService { .withLongOpt("deregister") .withDescription("Deregister all instances of given version from dynamic service discovery") .create()); + // --listHAPeers + options.addOption(OptionBuilder + .hasArgs(0) + .withLongOpt("listHAPeers") + .withDescription("List all HS2 instances when running in Active Passive HA mode") + .create()); + // --failover <workerIdentity> + options.addOption(OptionBuilder + .hasArgs(1) + .withArgName("workerIdentity") + .withLongOpt("failover") + .withDescription("Manually failover Active HS2 instance to passive standby mode") + .create()); options.addOption(new Option("H", "help", false, "Print help information")); } @@ -1027,6 +1074,18 @@ public class HiveServer2 extends CompositeService { return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor( commandLine.getOptionValue("deregister"))); } + + // Process --listHAPeers + if (commandLine.hasOption("listHAPeers")) { + return new ServerOptionsProcessorResponse(new ListHAPeersExecutor()); + } + + // Process --failover + if (commandLine.hasOption("failover")) { + return new ServerOptionsProcessorResponse(new FailoverHS2InstanceExecutor( + commandLine.getOptionValue("failover") + )); + } } catch (ParseException e) { // Error out & exit - we were not able to parse the args successfully System.err.println("Error starting HiveServer2 with given arguments: "); @@ -1124,4 +1183,112 @@ public class HiveServer2 extends CompositeService { System.exit(0); } } + + /** + * Handler for --failover <workerIdentity> command. The way failover works is, + * - the client gets <workerIdentity> from user input + * - the client uses HS2 HA registry to get list of HS2 instances and finds the one that matches <workerIdentity> + * - if there is a match, client makes sure the instance is a leader (only leader can failover) + * - if the matched instance is a leader, its web endpoint is obtained from service record then http DELETE method + * is invoked on /leader endpoint (Yes. Manual failover requires web UI to be enabled) + * - the webpoint checks if admin ACLs are set, if so will close and restart the leader latch triggering a failover + */ + static class FailoverHS2InstanceExecutor implements ServerOptionsExecutor { + private final String workerIdentity; + + FailoverHS2InstanceExecutor(String workerIdentity) { + this.workerIdentity = workerIdentity; + } + + @Override + public void execute() { + try { + HiveConf hiveConf = new HiveConf(); + HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); + Collection<HiveServer2Instance> hs2Instances = haRegistry.getAll(); + // no HS2 instances are running + if (hs2Instances.isEmpty()) { + LOG.error("No HiveServer2 instances are running in HA mode"); + System.err.println("No HiveServer2 instances are running in HA mode"); + System.exit(-1); + } + HiveServer2Instance targetInstance = null; + for (HiveServer2Instance instance : hs2Instances) { + if (instance.getWorkerIdentity().equals(workerIdentity)) { + targetInstance = instance; + break; + } + } + // no match for workerIdentity + if (targetInstance == null) { + LOG.error("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity); + System.err.println("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity); + System.exit(-1); + } + // only one HS2 instance available (cannot failover) + if (hs2Instances.size() == 1) { + LOG.error("Only one HiveServer2 instance running in thefail cluster. Cannot failover: " + workerIdentity); + System.err.println("Only one HiveServer2 instance running in the cluster. Cannot failover: " + workerIdentity); + System.exit(-1); + } + // matched HS2 instance is not leader + if (!targetInstance.isLeader()) { + LOG.error("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover"); + System.err.println("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover"); + System.exit(-1); + } + + String webPort = targetInstance.getProperties().get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname); + // web port cannot be obtained + if (StringUtils.isEmpty(webPort)) { + LOG.error("Unable to determine web port for instance: " + workerIdentity); + System.err.println("Unable to determine web port for instance: " + workerIdentity); + System.exit(-1); + } + + // invoke DELETE /leader endpoint for failover + String webEndpoint = "http://" + targetInstance.getHost() + ":" + webPort + "/leader"; + HttpClient client = new HttpClient(); + HttpMethodBase method = new DeleteMethod(webEndpoint); + try { + int statusCode = client.executeMethod(method); + if (statusCode == 200) { + System.out.println(method.getResponseBodyAsString()); + } else { + String response = method.getStatusLine().getReasonPhrase(); + LOG.error("Unable to failover HiveServer2 instance: " + workerIdentity + ". status code: " + + statusCode + "error: " + response); + System.err.println("Unable to failover HiveServer2 instance: " + workerIdentity + ". status code: " + + statusCode + " error: " + response); + System.exit(-1); + } + } finally { + method.releaseConnection(); + } + } catch (IOException e) { + LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e); + System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e); + System.exit(-1); + } + System.exit(0); + } + } + + static class ListHAPeersExecutor implements ServerOptionsExecutor { + @Override + public void execute() { + try { + HiveConf hiveConf = new HiveConf(); + HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); + HS2Peers.HS2Instances hs2Instances = new HS2Peers.HS2Instances(haRegistry.getAll()); + String jsonOut = hs2Instances.toJson(); + System.out.println(jsonOut); + } catch (IOException e) { + LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e); + System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e); + System.exit(-1); + } + System.exit(0); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java index 33529ed..708fa0c 100644 --- a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java +++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java @@ -17,6 +17,8 @@ */ package org.apache.hive.service.servlet; +import static org.apache.hive.http.HttpServer.CONF_CONTEXT_ATTRIBUTE; + import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,18 +27,36 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; +import org.apache.hive.http.HttpServer; +import org.apache.hive.service.server.HiveServer2; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Returns "true" if this HS2 instance is leader else "false". + * Invoking a "DELETE" method on this endpoint will trigger a failover if this instance is a leader. + * hadoop.security.instrumentation.requires.admin should be set to true and current user has to be in admin ACLS + * for accessing any of these endpoints. */ public class HS2LeadershipStatus extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class); @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + // admin check - + // allows when hadoop.security.instrumentation.requires.admin is set to false + // when hadoop.security.instrumentation.requires.admin is set to true, checks if hadoop.security.authorization + // is true and if the logged in user (via PAM or SPNEGO + kerberos) is in hive.users.in.admin.role list + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + LOG.warn("Unauthorized to perform GET action. remoteUser: {}", request.getRemoteUser()); + return; + } + ServletContext ctx = getServletContext(); AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader"); LOG.info("Returning isLeader: {}", isLeader); @@ -45,4 +65,73 @@ public class HS2LeadershipStatus extends HttpServlet { response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); } + + private class FailoverResponse { + private boolean success; + private String message; + + FailoverResponse() { } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(final boolean success) { + this.success = success; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + } + + @Override + public void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws IOException { + // strict admin check - + // allows ONLY if hadoop.security.instrumentation.requires.admin is set to true + // when hadoop.security.instrumentation.requires.admin is set to true, checks if hadoop.security.authorization + // is true and if the logged in user (via PAM or SPNEGO + kerberos) is in hive.users.in.admin.role list + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowedStrict(context, request, response)) { + LOG.warn("Unauthorized to perform DELETE action. remoteUser: {}", request.getRemoteUser()); + return; + } + + LOG.info("DELETE handler invoked for failover.."); + ObjectMapper mapper = new ObjectMapper(); + FailoverResponse failoverResponse = new FailoverResponse(); + AtomicBoolean isLeader = (AtomicBoolean) context.getAttribute("hs2.isLeader"); + if (!isLeader.get()) { + String msg = "Cannot failover an instance that is not a leader"; + LOG.info(msg); + failoverResponse.setSuccess(false); + failoverResponse.setMessage(msg); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_FORBIDDEN); + return; + } + + HiveServer2.FailoverHandlerCallback failoverHandler = (HiveServer2.FailoverHandlerCallback) context + .getAttribute("hs2.failover.callback"); + try { + String msg = "Failover successful!"; + LOG.info(msg); + failoverHandler.failover(); + failoverResponse.setSuccess(true); + failoverResponse.setMessage(msg); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_OK); + } catch (Exception e) { + String errMsg = "Cannot perform failover of HS2 instance. err: " + e.getMessage(); + LOG.error(errMsg, e); + failoverResponse.setSuccess(false); + failoverResponse.setMessage(errMsg); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), failoverResponse); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/service/src/java/org/apache/hive/service/servlet/HS2Peers.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java index a51bbeb..bde6d6b 100644 --- a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java +++ b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java @@ -26,17 +26,22 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; +import org.apache.hive.http.HttpServer; import org.apache.hive.service.server.HS2ActivePassiveHARegistry; import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; import org.apache.hive.service.server.HiveServer2Instance; -import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Returns all HS2 instances in Active-Passive standy modes. */ public class HS2Peers extends HttpServlet { + private static final Logger LOG = LoggerFactory.getLogger(HS2Peers.class); public static class HS2Instances { private Collection<HiveServer2Instance> hiveServer2Instances; @@ -55,20 +60,32 @@ public class HS2Peers extends HttpServlet { public void setHiveServer2Instances(final Collection<HiveServer2Instance> hiveServer2Instances) { this.hiveServer2Instances = hiveServer2Instances; } + + @JsonIgnore + public String toJson() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(this); + } } @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + // admin check - + // allows when hadoop.security.instrumentation.requires.admin is set to false + // when hadoop.security.instrumentation.requires.admin is set to true, checks if hadoop.security.authorization + // is true and if the logged in user (via PAM or SPNEGO + kerberos) is in hive.users.in.admin.role list + final ServletContext context = getServletContext(); + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + LOG.warn("Unauthorized to perform GET action. remoteUser: {}", request.getRemoteUser()); + return; + } + ServletContext ctx = getServletContext(); HiveConf hiveConf = (HiveConf) ctx.getAttribute("hiveconf"); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); - // serialize json based on field annotations only - mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker() - .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); HS2ActivePassiveHARegistry hs2Registry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); HS2Instances instances = new HS2Instances(hs2Registry.getAll()); - mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), instances); + response.getWriter().write(instances.toJson()); response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); } http://git-wip-us.apache.org/repos/asf/hive/blob/f1d4fcf6/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java b/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java index d1b3ce0..04f66b4 100644 --- a/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java +++ b/service/src/test/org/apache/hive/service/server/TestHS2HttpServerPam.java @@ -155,7 +155,7 @@ public class TestHS2HttpServerPam { public static class TestPamAuthenticator extends PamAuthenticator { private static final Map<String, String> users = new HashMap<>(); - TestPamAuthenticator(HiveConf conf) throws AuthenticationException { + public TestPamAuthenticator(HiveConf conf) throws AuthenticationException { super(conf); }