HIVE-17409 : refactor LLAP ZK registry to make the ZK-registry part reusable 
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4795b996
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4795b996
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4795b996

Branch: refs/heads/master
Commit: 4795b99690c9428636a5a28c5d4be40ce5fc430d
Parents: 2e226d2
Author: sergey <ser...@apache.org>
Authored: Fri Sep 1 11:44:11 2017 -0700
Committer: sergey <ser...@apache.org>
Committed: Fri Sep 1 11:48:01 2017 -0700

----------------------------------------------------------------------
 .../hive/llap/registry/LlapServiceInstance.java |  59 ++
 .../llap/registry/LlapServiceInstanceSet.java   |  34 +
 .../hive/llap/registry/ServiceInstance.java     |  86 --
 .../hive/llap/registry/ServiceInstanceSet.java  |  69 --
 .../ServiceInstanceStateChangeListener.java     |  42 -
 .../hive/llap/registry/ServiceRegistry.java     |   8 +-
 .../registry/impl/InactiveServiceInstance.java  |   4 +-
 .../registry/impl/LlapFixedRegistryImpl.java    |  41 +-
 .../llap/registry/impl/LlapRegistryService.java |  18 +-
 .../impl/LlapZookeeperRegistryImpl.java         | 800 +++----------------
 .../hive/llap/security/LlapTokenClient.java     |  18 +-
 .../hadoop/hive/registry/ServiceInstance.java   |  47 ++
 .../hive/registry/ServiceInstanceSet.java       |  61 ++
 .../ServiceInstanceStateChangeListener.java     |  42 +
 .../hive/registry/impl/ServiceInstanceBase.java |  93 +++
 .../hive/registry/impl/ZkRegistryBase.java      | 549 +++++++++++++
 .../hive/registry/impl/ZookeeperUtils.java      | 116 +++
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |  30 +-
 .../hive/llap/cli/LlapStatusServiceDriver.java  |   6 +-
 .../daemon/services/impl/LlapWebServices.java   |   4 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |   8 +-
 .../tezplugins/LlapTaskSchedulerService.java    |  42 +-
 .../TestLlapTaskSchedulerService.java           |  16 +-
 .../apache/hadoop/hive/ql/exec/tez/Utils.java   |   6 +-
 .../physical/LlapClusterStateForCompile.java    |   8 +-
 25 files changed, 1242 insertions(+), 965 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java
