HIVE-12935: LLAP: Replace Yarn registry with Zookeeper registry (Prasanth 
Jayachandran reviewed by Gopal V, Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c76eef2f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c76eef2f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c76eef2f

Branch: refs/heads/llap
Commit: c76eef2f9db4b6863a9665ed000276d6544309d0
Parents: 5eb1a62
Author: Prasanth Jayachandran <[email protected]>
Authored: Fri Feb 26 15:18:55 2016 -0600
Committer: Prasanth Jayachandran <[email protected]>
Committed: Fri Feb 26 15:19:04 2016 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +-
 .../hive/llap/registry/ServiceInstanceSet.java  |   7 -
 .../ServiceInstanceStateChangeListener.java     |  42 ++
 .../hive/llap/registry/ServiceRegistry.java     |  27 +-
 .../registry/impl/LlapFixedRegistryImpl.java    |  18 +-
 .../llap/registry/impl/LlapRegistryService.java |   8 +-
 .../registry/impl/LlapYarnRegistryImpl.java     | 438 ------------
 .../impl/LlapZookeeperRegistryImpl.java         | 672 +++++++++++++++++++
 .../hive/llap/daemon/impl/LlapDaemon.java       |   8 +
 .../hive/llap/security/LlapSecurityHelper.java  |   1 -
 .../tezplugins/LlapTaskSchedulerService.java    |  71 +-
 11 files changed, 797 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e868500..2723dad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1587,7 +1587,8 @@ public class HiveConf extends Configuration {
         "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, 
\n" +
         "2. When HiveServer2 supports service discovery via Zookeeper.\n" +
         "3. For delegation token storage if zookeeper store is used, if\n" +
-        "hive.cluster.delegation.token.store.zookeeper.connectString is not 
set"),
+        "hive.cluster.delegation.token.store.zookeeper.connectString is not 
set\n" +
+        "4. LLAP daemon registry service"),
 
     HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181",
         "The port of ZooKeeper servers to talk to.\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index be811eb..73f94f3 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -55,11 +55,4 @@ public interface ServiceInstanceSet {
    */
   public Set<ServiceInstance> getByHost(String host);
 
-  /**
-   * Refresh the instance set from registry backing store.
-   * 
-   * @throws IOException
-   */
-  public void refresh() throws IOException;
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
new file mode 100644
index 0000000..92eb8bd
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+/**
+ * Callback listener for instance state change events
+ */
+public interface ServiceInstanceStateChangeListener {
+  /**
+   * Called when new {@link ServiceInstance} is created.
+   *
+   * @param serviceInstance - created service instance
+   */
+  void onCreate(ServiceInstance serviceInstance);
+
+  /**
+   * Called when an existing {@link ServiceInstance} is updated.
+   *
+   * @param serviceInstance - updated service instance
+   */
+  void onUpdate(ServiceInstance serviceInstance);
+
+  /**
+   * Called when an existing {@link ServiceInstance} is removed.
+   *
+   * @param serviceInstance - removed service instance
+   */
+  void onRemove(ServiceInstance serviceInstance);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index d3fb517..f94a837 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -22,38 +22,47 @@ public interface ServiceRegistry {
 
   /**
    * Start the service registry
-   * 
-   * @throws InterruptedException
+   *
+   * @throws IOException
    */
-  public void start() throws InterruptedException;
+  public void start() throws IOException;
 
   /**
    * Stop the service registry
-   * 
-   * @throws InterruptedException
+   *
+   * @throws IOException
    */
-  public void stop() throws InterruptedException;
+  public void stop() throws IOException;
 
   /**
    * Register the current instance - the implementation takes care of the 
endpoints to register.
-   * 
+   *
    * @throws IOException
    */
   public void register() throws IOException;
 
   /**
    * Remove the current registration cleanly (implementation defined cleanup)
-   * 
+   *
    * @throws IOException
    */
   public void unregister() throws IOException;
 
   /**
    * Client API to get the list of instances registered via the current 
registry key.
-   * 
+   *
    * @param component
    * @return
    * @throws IOException
    */
   public ServiceInstanceSet getInstances(String component) throws IOException;
+
+  /**
+   * Adds state change listeners for service instances.
+   *
+   * @param listener - state change listener
+   * @throws IOException
+   */
+  public void registerStateChangeListener(ServiceInstanceStateChangeListener 
listener)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index c3c16c4..8cace8f 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -78,12 +79,12 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
   }
 
   @Override
-  public void start() throws InterruptedException {
+  public void start() throws IOException {
     // nothing to start
   }
 
   @Override
-  public void stop() throws InterruptedException {
+  public void stop() throws IOException {
     // nothing to stop
   }
 
@@ -124,7 +125,6 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
       this.host = host;
     }
 
-    @Override
     public String getWorkerIdentity() {
       return LlapFixedRegistryImpl.getWorkerIdentity(host);
     }
@@ -221,12 +221,6 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
       }
       return byHost;
     }
-
-    @Override
-    public void refresh() throws IOException {
-      // I will do no such thing
-    }
-
   }
 
   @Override
@@ -235,6 +229,12 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
   }
 
   @Override
