HIVE-17409 : refactor LLAP ZK registry to make the ZK-registry part reusable (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4795b996 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4795b996 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4795b996 Branch: refs/heads/master Commit: 4795b99690c9428636a5a28c5d4be40ce5fc430d Parents: 2e226d2 Author: sergey <ser...@apache.org> Authored: Fri Sep 1 11:44:11 2017 -0700 Committer: sergey <ser...@apache.org> Committed: Fri Sep 1 11:48:01 2017 -0700 ---------------------------------------------------------------------- .../hive/llap/registry/LlapServiceInstance.java | 59 ++ .../llap/registry/LlapServiceInstanceSet.java | 34 + .../hive/llap/registry/ServiceInstance.java | 86 -- .../hive/llap/registry/ServiceInstanceSet.java | 69 -- .../ServiceInstanceStateChangeListener.java | 42 - .../hive/llap/registry/ServiceRegistry.java | 8 +- .../registry/impl/InactiveServiceInstance.java | 4 +- .../registry/impl/LlapFixedRegistryImpl.java | 41 +- .../llap/registry/impl/LlapRegistryService.java | 18 +- .../impl/LlapZookeeperRegistryImpl.java | 800 +++---------------- .../hive/llap/security/LlapTokenClient.java | 18 +- .../hadoop/hive/registry/ServiceInstance.java | 47 ++ .../hive/registry/ServiceInstanceSet.java | 61 ++ .../ServiceInstanceStateChangeListener.java | 42 + .../hive/registry/impl/ServiceInstanceBase.java | 93 +++ .../hive/registry/impl/ZkRegistryBase.java | 549 +++++++++++++ .../hive/registry/impl/ZookeeperUtils.java | 116 +++ .../hadoop/hive/llap/LlapBaseInputFormat.java | 30 +- .../hive/llap/cli/LlapStatusServiceDriver.java | 6 +- .../daemon/services/impl/LlapWebServices.java | 4 +- .../llap/tezplugins/LlapTaskCommunicator.java | 8 +- .../tezplugins/LlapTaskSchedulerService.java | 42 +- .../TestLlapTaskSchedulerService.java | 16 +- .../apache/hadoop/hive/ql/exec/tez/Utils.java | 6 +- .../physical/LlapClusterStateForCompile.java | 8 +- 25 files changed, 1242 insertions(+), 965 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java new file mode 100644 index 0000000..30b1810 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstance.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry; + + +import org.apache.hadoop.hive.registry.ServiceInstance; + +import org.apache.hadoop.yarn.api.records.Resource; + +public interface LlapServiceInstance extends ServiceInstance { + + /** + * Management endpoint for service instance + * + * @return + */ + public int getManagementPort(); + + /** + * Shuffle Endpoint for service instance + * + * @return + */ + public int getShufflePort(); + + + /** + * Address for services hosted on http + * @return + */ + public String getServicesAddress(); + /** + * OutputFormat endpoint for service instance + * + * @return + */ + public int getOutputFormatPort(); + + + /** + * Memory and Executors available for the LLAP tasks + * + * This does not include the size of the cache or the actual vCores allocated via Slider. + * + * @return + */ + public Resource getResource(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java new file mode 100644 index 0000000..a728f4a --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/LlapServiceInstanceSet.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry; + +import java.util.Collection; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public interface LlapServiceInstanceSet extends ServiceInstanceSet<LlapServiceInstance> { + + /** + * Gets a list containing all the instances. This list has the same iteration order across + * different processes, assuming the list of registry entries is the same. + * @param consistentIndexes if true, also try to maintain the same exact index for each node + * across calls, by inserting inactive instances to replace the + * removed ones. + */ + Collection<LlapServiceInstance> getAllInstancesOrdered( + boolean consistentIndexes); + + /** LLAP application ID */ + ApplicationId getApplicationId(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java deleted file mode 100644 index 70515c4..0000000 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.registry; - -import java.util.Map; - -import org.apache.hadoop.yarn.api.records.Resource; - -public interface ServiceInstance { - - /** - * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought - * back on the same host/port - */ - public String getWorkerIdentity(); - - /** - * Hostname of the service instance - * - * @return - */ - public String getHost(); - - /** - * RPC Endpoint for service instance - * - * @return - */ - public int getRpcPort(); - - /** - * Management endpoint for service instance - * - * @return - */ - public int getManagementPort(); - - /** - * Shuffle Endpoint for service instance - * - * @return - */ - public int getShufflePort(); - - - /** - * Address for services hosted on http - * @return - */ - public String getServicesAddress(); - /** - * OutputFormat endpoint for service instance - * - * @return - */ - public int getOutputFormatPort(); - - - /** - * Config properties of the Service Instance (llap.daemon.*) - * - * @return - */ - - public Map<String, String> getProperties(); - - /** - * Memory and Executors available for the LLAP tasks - * - * This does not include the size of the cache or the actual vCores allocated via Slider. - * - * @return - */ - public Resource getResource(); -} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java deleted file mode 100644 index cc124e7..0000000 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.registry; - -import java.util.Collection; -import java.util.Set; - -/** - * Note: For most of the implementations, there's no guarantee that the ServiceInstance returned by - * one invocation is the same as the instance returned by another invocation. e.g. the ZK registry - * returns a new ServiceInstance object each time a getInstance call is made. - */ -public interface ServiceInstanceSet { - - /** - * Get an instance mapping which map worker identity to each instance. - * - * The worker identity does not collide between restarts, so each restart will have a unique id, - * while having the same host/ip pair. - * - * @return - */ - Collection<ServiceInstance> getAll(); - - /** - * Gets a list containing all the instances. This list has the same iteration order across - * different processes, assuming the list of registry entries is the same. - * @param consistentIndexes if true, also try to maintain the same exact index for each node - * across calls, by inserting inactive instances to replace the - * removed ones. - */ - Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes); - - /** - * Get an instance by worker identity. - * - * @param name - * @return - */ - ServiceInstance getInstance(String name); - - /** - * Get a list of service instances for a given host. - * - * The list could include dead and alive instances. - * - * @param host - * @return - */ - Set<ServiceInstance> getByHost(String host); - - /** - * Get number of instances in the currently availabe. - * - * @return - number of instances - */ - int size(); -} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java deleted file mode 100644 index 92eb8bd..0000000 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.registry; - -/** - * Callback listener for instance state change events - */ -public interface ServiceInstanceStateChangeListener { - /** - * Called when new {@link ServiceInstance} is created. - * - * @param serviceInstance - created service instance - */ - void onCreate(ServiceInstance serviceInstance); - - /** - * Called when an existing {@link ServiceInstance} is updated. - * - * @param serviceInstance - updated service instance - */ - void onUpdate(ServiceInstance serviceInstance); - - /** - * Called when an existing {@link ServiceInstance} is removed. - * - * @param serviceInstance - removed service instance - */ - void onRemove(ServiceInstance serviceInstance); -} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index 5739d72..5d7f813 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -14,7 +14,7 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; - +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.yarn.api.records.ApplicationId; /** @@ -49,14 +49,14 @@ public interface ServiceRegistry { * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not * started yet. 0 means do not wait. */ - ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException; + LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException; /** * Adds state change listeners for service instances. * @param listener - state change listener */ - void registerStateChangeListener(ServiceInstanceStateChangeListener listener) - throws IOException; + void registerStateChangeListener( + ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws IOException; /** * @return The application ID of the LLAP cluster. http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java index 9f2f3b4..1d6b716 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java @@ -16,10 +16,10 @@ package org.apache.hadoop.hive.llap.registry.impl; import java.util.Map; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.yarn.api.records.Resource; -public class InactiveServiceInstance implements ServiceInstance { +public class InactiveServiceInstance implements LlapServiceInstance { private final String name; public InactiveServiceInstance(String name) { this.name = name; http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index ebc32a1..c88198f 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -29,15 +29,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -117,7 +116,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { return "host-" + host; } - private final class FixedServiceInstance implements ServiceInstance { + private final class FixedServiceInstance implements LlapServiceInstance { private final String host; private final String serviceAddress; @@ -206,10 +205,10 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } } - private final class FixedServiceInstanceSet implements ServiceInstanceSet { + private final class FixedServiceInstanceSet implements LlapServiceInstanceSet { // LinkedHashMap have a repeatable iteration order. - private final Map<String, ServiceInstance> instances = new LinkedHashMap<>(); + private final Map<String, LlapServiceInstance> instances = new LinkedHashMap<>(); public FixedServiceInstanceSet() { for (String host : hosts) { @@ -219,17 +218,17 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } @Override - public Collection<ServiceInstance> getAll() { + public Collection<LlapServiceInstance> getAll() { return instances.values(); } @Override - public List<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) { - List<ServiceInstance> list = new LinkedList<>(); + public List<LlapServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) { + List<LlapServiceInstance> list = new LinkedList<>(); list.addAll(instances.values()); - Collections.sort(list, new Comparator<ServiceInstance>() { + Collections.sort(list, new Comparator<LlapServiceInstance>() { @Override - public int compare(ServiceInstance o1, ServiceInstance o2) { + public int compare(LlapServiceInstance o1, LlapServiceInstance o2) { return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); } }); @@ -237,14 +236,14 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } @Override - public ServiceInstance getInstance(String name) { + public LlapServiceInstance getInstance(String name) { return instances.get(name); } @Override - public Set<ServiceInstance> getByHost(String host) { - Set<ServiceInstance> byHost = new HashSet<ServiceInstance>(); - ServiceInstance inst = getInstance(getWorkerIdentity(host)); + public Set<LlapServiceInstance> getByHost(String host) { + Set<LlapServiceInstance> byHost = new HashSet<LlapServiceInstance>(); + LlapServiceInstance inst = getInstance(getWorkerIdentity(host)); if (inst != null) { byHost.add(inst); } @@ -255,15 +254,21 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { public int size() { return instances.size(); } + + @Override + public ApplicationId getApplicationId() { + return null; + } } @Override - public ServiceInstanceSet getInstances(String component, long timeoutMs) throws IOException { + public LlapServiceInstanceSet getInstances(String component, long timeoutMs) throws IOException { return new FixedServiceInstanceSet(); } @Override - public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) { + public void registerStateChangeListener( + final ServiceInstanceStateChangeListener<LlapServiceInstance> listener) { // nothing to set LOG.warn("Callbacks for instance state changes are not supported in fixed registry."); } http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 76fc9c7..80a6aba 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -13,24 +13,24 @@ */ package org.apache.hadoop.hive.llap.registry.impl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; + +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.HashMap; import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - public class LlapRegistryService extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); @@ -131,16 +131,16 @@ public class LlapRegistryService extends AbstractService { } } - public ServiceInstanceSet getInstances() throws IOException { + public LlapServiceInstanceSet getInstances() throws IOException { return getInstances(0); } - public ServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { + public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { return this.registry.getInstances("LLAP", clusterReadyTimeoutMs); } - public void registerStateChangeListener(ServiceInstanceStateChangeListener listener) - throws IOException { + public void registerStateChangeListener( + ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws IOException { this.registry.registerStateChangeListener(listener); } http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index ad17144..65f8f94 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -13,83 +13,47 @@ */ package org.apache.hadoop.hive.llap.registry.impl; +import org.apache.hadoop.registry.client.binding.RegistryUtils; + +import com.google.common.collect.Sets; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; -import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.security.auth.login.AppConfigurationEntry; - -import com.google.common.collect.Sets; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; -import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.io.api.LlapProxy; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ProtocolTypes; import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.KeeperException.InvalidACLException; -import org.apache.zookeeper.client.ZooKeeperSaslClient; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -public class LlapZookeeperRegistryImpl implements ServiceRegistry { - +public class LlapZookeeperRegistryImpl + extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry { private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class); /** @@ -100,155 +64,29 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { private static final String IPC_SHUFFLE = "shuffle"; private static final String IPC_LLAP = "llap"; private static final String IPC_OUTPUTFORMAT = "llapoutputformat"; - private final static String SASL_NAMESPACE = "llap-sasl"; - private final static String UNSECURE_NAMESPACE = "llap-unsecure"; + private final static String NAMESPACE_PREFIX = "llap-"; private final static String USER_SCOPE_PATH_PREFIX = "user-"; - private static final String DISABLE_MESSAGE = - "Set " + ConfVars.LLAP_VALIDATE_ACLS.varname + " to false to disable ACL validation"; private static final String WORKER_PREFIX = "worker-"; private static final String SLOT_PREFIX = "slot-"; + private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; - private final Configuration conf; - private final CuratorFramework zooKeeperClient; - // userPathPrefix is the path specific to the user for which ACLs should be restrictive. - // workersPath is the directory path where all the worker znodes are located. - private final String userPathPrefix, workersPath; - private String userNameFromPrincipal; // Only set when setting up the secure config for ZK. - - private PersistentEphemeralNode znode; private SlotZnode slotZnode; - private String znodePath; // unique identity for this instance - private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data - // to be used by clients of ServiceRegistry + // to be used by clients of ServiceRegistry TODO: this is unnecessary private DynamicServiceInstanceSet instances; - private PathChildrenCache instancesCache; - - private static final UUID uniq = UUID.randomUUID(); - private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; - - private Set<ServiceInstanceStateChangeListener> stateChangeListeners; - private final Map<String, Set<ServiceInstance>> pathToInstanceCache; - private final Map<String, Set<ServiceInstance>> nodeToInstanceCache; - private final Lock instanceCacheLock = new ReentrantLock(); - - // get local hostname - private static final String hostname; - - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { - this.conf = new Configuration(conf); - this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - String zkEnsemble = getQuorumServers(this.conf); - this.encoder = new RegistryUtils.ServiceRecordMarshal(); - int sessionTimeout = (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, - TimeUnit.MILLISECONDS); - int baseSleepTime = (int) HiveConf - .getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, - TimeUnit.MILLISECONDS); - int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); - - // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 - // worker-0000000 is the sequence number which will be retained until session timeout. If a - // worker does not respond due to communication interruptions it will retain the same sequence - // number when it returns back. If session timeout expires, the node will be deleted and new - // addition of the same node (restart) will get next sequence number - this.userPathPrefix = USER_SCOPE_PATH_PREFIX + getZkPathUser(this.conf); - this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers"; - this.instancesCache = null; - this.instances = null; - this.stateChangeListeners = new HashSet<>(); - this.pathToInstanceCache = new ConcurrentHashMap<>(); - this.nodeToInstanceCache = new ConcurrentHashMap<>(); - - final boolean isSecure = UserGroupInformation.isSecurityEnabled(); - ACLProvider zooKeeperAclProvider = new ACLProvider() { - @Override - public List<ACL> getDefaultAcl() { - // We always return something from getAclForPath so this should not happen. - LOG.warn("getDefaultAcl was called"); - return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - - @Override - public List<ACL> getAclForPath(String path) { - if (!isSecure || path == null || !path.contains(userPathPrefix)) { - // No security or the path is below the user path - full access. - return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - return createSecureAcls(); - } - }; - String rootNs = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE); - if (rootNs == null) { - rootNs = isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE; // The normal path. - } - - // Create a CuratorFramework instance to be used as the ZooKeeper client - // Use the zooKeeperAclProvider to create appropriate ACLs - this.zooKeeperClient = CuratorFrameworkFactory.builder() - .connectString(zkEnsemble) - .sessionTimeoutMs(sessionTimeout) - .aclProvider(zooKeeperAclProvider) - .namespace(rootNs) - .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) - .build(); - + super(instanceName, conf, + HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, + USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null, + HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), + HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE), + ConfVars.LLAP_VALIDATE_ACLS); LOG.info("Llap Zookeeper Registry is enabled with registryid: " + instanceName); } - private static List<ACL> createSecureAcls() { - // Read all to the world - List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE); - // Create/Delete/Write/Admin to creator - nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); - return nodeAcls; - } - - /** - * Get the ensemble server addresses from the configuration. The format is: host1:port, - * host2:port.. - * - * @param conf - **/ - private String getQuorumServers(Configuration conf) { - String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname); - String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, - ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); - StringBuilder quorum = new StringBuilder(); - for (int i = 0; i < hosts.length; i++) { - quorum.append(hosts[i].trim()); - if (!hosts[i].contains(":")) { - // if the hostname doesn't contain a port, add the configured port to hostname - quorum.append(":"); - quorum.append(port); - } - - if (i != hosts.length - 1) { - quorum.append(","); - } - } - - return quorum.toString(); - } - - private String getZkPathUser(Configuration conf) { - // External LLAP clients would need to set LLAP_ZK_REGISTRY_USER to the LLAP daemon user (hive), - // rather than relying on RegistryUtils.currentUser(). - String user = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); - return user; - } - public Endpoint getRpcEndpoint() { final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT); return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, rpcPort)); @@ -304,117 +142,37 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { } } - // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + String uniqueId = registerServiceRecord(srv); + long znodeCreationTimeout = 120; // Create a znode under the rootNamespace parent for this instance of the server try { - // PersistentEphemeralNode will make sure the ephemeral node created on server will be present - // even under connection or session interruption (will automatically handle retries) - znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL, - workersPath + "/" + WORKER_PREFIX, encoder.toBytes(srv)); - - // start the creation of znodes - znode.start(); - - // We'll wait for 120s for node creation - long znodeCreationTimeout = 120; - if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { - throw new Exception( - "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); - } - - znodePath = znode.getActualPath(); - slotZnode = new SlotZnode( - zooKeeperClient, workersPath, SLOT_PREFIX, WORKER_PREFIX, uniq.toString()); + zooKeeperClient, workersPath, SLOT_PREFIX, WORKER_PREFIX, uniqueId); if (!slotZnode.start(znodeCreationTimeout, TimeUnit.SECONDS)) { throw new Exception( "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); } - - if (HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)) { - try { - checkAndSetAcls(); - } catch (Exception ex) { - throw new IOException("Error validating or setting ACLs. " + DISABLE_MESSAGE, ex); - } - } - if (zooKeeperClient.checkExists().forPath(znodePath) == null) { - // No node exists, throw exception - throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper."); - } - LOG.info( - "Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}," + - " webui: {}, mgmt: {}, znodePath: {} ", - rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), znodePath); } catch (Exception e) { LOG.error("Unable to create a znode for this server instance", e); - CloseableUtils.closeQuietly(znode); CloseableUtils.closeQuietly(slotZnode); + super.stop(); throw (e instanceof IOException) ? (IOException)e : new IOException(e); } - if (LOG.isDebugEnabled()) { - LOG.debug("Created zknode with path: {} service record: {}", znodePath, srv); - } - - return uniq.toString(); + LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, " + + "shuffle: {}, webui: {}, mgmt: {}, znodePath: {}", rpcEndpoint, getShuffleEndpoint(), + getServicesEndpoint(), getMngEndpoint(), getRegistrationZnodePath()); + return uniqueId; } - private void checkAndSetAcls() throws Exception { - if (!UserGroupInformation.isSecurityEnabled()) return; - // We are trying to check ACLs on the "workers" directory, which noone except us should be - // able to write to. Higher-level directories shouldn't matter - we don't read them. - String pathToCheck = workersPath; - List<ACL> acls = zooKeeperClient.getACL().forPath(pathToCheck); - if (acls == null || acls.isEmpty()) { - // Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all. - LOG.warn("No ACLs on " + pathToCheck + "; setting up ACLs. " + DISABLE_MESSAGE); - setUpAcls(pathToCheck); - return; - } - // This could be brittle. - assert userNameFromPrincipal != null; - Id currentUser = new Id("sasl", userNameFromPrincipal); - for (ACL acl : acls) { - if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || currentUser.equals(acl.getId())) { - continue; // Read permission/no permissions, or the expected user. - } - LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck - + "; setting up ACLs. " + DISABLE_MESSAGE); - setUpAcls(pathToCheck); - return; - } - } - - private void setUpAcls(String path) throws Exception { - List<ACL> acls = createSecureAcls(); - LinkedList<String> paths = new LinkedList<>(); - paths.add(path); - while (!paths.isEmpty()) { - String currentPath = paths.poll(); - List<String> children = zooKeeperClient.getChildren().forPath(currentPath); - if (children != null) { - for (String child : children) { - paths.add(currentPath + "/" + child); - } - } - zooKeeperClient.setACL().withACL(acls).forPath(currentPath); - } - } - - @Override public void unregister() throws IOException { // Nothing for the zkCreate models } - private class DynamicServiceInstance implements ServiceInstance { - - private final ServiceRecord srv; - private final String host; - private final int rpcPort; + private class DynamicServiceInstance + extends ServiceInstanceBase implements LlapServiceInstance { private final int mngPort; private final int shufflePort; private final int outputFormatPort; @@ -422,24 +180,13 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { private final Resource resource; public DynamicServiceInstance(ServiceRecord srv) throws IOException { - this.srv = srv; - - if (LOG.isTraceEnabled()) { - LOG.trace("Working with ServiceRecord: {}", srv); - } + super(srv, IPC_LLAP); final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE); - final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP); final Endpoint mng = srv.getInternalEndpoint(IPC_MNG); final Endpoint outputFormat = srv.getInternalEndpoint(IPC_OUTPUTFORMAT); final Endpoint services = srv.getExternalEndpoint(IPC_SERVICES); - this.host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - this.rpcPort = - Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); this.mngPort = Integer.parseInt(RegistryTypeUtils.getAddressField(mng.addresses.get(0), AddressTypes.ADDRESS_PORT_FIELD)); @@ -462,39 +209,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - DynamicServiceInstance other = (DynamicServiceInstance) o; - return this.getWorkerIdentity().equals(other.getWorkerIdentity()); - } - - @Override - public int hashCode() { - return getWorkerIdentity().hashCode(); - } - - @Override - public String getWorkerIdentity() { - return srv.get(UNIQUE_IDENTIFIER); - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getRpcPort() { - return rpcPort; - } - - @Override public int getShufflePort() { return shufflePort; } @@ -505,11 +219,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { } @Override - public Map<String, String> getProperties() { - return srv.attributes(); - } - - @Override public Resource getResource() { return resource; } @@ -530,83 +239,60 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { public int getOutputFormatPort() { return outputFormatPort; } - - // TODO: This needs a hashCode/equality implementation if used as a key in various structures. - // A new ServiceInstance is created each time. } - private void addToCache(String path, String host, ServiceInstance instance) { - instanceCacheLock.lock(); - try { - putInCache(path, pathToInstanceCache, instance); - putInCache(host, nodeToInstanceCache, instance); - } finally { - instanceCacheLock.unlock(); - } - LOG.debug("Added path={}, host={} instance={} to cache." - + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", - path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size()); - } - private void removeFromCache(String path, String host) { - instanceCacheLock.lock(); - try { - pathToInstanceCache.remove(path); - nodeToInstanceCache.remove(host); - } finally { - instanceCacheLock.unlock(); - } - LOG.debug("Removed path={}, host={} from cache." - + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", - path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); - } + // TODO: this class is completely unnecessary... 1-on-1 mapping with parent. + // Remains here as the legacy of the original higher-level interface (getInstance). + private static class DynamicServiceInstanceSet implements LlapServiceInstanceSet { + private final PathChildrenCache instancesCache; + private final LlapZookeeperRegistryImpl parent; + private final ServiceRecordMarshal encoder; - private void putInCache(String key, Map<String, Set<ServiceInstance>> cache, - ServiceInstance instance) { - Set<ServiceInstance> instanceSet = cache.get(key); - if (instanceSet == null) { - instanceSet = Sets.newHashSet(); - cache.put(key, instanceSet); + public DynamicServiceInstanceSet(PathChildrenCache cache, + LlapZookeeperRegistryImpl parent, ServiceRecordMarshal encoder) { + this.instancesCache = cache; + this.parent = parent; + this.encoder = encoder; + parent.populateCache(instancesCache); } - instanceSet.add(instance); - } - private class DynamicServiceInstanceSet implements ServiceInstanceSet { - private final PathChildrenCache instancesCache; + @Override + public Collection<LlapServiceInstance> getAll() { + return parent.getAll(); + } - public DynamicServiceInstanceSet(final PathChildrenCache cache) { - this.instancesCache = cache; - populateCache(); + @Override + public Collection<LlapServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) { + return parent.getAllInstancesOrdered(consistentIndexes, instancesCache); } - private void populateCache() { - for (ChildData childData : instancesCache.getCurrentData()) { - byte[] data = getWorkerData(childData); - if (data == null) continue; - try { - ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); - ServiceInstance instance = new DynamicServiceInstance(srv); - addToCache(childData.getPath(), instance.getHost(), instance); - } catch (IOException e) { - LOG.error("Unable to decode data for zkpath: {}." + - " Ignoring from current instances list..", childData.getPath()); + @Override + public LlapServiceInstance getInstance(String name) { + Collection<LlapServiceInstance> instances = getAll(); + for(LlapServiceInstance instance : instances) { + if (instance.getWorkerIdentity().equals(name)) { + return instance; } } + return null; } @Override - public Collection<ServiceInstance> getAll() { - Set<ServiceInstance> instances = new HashSet<>(); - for(Set<ServiceInstance> instanceSet : pathToInstanceCache.values()) { - instances.addAll(instanceSet); - } - return instances; + public Set<LlapServiceInstance> getByHost(String host) { + return parent.getByHost(host); + } + + @Override + public int size() { + return parent.size(); } + @Override public ApplicationId getApplicationId() { for (ChildData childData : instancesCache.getCurrentData()) { - byte[] data = getWorkerData(childData); + byte[] data = getWorkerData(childData, WORKER_PREFIX); if (data == null) continue; ServiceRecord sr = null; try { @@ -622,147 +308,70 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { } return null; } + } - private byte[] getWorkerData(ChildData childData) { - if (childData == null) return null; - byte[] data = childData.getData(); - if (data == null) return null; - if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) return null; - return data; - } - - @Override - public Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) { - Map<String, Long> slotByWorker = new HashMap<String, Long>(); - Set<ServiceInstance> unsorted = Sets.newHashSet(); - for (ChildData childData : instancesCache.getCurrentData()) { - if (childData == null) continue; - byte[] data = childData.getData(); - if (data == null) continue; - String nodeName = extractNodeName(childData); - if (nodeName.startsWith(WORKER_PREFIX)) { - Set<ServiceInstance> instances = pathToInstanceCache.get(childData.getPath()); - if (instances != null) { - unsorted.addAll(instances); - } - } else if (nodeName.startsWith(SLOT_PREFIX)) { - slotByWorker.put(extractWorkerIdFromSlot(childData), - Long.parseLong(nodeName.substring(SLOT_PREFIX.length()))); - } else { - LOG.info("Ignoring unknown node {}", childData.getPath()); - } - } - - TreeMap<Long, ServiceInstance> sorted = new TreeMap<>(); - long maxSlot = Long.MIN_VALUE; - for (ServiceInstance worker : unsorted) { - Long slot = slotByWorker.get(worker.getWorkerIdentity()); - if (slot == null) { - LOG.info("Unknown slot for {}", worker.getWorkerIdentity()); - continue; - } - maxSlot = Math.max(maxSlot, slot); - sorted.put(slot, worker); - } + private static String extractWorkerIdFromSlot(ChildData childData) { + return new String(childData.getData(), SlotZnode.CHARSET); + } - if (consistentIndexes) { - // Add dummy instances to all slots where LLAPs are MIA... I can haz insert_iterator? - TreeMap<Long, ServiceInstance> dummies = new TreeMap<>(); - Iterator<Long> keyIter = sorted.keySet().iterator(); - long expected = 0; - Long ts = null; - while (keyIter.hasNext()) { - Long slot = keyIter.next(); - assert slot >= expected; - while (slot > expected) { - if (ts == null) { - ts = System.nanoTime(); // Inactive nodes restart every call! - } - dummies.put(expected, new InactiveServiceInstance("inactive-" + expected + "-" + ts)); - ++expected; - } - ++expected; + // The real implementation for the instanceset... instanceset has its own copy of the + // ZK cache yet completely depends on the parent in every other aspect and is thus unneeded. + + Collection<LlapServiceInstance> getAllInstancesOrdered( + boolean consistentIndexes, PathChildrenCache instancesCache) { + Map<String, Long> slotByWorker = new HashMap<String, Long>(); + Set<LlapServiceInstance> unsorted = Sets.newHashSet(); + for (ChildData childData : instancesCache.getCurrentData()) { + if (childData == null) continue; + byte[] data = childData.getData(); + if (data == null) continue; + String nodeName = extractNodeName(childData); + if (nodeName.startsWith(WORKER_PREFIX)) { + Set<LlapServiceInstance> instances = getInstancesByPath(childData.getPath()); + if (instances != null) { + unsorted.addAll(instances); } - sorted.putAll(dummies); + } else if (nodeName.startsWith(SLOT_PREFIX)) { + slotByWorker.put(extractWorkerIdFromSlot(childData), + Long.parseLong(nodeName.substring(SLOT_PREFIX.length()))); + } else { + LOG.info("Ignoring unknown node {}", childData.getPath()); } - return sorted.values(); } - @Override - public ServiceInstance getInstance(String name) { - Collection<ServiceInstance> instances = getAll(); - for(ServiceInstance instance : instances) { - if (instance.getWorkerIdentity().equals(name)) { - return instance; - } + TreeMap<Long, LlapServiceInstance> sorted = new TreeMap<>(); + long maxSlot = Long.MIN_VALUE; + for (LlapServiceInstance worker : unsorted) { + Long slot = slotByWorker.get(worker.getWorkerIdentity()); + if (slot == null) { + LOG.info("Unknown slot for {}", worker.getWorkerIdentity()); + continue; } - return null; - } - - @Override - public Set<ServiceInstance> getByHost(String host) { - Set<ServiceInstance> byHost = nodeToInstanceCache.get(host); - byHost = (byHost == null) ? Sets.<ServiceInstance>newHashSet() : byHost; - if (LOG.isDebugEnabled()) { - LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); - } - return byHost; - } - - @Override - public int size() { - // not using the path child cache here as there could be more than 1 path per host (worker and slot znodes) - return nodeToInstanceCache.size(); - } - } - - // TODO: make class static? fields leak - private class InstanceStateChangeListener implements PathChildrenCacheListener { - private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); - - @Override - public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) - throws Exception { - Preconditions.checkArgument(client != null - && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); - - synchronized (this) { - ChildData childData = event.getData(); - if (childData == null) - return; - String nodeName = extractNodeName(childData); - if (!nodeName.startsWith(WORKER_PREFIX)) - return; // No need to propagate slot updates. - LOG.info("{} for zknode {} in llap namespace", event.getType(), childData.getPath()); - ServiceInstance instance = extractServiceInstance(event, childData); - switch (event.getType()) { - case CHILD_ADDED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onCreate(instance); - } - break; - case CHILD_UPDATED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onUpdate(instance); - } - break; - case CHILD_REMOVED: - removeFromCache(childData.getPath(), instance.getHost()); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onRemove(instance); + maxSlot = Math.max(maxSlot, slot); + sorted.put(slot, worker); + } + + if (consistentIndexes) { + // Add dummy instances to all slots where LLAPs are MIA... I can haz insert_iterator? + TreeMap<Long, LlapServiceInstance> dummies = new TreeMap<>(); + Iterator<Long> keyIter = sorted.keySet().iterator(); + long expected = 0; + Long ts = null; + while (keyIter.hasNext()) { + Long slot = keyIter.next(); + assert slot >= expected; + while (slot > expected) { + if (ts == null) { + ts = System.nanoTime(); // Inactive nodes restart every call! } - break; - default: - // Ignore all the other events; logged above. + dummies.put(expected, new InactiveServiceInstance("inactive-" + expected + "-" + ts)); + ++expected; } + ++expected; } + sorted.putAll(dummies); } - } - - private static String extractWorkerIdFromSlot(ChildData childData) { - return new String(childData.getData(), SlotZnode.CHARSET); + return sorted.values(); } private static String extractNodeName(ChildData childData) { @@ -774,187 +383,44 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { return nodeName; } - private ServiceInstance extractServiceInstance( - PathChildrenCacheEvent event, ChildData childData) { - byte[] data = childData.getData(); - if (data == null) return null; - try { - ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data); - return new DynamicServiceInstance(srv); - } catch (IOException e) { - LOG.error("Unable to decode data for zknode: {}." + - " Dropping notification of type: {}", childData.getPath(), event.getType()); - return null; - } - } - + @Override - public ServiceInstanceSet getInstances( + public LlapServiceInstanceSet getInstances( String component, long clusterReadyTimeoutMs) throws IOException { - checkPathChildrenCache(clusterReadyTimeoutMs); + PathChildrenCache instancesCache = ensureInstancesCache(clusterReadyTimeoutMs); // lazily create instances if (instances == null) { - this.instances = new DynamicServiceInstanceSet(instancesCache); + this.instances = new DynamicServiceInstanceSet(instancesCache, this, encoder); } return instances; } @Override public ApplicationId getApplicationId() throws IOException { - getInstances("LLAP", 0); - return instances.getApplicationId(); - } - - @Override - public synchronized void registerStateChangeListener( - final ServiceInstanceStateChangeListener listener) - throws IOException { - checkPathChildrenCache(0); - - this.stateChangeListeners.add(listener); - } - - private synchronized void checkPathChildrenCache(long clusterReadyTimeoutMs) throws IOException { - Preconditions.checkArgument(zooKeeperClient != null && - zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started"); - // lazily create PathChildrenCache - if (instancesCache != null) return; - ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("StateChangeNotificationHandler").build()); - long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 1000000L; - long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs); - while (true) { - PathChildrenCache instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true); - instancesCache.getListenable().addListener(new InstanceStateChangeListener(), tp); - try { - instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - this.instancesCache = instancesCache; - break; - } catch (InvalidACLException e) { - // PathChildrenCache tried to mkdir when the znode wasn't there, and failed. - CloseableUtils.closeQuietly(instancesCache); - long elapsedNs = System.nanoTime() - startTimeNs; - if (deltaNs == 0 || deltaNs <= elapsedNs) { - LOG.error("Unable to start curator PathChildrenCache", e); - throw new IOException(e); - } - LOG.warn("The cluster is not started yet (InvalidACL); will retry"); - try { - Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L)); - } catch (InterruptedException e1) { - LOG.error("Interrupted while retrying the PathChildrenCache startup"); - throw new IOException(e1); - } - sleepTimeMs = sleepTimeMs << 1; - } catch (Exception e) { - CloseableUtils.closeQuietly(instancesCache); - LOG.error("Unable to start curator PathChildrenCache", e); - throw new IOException(e); - } - } + return getInstances("LLAP", 0).getApplicationId(); } @Override public void start() throws IOException { - if (zooKeeperClient != null) { - setupZookeeperAuth(this.conf); - zooKeeperClient.start(); - } - // Init closeable utils in case register is not called (see HIVE-13322) - CloseableUtils.class.getName(); + super.start(); } @Override public void stop() throws IOException { - CloseableUtils.closeQuietly(znode); + super.stop(); CloseableUtils.closeQuietly(slotZnode); - CloseableUtils.closeQuietly(instancesCache); - CloseableUtils.closeQuietly(zooKeeperClient); } - - private void setupZookeeperAuth(final Configuration conf) throws IOException { - if (UserGroupInformation.isSecurityEnabled() && LlapProxy.isDaemon()) { - LOG.info("UGI security is enabled. Setting up ZK auth."); - - String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL); - if (llapPrincipal == null || llapPrincipal.isEmpty()) { - throw new IOException("Llap Kerberos principal is empty"); - } - - String llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); - if (llapKeytab == null || llapKeytab.isEmpty()) { - throw new IOException("Llap Kerberos keytab is empty"); - } - - // Install the JAAS Configuration for the runtime - setZookeeperClientKerberosJaasConfig(llapPrincipal, llapKeytab); - } else { - LOG.info("UGI security is not enabled, or non-daemon environment. Skipping setting up ZK auth."); - } - } - - /** - * Dynamically sets up the JAAS configuration that uses kerberos - * - * @param principal - * @param keyTabFile - * @throws IOException - */ - private void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) - throws IOException { - // ZooKeeper property name to pick the correct JAAS conf section - final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; - System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME); - - principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0"); - userNameFromPrincipal = LlapUtil.getUserNameFromPrincipal(principal); - JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, - keyTabFile); - - // Install the Configuration in the runtime. - javax.security.auth.login.Configuration.setConfiguration(jaasConf); + @Override + protected LlapServiceInstance createServiceInstance(ServiceRecord srv) throws IOException { + return new DynamicServiceInstance(srv); } - /** - * A JAAS configuration for ZooKeeper clients intended to use for SASL - * Kerberos. - */ - private static class JaasConfiguration extends javax.security.auth.login.Configuration { - // Current installed Configuration - private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration - .getConfiguration(); - private final String loginContextName; - private final String principal; - private final String keyTabFile; - - public JaasConfiguration(String llapLoginContextName, String principal, String keyTabFile) { - this.loginContextName = llapLoginContextName; - this.principal = principal; - this.keyTabFile = keyTabFile; - } - - @Override - public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { - if (loginContextName.equals(appName)) { - Map<String, String> krbOptions = new HashMap<String, String>(); - krbOptions.put("doNotPrompt", "true"); - krbOptions.put("storeKey", "true"); - krbOptions.put("useKeyTab", "true"); - krbOptions.put("principal", principal); - krbOptions.put("keyTab", keyTabFile); - krbOptions.put("refreshKrb5Config", "true"); - AppConfigurationEntry llapZooKeeperClientEntry = new AppConfigurationEntry( - KerberosUtil.getKrb5LoginModuleName(), - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions); - return new AppConfigurationEntry[]{llapZooKeeperClientEntry}; - } - // Try the base config - if (baseConfig != null) { - return baseConfig.getAppConfigurationEntry(appName); - } - return null; - } + @Override + protected String getZkPathUser(Configuration conf) { + // External LLAP clients would need to set LLAP_ZK_REGISTRY_USER to the LLAP daemon user (hive), + // rather than relying on RegistryUtils.currentUser(). + return HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java index ace9475..783a19f 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java @@ -33,8 +33,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.retry.RetryPolicies; @@ -55,10 +55,10 @@ public class LlapTokenClient { private final SocketFactory socketFactory; private final RetryPolicy retryPolicy; private final Configuration conf; - private ServiceInstanceSet activeInstances; - private Collection<ServiceInstance> lastKnownInstances; + private LlapServiceInstanceSet activeInstances; + private Collection<LlapServiceInstance> lastKnownInstances; private LlapManagementProtocolClientImpl client; - private ServiceInstance clientInstance; + private LlapServiceInstance clientInstance; public LlapTokenClient(Configuration conf) { this.conf = conf; @@ -71,7 +71,7 @@ public class LlapTokenClient { public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException { if (!UserGroupInformation.isSecurityEnabled()) return null; - Iterator<ServiceInstance> llaps = null; + Iterator<LlapServiceInstance> llaps = null; if (clientInstance == null) { assert client == null; llaps = getLlapServices(false).iterator(); @@ -128,7 +128,7 @@ public class LlapTokenClient { } /** Synchronized - LLAP registry and instance set are not thread safe. */ - private synchronized List<ServiceInstance> getLlapServices( + private synchronized List<LlapServiceInstance> getLlapServices( boolean doForceRefresh) throws IOException { if (!doForceRefresh && lastKnownInstances != null) { return new ArrayList<>(lastKnownInstances); @@ -137,12 +137,12 @@ public class LlapTokenClient { registry.start(); activeInstances = registry.getInstances(); } - Collection<ServiceInstance> daemons = activeInstances.getAll(); + Collection<LlapServiceInstance> daemons = activeInstances.getAll(); if (daemons == null || daemons.isEmpty()) { throw new RuntimeException("No LLAPs found"); } lastKnownInstances = daemons; - return new ArrayList<ServiceInstance>(lastKnownInstances); + return new ArrayList<LlapServiceInstance>(lastKnownInstances); } } http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java new file mode 100644 index 0000000..908b3bb --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry; + +import java.util.Map; + +public interface ServiceInstance { + + /** + * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought + * back on the same host/port + */ + public abstract String getWorkerIdentity(); + + /** + * Hostname of the service instance + * + * @return + */ + public abstract String getHost(); + + /** + * RPC Endpoint for service instance + * + * @return + */ + public int getRpcPort(); + + /** + * Config properties of the Service Instance (llap.daemon.*) + * + * @return + */ + public abstract Map<String, String> getProperties(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java new file mode 100644 index 0000000..34fba5c --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry; + +import java.util.Collection; +import java.util.Set; + +/** + * Note: For most of the implementations, there's no guarantee that the ServiceInstance returned by + * one invocation is the same as the instance returned by another invocation. e.g. the ZK registry + * returns a new ServiceInstance object each time a getInstance call is made. + */ +public interface ServiceInstanceSet<InstanceType extends ServiceInstance> { + + /** + * Get an instance mapping which map worker identity to each instance. + * + * The worker identity does not collide between restarts, so each restart will have a unique id, + * while having the same host/ip pair. + * + * @return + */ + Collection<InstanceType> getAll(); + + /** + * Get an instance by worker identity. + * + * @param name + * @return + */ + InstanceType getInstance(String name); + + /** + * Get a list of service instances for a given host. + * + * The list could include dead and alive instances. + * + * @param host + * @return + */ + Set<InstanceType> getByHost(String host); + + /** + * Get number of instances in the currently availabe. + * + * @return - number of instances + */ + int size(); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java new file mode 100644 index 0000000..0a44179 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry; + +/** + * Callback listener for instance state change events + */ +public interface ServiceInstanceStateChangeListener<InstanceType extends ServiceInstance> { + /** + * Called when new {@link ServiceInstance} is created. + * + * @param serviceInstance - created service instance + */ + void onCreate(InstanceType serviceInstance); + + /** + * Called when an existing {@link ServiceInstance} is updated. + * + * @param serviceInstance - updated service instance + */ + void onUpdate(InstanceType serviceInstance); + + /** + * Called when an existing {@link ServiceInstance} is removed. + * + * @param serviceInstance - removed service instance + */ + void onRemove(InstanceType serviceInstance); +} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java new file mode 100644 index 0000000..db3d788 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry.impl; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceInstanceBase implements ServiceInstance { + private static final Logger LOG = LoggerFactory.getLogger(ServiceInstanceBase.class); + + protected final ServiceRecord srv; + protected final String host; + protected final int rpcPort; + + public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException { + this.srv = srv; + + if (LOG.isTraceEnabled()) { + LOG.trace("Working with ServiceRecord: {}", srv); + } + + final Endpoint rpc = srv.getInternalEndpoint(rpcName); + + this.host = + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = + Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ServiceInstanceBase other = (ServiceInstanceBase) o; + return this.getWorkerIdentity().equals(other.getWorkerIdentity()); + } + + @Override + public int hashCode() { + return getWorkerIdentity().hashCode(); + } + + @Override + public String getWorkerIdentity() { + return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + return rpcPort; + } + + @Override + public Map<String, String> getProperties() { + return srv.attributes(); + } + + @Override + public String toString() { + return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + + host + ":" + rpcPort + "]"; + } +} \ No newline at end of file