new file mode 100644
index 0000000..30b1810
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+
+import org.apache.hadoop.hive.registry.ServiceInstance;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public interface LlapServiceInstance extends ServiceInstance {
+
+  /**
+   * Management endpoint for service instance
+   *
+   * @return
+   */
+  public int getManagementPort();
+
+  /**
+   * Shuffle Endpoint for service instance
+   * 
+   * @return
+   */
+  public int getShufflePort();
+
+
+  /**
+   * Address for services hosted on http
+   * @return
+   */
+  public String getServicesAddress();
+  /**
+   * OutputFormat endpoint for service instance
+   *
+   * @return
+   */
+  public int getOutputFormatPort();
+
+
+  /**
+   * Memory and Executors available for the LLAP tasks
+   * 
+   * This does not include the size of the cache or the actual vCores 
allocated via Slider.
+   * 
+   * @return
+   */
+  public Resource getResource();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java
new file mode 100644
index 0000000..a728f4a
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+import java.util.Collection;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public interface LlapServiceInstanceSet extends 
ServiceInstanceSet<LlapServiceInstance> {
+
+  /**
+   * Gets a list containing all the instances. This list has the same 
iteration order across
+   * different processes, assuming the list of registry entries is the same.
+   * @param consistentIndexes if true, also try to maintain the same exact 
index for each node
+   *                          across calls, by inserting inactive instances to 
replace the
+   *                          removed ones.
+   */
+  Collection<LlapServiceInstance> getAllInstancesOrdered(
+      boolean consistentIndexes);
+
+  /** LLAP application ID */
+  ApplicationId getApplicationId();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
deleted file mode 100644
index 70515c4..0000000
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.registry;
-
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public interface ServiceInstance {
-
-  /**
-   * Worker identity is a UUID (unique across restarts), to identify a node 
which died &amp; was brought
-   * back on the same host/port
-   */
-  public String getWorkerIdentity();
-
-  /**
-   * Hostname of the service instance
-   * 
-   * @return
-   */
-  public String getHost();
-
-  /**
-   * RPC Endpoint for service instance
-   * 
-   * @return
-   */
-  public int getRpcPort();
-
-  /**
-   * Management endpoint for service instance
-   *
-   * @return
-   */
-  public int getManagementPort();
-
-  /**
-   * Shuffle Endpoint for service instance
-   * 
-   * @return
-   */
-  public int getShufflePort();
-
-
-  /**
-   * Address for services hosted on http
-   * @return
-   */
-  public String getServicesAddress();
-  /**
-   * OutputFormat endpoint for service instance
-   *
-   * @return
-   */
-  public int getOutputFormatPort();
-
-
-  /**
-   * Config properties of the Service Instance (llap.daemon.*)
-   * 
-   * @return
-   */
-
-  public Map<String, String> getProperties();
-
-  /**
-   * Memory and Executors available for the LLAP tasks
-   * 
-   * This does not include the size of the cache or the actual vCores 
allocated via Slider.
-   * 
-   * @return
-   */
-  public Resource getResource();
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
deleted file mode 100644
index cc124e7..0000000
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.registry;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * Note: For most of the implementations, there's no guarantee that the 
ServiceInstance returned by
- * one invocation is the same as the instance returned by another invocation. 
e.g. the ZK registry
- * returns a new ServiceInstance object each time a getInstance call is made.
- */
-public interface ServiceInstanceSet {
-
-  /**
-   * Get an instance mapping which map worker identity to each instance.
-   * 
-   * The worker identity does not collide between restarts, so each restart 
will have a unique id,
-   * while having the same host/ip pair.
-   * 
-   * @return
-   */
-  Collection<ServiceInstance> getAll();
-
-  /**
-   * Gets a list containing all the instances. This list has the same 
iteration order across
-   * different processes, assuming the list of registry entries is the same.
-   * @param consistentIndexes if true, also try to maintain the same exact 
index for each node
-   *                          across calls, by inserting inactive instances to 
replace the
-   *                          removed ones.
-   */
-  Collection<ServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes);
-
-  /**
-   * Get an instance by worker identity.
-   * 
-   * @param name
-   * @return
-   */
-  ServiceInstance getInstance(String name);
-
-  /**
-   * Get a list of service instances for a given host.
-   * 
-   * The list could include dead and alive instances.
-   * 
-   * @param host
-   * @return
-   */
-  Set<ServiceInstance> getByHost(String host);
-
-  /**
-   * Get number of instances in the currently availabe.
-   *
-   * @return - number of instances
-   */
-  int size();
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
deleted file mode 100644
index 92eb8bd..0000000
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.registry;
-
-/**
- * Callback listener for instance state change events
- */
-public interface ServiceInstanceStateChangeListener {
-  /**
-   * Called when new {@link ServiceInstance} is created.
-   *
-   * @param serviceInstance - created service instance
-   */
-  void onCreate(ServiceInstance serviceInstance);
-
-  /**
-   * Called when an existing {@link ServiceInstance} is updated.
-   *
-   * @param serviceInstance - updated service instance
-   */
-  void onUpdate(ServiceInstance serviceInstance);
-
-  /**
-   * Called when an existing {@link ServiceInstance} is removed.
-   *
-   * @param serviceInstance - removed service instance
-   */
-  void onRemove(ServiceInstance serviceInstance);
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 5739d72..5d7f813 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -14,7 +14,7 @@
 package org.apache.hadoop.hive.llap.registry;
 
 import java.io.IOException;
-
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 /**
@@ -49,14 +49,14 @@ public interface ServiceRegistry {
    * @param clusterReadyTimeoutMs The time to wait for the cluster to be 
ready, if it's not
    *                              started yet. 0 means do not wait.
    */
-  ServiceInstanceSet getInstances(String component, long 
clusterReadyTimeoutMs) throws IOException;
+  LlapServiceInstanceSet getInstances(String component, long 
clusterReadyTimeoutMs) throws IOException;
 
   /**
    * Adds state change listeners for service instances.
    * @param listener - state change listener
    */
-  void registerStateChangeListener(ServiceInstanceStateChangeListener listener)
-      throws IOException;
+  void registerStateChangeListener(
+      ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws 
IOException;
 
   /**
    * @return The application ID of the LLAP cluster.

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
index 9f2f3b4..1d6b716 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
@@ -16,10 +16,10 @@ package org.apache.hadoop.hive.llap.registry.impl;
 
 import java.util.Map;
 
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.yarn.api.records.Resource;
 
-public class InactiveServiceInstance implements ServiceInstance {
+public class InactiveServiceInstance implements LlapServiceInstance {
   private final String name;
   public InactiveServiceInstance(String name) {
     this.name = name;

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index ebc32a1..c88198f 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -29,15 +29,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -117,7 +116,7 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
     return "host-" + host;
   }
 
-  private final class FixedServiceInstance implements ServiceInstance {
+  private final class FixedServiceInstance implements LlapServiceInstance {
 
     private final String host;
     private final String serviceAddress;
@@ -206,10 +205,10 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
     }
   }
 
-  private final class FixedServiceInstanceSet implements ServiceInstanceSet {
+  private final class FixedServiceInstanceSet implements 
LlapServiceInstanceSet {
 
     // LinkedHashMap have a repeatable iteration order.
-    private final Map<String, ServiceInstance> instances = new 
LinkedHashMap<>();
+    private final Map<String, LlapServiceInstance> instances = new 
LinkedHashMap<>();
 
     public FixedServiceInstanceSet() {
       for (String host : hosts) {
@@ -219,17 +218,17 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
     }
 
     @Override
-    public Collection<ServiceInstance> getAll() {
+    public Collection<LlapServiceInstance> getAll() {
       return instances.values();
     }
 
     @Override
-    public List<ServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes) {
-      List<ServiceInstance> list = new LinkedList<>();
+    public List<LlapServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes) {
+      List<LlapServiceInstance> list = new LinkedList<>();
       list.addAll(instances.values());
-      Collections.sort(list, new Comparator<ServiceInstance>() {
+      Collections.sort(list, new Comparator<LlapServiceInstance>() {
         @Override
-        public int compare(ServiceInstance o1, ServiceInstance o2) {
+        public int compare(LlapServiceInstance o1, LlapServiceInstance o2) {
           return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
         }
       });
@@ -237,14 +236,14 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
     }
 
     @Override
-    public ServiceInstance getInstance(String name) {
+    public LlapServiceInstance getInstance(String name) {
       return instances.get(name);
     }
 
     @Override
-    public Set<ServiceInstance> getByHost(String host) {
-      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
-      ServiceInstance inst = getInstance(getWorkerIdentity(host));
+    public Set<LlapServiceInstance> getByHost(String host) {
+      Set<LlapServiceInstance> byHost = new HashSet<LlapServiceInstance>();
+      LlapServiceInstance inst = getInstance(getWorkerIdentity(host));
       if (inst != null) {
         byHost.add(inst);
       }
@@ -255,15 +254,21 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
     public int size() {
       return instances.size();
     }
+
+    @Override
+    public ApplicationId getApplicationId() {
+      return null;
+    }
   }
 
   @Override
-  public ServiceInstanceSet getInstances(String component, long timeoutMs) 
throws IOException {
+  public LlapServiceInstanceSet getInstances(String component, long timeoutMs) 
throws IOException {
     return new FixedServiceInstanceSet();
   }
 
   @Override
-  public void registerStateChangeListener(final 
ServiceInstanceStateChangeListener listener) {
+  public void registerStateChangeListener(
+      final ServiceInstanceStateChangeListener<LlapServiceInstance> listener) {
     // nothing to set
     LOG.warn("Callbacks for instance state changes are not supported in fixed 
registry.");
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 76fc9c7..80a6aba 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -13,24 +13,24 @@
  */
 package org.apache.hadoop.hive.llap.registry.impl;
 
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 public class LlapRegistryService extends AbstractService {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapRegistryService.class);
@@ -131,16 +131,16 @@ public class LlapRegistryService extends AbstractService {
     }
   }
 
-  public ServiceInstanceSet getInstances() throws IOException {
+  public LlapServiceInstanceSet getInstances() throws IOException {
     return getInstances(0);
   }
 
-  public ServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws 
IOException {
+  public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) 
throws IOException {
     return this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
   }
 
-  public void registerStateChangeListener(ServiceInstanceStateChangeListener 
listener)
-      throws IOException {
+  public void registerStateChangeListener(
+      ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws 
IOException {
     this.registry.registerStateChangeListener(listener);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index ad17144..65f8f94 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -13,83 +13,47 @@
  */
 package org.apache.hadoop.hive.llap.registry.impl;
 
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+
+import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.security.auth.login.AppConfigurationEntry;
-
-import com.google.common.collect.Sets;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import 
org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
 import org.apache.hadoop.registry.client.types.AddressTypes;
 import org.apache.hadoop.registry.client.types.Endpoint;
 import org.apache.hadoop.registry.client.types.ProtocolTypes;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.KeeperException.InvalidACLException;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class LlapZookeeperRegistryImpl implements ServiceRegistry {
-
+public class LlapZookeeperRegistryImpl
+    extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry {
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
 
   /**
@@ -100,155 +64,29 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
   private static final String IPC_SHUFFLE = "shuffle";
   private static final String IPC_LLAP = "llap";
   private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
-  private final static String SASL_NAMESPACE = "llap-sasl";
-  private final static String UNSECURE_NAMESPACE = "llap-unsecure";
+  private final static String NAMESPACE_PREFIX = "llap-";
   private final static String USER_SCOPE_PATH_PREFIX = "user-";
-  private static final String DISABLE_MESSAGE =
-      "Set " + ConfVars.LLAP_VALIDATE_ACLS.varname + " to false to disable ACL 
validation";
   private static final String WORKER_PREFIX = "worker-";
   private static final String SLOT_PREFIX = "slot-";
+  private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
 
-  private final Configuration conf;
-  private final CuratorFramework zooKeeperClient;
-  // userPathPrefix is the path specific to the user for which ACLs should be 
restrictive.
-  // workersPath is the directory path where all the worker znodes are located.
-  private final String userPathPrefix, workersPath;
-  private String userNameFromPrincipal; // Only set when setting up the secure 
config for ZK.
-
-  private PersistentEphemeralNode znode;
 
   private SlotZnode slotZnode;
-  private String znodePath; // unique identity for this instance
-  private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode 
data
 
-  // to be used by clients of ServiceRegistry
+  // to be used by clients of ServiceRegistry TODO: this is unnecessary
   private DynamicServiceInstanceSet instances;
-  private PathChildrenCache instancesCache;
-
-  private static final UUID uniq = UUID.randomUUID();
-  private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
-
-  private Set<ServiceInstanceStateChangeListener> stateChangeListeners;
-  private final Map<String, Set<ServiceInstance>> pathToInstanceCache;
-  private final Map<String, Set<ServiceInstance>> nodeToInstanceCache;
-  private final Lock instanceCacheLock = new ReentrantLock();
-
-  // get local hostname
-  private static final String hostname;
-
-  static {
-    String localhost = "localhost";
-    try {
-      localhost = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException uhe) {
-      // ignore
-    }
-    hostname = localhost;
-  }
 
   public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
-    this.conf = new Configuration(conf);
-    this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-    String zkEnsemble = getQuorumServers(this.conf);
-    this.encoder = new RegistryUtils.ServiceRecordMarshal();
-    int sessionTimeout = (int) HiveConf.getTimeVar(conf, 
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
-        TimeUnit.MILLISECONDS);
-    int baseSleepTime = (int) HiveConf
-        .getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
-            TimeUnit.MILLISECONDS);
-    int maxRetries = HiveConf.getIntVar(conf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
-
-    // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000
-    // worker-0000000 is the sequence number which will be retained until 
session timeout. If a
-    // worker does not respond due to communication interruptions it will 
retain the same sequence
-    // number when it returns back. If session timeout expires, the node will 
be deleted and new
-    // addition of the same node (restart) will get next sequence number
-    this.userPathPrefix = USER_SCOPE_PATH_PREFIX + getZkPathUser(this.conf);
-    this.workersPath =  "/" + userPathPrefix + "/" + instanceName + "/workers";
-    this.instancesCache = null;
-    this.instances = null;
-    this.stateChangeListeners = new HashSet<>();
-    this.pathToInstanceCache = new ConcurrentHashMap<>();
-    this.nodeToInstanceCache = new ConcurrentHashMap<>();
-
-    final boolean isSecure = UserGroupInformation.isSecurityEnabled();
-    ACLProvider zooKeeperAclProvider = new ACLProvider() {
-      @Override
-      public List<ACL> getDefaultAcl() {
-        // We always return something from getAclForPath so this should not 
happen.
-        LOG.warn("getDefaultAcl was called");
-        return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
-      }
-
-      @Override
-      public List<ACL> getAclForPath(String path) {
-        if (!isSecure || path == null || !path.contains(userPathPrefix)) {
-          // No security or the path is below the user path - full access.
-          return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
-        }
-        return createSecureAcls();
-      }
-    };
-    String rootNs = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE);
-    if (rootNs == null) {
-      rootNs = isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE; // The normal 
path.
-    }
-
-    // Create a CuratorFramework instance to be used as the ZooKeeper client
-    // Use the zooKeeperAclProvider to create appropriate ACLs
-    this.zooKeeperClient = CuratorFrameworkFactory.builder()
-        .connectString(zkEnsemble)
-        .sessionTimeoutMs(sessionTimeout)
-        .aclProvider(zooKeeperAclProvider)
-        .namespace(rootNs)
-        .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
-        .build();
-
+    super(instanceName, conf,
+        HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), 
NAMESPACE_PREFIX,
+        USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+        LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null,
+        HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+        HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE),
+        ConfVars.LLAP_VALIDATE_ACLS);
     LOG.info("Llap Zookeeper Registry is enabled with registryid: " + 
instanceName);
   }
 
-  private static List<ACL> createSecureAcls() {
-    // Read all to the world
-    List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
-    // Create/Delete/Write/Admin to creator
-    nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
-    return nodeAcls;
-  }
-
-  /**
-   * Get the ensemble server addresses from the configuration. The format is: 
host1:port,
-   * host2:port..
-   *
-   * @param conf
-   **/
-  private String getQuorumServers(Configuration conf) {
-    String[] hosts = 
conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
-    String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
-        ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
-    StringBuilder quorum = new StringBuilder();
-    for (int i = 0; i < hosts.length; i++) {
-      quorum.append(hosts[i].trim());
-      if (!hosts[i].contains(":")) {
-        // if the hostname doesn't contain a port, add the configured port to 
hostname
-        quorum.append(":");
-        quorum.append(port);
-      }
-
-      if (i != hosts.length - 1) {
-        quorum.append(",");
-      }
-    }
-
-    return quorum.toString();
-  }
-
-  private String getZkPathUser(Configuration conf) {
-    // External LLAP clients would need to set LLAP_ZK_REGISTRY_USER to the 
LLAP daemon user (hive),
-    // rather than relying on RegistryUtils.currentUser().
-    String user = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, 
RegistryUtils.currentUser());
-    return user;
-  }
-
   public Endpoint getRpcEndpoint() {
     final int rpcPort = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_RPC_PORT);
     return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new 
InetSocketAddress(hostname, rpcPort));
@@ -304,117 +142,37 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
       }
     }
 
-    // restart sensitive instance id
-    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+    String uniqueId = registerServiceRecord(srv);
+    long znodeCreationTimeout = 120;
 
     // Create a znode under the rootNamespace parent for this instance of the 
server
     try {
-      // PersistentEphemeralNode will make sure the ephemeral node created on 
server will be present
-      // even under connection or session interruption (will automatically 
handle retries)
-      znode = new PersistentEphemeralNode(zooKeeperClient, 
Mode.EPHEMERAL_SEQUENTIAL,
-          workersPath + "/" + WORKER_PREFIX, encoder.toBytes(srv));
-
-      // start the creation of znodes
-      znode.start();
-
-      // We'll wait for 120s for node creation
-      long znodeCreationTimeout = 120;
-      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) 
{
-        throw new Exception(
-            "Max znode creation wait time: " + znodeCreationTimeout + "s 
exhausted");
-      }
-
-      znodePath = znode.getActualPath();
-
       slotZnode = new SlotZnode(
-          zooKeeperClient, workersPath, SLOT_PREFIX, WORKER_PREFIX, 
uniq.toString());
+          zooKeeperClient, workersPath, SLOT_PREFIX, WORKER_PREFIX, uniqueId);
       if (!slotZnode.start(znodeCreationTimeout, TimeUnit.SECONDS)) {
         throw new Exception(
             "Max znode creation wait time: " + znodeCreationTimeout + "s 
exhausted");
       }
-
-      if (HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)) {
-        try {
-          checkAndSetAcls();
-        } catch (Exception ex) {
-          throw new IOException("Error validating or setting ACLs. " + 
DISABLE_MESSAGE, ex);
-        }
-      }
-      if (zooKeeperClient.checkExists().forPath(znodePath) == null) {
-        // No node exists, throw exception
-        throw new Exception("Unable to create znode for this LLAP instance on 
ZooKeeper.");
-      }
-      LOG.info(
-          "Registered node. Created a znode on ZooKeeper for LLAP instance: 
rpc: {}, shuffle: {}," +
-              " webui: {}, mgmt: {}, znodePath: {} ",
-          rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), 
getMngEndpoint(), znodePath);
     } catch (Exception e) {
       LOG.error("Unable to create a znode for this server instance", e);
-      CloseableUtils.closeQuietly(znode);
       CloseableUtils.closeQuietly(slotZnode);
+      super.stop();
       throw (e instanceof IOException) ? (IOException)e : new IOException(e);
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created zknode with path: {} service record: {}", znodePath, 
srv);
-    }
-
-    return uniq.toString();
+    LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: 
rpc: {}, " +
+            "shuffle: {}, webui: {}, mgmt: {}, znodePath: {}", rpcEndpoint, 
getShuffleEndpoint(),
+            getServicesEndpoint(), getMngEndpoint(), 
getRegistrationZnodePath());
+    return uniqueId;
   }
 
-  private void checkAndSetAcls() throws Exception {
-    if (!UserGroupInformation.isSecurityEnabled()) return;
-    // We are trying to check ACLs on the "workers" directory, which noone 
except us should be
-    // able to write to. Higher-level directories shouldn't matter - we don't 
read them.
-    String pathToCheck = workersPath;
-    List<ACL> acls = zooKeeperClient.getACL().forPath(pathToCheck);
-    if (acls == null || acls.isEmpty()) {
-      // Can there be no ACLs? There's some access (to get ACLs), so assume it 
means free for all.
-      LOG.warn("No ACLs on "  + pathToCheck + "; setting up ACLs. " + 
DISABLE_MESSAGE);
-      setUpAcls(pathToCheck);
-      return;
-    }
-    // This could be brittle.
-    assert userNameFromPrincipal != null;
-    Id currentUser = new Id("sasl", userNameFromPrincipal);
-    for (ACL acl : acls) {
-      if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || 
currentUser.equals(acl.getId())) {
-        continue; // Read permission/no permissions, or the expected user.
-      }
-      LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck
-        + "; setting up ACLs. " + DISABLE_MESSAGE);
-      setUpAcls(pathToCheck);
-      return;
-    }
-  }
-
-  private void setUpAcls(String path) throws Exception {
-    List<ACL> acls = createSecureAcls();
-    LinkedList<String> paths = new LinkedList<>();
-    paths.add(path);
-    while (!paths.isEmpty()) {
-      String currentPath = paths.poll();
-      List<String> children = 
zooKeeperClient.getChildren().forPath(currentPath);
-      if (children != null) {
-        for (String child : children) {
-          paths.add(currentPath + "/" + child);
-        }
-      }
-      zooKeeperClient.setACL().withACL(acls).forPath(currentPath);
-    }
-  }
-
-
   @Override
   public void unregister() throws IOException {
     // Nothing for the zkCreate models
   }
 
