Repository: hive
Updated Branches:
  refs/heads/master 2e226d22f -> 4795b9969


http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
new file mode 100644
index 0000000..c773770
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.registry.ServiceInstance;
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import 
org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.KeeperException.InvalidACLException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is currently used for implementation inheritance only; it doesn't 
provide a unified flow
+ * into which one can just plug a few abstract method implementations, because 
providing one with
+ * getInstance method is a huge pain involving lots of generics. Also, 
different registries may
+ * have slightly different usage patterns anyway and noone would use a 
registry without knowing
+ * what type it is. So, it's mostly a grab bag of methods used by 
ServiceInstanceSet and other
+ * parts of each implementation.
+ */
+public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkRegistryBase.class);
+  private final static String SASL_NAMESPACE = "sasl";
+  private final static String UNSECURE_NAMESPACE = "unsecure";
+
+  static final String UNIQUE_IDENTIFIER = "registry.unique.id";
+  private static final UUID uniq = UUID.randomUUID();
+
+  protected final Configuration conf;
+  protected final CuratorFramework zooKeeperClient;
+  // userPathPrefix is the path specific to the user for which ACLs should be 
restrictive.
+  // workersPath is the directory path where all the worker znodes are located.
+  protected final String workersPath;
+  private final String userPathPrefix, workerNodePrefix;
+
+  protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode 
data
+
+  private final Set<ServiceInstanceStateChangeListener<InstanceType>> 
stateChangeListeners;
+
+  private final boolean doCheckAcls;
+  // Secure ZK is only set up by the registering service; anyone can read the 
registrations.
+  private final String zkPrincipal, zkKeytab, saslLoginContextName;
+  private String userNameFromPrincipal; // Only set when setting up the secure 
config for ZK.
+  private final String disableMessage;
+
+  private final Lock instanceCacheLock = new ReentrantLock();
+  private final Map<String, Set<InstanceType>> pathToInstanceCache;
+  private final Map<String, Set<InstanceType>> nodeToInstanceCache;
+
+  // The registration znode.
+  private PersistentEphemeralNode znode;
+  private String znodePath; // unique identity for this instance
+
+  private PathChildrenCache instancesCache; // Created on demand.
+
+  /** Local hostname. */
+  protected static final String hostname;
+  static {
+    String localhost = "localhost";
+    try {
+      localhost = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException uhe) {
+      // ignore
+    }
+    hostname = localhost;
+  }
+
+  /**
+   * @param rootNs A single root namespace override. Not recommended.
+   * @param nsPrefix The namespace prefix to use with default namespaces.
+   * @param userScopePathPrefix The prefix to use for the user-specific part 
of the path.
+   * @param workerPrefix The prefix to use for each worker znode.
+   * @param zkSaslLoginContextName SASL login context name for ZK security; 
null if not needed.
+   * @param zkPrincipal ZK security principal.
+   * @param zkKeytab ZK security keytab.
+   * @param aclsConfig A config setting to use to determine if ACLs should be 
verified.
+   */
+  public ZkRegistryBase(String instanceName, Configuration conf, String 
rootNs, String nsPrefix,
+      String userScopePathPrefix, String workerPrefix,
+      String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, 
ConfVars aclsConfig) {
+    this.conf = new Configuration(conf);
+    this.saslLoginContextName = zkSaslLoginContextName;
+    this.zkPrincipal = zkPrincipal;
+    this.zkKeytab = zkKeytab;
+    if (aclsConfig != null) {
+      this.doCheckAcls = HiveConf.getBoolVar(conf, aclsConfig);
+      this.disableMessage = "Set " + aclsConfig.varname + " to false to 
disable ACL validation";
+    } else {
+      this.doCheckAcls = true;
+      this.disableMessage = "";
+    }
+    this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+    String zkEnsemble = getQuorumServers(this.conf);
+    this.encoder = new RegistryUtils.ServiceRecordMarshal();
+    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+        ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+        ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 
TimeUnit.MILLISECONDS);
+    int maxRetries = HiveConf.getIntVar(conf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+
+    // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000
+    // worker-0000000 is the sequence number which will be retained until 
session timeout. If a
+    // worker does not respond due to communication interruptions it will 
retain the same sequence
+    // number when it returns back. If session timeout expires, the node will 
be deleted and new
+    // addition of the same node (restart) will get next sequence number
+    this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf);
+    this.workerNodePrefix = workerPrefix;
+    this.workersPath =  "/" + userPathPrefix + "/" + instanceName + "/workers";
+    this.instancesCache = null;
+    this.stateChangeListeners = new HashSet<>();
+    this.pathToInstanceCache = new ConcurrentHashMap<>();
+    this.nodeToInstanceCache = new ConcurrentHashMap<>();
+
+    final boolean isSecure = UserGroupInformation.isSecurityEnabled();
+    ACLProvider zooKeeperAclProvider = new ACLProvider() {
+      @Override
+      public List<ACL> getDefaultAcl() {
+        // We always return something from getAclForPath so this should not 
happen.
+        LOG.warn("getDefaultAcl was called");
+        return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+      }
+
+      @Override
+      public List<ACL> getAclForPath(String path) {
+        if (!isSecure || path == null || !path.contains(userPathPrefix)) {
+          // No security or the path is below the user path - full access.
+          return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        }
+        return createSecureAcls();
+      }
+    };
+    if (rootNs == null) {
+      rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // 
The normal path.
+    }
+
+    // Create a CuratorFramework instance to be used as the ZooKeeper client
+    // Use the zooKeeperAclProvider to create appropriate ACLs
+    this.zooKeeperClient = CuratorFrameworkFactory.builder()
+        .connectString(zkEnsemble)
+        .sessionTimeoutMs(sessionTimeout)
+        .aclProvider(zooKeeperAclProvider)
+        .namespace(rootNs)
+        .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+        .build();
+  }
+
+  private static List<ACL> createSecureAcls() {
+    // Read all to the world
+    List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
+    // Create/Delete/Write/Admin to creator
+    nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
+    return nodeAcls;
+  }
+
+  /**
+   * Get the ensemble server addresses from the configuration. The format is: 
host1:port,
+   * host2:port..
+   *
+   * @param conf
+   **/
+  private String getQuorumServers(Configuration conf) {
+    String[] hosts = 
conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
+    String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
+        ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
+    StringBuilder quorum = new StringBuilder();
+    for (int i = 0; i < hosts.length; i++) {
+      quorum.append(hosts[i].trim());
+      if (!hosts[i].contains(":")) {
+        // if the hostname doesn't contain a port, add the configured port to 
hostname
+        quorum.append(":");
+        quorum.append(port);
+      }
+
+      if (i != hosts.length - 1) {
+        quorum.append(",");
+      }
+    }
+
+    return quorum.toString();
+  }
+
+  protected abstract String getZkPathUser(Configuration conf);
+
+  protected final String registerServiceRecord(ServiceRecord srv) throws 
IOException {
+    // restart sensitive instance id
+    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+
+    // Create a znode under the rootNamespace parent for this instance of the 
server
+    try {
+      // PersistentEphemeralNode will make sure the ephemeral node created on 
server will be present
+      // even under connection or session interruption (will automatically 
handle retries)
+      znode = new PersistentEphemeralNode(zooKeeperClient, 
Mode.EPHEMERAL_SEQUENTIAL,
+          workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
+
+      // start the creation of znodes
+      znode.start();
+
+      // We'll wait for 120s for node creation
+      long znodeCreationTimeout = 120;
+      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) 
{
+        throw new Exception(
+            "Max znode creation wait time: " + znodeCreationTimeout + "s 
exhausted");
+      }
+
+      znodePath = znode.getActualPath();
+
+      if (doCheckAcls) {
+        try {
+          checkAndSetAcls();
+        } catch (Exception ex) {
+          throw new IOException("Error validating or setting ACLs. " + 
disableMessage, ex);
+        }
+      }
+      if (zooKeeperClient.checkExists().forPath(znodePath) == null) {
+        // No node exists, throw exception
+        throw new Exception("Unable to create znode for this instance on 
ZooKeeper.");
+      }
+    } catch (Exception e) {
+      LOG.error("Unable to create a znode for this server instance", e);
+      CloseableUtils.closeQuietly(znode);
+      throw (e instanceof IOException) ? (IOException)e : new IOException(e);
+    }
+    return uniq.toString();
+  }
+
+
+  private void checkAndSetAcls() throws Exception {
+    if (!UserGroupInformation.isSecurityEnabled()) return;
+    // We are trying to check ACLs on the "workers" directory, which noone 
except us should be
+    // able to write to. Higher-level directories shouldn't matter - we don't 
read them.
+    String pathToCheck = workersPath;
+    List<ACL> acls = zooKeeperClient.getACL().forPath(pathToCheck);
+    if (acls == null || acls.isEmpty()) {
+      // Can there be no ACLs? There's some access (to get ACLs), so assume it 
means free for all.
+      LOG.warn("No ACLs on "  + pathToCheck + "; setting up ACLs. " + 
disableMessage);
+      setUpAcls(pathToCheck);
+      return;
+    }
+    // This could be brittle.
+    assert userNameFromPrincipal != null;
+    Id currentUser = new Id("sasl", userNameFromPrincipal);
+    for (ACL acl : acls) {
+      if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || 
currentUser.equals(acl.getId())) {
+        continue; // Read permission/no permissions, or the expected user.
+      }
+      LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck
+        + "; setting up ACLs. " + disableMessage);
+      setUpAcls(pathToCheck);
+      return;
+    }
+  }
+
+  private void setUpAcls(String path) throws Exception {
+    List<ACL> acls = createSecureAcls();
+    LinkedList<String> paths = new LinkedList<>();
+    paths.add(path);
+    while (!paths.isEmpty()) {
+      String currentPath = paths.poll();
+      List<String> children = 
zooKeeperClient.getChildren().forPath(currentPath);
+      if (children != null) {
+        for (String child : children) {
+          paths.add(currentPath + "/" + child);
+        }
+      }
+      zooKeeperClient.setACL().withACL(acls).forPath(currentPath);
+    }
+  }
+
+  private void addToCache(String path, String host, InstanceType instance) {
+    instanceCacheLock.lock();
+    try {
+      putInCache(path, pathToInstanceCache, instance);
+      putInCache(host, nodeToInstanceCache, instance);
+    } finally {
+      instanceCacheLock.unlock();
+    }
+    LOG.debug("Added path={}, host={} instance={} to cache."
+            + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}",
+        path, host, instance, pathToInstanceCache.size(), 
nodeToInstanceCache.size());
+  }
+
+  private void removeFromCache(String path, String host) {
+    instanceCacheLock.lock();
+    try {
+      pathToInstanceCache.remove(path);
+      nodeToInstanceCache.remove(host);
+    } finally {
+      instanceCacheLock.unlock();
+    }
+    LOG.debug("Removed path={}, host={} from cache."
+            + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}",
+        path, host, pathToInstanceCache.size(), nodeToInstanceCache.size());
+  }
+
+  private void putInCache(String key, Map<String, Set<InstanceType>> cache,
+      InstanceType instance) {
+    Set<InstanceType> instanceSet = cache.get(key);
+    if (instanceSet == null) {
+      instanceSet = Sets.newHashSet();
+      cache.put(key, instanceSet);
+    }
+    instanceSet.add(instance);
+  }
+
+  protected final void populateCache(PathChildrenCache instancesCache) {
+    for (ChildData childData : instancesCache.getCurrentData()) {
+      byte[] data = getWorkerData(childData, workerNodePrefix);
+      if (data == null) continue;
+      try {
+        ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+        InstanceType instance = createServiceInstance(srv);
+        addToCache(childData.getPath(), instance.getHost(), instance);
+      } catch (IOException e) {
+        LOG.error("Unable to decode data for zkpath: {}." +
+            " Ignoring from current instances list..", childData.getPath());
+      }
+    }
+  }
+
+  protected abstract InstanceType createServiceInstance(ServiceRecord srv) 
throws IOException;
+
+  protected static final byte[] getWorkerData(ChildData childData, String 
workerNodePrefix) {
+    if (childData == null) return null;
+    byte[] data = childData.getData();
+    if (data == null) return null;
+    if (!extractNodeName(childData).startsWith(workerNodePrefix)) return null;
+    return data;
+  }
+
+  private class InstanceStateChangeListener implements 
PathChildrenCacheListener {
+    private final Logger LOG = 
LoggerFactory.getLogger(InstanceStateChangeListener.class);
+
+    @Override
+    public void childEvent(final CuratorFramework client, final 
PathChildrenCacheEvent event)
+        throws Exception {
+      Preconditions.checkArgument(client != null
+          && client.getState() == CuratorFrameworkState.STARTED, "client is 
not started");
+
+      synchronized (this) {
+        ChildData childData = event.getData();
+        if (childData == null) return;
+        String nodeName = extractNodeName(childData);
+        if (!nodeName.startsWith(workerNodePrefix)) return;
+        LOG.info("{} for zknode {}", event.getType(), childData.getPath());
+        InstanceType instance = extractServiceInstance(event, childData);
+        switch (event.getType()) {
+        case CHILD_ADDED:
+          addToCache(childData.getPath(), instance.getHost(), instance);
+          for (ServiceInstanceStateChangeListener<InstanceType> listener : 
stateChangeListeners) {
+            listener.onCreate(instance);
+          }
+          break;
+        case CHILD_UPDATED:
+          addToCache(childData.getPath(), instance.getHost(), instance);
+          for (ServiceInstanceStateChangeListener<InstanceType> listener : 
stateChangeListeners) {
+            listener.onUpdate(instance);
+          }
+          break;
+        case CHILD_REMOVED:
+          removeFromCache(childData.getPath(), instance.getHost());
+          for (ServiceInstanceStateChangeListener<InstanceType> listener : 
stateChangeListeners) {
+            listener.onRemove(instance);
+          }
+          break;
+        default:
+          // Ignore all the other events; logged above.
+        }
+      }
+    }
+  }
+
+  // The real implementation for the instanceset... instanceset has its own 
copy of the
+  // ZK cache yet completely depends on the parent in every other aspect and 
is thus unneeded.
+
+  public int size() {
+    // not using the path child cache here as there could be more than 1 path 
per host (worker and slot znodes)
+    return nodeToInstanceCache.size();
+  }
+
+  protected final Set<InstanceType> getByHost(String host) {
+    Set<InstanceType> byHost = nodeToInstanceCache.get(host);
+    byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Returning " + byHost.size() + " hosts for locality allocation 
on " + host);
+    }
+    return byHost;
+  }
+
+  protected final Collection<InstanceType> getAll() {
+    Set<InstanceType> instances =  new HashSet<>();
+    for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) {
+      instances.addAll(instanceSet);
+    }
+    return instances;
+  }
+
+  private static String extractNodeName(ChildData childData) {
+    String nodeName = childData.getPath();
+    int ix = nodeName.lastIndexOf("/");
+    if (ix >= 0) {
+      nodeName = nodeName.substring(ix + 1);
+    }
+    return nodeName;
+  }
+
+  private InstanceType extractServiceInstance(
+      PathChildrenCacheEvent event, ChildData childData) {
+    byte[] data = childData.getData();
+    if (data == null) return null;
+    try {
+      ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data);
+      return createServiceInstance(srv);
+    } catch (IOException e) {
+      LOG.error("Unable to decode data for zknode: {}." +
+          " Dropping notification of type: {}", childData.getPath(), 
event.getType());
+      return null;
+    }
+  }
+
+  public synchronized void registerStateChangeListener(
+      ServiceInstanceStateChangeListener<InstanceType> listener) throws 
IOException {
+    ensureInstancesCache(0);
+    this.stateChangeListeners.add(listener);
+  }
+
+  @SuppressWarnings("resource") // Bogus warnings despite closeQuietly.
+  protected final synchronized PathChildrenCache ensureInstancesCache(
+      long clusterReadyTimeoutMs) throws IOException {
+    Preconditions.checkArgument(zooKeeperClient != null &&
+            zooKeeperClient.getState() == CuratorFrameworkState.STARTED, 
"client is not started");
+    // lazily create PathChildrenCache
+    PathChildrenCache instancesCache = this.instancesCache;
+    if (instancesCache != null) return instancesCache;
+    ExecutorService tp = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder()
+              
.setDaemon(true).setNameFormat("StateChangeNotificationHandler").build());
+    long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 
1000000L;
+    long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs);
+    while (true) {
+      instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, 
true);
+      instancesCache.getListenable().addListener(new 
InstanceStateChangeListener(), tp);
+      try {
+        instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+        this.instancesCache = instancesCache;
+        return instancesCache;
+      } catch (InvalidACLException e) {
+        // PathChildrenCache tried to mkdir when the znode wasn't there, and 
failed.
+        CloseableUtils.closeQuietly(instancesCache);
+        long elapsedNs = System.nanoTime() - startTimeNs;
+        if (deltaNs == 0 || deltaNs <= elapsedNs) {
+          LOG.error("Unable to start curator PathChildrenCache", e);
+          throw new IOException(e);
+        }
+        LOG.warn("The cluster is not started yet (InvalidACL); will retry");
+        try {
+          Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L));
+        } catch (InterruptedException e1) {
+          LOG.error("Interrupted while retrying the PathChildrenCache 
startup");
+          throw new IOException(e1);
+        }
+        sleepTimeMs = sleepTimeMs << 1;
+      } catch (Exception e) {
+        CloseableUtils.closeQuietly(instancesCache);
+        LOG.error("Unable to start curator PathChildrenCache", e);
+        throw new IOException(e);
+      }
+    }
+  }
+
+  public void start() throws IOException {
+    if (zooKeeperClient != null) {
+      String principal = ZookeeperUtils.setupZookeeperAuth(
+          conf, saslLoginContextName, zkPrincipal, zkKeytab);
+      if (principal != null) {
+        userNameFromPrincipal = LlapUtil.getUserNameFromPrincipal(principal);
+      }
+      zooKeeperClient.start();
+    }
+    // Init closeable utils in case register is not called (see HIVE-13322)
+    CloseableUtils.class.getName();
+  }
+
+  public void stop() throws IOException {
+    CloseableUtils.closeQuietly(znode);
+    CloseableUtils.closeQuietly(instancesCache);
+    CloseableUtils.closeQuietly(zooKeeperClient);
+  }
+
+  protected final Set<InstanceType> getInstancesByPath(String path) {
+    return pathToInstanceCache.get(path);
+  }
+
+  protected final String getRegistrationZnodePath() {
+    return znodePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java
new file mode 100644
index 0000000..454d503
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+
+public class ZookeeperUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperUtils.class);
+
+  public static String setupZookeeperAuth(Configuration conf, String 
saslLoginContextName,
+      String zkPrincipal, String zkKeytab) throws IOException {
+    // If the login context name is not set, we are in the client and don't 
need auth.
+    if (UserGroupInformation.isSecurityEnabled() && saslLoginContextName != 
null) {
+      LOG.info("UGI security is enabled. Setting up ZK auth.");
+
+      if (zkPrincipal == null || zkPrincipal.isEmpty()) {
+        throw new IOException("Kerberos principal is empty");
+      }
+
+      if (zkKeytab == null || zkKeytab.isEmpty()) {
+        throw new IOException("Kerberos keytab is empty");
+      }
+
+      // Install the JAAS Configuration for the runtime
+      return setZookeeperClientKerberosJaasConfig(saslLoginContextName, 
zkPrincipal, zkKeytab);
+    } else {
+      LOG.info("UGI security is not enabled, or no SASL context name. " +
+          "Skipping setting up ZK auth.");
+      return null;
+    }
+  }
+
+  /**
+   * Dynamically sets up the JAAS configuration that uses kerberos
+   *
+   * @param principal
+   * @param keyTabFile
+   * @throws IOException
+   */
+  private static String setZookeeperClientKerberosJaasConfig(
+      String saslLoginContextName, String zkPrincipal, String zkKeytab) throws 
IOException {
+    // ZooKeeper property name to pick the correct JAAS conf section
+    System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, 
saslLoginContextName);
+
+    String principal = SecurityUtil.getServerPrincipal(zkPrincipal, "0.0.0.0");
+    JaasConfiguration jaasConf = new JaasConfiguration(
+        saslLoginContextName, principal, zkKeytab);
+
+    // Install the Configuration in the runtime.
+    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+    return principal;
+  }
+
+  /**
+   * A JAAS configuration for ZooKeeper clients intended to use for SASL
+   * Kerberos.
+   */
+  private static class JaasConfiguration extends 
javax.security.auth.login.Configuration {
+    // Current installed Configuration
+    private final javax.security.auth.login.Configuration baseConfig = 
javax.security.auth.login.Configuration
+        .getConfiguration();
+    private final String loginContextName;
+    private final String principal;
+    private final String keyTabFile;
+
+    public JaasConfiguration(String loginContextName, String principal, String 
keyTabFile) {
+      this.loginContextName = loginContextName;
+      this.principal = principal;
+      this.keyTabFile = keyTabFile;
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+      if (loginContextName.equals(appName)) {
+        Map<String, String> krbOptions = new HashMap<String, String>();
+        krbOptions.put("doNotPrompt", "true");
+        krbOptions.put("storeKey", "true");
+        krbOptions.put("useKeyTab", "true");
+        krbOptions.put("principal", principal);
+        krbOptions.put("keyTab", keyTabFile);
+        krbOptions.put("refreshKrb5Config", "true");
+        AppConfigurationEntry zooKeeperClientEntry = new AppConfigurationEntry(
+            KerberosUtil.getKrb5LoginModuleName(),
+            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions);
+        return new AppConfigurationEntry[] { zooKeeperClientEntry };
+      }
+      // Try the base config
+      if (baseConfig != null) {
+        return baseConfig.getAppConfigurationEntry(appName);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 201f5fa..f2d9074 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -45,8 +45,8 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
 import 
org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
@@ -104,7 +104,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   public static final String PWD_KEY = "llap.if.pwd";
 
   public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
-  public static final ServiceInstance[] serviceInstanceArray = new 
ServiceInstance[0];
+  public static final LlapServiceInstance[] serviceInstanceArray = new 
LlapServiceInstance[0];
 
   public LlapBaseInputFormat(String url, String user, String pwd, String 
query) {
     this.url = url;
@@ -126,7 +126,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, 
llapSplit.getLlapUser());
     SubmitWorkInfo submitWorkInfo = 
SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
 
-    ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
+    LlapServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
     String host = serviceInstance.getHost();
     int llapSubmitPort = serviceInstance.getRpcPort();
 
@@ -230,11 +230,11 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     return ins.toArray(new InputSplit[ins.size()]);
   }
 
-  private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit 
llapSplit) throws IOException {
+  private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit 
llapSplit) throws IOException {
     LlapRegistryService registryService = LlapRegistryService.getClient(job);
     String host = llapSplit.getLocations()[0];
 
-    ServiceInstance serviceInstance = 
getServiceInstanceForHost(registryService, host);
+    LlapServiceInstance serviceInstance = 
getServiceInstanceForHost(registryService, host);
     if (serviceInstance == null) {
       LOG.info("No service instances found for " + host + " in registry.");
       serviceInstance = getServiceInstanceRandom(registryService);
@@ -246,10 +246,10 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     return serviceInstance;
   }
 
-  private ServiceInstance getServiceInstanceForHost(LlapRegistryService 
registryService, String host) throws IOException {
+  private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService 
registryService, String host) throws IOException {
     InetAddress address = InetAddress.getByName(host);
-    ServiceInstanceSet instanceSet = registryService.getInstances();
-    ServiceInstance serviceInstance = null;
+    LlapServiceInstanceSet instanceSet = registryService.getInstances();
+    LlapServiceInstance serviceInstance = null;
 
     // The name used in the service registry may not match the host name we're 
using.
     // Try hostname/canonical hostname/host address
@@ -279,12 +279,12 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   }
 
 
-  private ServiceInstance getServiceInstanceRandom(LlapRegistryService 
registryService) throws IOException {
-    ServiceInstanceSet instanceSet = registryService.getInstances();
-    ServiceInstance serviceInstance = null;
+  private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService 
registryService) throws IOException {
+    LlapServiceInstanceSet instanceSet = registryService.getInstances();
+    LlapServiceInstance serviceInstance = null;
 
     LOG.info("Finding random live service instance");
-    Collection<ServiceInstance> allInstances = instanceSet.getAll();
+    Collection<LlapServiceInstance> allInstances = instanceSet.getAll();
     if (allInstances.size() > 0) {
       int randIdx = rand.nextInt() % allInstances.size();
       serviceInstance = allInstances.toArray(serviceInstanceArray)[randIdx];
@@ -292,13 +292,13 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     return serviceInstance;
   }
 
-  private ServiceInstance selectServiceInstance(Set<ServiceInstance> 
serviceInstances) {
+  private LlapServiceInstance selectServiceInstance(Set<LlapServiceInstance> 
serviceInstances) {
     if (serviceInstances == null || serviceInstances.isEmpty()) {
       return null;
     }
 
     // Get the first live service instance
-    for (ServiceInstance serviceInstance : serviceInstances) {
+    for (LlapServiceInstance serviceInstance : serviceInstances) {
       return serviceInstance;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index 1b57e38..4fad441 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -45,7 +45,7 @@ import 
org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers;
 import 
org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.AppStatusBuilder;
 import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.LlapInstance;
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -563,7 +563,7 @@ public class LlapStatusServiceDriver {
       }
     }
 
-    Collection<ServiceInstance> serviceInstances;
+    Collection<LlapServiceInstance> serviceInstances;
     try {
       serviceInstances = llapRegistry.getInstances(watchTimeoutMs).getAll();
     } catch (Exception e) {
@@ -583,7 +583,7 @@ public class LlapStatusServiceDriver {
       List<LlapInstance> validatedInstances = new LinkedList<>();
       List<String> llapExtraInstances = new LinkedList<>();
 
-      for (ServiceInstance serviceInstance : serviceInstances) {
+      for (LlapServiceInstance serviceInstance : serviceInstances) {
         String containerIdString = serviceInstance.getProperties().get(
           HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index ebc3437..58bf8dc 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -230,7 +230,7 @@ public class LlapWebServices extends AbstractService {
           }
           jg.writeStringField("identity", registry.getWorkerIdentity());
           jg.writeArrayFieldStart("peers");
-          for (ServiceInstance s : 
registry.getInstances().getAllInstancesOrdered(false)) {
+          for (LlapServiceInstance s : 
registry.getInstances().getAllInstancesOrdered(false)) {
             jg.writeStartObject();
             jg.writeStringField("identity", s.getWorkerIdentity());
             jg.writeStringField("host", s.getHost());

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index ff00aba..100c132 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -15,7 +15,7 @@
 package org.apache.hadoop.hive.llap.tezplugins;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.io.Writable;
 
 import 
org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
@@ -542,7 +542,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     if (timelineServerUri == null || containerNodeId == null) {
       return null;
     }
-    Set<ServiceInstance> instanceSet;
+    Set<LlapServiceInstance> instanceSet;
     try {
       instanceSet = 
serviceRegistry.getInstances().getByHost(containerNodeId.getHost());
     } catch (IOException e) {
@@ -554,8 +554,8 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
     // Once NodeId includes fragmentId - this becomes a lot more reliable.
     if (instanceSet != null) {
-      ServiceInstance matchedInstance = null;
-      for (ServiceInstance instance : instanceSet) {
+      LlapServiceInstance matchedInstance = null;
+      for (LlapServiceInstance instance : instanceSet) {
         if (instance.getRpcPort() == containerNodeId.getPort()) {
           matchedInstance = instance;
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 6bedccb..94f1369 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,8 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -62,9 +64,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
 import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
@@ -107,7 +108,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
   private final Configuration conf;
 
   // interface into the registry service
-  private ServiceInstanceSet activeInstances;
+  private LlapServiceInstanceSet activeInstances;
 
   // Tracks all instances, including ones which have been disabled in the past.
   // LinkedHashMap to provide the same iteration order when selecting a random 
host.
@@ -330,7 +331,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       registry.start();
       registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
-      for (ServiceInstance inst : activeInstances.getAll()) {
+      for (LlapServiceInstance inst : activeInstances.getAll()) {
         addNode(new NodeInfo(inst, nodeBlacklistConf, clock,
             numSchedulableTasksPerNode, metrics), inst);
       }
@@ -340,15 +341,16 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
   }
 
   @VisibleForTesting
-  public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) {
+  public void setServiceInstanceSet(LlapServiceInstanceSet serviceInstanceSet) 
{
     this.activeInstances = serviceInstanceSet;
   }
 
-  private class NodeStateChangeListener implements 
ServiceInstanceStateChangeListener {
+  private class NodeStateChangeListener
+      implements ServiceInstanceStateChangeListener<LlapServiceInstance> {
     private final Logger LOG = 
LoggerFactory.getLogger(NodeStateChangeListener.class);
 
     @Override
-    public void onCreate(ServiceInstance serviceInstance) {
+    public void onCreate(LlapServiceInstance serviceInstance) {
       LOG.info("Added node with identity: {} as a result of registry callback",
           serviceInstance.getWorkerIdentity());
       addNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
@@ -356,7 +358,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     }
 
     @Override
-    public void onUpdate(ServiceInstance serviceInstance) {
+    public void onUpdate(LlapServiceInstance serviceInstance) {
       // TODO In what situations will this be invoked?
       LOG.warn(
           "Not expecing Updates from the registry. Received update for 
instance={}. Ignoring",
@@ -364,7 +366,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     }
 
     @Override
-    public void onRemove(ServiceInstance serviceInstance) {
+    public void onRemove(LlapServiceInstance serviceInstance) {
       NodeReport nodeReport = constructNodeReport(serviceInstance, false);
       LOG.info("Sending out nodeReport for onRemove: {}", nodeReport);
       getContext().nodesUpdated(Collections.singletonList(nodeReport));
@@ -475,7 +477,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     readLock.lock();
     try {
       int numInstancesFound = 0;
-      for (ServiceInstance inst : activeInstances.getAll()) {
+      for (LlapServiceInstance inst : activeInstances.getAll()) {
         Resource r = inst.getResource();
         memory += r.getMemory();
         vcores += r.getVirtualCores();
@@ -506,7 +508,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     readLock.lock();
     try {
       int numInstancesFound = 0;
-      for (ServiceInstance inst : activeInstances.getAll()) {
+      for (LlapServiceInstance inst : activeInstances.getAll()) {
         NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
         if (nodeInfo != null && !nodeInfo.isDisabled()) {
           Resource r = inst.getResource();
@@ -766,9 +768,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
         for (String host : requestedHosts) {
           prefHostCount++;
           // Pick the first host always. Weak attempt at cache affinity.
-          Set<ServiceInstance> instances = activeInstances.getByHost(host);
+          Set<LlapServiceInstance> instances = activeInstances.getByHost(host);
           if (!instances.isEmpty()) {
-            for (ServiceInstance inst : instances) {
+            for (LlapServiceInstance inst : instances) {
               NodeInfo nodeInfo = 
instanceToNodeMap.get(inst.getWorkerIdentity());
               if (nodeInfo != null) {
                 if  (nodeInfo.canAcceptTask()) {
@@ -828,10 +830,10 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       }
 
       /* fall through - miss in locality or no locality-requested */
-      Collection<ServiceInstance> instances = 
activeInstances.getAllInstancesOrdered(true);
+      Collection<LlapServiceInstance> instances = 
activeInstances.getAllInstancesOrdered(true);
       List<NodeInfo> allNodes = new ArrayList<>(instances.size());
       List<NodeInfo> activeNodesWithFreeSlots = new ArrayList<>();
-      for (ServiceInstance inst : instances) {
+      for (LlapServiceInstance inst : instances) {
         if (inst instanceof InactiveServiceInstance) {
           allNodes.add(null);
         } else {
@@ -918,7 +920,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     return new SelectHostResult(randomNode);
   }
 
-  private void addNode(NodeInfo node, ServiceInstance serviceInstance) {
+  private void addNode(NodeInfo node, LlapServiceInstance serviceInstance) {
     // we have just added a new node. Signal timeout monitor to reset timer
     if (activeInstances.size() != 0 && timeoutFutureRef.get() != null) {
       LOG.info("New node added. Signalling scheduler timeout monitor thread to 
stop timer.");
@@ -1006,7 +1008,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
   }
 
-  private static NodeReport constructNodeReport(ServiceInstance 
serviceInstance,
+  private static NodeReport constructNodeReport(LlapServiceInstance 
serviceInstance,
                                          boolean healthy) {
     NodeReport nodeReport = NodeReport.newInstance(NodeId
             .newInstance(serviceInstance.getHost(), 
serviceInstance.getRpcPort()),
@@ -1576,7 +1578,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
   @VisibleForTesting
   static class NodeInfo implements Delayed {
     private final NodeBlacklistConf blacklistConf;
-    final ServiceInstance serviceInstance;
+    final LlapServiceInstance serviceInstance;
     private final Clock clock;
 
     long expireTimeMillis = -1;
@@ -1609,7 +1611,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
 *                                detect based on the serviceInstance, -1 
indicates indicates
      * @param metrics
      */
-    NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, 
Clock clock,
+    NodeInfo(LlapServiceInstance serviceInstance, NodeBlacklistConf 
blacklistConf, Clock clock,
         int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) {
       Preconditions.checkArgument(numSchedulableTasksConf >= -1, 
"NumSchedulableTasks must be >=-1");
       this.serviceInstance = serviceInstance;

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 339f513..21a39c7 100644
--- 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -40,8 +40,8 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
 import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
@@ -1447,7 +1447,7 @@ public class TestLlapTaskSchedulerService {
     static final Resource resource = Resource.newInstance(1024, 1);
     Configuration conf;
     TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
-    ServiceInstanceSet mockServiceInstanceSet = mock(ServiceInstanceSet.class);
+    LlapServiceInstanceSet mockServiceInstanceSet = 
mock(LlapServiceInstanceSet.class);
     ControlledClock clock = new ControlledClock(new MonotonicClock());
     ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
     LlapTaskSchedulerServiceForTest ts;
@@ -1499,16 +1499,16 @@ public class TestLlapTaskSchedulerService {
       doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
 
       if (useMockRegistry) {
-        List<ServiceInstance> liveInstances = new ArrayList<>();
+        List<LlapServiceInstance> liveInstances = new ArrayList<>();
         for (String host : liveHosts) {
           if (host == null) {
-            ServiceInstance mockInactive = mock(InactiveServiceInstance.class);
+            LlapServiceInstance mockInactive = 
mock(InactiveServiceInstance.class);
             doReturn(host).when(mockInactive).getHost();
             doReturn("inactive-host-" + 
host).when(mockInactive).getWorkerIdentity();
             
doReturn(ImmutableSet.builder().add(mockInactive).build()).when(mockServiceInstanceSet).getByHost(host);
             liveInstances.add(mockInactive);
           } else {
-            ServiceInstance mockActive = mock(ServiceInstance.class);
+            LlapServiceInstance mockActive = mock(LlapServiceInstance.class);
             doReturn(host).when(mockActive).getHost();
             doReturn("host-" + host).when(mockActive).getWorkerIdentity();
             
doReturn(ImmutableSet.builder().add(mockActive).build()).when(mockServiceInstanceSet).getByHost(host);
@@ -1517,9 +1517,9 @@ public class TestLlapTaskSchedulerService {
         }
         
doReturn(liveInstances).when(mockServiceInstanceSet).getAllInstancesOrdered(true);
 
-        List<ServiceInstance> allInstances = new ArrayList<>();
+        List<LlapServiceInstance> allInstances = new ArrayList<>();
         for (String host : hosts) {
-          ServiceInstance mockActive = mock(ServiceInstance.class);
+          LlapServiceInstance mockActive = mock(LlapServiceInstance.class);
           doReturn(host).when(mockActive).getHost();
           doReturn(Resource.newInstance(100, 
1)).when(mockActive).getResource();
           doReturn("host-" + host).when(mockActive).getWorkerIdentity();

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index 2b57d90..4c8e7bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -25,7 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -43,12 +43,12 @@ public class Utils {
       LlapRegistryService serviceRegistry = 
LlapRegistryService.getClient(conf);
       LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
 
-      Collection<ServiceInstance> serviceInstances =
+      Collection<LlapServiceInstance> serviceInstances =
           serviceRegistry.getInstances().getAllInstancesOrdered(true);
       Preconditions.checkArgument(!serviceInstances.isEmpty(),
           "No running LLAP daemons! Please check LLAP service status and 
zookeeper configuration");
       ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
-      for (ServiceInstance serviceInstance : serviceInstances) {
+      for (LlapServiceInstance serviceInstance : serviceInstances) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with 
hostname=" +
               serviceInstance.getHost() + " to list for split locations");

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index a5ed308..7a02a56 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -28,8 +28,8 @@ import java.util.concurrent.Callable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.slf4j.Logger;
@@ -100,7 +100,7 @@ public class LlapClusterStateForCompile {
         return; // Don't fail; this is best-effort.
       }
     }
-    ServiceInstanceSet instances;
+    LlapServiceInstanceSet instances;
     try {
       instances = svc.getInstances(10);
     } catch (IOException e) {
@@ -108,7 +108,7 @@ public class LlapClusterStateForCompile {
       return; // Don't wait for the cluster if not started; this is 
best-effort.
     }
     int executorsLocal = 0, noConfigNodesLocal = 0;
-    for (ServiceInstance si : instances.getAll()) {
+    for (LlapServiceInstance si : instances.getAll()) {
       if (si instanceof InactiveServiceInstance) continue; // Shouldn't happen 
in getAll.
       Map<String, String> props = si.getProperties();
       if (props == null) {

Reply via email to