+  public void registerStateChangeListener(final 
ServiceInstanceStateChangeListener listener) {
+    // nothing to set
+    LOG.warn("Callbacks for instance state changes are not supported in fixed 
registry.");
+  }
+
+  @Override
   public String toString() {
     return String.format("FixedRegistry hosts=%s", StringUtils.join(",", 
this.hosts));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 907faed..5917156 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
 import org.apache.hadoop.service.AbstractService;
 import org.slf4j.Logger;
@@ -77,7 +78,7 @@ public class LlapRegistryService extends AbstractService {
   public void serviceInit(Configuration conf) {
     String hosts = HiveConf.getTrimmedVar(conf, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
     if (hosts.startsWith("@")) {
-      registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon);
+      registry = new LlapZookeeperRegistryImpl(hosts.substring(1), conf);
     } else {
       registry = new LlapFixedRegistryImpl(hosts, conf);
     }
@@ -122,4 +123,9 @@ public class LlapRegistryService extends AbstractService {
   public ServiceInstanceSet getInstances() throws IOException {
     return this.registry.getInstances("LLAP");
   }
+
+  public void registerStateChangeListener(ServiceInstanceStateChangeListener 
listener)
+      throws IOException {
+    this.registry.registerStateChangeListener(listener);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
deleted file mode 100644
index c83dd6e..0000000
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.registry.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
-import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import 
org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
-import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
-import org.apache.hadoop.registry.client.types.AddressTypes;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.ProtocolTypes;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class LlapYarnRegistryImpl implements ServiceRegistry {
-
-  /** IPC endpoint names. */
-  private static final String IPC_SERVICES = "services",
-      IPC_MNG = "llapmng", IPC_SHUFFLE = "shuffle", IPC_LLAP = "llap";
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(LlapYarnRegistryImpl.class);
-
-  private final RegistryOperationsService client;
-  private final Configuration conf;
-  private final ServiceRecordMarshal encoder;
-  private final String path;
-
-  private final DynamicServiceInstanceSet instances = new 
DynamicServiceInstanceSet();
-
-  private static final UUID uniq = UUID.randomUUID();
-  private static final String hostname;
-
-  private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
-
-  private final static String SERVICE_CLASS = "org-apache-hive";
-
-  final ScheduledExecutorService refresher = 
Executors.newScheduledThreadPool(1,
-      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build());
-  final long refreshDelay;
-  private final boolean isDaemon;
-
-  static {
-    String localhost = "localhost";
-    try {
-      localhost = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException uhe) {
-      // ignore
-    }
-    hostname = localhost;
-  }
-
-  public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean 
isDaemon) {
-
-    LOG.info("Llap Registry is enabled with registryid: " + instanceName);
-    this.conf = new Configuration(conf);
-    conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-    // registry reference
-    client = (RegistryOperationsService) 
RegistryOperationsFactory.createInstance(conf);
-    encoder = new RegistryUtils.ServiceRecordMarshal();
-    this.path = 
RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
-        SERVICE_CLASS, instanceName, "workers"), "worker-");
-    refreshDelay = HiveConf.getTimeVar(
-        conf, ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, TimeUnit.SECONDS);
-    this.isDaemon = isDaemon;
-    Preconditions.checkArgument(refreshDelay > 0,
-        "Refresh delay for registry has to be positive = %d", refreshDelay);
-  }
-
-  public Endpoint getRpcEndpoint() {
-    final int rpcPort = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_RPC_PORT);
-    return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new 
InetSocketAddress(hostname, rpcPort));
-  }
-
-  public Endpoint getShuffleEndpoint() {
-    final int shufflePort = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
-    // HTTP today, but might not be
-    return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, 
ProtocolTypes.PROTOCOL_TCP, hostname,
-        shufflePort);
-  }
-
-  public Endpoint getServicesEndpoint() {
-    final int servicePort = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_WEB_PORT);
-    final boolean isSSL = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_DAEMON_WEB_SSL);
-    final String scheme = isSSL ? "https" : "http";
-    final URL serviceURL;
-    try {
-      serviceURL = new URL(scheme, hostname, servicePort, "");
-      return RegistryTypeUtils.webEndpoint(IPC_SERVICES, serviceURL.toURI());
-    } catch (MalformedURLException e) {
-      throw new RuntimeException(e);
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("llap service URI for " + hostname + " is 
invalid", e);
-    }
-  }
-
-  public Endpoint getMngEndpoint() {
-    return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new 
InetSocketAddress(hostname,
-        HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
-  }
-
-  private final String getPath() {
-    return this.path;
-  }
-
-  @Override
-  public void register() throws IOException {
-    String path = getPath();
-    ServiceRecord srv = new ServiceRecord();
-    srv.addInternalEndpoint(getRpcEndpoint());
-    srv.addInternalEndpoint(getMngEndpoint());
-    srv.addInternalEndpoint(getShuffleEndpoint());
-    srv.addExternalEndpoint(getServicesEndpoint());
-
-    for (Map.Entry<String, String> kv : this.conf) {
-      if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP)
-          || kv.getKey().startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
-        // TODO: read this somewhere useful, like the task scheduler
-        srv.set(kv.getKey(), kv.getValue());
-      }
-    }
-
-    // restart sensitive instance id
-    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
-
-    client.mknode(RegistryPathUtils.parentOf(path), true);
-
-    // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the 
paths
-    client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, 
encoder.toBytes(srv),
-        client.getClientAcls());
-  }
-
-  @Override
-  public void unregister() throws IOException {
-   // Nothing for the zkCreate models
-  }
-
-  private class DynamicServiceInstance implements ServiceInstance {
-
-    private final ServiceRecord srv;
-    private boolean alive = true;
-    private final String host;
-    private final int rpcPort;
-    private final int mngPort;
-    private final int shufflePort;
-
-    public DynamicServiceInstance(ServiceRecord srv) throws IOException {
-      this.srv = srv;
-
-      final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
-      final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP);
-      final Endpoint mng = srv.getInternalEndpoint(IPC_MNG);
-
-      this.host =
-          RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-              AddressTypes.ADDRESS_HOSTNAME_FIELD);
-      this.rpcPort =
-          
Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-              AddressTypes.ADDRESS_PORT_FIELD));
-      this.mngPort =
-          
Integer.valueOf(RegistryTypeUtils.getAddressField(mng.addresses.get(0),
-              AddressTypes.ADDRESS_PORT_FIELD));
-      this.shufflePort =
-          
Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
-              AddressTypes.ADDRESS_PORT_FIELD));
-    }
-
-    @Override
-    public String getWorkerIdentity() {
-      return srv.get(UNIQUE_IDENTIFIER);
-    }
-
-    @Override
-    public String getHost() {
-      return host;
-    }
-
-    @Override
-    public int getRpcPort() {
-      return rpcPort;
-    }
-
-    @Override
-    public int getShufflePort() {
-      return shufflePort;
-    }
-
-    @Override
-    public boolean isAlive() {
-      return alive ;
-    }
-
-    public void kill() {
-      // May be possible to generate a notification back to the scheduler from 
here.
-      LOG.info("Killing service instance: " + this);
-      this.alive = false;
-    }
-
-    @Override
-    public Map<String, String> getProperties() {
-      return srv.attributes();
-    }
-
-    @Override
-    public Resource getResource() {
-      int memory = 
Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
-      int vCores = 
Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
-      return Resource.newInstance(memory, vCores);
-    }
-
-    @Override
-    public String toString() {
-      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" 
+ rpcPort + " with resources=" + getResource() +"]";
-    }
-
-    @Override
-    public int getManagementPort() {
-      return mngPort;
-    }
-
-    // Relying on the identity hashCode and equality, since refreshing 
instances retains the old copy
-    // of an already known instance.
-  }
-
-  private class DynamicServiceInstanceSet implements ServiceInstanceSet {
-
-    // LinkedHashMap to retain iteration order.
-    private final Map<String, ServiceInstance> instances = new 
LinkedHashMap<>();
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
-    private final ReentrantReadWriteLock.WriteLock writeLock = 
lock.writeLock();
-
-    @Override
-    public Map<String, ServiceInstance> getAll() {
-      // Return a copy. Instances may be modified during a refresh.
-      readLock.lock();
-      try {
-        return new LinkedHashMap<>(instances);
-      } finally {
-        readLock.unlock();
-      }
-    }
-
-    @Override
-    public List<ServiceInstance> getAllInstancesOrdered() {
-      List<ServiceInstance> list = new LinkedList<>();
-      readLock.lock();
-      try {
-        list.addAll(instances.values());
-      } finally {
-        readLock.unlock();
-      }
-      Collections.sort(list, new Comparator<ServiceInstance>() {
-        @Override
-        public int compare(ServiceInstance o1, ServiceInstance o2) {
-          return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
-        }
-      });
-      return list;
-    }
-
-    @Override
-    public ServiceInstance getInstance(String name) {
-      readLock.lock();
-      try {
-        return instances.get(name);
-      } finally {
-        readLock.unlock();
-      }
-    }
-
-    @Override
-    public  void refresh() throws IOException {
-      /* call this from wherever */
-      Map<String, ServiceInstance> freshInstances = new HashMap<String, 
ServiceInstance>();
-
-      String path = getPath();
-      Map<String, ServiceRecord> records =
-          RegistryUtils.listServiceRecords(client, 
RegistryPathUtils.parentOf(path));
-      // Synchronize after reading the service records from the external 
service (ZK)
-      writeLock.lock();
-      try {
-        Set<String> latestKeys = new HashSet<String>();
-        LOG.info("Starting to refresh ServiceInstanceSet " + 
System.identityHashCode(this));
-        for (ServiceRecord rec : records.values()) {
-          ServiceInstance instance = new DynamicServiceInstance(rec);
-          if (instance != null) {
-            if (instances != null && 
instances.containsKey(instance.getWorkerIdentity()) == false) {
-              // add a new object
-              freshInstances.put(instance.getWorkerIdentity(), instance);
-              if (LOG.isInfoEnabled()) {
-                LOG.info("Adding new worker " + instance.getWorkerIdentity() + 
" which mapped to "
-                    + instance);
-              }
-            } else {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Retaining running worker " + 
instance.getWorkerIdentity() +
-                    " which mapped to " + instance);
-              }
-            }
-          }
-          latestKeys.add(instance.getWorkerIdentity());
-        }
-
-        if (instances != null) {
-          // deep-copy before modifying
-          Set<String> oldKeys = new HashSet<>(instances.keySet());
-          if (oldKeys.removeAll(latestKeys)) {
-            // This is all the records which have not checked in, and are 
effectively dead.
-            for (String k : oldKeys) {
-              // this is so that people can hold onto ServiceInstance 
references as placeholders for tasks
-              final DynamicServiceInstance dead = (DynamicServiceInstance) 
instances.get(k);
-              dead.kill();
-              if (LOG.isInfoEnabled()) {
-                LOG.info("Deleting dead worker " + k + " which mapped to " + 
dead);
-              }
-            }
-          }
-          // oldKeys contains the set of dead instances at this point.
-          this.instances.keySet().removeAll(oldKeys);
-          this.instances.putAll(freshInstances);
-        } else {
-          this.instances.putAll(freshInstances);
-        }
-      } finally {
-        writeLock.unlock();
-      }
-    }
-
-    @Override
-    public Set<ServiceInstance> getByHost(String host) {
-      // TODO Maybe store this as a map which is populated during 
construction, to avoid walking
-      // the map on each request.
-      readLock.lock();
-      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
-      try {
-        for (ServiceInstance i : instances.values()) {
-          if (host.equals(i.getHost())) {
-            // all hosts in instances should be alive in this impl
-            byHost.add(i);
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Locality comparing " + host + " to " + i.getHost());
-          }
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Returning " + byHost.size() + " hosts for locality 
allocation on " + host);
-        }
-        return byHost;
-      } finally {
-        readLock.unlock();
-      }
-    }
-  }
-
-  @Override
-  public ServiceInstanceSet getInstances(String component) throws IOException {
-    Preconditions.checkArgument("LLAP".equals(component)); // right now there 
is only 1 component 
-    if (this.client != null) {
-      instances.refresh();
-      return instances;
-    } else {
-      Preconditions.checkNotNull(this.client, "Yarn registry client is not 
intialized");
-      return null;
-    }
-  }
-
-  @Override
-  public void start() {
-    if (client == null) return;
-    client.start();
-    if (isDaemon) return;
-    refresher.scheduleWithFixedDelay(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          instances.refresh();
-        } catch (IOException ioe) {
-          LOG.warn("Could not refresh hosts during scheduled refresh", ioe);
-        }
-      }
-    }, 0, refreshDelay, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public void stop() {
-    if (client != null) {
-      client.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
new file mode 100644
index 0000000..ab9fa39
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import 
org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class LlapZookeeperRegistryImpl implements ServiceRegistry {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
+
+  /**
+   * IPC endpoint names.
+   */
+  private static final String IPC_SERVICES = "services";
+  private static final String IPC_MNG = "llapmng";
+  private static final String IPC_SHUFFLE = "shuffle";
+  private static final String IPC_LLAP = "llap";
+  private final static String ROOT_NAMESPACE = "llap";
+
+  private final Configuration conf;
+  private final CuratorFramework zooKeeperClient;
+  private final String pathPrefix;
+  private PersistentEphemeralNode znode;
+  private String znodePath; // unique identity for this instance
+  private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode 
data
+
+  // to be used by clients of ServiceRegistry
+  private DynamicServiceInstanceSet instances;
+  private PathChildrenCache instancesCache;
+
+  private static final UUID uniq = UUID.randomUUID();
+  private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
+
+  private Set<ServiceInstanceStateChangeListener> stateChangeListeners;
+
+  // get local hostname
+  private static final String hostname;
+
+  static {
+    String localhost = "localhost";
+    try {
+      localhost = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException uhe) {
+      // ignore
+    }
+    hostname = localhost;
+  }
+
+  /**
+   * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
+   */
+  private final ACLProvider zooKeeperAclProvider = new ACLProvider() {
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      List<ACL> nodeAcls = new ArrayList<ACL>();
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // Read all to the world
+        nodeAcls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE);
+        // Create/Delete/Write/Admin to the authenticated user
+        nodeAcls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
+      } else {
+        // ACLs for znodes on a non-kerberized cluster
+        // Create/Read/Delete/Write/Admin to the world
+        nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+      }
+      return nodeAcls;
+    }
+
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return getDefaultAcl();
+    }
+  };
+
+  public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
+    this.conf = new Configuration(conf);
+    this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+    String zkEnsemble = getQuorumServers(this.conf);
+    this.encoder = new RegistryUtils.ServiceRecordMarshal();
+    int sessionTimeout = (int) HiveConf.getTimeVar(conf, 
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+        TimeUnit.MILLISECONDS);
+    int baseSleepTime = (int) HiveConf
+        .getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+            TimeUnit.MILLISECONDS);
+    int maxRetries = HiveConf.getIntVar(conf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+
+    // Create a CuratorFramework instance to be used as the ZooKeeper client
+    // Use the zooKeeperAclProvider to create appropriate ACLs
+    this.zooKeeperClient = CuratorFrameworkFactory.builder()
+        .connectString(zkEnsemble)
+        .sessionTimeoutMs(sessionTimeout)
+        .aclProvider(zooKeeperAclProvider)
+        .namespace(ROOT_NAMESPACE)
+        .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+        .build();
+
+    // sample path: /llap/hiveuser/hostname/workers/worker-0000000
+    // worker-0000000 is the sequence number which will be retained until 
session timeout. If a
+    // worker does not respond due to communication interruptions it will 
retain the same sequence
+    // number when it returns back. If session timeout expires, the node will 
be deleted and new
+    // addition of the same node (restart) will get next sequence number
+    this.pathPrefix = "/" + RegistryUtils.currentUser() + "/" + instanceName + 
"/workers/worker-";
+    this.instancesCache = null;
+    this.instances = null;
+    this.stateChangeListeners = new HashSet<>();
+    LOG.info("Llap Zookeeper Registry is enabled with registryid: " + 
instanceName);
+  }
+
+  /**
+   * Get the ensemble server addresses from the configuration. The format is: 
host1:port,
+   * host2:port..
+   *
+   * @param conf
+   **/
+  private String getQuorumServers(Configuration conf) {
+    String[] hosts = 
conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
+    String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
+        ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
+    StringBuilder quorum = new StringBuilder();
+    for (int i = 0; i < hosts.length; i++) {
+      quorum.append(hosts[i].trim());
+      if (!hosts[i].contains(":")) {
+        // if the hostname doesn't contain a port, add the configured port to 
hostname
+        quorum.append(":");
+        quorum.append(port);
+      }
+
+      if (i != hosts.length - 1) {
+        quorum.append(",");
+      }
+    }
+
+    return quorum.toString();
+  }
+
+  public Endpoint getRpcEndpoint() {
+    final int rpcPort = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_RPC_PORT);
+    return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new 
InetSocketAddress(hostname, rpcPort));
+  }
+
+  public Endpoint getShuffleEndpoint() {
+    final int shufflePort = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
+    // HTTP today, but might not be
+    return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, 
ProtocolTypes.PROTOCOL_TCP, hostname,
+        shufflePort);
+  }
+
+  public Endpoint getServicesEndpoint() {
+    final int servicePort = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_WEB_PORT);
+    final boolean isSSL = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_DAEMON_WEB_SSL);
+    final String scheme = isSSL ? "https" : "http";
+    final URL serviceURL;
+    try {
+      serviceURL = new URL(scheme, hostname, servicePort, "");
+      return RegistryTypeUtils.webEndpoint(IPC_SERVICES, serviceURL.toURI());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException(e);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("llap service URI for " + hostname + " is 
invalid", e);
+    }
+  }
+
+  public Endpoint getMngEndpoint() {
+    return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new 
InetSocketAddress(hostname,
+        HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
+  }
+
+  @Override
+  public void register() throws IOException {
+    ServiceRecord srv = new ServiceRecord();
+    Endpoint rpcEndpoint = getRpcEndpoint();
+    srv.addInternalEndpoint(rpcEndpoint);
+    srv.addInternalEndpoint(getMngEndpoint());
+    srv.addInternalEndpoint(getShuffleEndpoint());
+    srv.addExternalEndpoint(getServicesEndpoint());
+
+    for (Map.Entry<String, String> kv : this.conf) {
+      if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP)
+          || kv.getKey().startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
+        // TODO: read this somewhere useful, like the task scheduler
+        srv.set(kv.getKey(), kv.getValue());
+      }
+    }
+
+    // restart sensitive instance id
+    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+
+    // Create a znode under the rootNamespace parent for this instance of the 
server
+    try {
+      // PersistentEphemeralNode will make sure the ephemeral node created on 
server will be present
+      // even under connection or session interruption (will automatically 
handle retries)
+      znode = new PersistentEphemeralNode(zooKeeperClient,
+          PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, 
encoder.toBytes(srv));
+
+      // start the creation of znode
+      znode.start();
+
+      // We'll wait for 120s for node creation
+      long znodeCreationTimeout = 120;
+      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) 
{
+        throw new Exception(
+            "Max znode creation wait time: " + znodeCreationTimeout + "s 
exhausted");
+      }
+
+      znodePath = znode.getActualPath();
+      // Set a watch on the znode
+      if (zooKeeperClient.checkExists()
+          .forPath(znodePath) == null) {
+        // No node exists, throw exception
+        throw new Exception("Unable to create znode for this LLAP instance on 
ZooKeeper.");
+      }
+      LOG.info("Created a znode on ZooKeeper for LLAP instance: {} znodePath: 
{}", rpcEndpoint,
+          znodePath);
+    } catch (Exception e) {
+      LOG.error("Unable to create a znode for this server instance", e);
+      CloseableUtils.closeQuietly(znode);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created zknode with path: {} service record: {}", znodePath, 
srv);
+    }
+  }
+
+  @Override
+  public void unregister() throws IOException {
+    // Nothing for the zkCreate models
+  }
+
+  private class DynamicServiceInstance implements ServiceInstance {
+
+    private final ServiceRecord srv;
+    private boolean alive = true;
+    private final String host;
+    private final int rpcPort;
+    private final int mngPort;
+    private final int shufflePort;
+
+    public DynamicServiceInstance(ServiceRecord srv) throws IOException {
+      this.srv = srv;
+
+      final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
+      final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP);
+      final Endpoint mng = srv.getInternalEndpoint(IPC_MNG);
+
+      this.host =
+          RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+              AddressTypes.ADDRESS_HOSTNAME_FIELD);
+      this.rpcPort =
+          
Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
+      this.mngPort =
+          
Integer.valueOf(RegistryTypeUtils.getAddressField(mng.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
+      this.shufflePort =
+          
Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
+    }
+
+    @Override
+    public String getWorkerIdentity() {
+      return srv.get(UNIQUE_IDENTIFIER);
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getRpcPort() {
+      return rpcPort;
+    }
+
+    @Override
+    public int getShufflePort() {
+      return shufflePort;
+    }
+
+    @Override
+    public boolean isAlive() {
+      return alive;
+    }
+
+    public void kill() {
+      // May be possible to generate a notification back to the scheduler from 
here.
+      LOG.info("Killing service instance: " + this);
+      this.alive = false;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+      return srv.attributes();
+    }
+
+    @Override
+    public Resource getResource() {
+      int memory = 
Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
+      int vCores = 
Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
+      return Resource.newInstance(memory, vCores);
+    }
+
+    @Override
+    public String toString() {
+      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" 
+ rpcPort +
+          " with resources=" + getResource() + "]";
+    }
+
+    @Override
+    public int getManagementPort() {
+      return mngPort;
+    }
+
+    // Relying on the identity hashCode and equality, since refreshing 
instances retains the old copy
+    // of an already known instance.
+  }
+
+  private class DynamicServiceInstanceSet implements ServiceInstanceSet {
+    private final PathChildrenCache instancesCache;
+
+    public DynamicServiceInstanceSet(final PathChildrenCache cache) {
+      this.instancesCache = cache;
+    }
+
+    @Override
+    public Map<String, ServiceInstance> getAll() {
+      Map<String, ServiceInstance> instances = new LinkedHashMap<>();
+      for (ChildData childData : instancesCache.getCurrentData()) {
+        if (childData != null) {
+          byte[] data = childData.getData();
+          if (data != null) {
+            try {
+              ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+              ServiceInstance instance = new DynamicServiceInstance(srv);
+              instances.put(childData.getPath(), instance);
+            } catch (IOException e) {
+              LOG.error("Unable to decode data for zkpath: {}." +
+                  " Ignoring from current instances list..", 
childData.getPath());
+            }
+          }
+        }
+      }
+      return instances;
+    }
+
+    @Override
+    public List<ServiceInstance> getAllInstancesOrdered() {
+      List<ServiceInstance> list = new LinkedList<>();
+      list.addAll(instances.getAll().values());
+      Collections.sort(list, new Comparator<ServiceInstance>() {
+        @Override
+        public int compare(ServiceInstance o1, ServiceInstance o2) {
+          return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
+        }
+      });
+      return list;
+    }
+
+    @Override
+    public ServiceInstance getInstance(String name) {
+      ChildData childData = instancesCache.getCurrentData(name);
+      if (childData != null) {
+        byte[] data = childData.getData();
+        if (data != null) {
+          try {
+            ServiceRecord srv = encoder.fromBytes(name, data);
+            ServiceInstance instance = new DynamicServiceInstance(srv);
+            return instance;
+          } catch (IOException e) {
+            LOG.error("Unable to decode data for zkpath: {}", name);
+            return null;
+          }
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public Set<ServiceInstance> getByHost(String host) {
+      Set<ServiceInstance> byHost = new HashSet<>();
+      for (ChildData childData : instancesCache.getCurrentData()) {
+        if (childData != null) {
+          byte[] data = childData.getData();
+          if (data != null) {
+            try {
+              ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+              ServiceInstance instance = new DynamicServiceInstance(srv);
+              if (host.equals(instance.getHost())) {
+                byHost.add(instance);
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Locality comparing " + host + " to " + 
instance.getHost());
+              }
+            } catch (IOException e) {
+              LOG.error("Unable to decode data for zkpath: {}." +
+                  " Ignoring host from current instances list..", 
childData.getPath());
+            }
+          }
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Returning " + byHost.size() + " hosts for locality 
allocation on " + host);
+      }
+      return byHost;
+    }
+  }
+
+  private class InstanceStateChangeListener implements 
PathChildrenCacheListener {
+    private final Logger LOG = 
LoggerFactory.getLogger(InstanceStateChangeListener.class);
+
+    @Override
+    public void childEvent(final CuratorFramework client, final 
PathChildrenCacheEvent event)
+        throws Exception {
+      Preconditions.checkArgument(client != null
+          && client.getState() == CuratorFrameworkState.STARTED, "client is 
not started");
+
+      synchronized (this) {
+        if (!stateChangeListeners.isEmpty()) {
+          ServiceInstance instance = null;
+          ChildData childData = event.getData();
+          if (childData != null) {
+            byte[] data = childData.getData();
+            if (data != null) {
+              try {
+                ServiceRecord srv = 
encoder.fromBytes(event.getData().getPath(), data);
+                instance = new DynamicServiceInstance(srv);
+              } catch (IOException e) {
+                LOG.error("Unable to decode data for zknode: {}." +
+                    " Dropping notification of type: {}", childData.getPath(), 
event.getType());
+              }
+            }
+          }
+
+          // notify listeners of the new data
+          for (ServiceInstanceStateChangeListener listener : 
stateChangeListeners) {
+            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
+              LOG.info("Added zknode {} to llap namespace. Notifying state 
change listener.",
+                  event.getData().getPath());
+              listener.onCreate(instance);
+            } else if (event.getType() == 
PathChildrenCacheEvent.Type.CHILD_UPDATED) {
+              LOG.info("Updated zknode {} in llap namespace. Notifying state 
change listener.",
+                  event.getData().getPath());
+              listener.onUpdate(instance);
+            } else if (event.getType() == 
PathChildrenCacheEvent.Type.CHILD_REMOVED) {
+              LOG.info("Removed zknode {} from llap namespace. Notifying state 
change listener.",
+                  event.getData().getPath());
+              listener.onRemove(instance);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public ServiceInstanceSet getInstances(String component) throws IOException {
+    checkPathChildrenCache();
+
+    // lazily create instances
+    if (instances == null) {
+      this.instances = new DynamicServiceInstanceSet(instancesCache);
+    }
+    return instances;
+  }
+
+  @Override
+  public synchronized void registerStateChangeListener(
+      final ServiceInstanceStateChangeListener listener)
+      throws IOException {
+    checkPathChildrenCache();
+
+    this.stateChangeListeners.add(listener);
+  }
+
+  private synchronized void checkPathChildrenCache() throws IOException {
+    Preconditions.checkArgument(zooKeeperClient != null &&
+            zooKeeperClient.getState() == CuratorFrameworkState.STARTED,
+        "client is not started");
+
+    // lazily create PathChildrenCache
+    if (instancesCache == null) {
+      this.instancesCache = new PathChildrenCache(zooKeeperClient,
+          RegistryPathUtils.parentOf(pathPrefix).toString(), true);
+      instancesCache.getListenable().addListener(new 
InstanceStateChangeListener(),
+          Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("StateChangeNotificationHandler")
+              .build()));
+      try {
+        
this.instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+      } catch (Exception e) {
+        LOG.error("Unable to start curator PathChildrenCache. Exception: {}", 
e);
+        throw new IOException(e);
+      }
+    }
+  }
+
+  @Override
+  public void start() throws IOException {
+    if (zooKeeperClient != null) {
+      setupZookeeperAuth(this.conf);
+      zooKeeperClient.start();
+    }
+  }
+
+  @Override
+  public void stop() throws IOException {
+    CloseableUtils.closeQuietly(znode);
+    CloseableUtils.closeQuietly(instancesCache);
+    CloseableUtils.closeQuietly(zooKeeperClient);
+  }
+
+
+  private void setupZookeeperAuth(final Configuration conf) throws IOException 
{
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("UGI security is enabled. Setting up ZK auth.");
+
+      String llapPrincipal = HiveConf.getVar(conf, 
ConfVars.LLAP_KERBEROS_PRINCIPAL);
+      if (llapPrincipal == null || llapPrincipal.isEmpty()) {
+        throw new IOException("Llap Kerberos principal is empty");
+      }
+
+      String llapKeytab = HiveConf.getVar(conf, 
ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+      if (llapKeytab == null || llapKeytab.isEmpty()) {
+        throw new IOException("Llap Kerberos keytab is empty");
+      }
+
+      // Install the JAAS Configuration for the runtime
+      setZookeeperClientKerberosJaasConfig(llapPrincipal, llapKeytab);
+    } else {
+      LOG.info("UGI security is not enabled. Skipping setting up ZK auth.");
+    }
+  }
+
+  /**
+   * Dynamically sets up the JAAS configuration that uses kerberos
+   *
+   * @param principal
+   * @param keyTabFile
+   * @throws IOException
+   */
+  private void setZookeeperClientKerberosJaasConfig(String principal, String 
keyTabFile)
+      throws IOException {
+    // ZooKeeper property name to pick the correct JAAS conf section
+    final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
+    System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, 
SASL_LOGIN_CONTEXT_NAME);
+
+    principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+    JaasConfiguration jaasConf = new 
JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal,
+        keyTabFile);
+
+    // Install the Configuration in the runtime.
+    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+  }
+
+  /**
+   * A JAAS configuration for ZooKeeper clients intended to use for SASL
+   * Kerberos.
+   */
+  private static class JaasConfiguration extends 
javax.security.auth.login.Configuration {
+    // Current installed Configuration
+    private final javax.security.auth.login.Configuration baseConfig = 
javax.security.auth.login.Configuration
+        .getConfiguration();
+    private final String loginContextName;
+    private final String principal;
+    private final String keyTabFile;
+
+    public JaasConfiguration(String llapLoginContextName, String principal, 
String keyTabFile) {
+      this.loginContextName = llapLoginContextName;
+      this.principal = principal;
+      this.keyTabFile = keyTabFile;
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+      if (loginContextName.equals(appName)) {
+        Map<String, String> krbOptions = new HashMap<String, String>();
+        krbOptions.put("doNotPrompt", "true");
+        krbOptions.put("storeKey", "true");
+        krbOptions.put("useKeyTab", "true");
+        krbOptions.put("principal", principal);
+        krbOptions.put("keyTab", keyTabFile);
+        krbOptions.put("refreshKrb5Config", "true");
+        AppConfigurationEntry llapZooKeeperClientEntry = new 
AppConfigurationEntry(
+            KerberosUtil.getKrb5LoginModuleName(),
+            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions);
+        return new AppConfigurationEntry[]{llapZooKeeperClientEntry};
+      }
+      // Try the base config
+      if (baseConfig != null) {
+        return baseConfig.getAppConfigurationEntry(appName);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 8621826..8394004 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -114,6 +114,14 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
         "Work dirs must be specified");
     Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && 
shufflePort < 65536),
         "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic 
selection");
+    String hosts = HiveConf.getTrimmedVar(daemonConf, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+    if (hosts.startsWith("@")) {
+      String zkHosts = HiveConf.getTrimmedVar(daemonConf, 
ConfVars.HIVE_ZOOKEEPER_QUORUM);
+      LOG.info("Zookeeper Quorum: {}", zkHosts);
+      Preconditions.checkArgument(zkHosts != null && !zkHosts.trim().isEmpty(),
+          "LLAP service hosts startswith '@' but hive.zookeeper.quorum is not 
set." +
+              " hive.zookeeper.quorum must be set.");
+    }
 
     this.maxJvmMemory = getTotalHeapSize();
     this.llapIoEnabled = ioEnabled;

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
index 6d8b2da..76ba225 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -151,7 +151,6 @@ public class LlapSecurityHelper implements 
LlapTokenProvider {
     }
     Map<String, ServiceInstance> daemons = activeInstances.getAll();
     if (doForceRefresh || daemons == null || daemons.isEmpty()) {
-      activeInstances.refresh();
       daemons = activeInstances.getAll();
       if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No 
LLAPs found");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/c76eef2f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 3bca0da..0cb770b 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -91,7 +92,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   // Tracks all instances, including ones which have been disabled in the past.
   // LinkedHashMap to provide the same iteration order when selecting a random 
host.
   @VisibleForTesting
-  final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new 
LinkedHashMap<>();
+  final Map<String, NodeInfo> instanceToNodeMap = new LinkedHashMap<>();
   // TODO Ideally, remove elements from this once it's known that no tasks are 
linked to the instance (all deallocated)
 
   // Tracks tasks which could not be allocated immediately.
@@ -258,6 +259,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
         }
       });
       registry.start();
+      registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
       for (ServiceInstance inst : activeInstances.getAll().values()) {
         addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
@@ -267,6 +269,31 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
   }
 
+  private class NodeStateChangeListener implements 
ServiceInstanceStateChangeListener {
+    private final Logger LOG = 
LoggerFactory.getLogger(NodeStateChangeListener.class);
+
+    @Override
+    public void onCreate(final ServiceInstance serviceInstance) {
+      addNode(serviceInstance, new NodeInfo(serviceInstance, 
nodeBlacklistConf, clock,
+          numSchedulableTasksPerNode));
+      LOG.info("Added node with identity: {}", 
serviceInstance.getWorkerIdentity());
+    }
+
+    @Override
+    public void onUpdate(final ServiceInstance serviceInstance) {
+      instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new 
NodeInfo(serviceInstance,
+          nodeBlacklistConf, clock, numSchedulableTasksPerNode));
+      LOG.info("Updated node with identity: {}", 
serviceInstance.getWorkerIdentity());
+    }
+
+    @Override
+    public void onRemove(final ServiceInstance serviceInstance) {
+      // FIXME: disabling this for now
+      // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
+      LOG.info("Removed node with identity: {}", 
serviceInstance.getWorkerIdentity());
+    }
+  }
+
   @Override
   public void shutdown() {
     writeLock.lock();
@@ -328,9 +355,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     int vcores = 0;
     readLock.lock();
     try {
-      for (Entry<ServiceInstance, NodeInfo> entry : 
instanceToNodeMap.entrySet()) {
-        if (entry.getKey().isAlive() && !entry.getValue().isDisabled()) {
-          Resource r = entry.getKey().getResource();
+      for (Entry<String, NodeInfo> entry : instanceToNodeMap.entrySet()) {
+        if (entry.getValue().getServiceInstance().isAlive() && 
!entry.getValue().isDisabled()) {
+          Resource r = entry.getValue().getServiceInstance().getResource();
           memory += r.getMemory();
           vcores += r.getVirtualCores();
         }
@@ -440,7 +467,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       ServiceInstance assignedInstance = taskInfo.assignedInstance;
       assert assignedInstance != null;
 
-      NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance);
+      NodeInfo nodeInfo = 
instanceToNodeMap.get(assignedInstance.getWorkerIdentity());
       assert nodeInfo != null;
 
       // Re-enable the node if preempted
@@ -538,7 +565,6 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
         if (LOG.isDebugEnabled()) {
           LOG.debug("Refreshing instances since total memory is 0");
         }
-        refreshInstances();
       }
 
       // If there's no memory available, fail
@@ -556,7 +582,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
           if (!instances.isEmpty()) {
             requestedHostExists = true;
             for (ServiceInstance inst : instances) {
-              NodeInfo nodeInfo = instanceToNodeMap.get(inst);
+              NodeInfo nodeInfo = 
instanceToNodeMap.get(inst.getWorkerIdentity());
               if (nodeInfo != null && nodeInfo.canAcceptTask()) {
                 LOG.info("Assigning " + inst + " when looking for " + host + 
"." +
                     " FirstRequestedHost=" + (prefHostCount == 0) +
@@ -584,20 +610,19 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
         }
       }
       /* fall through - miss in locality (random scheduling) */
-      Entry<ServiceInstance, NodeInfo>[] all =
-          instanceToNodeMap.entrySet().toArray(new 
Entry[instanceToNodeMap.size()]);
+      Entry<String, NodeInfo>[] all = instanceToNodeMap.entrySet().toArray(new 
Entry[0]);
       // Check again
       if (all.length > 0) {
         int n = random.nextInt(all.length);
         // start at random offset and iterate whole list
         for (int i = 0; i < all.length; i++) {
-          Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length];
+          Entry<String, NodeInfo> inst = all[(i + n) % all.length];
           if (inst.getValue().canAcceptTask()) {
             LOG.info("Assigning " + inst + " when looking for any host, from 
#hosts=" + all.length +
                 ", requestedHosts=" +
                 ((requestedHosts == null || requestedHosts.length == 0) ? 
"null" :
                     Arrays.toString(requestedHosts)));
-            return new SelectHostResult(inst.getKey(), inst.getValue());
+            return new SelectHostResult(inst.getValue().getServiceInstance(), 
inst.getValue());
           }
         }
       }
@@ -607,24 +632,12 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
   }
 
-  // TODO Each refresh operation should addNodes if they don't already exist.
-  // Even better would be to get notifications from the service impl when a 
node gets added or removed.
-  // Instead of having to walk through the entire list. The computation of a 
node getting added or
-  // removed already exists in the DynamicRegistry implementation.
-  private void refreshInstances() {
-    try {
-      activeInstances.refresh(); // handles its own sync
-    } catch (IOException ioe) {
-      LOG.warn("Could not refresh list of active instances", ioe);
-    }
-  }
-
   private void scanForNodeChanges() {
     /* check again whether nodes are disabled or just missing */
     writeLock.lock();
     try {
       for (ServiceInstance inst : activeInstances.getAll().values()) {
-        if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) {
+        if (inst.isAlive() && 
instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) {
           /* that's a good node, not added to the allocations yet */
           LOG.info("Found a new node: " + inst + ".");
           addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
@@ -637,7 +650,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
 
   private void addNode(ServiceInstance inst, NodeInfo node) {
     LOG.info("Adding node: " + inst);
-    instanceToNodeMap.put(inst, node);
+    instanceToNodeMap.put(inst.getWorkerIdentity(), node);
     // Trigger scheduling since a new node became available.
     trySchedulingPendingTasks();
   }
@@ -645,11 +658,6 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
   private void reenableDisabledNode(NodeInfo nodeInfo) {
     writeLock.lock();
     try {
-      if (nodeInfo.hadCommFailure()) {
-        // If the node being re-enabled was not marked busy previously, then 
it was disabled due to
-        // some other failure. Refresh the service list to see if it's been 
removed permanently.
-        refreshInstances();
-      }
       LOG.info("Attempting to re-enable node: " + 
nodeInfo.getServiceInstance());
       if (nodeInfo.getServiceInstance().isAlive()) {
         nodeInfo.enableNode();
@@ -666,7 +674,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
   private void disableInstance(ServiceInstance instance, boolean 
isCommFailure) {
     writeLock.lock();
     try {
-      NodeInfo nodeInfo = instanceToNodeMap.get(instance);
+      NodeInfo nodeInfo = instanceToNodeMap.get(instance.getWorkerIdentity());
       if (nodeInfo == null || nodeInfo.isDisabled()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Node: " + instance + " already disabled, or invalid. Not 
doing anything.");
@@ -1012,7 +1020,6 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Refreshing instances based on poll interval");
               }
-              refreshInstances();
               scanForNodeChanges();
             }
           }

Reply via email to