-  private class DynamicServiceInstance implements ServiceInstance {
-
-    private final ServiceRecord srv;
-    private final String host;
-    private final int rpcPort;
+  private class DynamicServiceInstance
+      extends ServiceInstanceBase implements LlapServiceInstance {
     private final int mngPort;
     private final int shufflePort;
     private final int outputFormatPort;
@@ -422,24 +180,13 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
     private final Resource resource;
 
     public DynamicServiceInstance(ServiceRecord srv) throws IOException {
-      this.srv = srv;
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Working with ServiceRecord: {}", srv);
-      }
+      super(srv, IPC_LLAP);
 
       final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
-      final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP);
       final Endpoint mng = srv.getInternalEndpoint(IPC_MNG);
       final Endpoint outputFormat = srv.getInternalEndpoint(IPC_OUTPUTFORMAT);
       final Endpoint services = srv.getExternalEndpoint(IPC_SERVICES);
 
-      this.host =
-          RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-              AddressTypes.ADDRESS_HOSTNAME_FIELD);
-      this.rpcPort =
-          
Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-              AddressTypes.ADDRESS_PORT_FIELD));
       this.mngPort =
           
Integer.parseInt(RegistryTypeUtils.getAddressField(mng.addresses.get(0),
               AddressTypes.ADDRESS_PORT_FIELD));
