Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1626482&r1=1626481&r2=1626482&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Sat Sep 20 17:34:39 2014 @@ -18,6 +18,8 @@ package org.apache.hive.service.server; +import java.nio.charset.Charset; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.LogUtils; @@ -26,12 +28,21 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.CompositeService; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; /** * HiveServer2. @@ -42,9 +53,12 @@ public class HiveServer2 extends Composi private CLIService cliService; private ThriftCLIService thriftCLIService; + private String znodePath; + private ZooKeeper zooKeeperClient; + private boolean registeredWithZooKeeper = false; public HiveServer2() { - super("HiveServer2"); + super(HiveServer2.class.getSimpleName()); HiveConf.setLoadHiveServer2Config(true); } @@ -53,20 +67,129 @@ public class HiveServer2 extends Composi public synchronized void init(HiveConf hiveConf) { cliService = new CLIService(); addService(cliService); + if (isHTTPTransportMode(hiveConf)) { + thriftCLIService = new ThriftHttpCLIService(cliService); + } else { + thriftCLIService = new ThriftBinaryCLIService(cliService); + } + addService(thriftCLIService); + thriftCLIService.setHiveServer2(this); + super.init(hiveConf); + // Add a shutdown hook for catching SIGTERM & SIGINT + final HiveServer2 hiveServer2 = this; + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + hiveServer2.stop(); + } + }); + } + + public static boolean isHTTPTransportMode(HiveConf hiveConf) { String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE"); - if(transportMode == null) { + if (transportMode == null) { transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); } - if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) { - thriftCLIService = new ThriftHttpCLIService(cliService); + if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) { + return true; } - else { - thriftCLIService = new ThriftBinaryCLIService(cliService); + return false; + } + + /** + * Adds a server instance to ZooKeeper as a znode. + * + * @param hiveConf + * @throws Exception + */ + private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { + int zooKeeperSessionTimeout = + hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); + String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + String instanceURI = getServerInstanceURI(hiveConf); + byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); + zooKeeperClient = + new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, + new ZooKeeperHiveHelper.DummyWatcher()); + + // Create the parent znodes recursively; ignore if the parent already exists + try { + ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2"); + } catch (KeeperException e) { + if (e.code() != KeeperException.Code.NODEEXISTS) { + LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e); + throw (e); + } } + // Create a znode under the rootNamespace parent for this instance of the server + // Znode name: server-host:port-versionInfo-sequence + try { + String znodePath = + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-" + + HiveVersionInfo.getVersion() + "-"; + znodePath = + zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL); + setRegisteredWithZooKeeper(true); + // Set a watch on the znode + if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) { + // No node exists, throw exception + throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); + } + LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); + } catch (KeeperException e) { + LOG.fatal("Unable to create a znode for this server instance", e); + throw new Exception(e); + } + } - addService(thriftCLIService); - super.init(hiveConf); + /** + * The watcher class which sets the de-register flag when the znode corresponding to this server + * instance is deleted. Additionally, it shuts down the server if there are no more active client + * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper. + */ + private class DeRegisterWatcher implements Watcher { + public void process(WatchedEvent event) { + if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { + HiveServer2.this.setRegisteredWithZooKeeper(false); + // If there are no more active client sessions, stop the server + if (cliService.getSessionManager().getOpenSessionCount() == 0) { + LOG.warn("This instance of HiveServer2 has been removed from the list of server " + + "instances available for dynamic service discovery. " + + "The last client session has ended - will shutdown now."); + HiveServer2.this.stop(); + } + LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. " + + "The server will be shut down after the last client sesssion completes."); + } + } + } + + private void removeServerInstanceFromZooKeeper() throws Exception { + setRegisteredWithZooKeeper(false); + zooKeeperClient.close(); + LOG.info("Server instance removed from ZooKeeper."); + } + + public boolean isRegisteredWithZooKeeper() { + return registeredWithZooKeeper; + } + + private void setRegisteredWithZooKeeper(boolean registeredWithZooKeeper) { + this.registeredWithZooKeeper = registeredWithZooKeeper; + } + + private String getServerInstanceURI(HiveConf hiveConf) throws Exception { + if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) { + throw new Exception("Unable to get the server address; it hasn't been initialized yet."); + } + return thriftCLIService.getServerAddress().getHostName() + ":" + + thriftCLIService.getPortNumber(); } @Override @@ -76,16 +199,25 @@ public class HiveServer2 extends Composi @Override public synchronized void stop() { - super.stop(); - // there should already be an instance of the session pool manager. - // if not, ignoring is fine while stopping the hive server. + LOG.info("Shutting down HiveServer2"); HiveConf hiveConf = this.getHiveConf(); + super.stop(); + // Remove this server instance from ZooKeeper if dynamic service discovery is set + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + try { + removeServerInstanceFromZooKeeper(); + } catch (Exception e) { + LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e); + } + } + // There should already be an instance of the session pool manager. + // If not, ignoring is fine while stopping HiveServer2. if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { try { TezSessionPoolManager.getInstance().stop(); } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of hive server"); - e.printStackTrace(); + LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } @@ -100,7 +232,7 @@ public class HiveServer2 extends Composi private static void startHiveServer2() throws Throwable { long attempts = 0, maxAttempts = 1; - while(true) { + while (true) { HiveConf hiveConf = new HiveConf(); maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS); HiveServer2 server = null; @@ -108,6 +240,11 @@ public class HiveServer2 extends Composi server = new HiveServer2(); server.init(hiveConf); server.start(); + // If we're supporting dynamic service discovery, we'll add the service uri for this + // HiveServer2 instance to Zookeeper as a znode. + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + server.addServerInstanceToZooKeeper(hiveConf); + } if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance(); sessionPool.setupPool(hiveConf); @@ -119,19 +256,19 @@ public class HiveServer2 extends Composi } break; } catch (Throwable throwable) { - if(++attempts >= maxAttempts) { + if (++attempts >= maxAttempts) { throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable); } else { - LOG.warn("Error starting HiveServer2 on attempt " + attempts + - ", will retry in 60 seconds", throwable); + LOG.warn("Error starting HiveServer2 on attempt " + attempts + + ", will retry in 60 seconds", throwable); try { if (server != null) { server.stop(); server = null; } } catch (Exception e) { - LOG.info("Exception caught when calling stop of HiveServer2 before" + - " retrying start", e); + LOG.info( + "Exception caught when calling stop of HiveServer2 before" + " retrying start", e); } try { Thread.sleep(60L * 1000L); @@ -152,14 +289,15 @@ public class HiveServer2 extends Composi System.exit(-1); } - //NOTE: It is critical to do this here so that log4j is reinitialized + // NOTE: It is critical to do this here so that log4j is reinitialized // before any of the other core hive classes are loaded String initLog4jMessage = LogUtils.initHiveLog4j(); LOG.debug(initLog4jMessage); HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG); - //log debug message from "oproc" after log4j initialize properly + // log debug message from "oproc" after log4j initialize properly LOG.debug(oproc.getDebugMessage().toString()); + startHiveServer2(); } catch (LogInitializationException e) { LOG.error("Error initializing log: " + e.getMessage(), e); @@ -169,6 +307,5 @@ public class HiveServer2 extends Composi System.exit(-1); } } - }
Modified: hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java?rev=1626482&r1=1626481&r2=1626482&view=diff ============================================================================== --- hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java (original) +++ hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java Sat Sep 20 17:34:39 2014 @@ -27,7 +27,11 @@ import junit.framework.TestCase; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hive.service.cli.*; +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.ICLIService; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; import org.junit.After; @@ -83,7 +87,7 @@ public class TestSessionGlobalInitFile e // set up service and client HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION, + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION, initFile.getParentFile().getAbsolutePath()); service = new FakeEmbeddedThriftBinaryCLIService(hiveConf); service.init(new HiveConf()); Modified: hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1626482&r1=1626481&r2=1626482&view=diff ============================================================================== --- hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original) +++ hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Sat Sep 20 17:34:39 2014 @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import javax.security.auth.Subject; import javax.security.auth.login.LoginException; @@ -652,6 +653,17 @@ public class Hadoop20Shims implements Ha } @Override + public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs, + FileStatus status) throws IOException { + TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>(); + BlockLocation[] locations = getLocations(fs, status); + for (BlockLocation location : locations) { + offsetBlockMap.put(location.getOffset(), location); + } + return offsetBlockMap; + } + + @Override public void hflush(FSDataOutputStream stream) throws IOException { stream.sync(); } Modified: hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1626482&r1=1626481&r2=1626482&view=diff ============================================================================== --- hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original) +++ hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Sat Sep 20 17:34:39 2014 @@ -27,6 +27,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -403,6 +404,17 @@ public class Hadoop20SShims extends Hado } @Override + public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs, + FileStatus status) throws IOException { + TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>(); + BlockLocation[] locations = getLocations(fs, status); + for (BlockLocation location : locations) { + offsetBlockMap.put(location.getOffset(), location); + } + return offsetBlockMap; + } + + @Override public void hflush(FSDataOutputStream stream) throws IOException { stream.sync(); } Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1626482&r1=1626481&r2=1626482&view=diff ============================================================================== --- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Sat Sep 20 17:34:39 2014 @@ -29,6 +29,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -511,6 +512,17 @@ public class Hadoop23Shims extends Hadoo } @Override + public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs, + FileStatus status) throws IOException { + TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>(); + BlockLocation[] locations = getLocations(fs, status); + for (BlockLocation location : locations) { + offsetBlockMap.put(location.getOffset(), location); + } + return offsetBlockMap; + } + + @Override public void hflush(FSDataOutputStream stream) throws IOException { stream.hflush(); } Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1626482&r1=1626481&r2=1626482&view=diff ============================================================================== --- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original) +++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Sat Sep 20 17:34:39 2014 @@ -30,6 +30,7 @@ import java.security.PrivilegedException import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import javax.security.auth.login.LoginException; @@ -477,6 +478,19 @@ public interface HadoopShims { FileStatus status) throws IOException; /** + * For the block locations returned by getLocations() convert them into a Treemap + * <Offset,blockLocation> by iterating over the list of blockLocation. + * Using TreeMap from offset to blockLocation, makes it O(logn) to get a particular + * block based upon offset. + * @param fs the file system + * @param status the file information + * @return TreeMap<Long, BlockLocation> + * @throws IOException + */ + TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs, + FileStatus status) throws IOException; + + /** * Flush and make visible to other users the changes to the given stream. * @param stream the stream to hflush. * @throws IOException