@@ -462,39 +209,6 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
     }
 
     @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      DynamicServiceInstance other = (DynamicServiceInstance) o;
-      return this.getWorkerIdentity().equals(other.getWorkerIdentity());
-    }
-
-    @Override
-    public int hashCode() {
-      return getWorkerIdentity().hashCode();
-    }
-
-    @Override
-    public String getWorkerIdentity() {
-      return srv.get(UNIQUE_IDENTIFIER);
-    }
-
-    @Override
-    public String getHost() {
-      return host;
-    }
-
-    @Override
-    public int getRpcPort() {
-      return rpcPort;
-    }
-
-    @Override
     public int getShufflePort() {
       return shufflePort;
     }
@@ -505,11 +219,6 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
     }
 
     @Override
-    public Map<String, String> getProperties() {
-      return srv.attributes();
-    }
-
-    @Override
     public Resource getResource() {
       return resource;
     }
@@ -530,83 +239,60 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
     public int getOutputFormatPort() {
       return outputFormatPort;
     }
-
-    // TODO: This needs a hashCode/equality implementation if used as a key in 
various structures.
-    // A new ServiceInstance is created each time.
   }
 
-  private void addToCache(String path, String host, ServiceInstance instance) {
-    instanceCacheLock.lock();
-    try {
-      putInCache(path, pathToInstanceCache, instance);
-      putInCache(host, nodeToInstanceCache, instance);
-    } finally {
-      instanceCacheLock.unlock();
-    }
-    LOG.debug("Added path={}, host={} instance={} to cache."
-            + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}",
-        path, host, instance, pathToInstanceCache.size(), 
nodeToInstanceCache.size());
-  }
 
-  private void removeFromCache(String path, String host) {
-    instanceCacheLock.lock();
-    try {
-      pathToInstanceCache.remove(path);
-      nodeToInstanceCache.remove(host);
-    } finally {
-      instanceCacheLock.unlock();
-    }
-    LOG.debug("Removed path={}, host={} from cache."
-            + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}",
-        path, host, pathToInstanceCache.size(), nodeToInstanceCache.size());
-  }
+  // TODO: this class is completely unnecessary... 1-on-1 mapping with parent.
+  //       Remains here as the legacy of the original higher-level interface 
(getInstance).
+  private static class DynamicServiceInstanceSet implements 
LlapServiceInstanceSet {
+    private final PathChildrenCache instancesCache;
+    private final LlapZookeeperRegistryImpl parent;
+    private final ServiceRecordMarshal encoder;
 
-  private void putInCache(String key, Map<String, Set<ServiceInstance>> cache,
-      ServiceInstance instance) {
-    Set<ServiceInstance> instanceSet = cache.get(key);
-    if (instanceSet == null) {
-      instanceSet = Sets.newHashSet();
-      cache.put(key, instanceSet);
+    public DynamicServiceInstanceSet(PathChildrenCache cache,
+        LlapZookeeperRegistryImpl parent, ServiceRecordMarshal encoder) {
+      this.instancesCache = cache;
+      this.parent = parent;
+      this.encoder = encoder;
+      parent.populateCache(instancesCache);
     }
-    instanceSet.add(instance);
-  }
 
 
-  private class DynamicServiceInstanceSet implements ServiceInstanceSet {
-    private final PathChildrenCache instancesCache;
+    @Override
+    public Collection<LlapServiceInstance> getAll() {
+      return parent.getAll();
+    }
 
-    public DynamicServiceInstanceSet(final PathChildrenCache cache) {
-      this.instancesCache = cache;
-      populateCache();
+    @Override
+    public Collection<LlapServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes) {
+      return parent.getAllInstancesOrdered(consistentIndexes, instancesCache);
     }
 
-    private void populateCache() {
-      for (ChildData childData : instancesCache.getCurrentData()) {
-        byte[] data = getWorkerData(childData);
-        if (data == null) continue;
-        try {
-          ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
-          ServiceInstance instance = new DynamicServiceInstance(srv);
-          addToCache(childData.getPath(), instance.getHost(), instance);
-        } catch (IOException e) {
-          LOG.error("Unable to decode data for zkpath: {}." +
-              " Ignoring from current instances list..", childData.getPath());
+    @Override
+    public LlapServiceInstance getInstance(String name) {
+      Collection<LlapServiceInstance> instances = getAll();
+      for(LlapServiceInstance instance : instances) {
+        if (instance.getWorkerIdentity().equals(name)) {
+          return instance;
         }
       }
+      return null;
     }
 
     @Override
-    public Collection<ServiceInstance> getAll() {
-      Set<ServiceInstance> instances =  new HashSet<>();
-      for(Set<ServiceInstance> instanceSet : pathToInstanceCache.values()) {
-        instances.addAll(instanceSet);
-      }
-      return instances;
+    public Set<LlapServiceInstance> getByHost(String host) {
+      return parent.getByHost(host);
+    }
+
+    @Override
+    public int size() {
+      return parent.size();
     }
 
+    @Override
     public ApplicationId getApplicationId() {
       for (ChildData childData : instancesCache.getCurrentData()) {
-        byte[] data = getWorkerData(childData);
+        byte[] data = getWorkerData(childData, WORKER_PREFIX);
         if (data == null) continue;
         ServiceRecord sr = null;
         try {
@@ -622,147 +308,70 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
       }
       return null;
     }
+  }
 
-    private byte[] getWorkerData(ChildData childData) {
-        if (childData == null) return null;
-        byte[] data = childData.getData();
-        if (data == null) return null;
-        if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) return null;
-        return data;
-    }
-
-    @Override
-    public Collection<ServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes) {
-      Map<String, Long> slotByWorker = new HashMap<String, Long>();
-      Set<ServiceInstance> unsorted = Sets.newHashSet();
-      for (ChildData childData : instancesCache.getCurrentData()) {
-        if (childData == null) continue;
-        byte[] data = childData.getData();
-        if (data == null) continue;
-        String nodeName = extractNodeName(childData);
-        if (nodeName.startsWith(WORKER_PREFIX)) {
-          Set<ServiceInstance> instances = 
pathToInstanceCache.get(childData.getPath());
-          if (instances != null) {
-            unsorted.addAll(instances);
-          }
-        } else if (nodeName.startsWith(SLOT_PREFIX)) {
-          slotByWorker.put(extractWorkerIdFromSlot(childData),
-              Long.parseLong(nodeName.substring(SLOT_PREFIX.length())));
-        } else {
-          LOG.info("Ignoring unknown node {}", childData.getPath());
-        }
-      }
-
-      TreeMap<Long, ServiceInstance> sorted = new TreeMap<>();
-      long maxSlot = Long.MIN_VALUE;
-      for (ServiceInstance worker : unsorted) {
-        Long slot = slotByWorker.get(worker.getWorkerIdentity());
-        if (slot == null) {
-          LOG.info("Unknown slot for {}", worker.getWorkerIdentity());
-          continue;
-        }
-        maxSlot = Math.max(maxSlot, slot);
-        sorted.put(slot, worker);
-      }
+  private static String extractWorkerIdFromSlot(ChildData childData) {
+    return new String(childData.getData(), SlotZnode.CHARSET);
+  }
 
-      if (consistentIndexes) {
-        // Add dummy instances to all slots where LLAPs are MIA... I can haz 
insert_iterator? 
-        TreeMap<Long, ServiceInstance> dummies = new TreeMap<>();
-        Iterator<Long> keyIter = sorted.keySet().iterator();
-        long expected = 0;
-        Long ts = null;
-        while (keyIter.hasNext()) {
-          Long slot = keyIter.next();
-          assert slot >= expected;
-          while (slot > expected) {
-            if (ts == null) {
-              ts = System.nanoTime(); // Inactive nodes restart every call!
-            }
-            dummies.put(expected, new InactiveServiceInstance("inactive-" + 
expected + "-" + ts));
-            ++expected;
-          }
-          ++expected;
+  // The real implementation for the instanceset... instanceset has its own 
copy of the
+  // ZK cache yet completely depends on the parent in every other aspect and 
is thus unneeded.
+
+  Collection<LlapServiceInstance> getAllInstancesOrdered(
+      boolean consistentIndexes, PathChildrenCache instancesCache) {
+    Map<String, Long> slotByWorker = new HashMap<String, Long>();
+    Set<LlapServiceInstance> unsorted = Sets.newHashSet();
+    for (ChildData childData : instancesCache.getCurrentData()) {
+      if (childData == null) continue;
+      byte[] data = childData.getData();
+      if (data == null) continue;
+      String nodeName = extractNodeName(childData);
+      if (nodeName.startsWith(WORKER_PREFIX)) {
+        Set<LlapServiceInstance> instances = 
getInstancesByPath(childData.getPath());
+        if (instances != null) {
+          unsorted.addAll(instances);
         }
-        sorted.putAll(dummies);
+      } else if (nodeName.startsWith(SLOT_PREFIX)) {
+        slotByWorker.put(extractWorkerIdFromSlot(childData),
+            Long.parseLong(nodeName.substring(SLOT_PREFIX.length())));
+      } else {
+        LOG.info("Ignoring unknown node {}", childData.getPath());
       }
-      return sorted.values();
     }
 
-    @Override
-    public ServiceInstance getInstance(String name) {
-      Collection<ServiceInstance> instances = getAll();
-      for(ServiceInstance instance : instances) {
-        if (instance.getWorkerIdentity().equals(name)) {
-          return instance;
-        }
+    TreeMap<Long, LlapServiceInstance> sorted = new TreeMap<>();
+    long maxSlot = Long.MIN_VALUE;
+    for (LlapServiceInstance worker : unsorted) {
+      Long slot = slotByWorker.get(worker.getWorkerIdentity());
+      if (slot == null) {
+        LOG.info("Unknown slot for {}", worker.getWorkerIdentity());
+        continue;
       }
-      return null;
-    }
-
-    @Override
-    public Set<ServiceInstance> getByHost(String host) {
-      Set<ServiceInstance> byHost = nodeToInstanceCache.get(host);
-      byHost = (byHost == null) ? Sets.<ServiceInstance>newHashSet() : byHost;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Returning " + byHost.size() + " hosts for locality 
allocation on " + host);
-      }
-      return byHost;
-    }
-
-    @Override
-    public int size() {
-      // not using the path child cache here as there could be more than 1 
path per host (worker and slot znodes)
-      return nodeToInstanceCache.size();
-    }
-  }
-
-  // TODO: make class static? fields leak
-  private class InstanceStateChangeListener implements 
PathChildrenCacheListener {
-    private final Logger LOG = 
LoggerFactory.getLogger(InstanceStateChangeListener.class);
-
-    @Override
-    public void childEvent(final CuratorFramework client, final 
PathChildrenCacheEvent event)
-        throws Exception {
-      Preconditions.checkArgument(client != null
-          && client.getState() == CuratorFrameworkState.STARTED, "client is 
not started");
-
-      synchronized (this) {
-        ChildData childData = event.getData();
-        if (childData == null)
-          return;
-        String nodeName = extractNodeName(childData);
-        if (!nodeName.startsWith(WORKER_PREFIX))
-          return; // No need to propagate slot updates.
-        LOG.info("{} for zknode {} in llap namespace", event.getType(), 
childData.getPath());
-        ServiceInstance instance = extractServiceInstance(event, childData);
-        switch (event.getType()) {
-        case CHILD_ADDED:
-          addToCache(childData.getPath(), instance.getHost(), instance);
-          for (ServiceInstanceStateChangeListener listener : 
stateChangeListeners) {
-            listener.onCreate(instance);
-          }
-          break;
-        case CHILD_UPDATED:
-          addToCache(childData.getPath(), instance.getHost(), instance);
-          for (ServiceInstanceStateChangeListener listener : 
stateChangeListeners) {
-            listener.onUpdate(instance);
-          }
-          break;
-        case CHILD_REMOVED:
-          removeFromCache(childData.getPath(), instance.getHost());
-          for (ServiceInstanceStateChangeListener listener : 
stateChangeListeners) {
-            listener.onRemove(instance);
+      maxSlot = Math.max(maxSlot, slot);
+      sorted.put(slot, worker);
+    }
+
+    if (consistentIndexes) {
+      // Add dummy instances to all slots where LLAPs are MIA... I can haz 
insert_iterator? 
+      TreeMap<Long, LlapServiceInstance> dummies = new TreeMap<>();
+      Iterator<Long> keyIter = sorted.keySet().iterator();
+      long expected = 0;
+      Long ts = null;
+      while (keyIter.hasNext()) {
+        Long slot = keyIter.next();
+        assert slot >= expected;
+        while (slot > expected) {
+          if (ts == null) {
+            ts = System.nanoTime(); // Inactive nodes restart every call!
           }
-          break;
-        default:
-          // Ignore all the other events; logged above.
+          dummies.put(expected, new InactiveServiceInstance("inactive-" + 
expected + "-" + ts));
+          ++expected;
         }
+        ++expected;
       }
+      sorted.putAll(dummies);
     }
-  }
-
-  private static String extractWorkerIdFromSlot(ChildData childData) {
-    return new String(childData.getData(), SlotZnode.CHARSET);
+    return sorted.values();
   }
 
   private static String extractNodeName(ChildData childData) {
@@ -774,187 +383,44 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
     return nodeName;
   }
 
-  private ServiceInstance extractServiceInstance(
-      PathChildrenCacheEvent event, ChildData childData) {
-    byte[] data = childData.getData();
-    if (data == null) return null;
-    try {
-      ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data);
-      return new DynamicServiceInstance(srv);
-    } catch (IOException e) {
-      LOG.error("Unable to decode data for zknode: {}." +
-          " Dropping notification of type: {}", childData.getPath(), 
event.getType());
-      return null;
-    }
-  }
-  
+
   @Override
-  public ServiceInstanceSet getInstances(
+  public LlapServiceInstanceSet getInstances(
       String component, long clusterReadyTimeoutMs) throws IOException {
-    checkPathChildrenCache(clusterReadyTimeoutMs);
+    PathChildrenCache instancesCache = 
ensureInstancesCache(clusterReadyTimeoutMs);
 
     // lazily create instances
     if (instances == null) {
-      this.instances = new DynamicServiceInstanceSet(instancesCache);
+      this.instances = new DynamicServiceInstanceSet(instancesCache, this, 
encoder);
     }
     return instances;
   }
 
   @Override
   public ApplicationId getApplicationId() throws IOException {
-    getInstances("LLAP", 0);
-    return instances.getApplicationId();
-  }
-
-  @Override
-  public synchronized void registerStateChangeListener(
-      final ServiceInstanceStateChangeListener listener)
-      throws IOException {
-    checkPathChildrenCache(0);
-
-    this.stateChangeListeners.add(listener);
-  }
-
-  private synchronized void checkPathChildrenCache(long clusterReadyTimeoutMs) 
throws IOException {
-    Preconditions.checkArgument(zooKeeperClient != null &&
-            zooKeeperClient.getState() == CuratorFrameworkState.STARTED, 
"client is not started");
-    // lazily create PathChildrenCache
-    if (instancesCache != null) return;
-    ExecutorService tp = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder()
-              
.setDaemon(true).setNameFormat("StateChangeNotificationHandler").build());
-    long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 
1000000L;
-    long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs);
-    while (true) {
-      PathChildrenCache instancesCache = new 
PathChildrenCache(zooKeeperClient, workersPath, true);
-      instancesCache.getListenable().addListener(new 
InstanceStateChangeListener(), tp);
-      try {
-        instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-        this.instancesCache = instancesCache;
-        break;
-      } catch (InvalidACLException e) {
-        // PathChildrenCache tried to mkdir when the znode wasn't there, and 
failed.
-        CloseableUtils.closeQuietly(instancesCache);
-        long elapsedNs = System.nanoTime() - startTimeNs;
-        if (deltaNs == 0 || deltaNs <= elapsedNs) {
-          LOG.error("Unable to start curator PathChildrenCache", e);
-          throw new IOException(e);
-        }
-        LOG.warn("The cluster is not started yet (InvalidACL); will retry");
-        try {
-          Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L));
-        } catch (InterruptedException e1) {
-          LOG.error("Interrupted while retrying the PathChildrenCache 
startup");
-          throw new IOException(e1);
-        }
-        sleepTimeMs = sleepTimeMs << 1;
-      } catch (Exception e) {
-        CloseableUtils.closeQuietly(instancesCache);
-        LOG.error("Unable to start curator PathChildrenCache", e);
-        throw new IOException(e);
-      }
-    }
+    return getInstances("LLAP", 0).getApplicationId();
   }
 
   @Override
   public void start() throws IOException {
-    if (zooKeeperClient != null) {
-      setupZookeeperAuth(this.conf);
-      zooKeeperClient.start();
-    }
-    // Init closeable utils in case register is not called (see HIVE-13322)
-    CloseableUtils.class.getName();
+    super.start();
   }
 
   @Override
   public void stop() throws IOException {
-    CloseableUtils.closeQuietly(znode);
+    super.stop();
     CloseableUtils.closeQuietly(slotZnode);
-    CloseableUtils.closeQuietly(instancesCache);
-    CloseableUtils.closeQuietly(zooKeeperClient);
   }
 
-
-  private void setupZookeeperAuth(final Configuration conf) throws IOException 
{
-    if (UserGroupInformation.isSecurityEnabled() && LlapProxy.isDaemon()) {
-      LOG.info("UGI security is enabled. Setting up ZK auth.");
-
-      String llapPrincipal = HiveConf.getVar(conf, 
ConfVars.LLAP_KERBEROS_PRINCIPAL);
-      if (llapPrincipal == null || llapPrincipal.isEmpty()) {
-        throw new IOException("Llap Kerberos principal is empty");
-      }
-
-      String llapKeytab = HiveConf.getVar(conf, 
ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
-      if (llapKeytab == null || llapKeytab.isEmpty()) {
-        throw new IOException("Llap Kerberos keytab is empty");
-      }
-
-      // Install the JAAS Configuration for the runtime
-      setZookeeperClientKerberosJaasConfig(llapPrincipal, llapKeytab);
-    } else {
-      LOG.info("UGI security is not enabled, or non-daemon environment. 
Skipping setting up ZK auth.");
-    }
-  }
-
-  /**
-   * Dynamically sets up the JAAS configuration that uses kerberos
-   *
-   * @param principal
-   * @param keyTabFile
-   * @throws IOException
-   */
-  private void setZookeeperClientKerberosJaasConfig(String principal, String 
keyTabFile)
-      throws IOException {
-    // ZooKeeper property name to pick the correct JAAS conf section
-    final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
-    System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, 
SASL_LOGIN_CONTEXT_NAME);
-
-    principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
-    userNameFromPrincipal = LlapUtil.getUserNameFromPrincipal(principal);
-    JaasConfiguration jaasConf = new 
JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal,
-        keyTabFile);
-
-    // Install the Configuration in the runtime.
-    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+  @Override
+  protected LlapServiceInstance createServiceInstance(ServiceRecord srv) 
throws IOException {
+    return new DynamicServiceInstance(srv);
   }
 
-  /**
-   * A JAAS configuration for ZooKeeper clients intended to use for SASL
-   * Kerberos.
-   */
-  private static class JaasConfiguration extends 
javax.security.auth.login.Configuration {
-    // Current installed Configuration
-    private final javax.security.auth.login.Configuration baseConfig = 
javax.security.auth.login.Configuration
-        .getConfiguration();
-    private final String loginContextName;
-    private final String principal;
-    private final String keyTabFile;
-
-    public JaasConfiguration(String llapLoginContextName, String principal, 
String keyTabFile) {
-      this.loginContextName = llapLoginContextName;
-      this.principal = principal;
-      this.keyTabFile = keyTabFile;
-    }
-
-    @Override
-    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
-      if (loginContextName.equals(appName)) {
-        Map<String, String> krbOptions = new HashMap<String, String>();
-        krbOptions.put("doNotPrompt", "true");
-        krbOptions.put("storeKey", "true");
-        krbOptions.put("useKeyTab", "true");
-        krbOptions.put("principal", principal);
-        krbOptions.put("keyTab", keyTabFile);
-        krbOptions.put("refreshKrb5Config", "true");
-        AppConfigurationEntry llapZooKeeperClientEntry = new 
AppConfigurationEntry(
-            KerberosUtil.getKrb5LoginModuleName(),
-            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions);
-        return new AppConfigurationEntry[]{llapZooKeeperClientEntry};
-      }
-      // Try the base config
-      if (baseConfig != null) {
-        return baseConfig.getAppConfigurationEntry(appName);
-      }
-      return null;
-    }
+  @Override
+  protected String getZkPathUser(Configuration conf) {
+    // External LLAP clients would need to set LLAP_ZK_REGISTRY_USER to the 
LLAP daemon user (hive),
+    // rather than relying on RegistryUtils.currentUser().
+    return HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, 
RegistryUtils.currentUser());
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index ace9475..783a19f 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -33,8 +33,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.retry.RetryPolicies;
@@ -55,10 +55,10 @@ public class LlapTokenClient {
   private final SocketFactory socketFactory;
   private final RetryPolicy retryPolicy;
   private final Configuration conf;
-  private ServiceInstanceSet activeInstances;
-  private Collection<ServiceInstance> lastKnownInstances;
+  private LlapServiceInstanceSet activeInstances;
+  private Collection<LlapServiceInstance> lastKnownInstances;
   private LlapManagementProtocolClientImpl client;
-  private ServiceInstance clientInstance;
+  private LlapServiceInstance clientInstance;
 
   public LlapTokenClient(Configuration conf) {
     this.conf = conf;
@@ -71,7 +71,7 @@ public class LlapTokenClient {
 
   public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws 
IOException {
     if (!UserGroupInformation.isSecurityEnabled()) return null;
-    Iterator<ServiceInstance> llaps = null;
+    Iterator<LlapServiceInstance> llaps = null;
     if (clientInstance == null) {
       assert client == null;
       llaps = getLlapServices(false).iterator();
@@ -128,7 +128,7 @@ public class LlapTokenClient {
   }
 
   /** Synchronized - LLAP registry and instance set are not thread safe. */
-  private synchronized List<ServiceInstance> getLlapServices(
+  private synchronized List<LlapServiceInstance> getLlapServices(
       boolean doForceRefresh) throws IOException {
     if (!doForceRefresh && lastKnownInstances != null) {
       return new ArrayList<>(lastKnownInstances);
@@ -137,12 +137,12 @@ public class LlapTokenClient {
       registry.start();
       activeInstances = registry.getInstances();
     }
-    Collection<ServiceInstance> daemons = activeInstances.getAll();
+    Collection<LlapServiceInstance> daemons = activeInstances.getAll();
     if (daemons == null || daemons.isEmpty()) {
       throw new RuntimeException("No LLAPs found");
     }
     lastKnownInstances = daemons;
-    return new ArrayList<ServiceInstance>(lastKnownInstances);
+    return new ArrayList<LlapServiceInstance>(lastKnownInstances);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java 
b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
new file mode 100644
index 0000000..908b3bb
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry;
+
+import java.util.Map;
+
+public interface ServiceInstance {
+
+  /**
+   * Worker identity is a UUID (unique across restarts), to identify a node 
which died &amp; was brought
+   * back on the same host/port
+   */
+  public abstract String getWorkerIdentity();
+
+  /**
+   * Hostname of the service instance
+   * 
+   * @return
+   */
+  public abstract String getHost();
+
+  /**
+   * RPC Endpoint for service instance
+   * 
+   * @return
+   */
+  public int getRpcPort();
+
+  /**
+   * Config properties of the Service Instance (llap.daemon.*)
+   * 
+   * @return
+   */
+  public abstract Map<String, String> getProperties();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java 
b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
new file mode 100644
index 0000000..34fba5c
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Note: For most of the implementations, there's no guarantee that the 
ServiceInstance returned by
+ * one invocation is the same as the instance returned by another invocation. 
e.g. the ZK registry
+ * returns a new ServiceInstance object each time a getInstance call is made.
+ */
+public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
+
+  /**
+   * Get an instance mapping which map worker identity to each instance.
+   * 
+   * The worker identity does not collide between restarts, so each restart 
will have a unique id,
+   * while having the same host/ip pair.
+   * 
+   * @return
+   */
+  Collection<InstanceType> getAll();
+
+  /**
+   * Get an instance by worker identity.
+   * 
+   * @param name
+   * @return
+   */
+  InstanceType getInstance(String name);
+
+  /**
+   * Get a list of service instances for a given host.
+   * 
+   * The list could include dead and alive instances.
+   * 
+   * @param host
+   * @return
+   */
+  Set<InstanceType> getByHost(String host);
+
+  /**
+   * Get number of instances in the currently availabe.
+   *
+   * @return - number of instances
+   */
+  int size();
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java
 
b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java
new file mode 100644
index 0000000..0a44179
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.registry;
+
+/**
+ * Callback listener for instance state change events
+ */
+public interface ServiceInstanceStateChangeListener<InstanceType extends 
ServiceInstance> {
+  /**
+   * Called when new {@link ServiceInstance} is created.
+   *
+   * @param serviceInstance - created service instance
+   */
+  void onCreate(InstanceType serviceInstance);
+
+  /**
+   * Called when an existing {@link ServiceInstance} is updated.
+   *
+   * @param serviceInstance - updated service instance
+   */
+  void onUpdate(InstanceType serviceInstance);
+
+  /**
+   * Called when an existing {@link ServiceInstance} is removed.
+   *
+   * @param serviceInstance - removed service instance
+   */
+  void onRemove(InstanceType serviceInstance);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
new file mode 100644
index 0000000..db3d788
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.hive.registry.ServiceInstance;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServiceInstanceBase implements ServiceInstance {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ServiceInstanceBase.class);
+
+  protected final ServiceRecord srv;
+  protected final String host;
+  protected final int rpcPort;
+
+  public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws 
IOException {
+    this.srv = srv;
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Working with ServiceRecord: {}", srv);
+    }
+
+    final Endpoint rpc = srv.getInternalEndpoint(rpcName);
+
+    this.host =
+        RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+            AddressTypes.ADDRESS_HOSTNAME_FIELD);
+    this.rpcPort =
+        
Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+            AddressTypes.ADDRESS_PORT_FIELD));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ServiceInstanceBase other = (ServiceInstanceBase) o;
+    return this.getWorkerIdentity().equals(other.getWorkerIdentity());
+  }
+
+  @Override
+  public int hashCode() {
+    return getWorkerIdentity().hashCode();
+  }
+
+  @Override
+  public String getWorkerIdentity() {
+    return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+  }
+
+  @Override
+  public String getHost() {
+    return host;
+  }
+
+  @Override
+  public int getRpcPort() {
+    return rpcPort;
+  }
+
+  @Override
+  public Map<String, String> getProperties() {
+    return srv.attributes();
+  }
+
+  @Override
+  public String toString() {
+    return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host="
+        + host + ":" + rpcPort + "]";
+  }
+}
\ No newline at end of file

Reply via email to