Repository: hive Updated Branches: refs/heads/master 2e226d22f -> 4795b9969
http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java new file mode 100644 index 0000000..c773770 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -0,0 +1,549 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry.impl; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.zookeeper.KeeperException.InvalidACLException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is currently used for implementation inheritance only; it doesn't provide a unified flow + * into which one can just plug a few abstract method implementations, because providing one with + * getInstance method is a huge pain involving lots of generics. Also, different registries may + * have slightly different usage patterns anyway and noone would use a registry without knowing + * what type it is. So, it's mostly a grab bag of methods used by ServiceInstanceSet and other + * parts of each implementation. + */ +public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { + private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class); + private final static String SASL_NAMESPACE = "sasl"; + private final static String UNSECURE_NAMESPACE = "unsecure"; + + static final String UNIQUE_IDENTIFIER = "registry.unique.id"; + private static final UUID uniq = UUID.randomUUID(); + + protected final Configuration conf; + protected final CuratorFramework zooKeeperClient; + // userPathPrefix is the path specific to the user for which ACLs should be restrictive. + // workersPath is the directory path where all the worker znodes are located. + protected final String workersPath; + private final String userPathPrefix, workerNodePrefix; + + protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data + + private final Set<ServiceInstanceStateChangeListener<InstanceType>> stateChangeListeners; + + private final boolean doCheckAcls; + // Secure ZK is only set up by the registering service; anyone can read the registrations. + private final String zkPrincipal, zkKeytab, saslLoginContextName; + private String userNameFromPrincipal; // Only set when setting up the secure config for ZK. + private final String disableMessage; + + private final Lock instanceCacheLock = new ReentrantLock(); + private final Map<String, Set<InstanceType>> pathToInstanceCache; + private final Map<String, Set<InstanceType>> nodeToInstanceCache; + + // The registration znode. + private PersistentEphemeralNode znode; + private String znodePath; // unique identity for this instance + + private PathChildrenCache instancesCache; // Created on demand. + + /** Local hostname. */ + protected static final String hostname; + static { + String localhost = "localhost"; + try { + localhost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException uhe) { + // ignore + } + hostname = localhost; + } + + /** + * @param rootNs A single root namespace override. Not recommended. + * @param nsPrefix The namespace prefix to use with default namespaces. + * @param userScopePathPrefix The prefix to use for the user-specific part of the path. + * @param workerPrefix The prefix to use for each worker znode. + * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed. + * @param zkPrincipal ZK security principal. + * @param zkKeytab ZK security keytab. + * @param aclsConfig A config setting to use to determine if ACLs should be verified. + */ + public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix, + String userScopePathPrefix, String workerPrefix, + String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) { + this.conf = new Configuration(conf); + this.saslLoginContextName = zkSaslLoginContextName; + this.zkPrincipal = zkPrincipal; + this.zkKeytab = zkKeytab; + if (aclsConfig != null) { + this.doCheckAcls = HiveConf.getBoolVar(conf, aclsConfig); + this.disableMessage = "Set " + aclsConfig.varname + " to false to disable ACL validation"; + } else { + this.doCheckAcls = true; + this.disableMessage = ""; + } + this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + String zkEnsemble = getQuorumServers(this.conf); + this.encoder = new RegistryUtils.ServiceRecordMarshal(); + int sessionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int baseSleepTime = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); + + // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 + // worker-0000000 is the sequence number which will be retained until session timeout. If a + // worker does not respond due to communication interruptions it will retain the same sequence + // number when it returns back. If session timeout expires, the node will be deleted and new + // addition of the same node (restart) will get next sequence number + this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf); + this.workerNodePrefix = workerPrefix; + this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers"; + this.instancesCache = null; + this.stateChangeListeners = new HashSet<>(); + this.pathToInstanceCache = new ConcurrentHashMap<>(); + this.nodeToInstanceCache = new ConcurrentHashMap<>(); + + final boolean isSecure = UserGroupInformation.isSecurityEnabled(); + ACLProvider zooKeeperAclProvider = new ACLProvider() { + @Override + public List<ACL> getDefaultAcl() { + // We always return something from getAclForPath so this should not happen. + LOG.warn("getDefaultAcl was called"); + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + @Override + public List<ACL> getAclForPath(String path) { + if (!isSecure || path == null || !path.contains(userPathPrefix)) { + // No security or the path is below the user path - full access. + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + return createSecureAcls(); + } + }; + if (rootNs == null) { + rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path. + } + + // Create a CuratorFramework instance to be used as the ZooKeeper client + // Use the zooKeeperAclProvider to create appropriate ACLs + this.zooKeeperClient = CuratorFrameworkFactory.builder() + .connectString(zkEnsemble) + .sessionTimeoutMs(sessionTimeout) + .aclProvider(zooKeeperAclProvider) + .namespace(rootNs) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) + .build(); + } + + private static List<ACL> createSecureAcls() { + // Read all to the world + List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to creator + nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); + return nodeAcls; + } + + /** + * Get the ensemble server addresses from the configuration. The format is: host1:port, + * host2:port.. + * + * @param conf + **/ + private String getQuorumServers(Configuration conf) { + String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname); + String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, + ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); + StringBuilder quorum = new StringBuilder(); + for (int i = 0; i < hosts.length; i++) { + quorum.append(hosts[i].trim()); + if (!hosts[i].contains(":")) { + // if the hostname doesn't contain a port, add the configured port to hostname + quorum.append(":"); + quorum.append(port); + } + + if (i != hosts.length - 1) { + quorum.append(","); + } + } + + return quorum.toString(); + } + + protected abstract String getZkPathUser(Configuration conf); + + protected final String registerServiceRecord(ServiceRecord srv) throws IOException { + // restart sensitive instance id + srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + + // Create a znode under the rootNamespace parent for this instance of the server + try { + // PersistentEphemeralNode will make sure the ephemeral node created on server will be present + // even under connection or session interruption (will automatically handle retries) + znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL, + workersPath + "/" + workerNodePrefix, encoder.toBytes(srv)); + + // start the creation of znodes + znode.start(); + + // We'll wait for 120s for node creation + long znodeCreationTimeout = 120; + if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { + throw new Exception( + "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); + } + + znodePath = znode.getActualPath(); + + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + if (zooKeeperClient.checkExists().forPath(znodePath) == null) { + // No node exists, throw exception + throw new Exception("Unable to create znode for this instance on ZooKeeper."); + } + } catch (Exception e) { + LOG.error("Unable to create a znode for this server instance", e); + CloseableUtils.closeQuietly(znode); + throw (e instanceof IOException) ? (IOException)e : new IOException(e); + } + return uniq.toString(); + } + + + private void checkAndSetAcls() throws Exception { + if (!UserGroupInformation.isSecurityEnabled()) return; + // We are trying to check ACLs on the "workers" directory, which noone except us should be + // able to write to. Higher-level directories shouldn't matter - we don't read them. + String pathToCheck = workersPath; + List<ACL> acls = zooKeeperClient.getACL().forPath(pathToCheck); + if (acls == null || acls.isEmpty()) { + // Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all. + LOG.warn("No ACLs on " + pathToCheck + "; setting up ACLs. " + disableMessage); + setUpAcls(pathToCheck); + return; + } + // This could be brittle. + assert userNameFromPrincipal != null; + Id currentUser = new Id("sasl", userNameFromPrincipal); + for (ACL acl : acls) { + if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || currentUser.equals(acl.getId())) { + continue; // Read permission/no permissions, or the expected user. + } + LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck + + "; setting up ACLs. " + disableMessage); + setUpAcls(pathToCheck); + return; + } + } + + private void setUpAcls(String path) throws Exception { + List<ACL> acls = createSecureAcls(); + LinkedList<String> paths = new LinkedList<>(); + paths.add(path); + while (!paths.isEmpty()) { + String currentPath = paths.poll(); + List<String> children = zooKeeperClient.getChildren().forPath(currentPath); + if (children != null) { + for (String child : children) { + paths.add(currentPath + "/" + child); + } + } + zooKeeperClient.setACL().withACL(acls).forPath(currentPath); + } + } + + private void addToCache(String path, String host, InstanceType instance) { + instanceCacheLock.lock(); + try { + putInCache(path, pathToInstanceCache, instance); + putInCache(host, nodeToInstanceCache, instance); + } finally { + instanceCacheLock.unlock(); + } + LOG.debug("Added path={}, host={} instance={} to cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size()); + } + + private void removeFromCache(String path, String host) { + instanceCacheLock.lock(); + try { + pathToInstanceCache.remove(path); + nodeToInstanceCache.remove(host); + } finally { + instanceCacheLock.unlock(); + } + LOG.debug("Removed path={}, host={} from cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); + } + + private void putInCache(String key, Map<String, Set<InstanceType>> cache, + InstanceType instance) { + Set<InstanceType> instanceSet = cache.get(key); + if (instanceSet == null) { + instanceSet = Sets.newHashSet(); + cache.put(key, instanceSet); + } + instanceSet.add(instance); + } + + protected final void populateCache(PathChildrenCache instancesCache) { + for (ChildData childData : instancesCache.getCurrentData()) { + byte[] data = getWorkerData(childData, workerNodePrefix); + if (data == null) continue; + try { + ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); + InstanceType instance = createServiceInstance(srv); + addToCache(childData.getPath(), instance.getHost(), instance); + } catch (IOException e) { + LOG.error("Unable to decode data for zkpath: {}." + + " Ignoring from current instances list..", childData.getPath()); + } + } + } + + protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException; + + protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) { + if (childData == null) return null; + byte[] data = childData.getData(); + if (data == null) return null; + if (!extractNodeName(childData).startsWith(workerNodePrefix)) return null; + return data; + } + + private class InstanceStateChangeListener implements PathChildrenCacheListener { + private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); + + @Override + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) + throws Exception { + Preconditions.checkArgument(client != null + && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); + + synchronized (this) { + ChildData childData = event.getData(); + if (childData == null) return; + String nodeName = extractNodeName(childData); + if (!nodeName.startsWith(workerNodePrefix)) return; + LOG.info("{} for zknode {}", event.getType(), childData.getPath()); + InstanceType instance = extractServiceInstance(event, childData); + switch (event.getType()) { + case CHILD_ADDED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { + listener.onCreate(instance); + } + break; + case CHILD_UPDATED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { + listener.onUpdate(instance); + } + break; + case CHILD_REMOVED: + removeFromCache(childData.getPath(), instance.getHost()); + for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { + listener.onRemove(instance); + } + break; + default: + // Ignore all the other events; logged above. + } + } + } + } + + // The real implementation for the instanceset... instanceset has its own copy of the + // ZK cache yet completely depends on the parent in every other aspect and is thus unneeded. + + public int size() { + // not using the path child cache here as there could be more than 1 path per host (worker and slot znodes) + return nodeToInstanceCache.size(); + } + + protected final Set<InstanceType> getByHost(String host) { + Set<InstanceType> byHost = nodeToInstanceCache.get(host); + byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost; + if (LOG.isDebugEnabled()) { + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); + } + return byHost; + } + + protected final Collection<InstanceType> getAll() { + Set<InstanceType> instances = new HashSet<>(); + for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) { + instances.addAll(instanceSet); + } + return instances; + } + + private static String extractNodeName(ChildData childData) { + String nodeName = childData.getPath(); + int ix = nodeName.lastIndexOf("/"); + if (ix >= 0) { + nodeName = nodeName.substring(ix + 1); + } + return nodeName; + } + + private InstanceType extractServiceInstance( + PathChildrenCacheEvent event, ChildData childData) { + byte[] data = childData.getData(); + if (data == null) return null; + try { + ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data); + return createServiceInstance(srv); + } catch (IOException e) { + LOG.error("Unable to decode data for zknode: {}." + + " Dropping notification of type: {}", childData.getPath(), event.getType()); + return null; + } + } + + public synchronized void registerStateChangeListener( + ServiceInstanceStateChangeListener<InstanceType> listener) throws IOException { + ensureInstancesCache(0); + this.stateChangeListeners.add(listener); + } + + @SuppressWarnings("resource") // Bogus warnings despite closeQuietly. + protected final synchronized PathChildrenCache ensureInstancesCache( + long clusterReadyTimeoutMs) throws IOException { + Preconditions.checkArgument(zooKeeperClient != null && + zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started"); + // lazily create PathChildrenCache + PathChildrenCache instancesCache = this.instancesCache; + if (instancesCache != null) return instancesCache; + ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("StateChangeNotificationHandler").build()); + long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 1000000L; + long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs); + while (true) { + instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true); + instancesCache.getListenable().addListener(new InstanceStateChangeListener(), tp); + try { + instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + this.instancesCache = instancesCache; + return instancesCache; + } catch (InvalidACLException e) { + // PathChildrenCache tried to mkdir when the znode wasn't there, and failed. + CloseableUtils.closeQuietly(instancesCache); + long elapsedNs = System.nanoTime() - startTimeNs; + if (deltaNs == 0 || deltaNs <= elapsedNs) { + LOG.error("Unable to start curator PathChildrenCache", e); + throw new IOException(e); + } + LOG.warn("The cluster is not started yet (InvalidACL); will retry"); + try { + Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L)); + } catch (InterruptedException e1) { + LOG.error("Interrupted while retrying the PathChildrenCache startup"); + throw new IOException(e1); + } + sleepTimeMs = sleepTimeMs << 1; + } catch (Exception e) { + CloseableUtils.closeQuietly(instancesCache); + LOG.error("Unable to start curator PathChildrenCache", e); + throw new IOException(e); + } + } + } + + public void start() throws IOException { + if (zooKeeperClient != null) { + String principal = ZookeeperUtils.setupZookeeperAuth( + conf, saslLoginContextName, zkPrincipal, zkKeytab); + if (principal != null) { + userNameFromPrincipal = LlapUtil.getUserNameFromPrincipal(principal); + } + zooKeeperClient.start(); + } + // Init closeable utils in case register is not called (see HIVE-13322) + CloseableUtils.class.getName(); + } + + public void stop() throws IOException { + CloseableUtils.closeQuietly(znode); + CloseableUtils.closeQuietly(instancesCache); + CloseableUtils.closeQuietly(zooKeeperClient); + } + + protected final Set<InstanceType> getInstancesByPath(String path) { + return pathToInstanceCache.get(path); + } + + protected final String getRegistrationZnodePath() { + return znodePath; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java new file mode 100644 index 0000000..454d503 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZookeeperUtils.java @@ -0,0 +1,116 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.zookeeper.client.ZooKeeperSaslClient; + +public class ZookeeperUtils { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperUtils.class); + + public static String setupZookeeperAuth(Configuration conf, String saslLoginContextName, + String zkPrincipal, String zkKeytab) throws IOException { + // If the login context name is not set, we are in the client and don't need auth. + if (UserGroupInformation.isSecurityEnabled() && saslLoginContextName != null) { + LOG.info("UGI security is enabled. Setting up ZK auth."); + + if (zkPrincipal == null || zkPrincipal.isEmpty()) { + throw new IOException("Kerberos principal is empty"); + } + + if (zkKeytab == null || zkKeytab.isEmpty()) { + throw new IOException("Kerberos keytab is empty"); + } + + // Install the JAAS Configuration for the runtime + return setZookeeperClientKerberosJaasConfig(saslLoginContextName, zkPrincipal, zkKeytab); + } else { + LOG.info("UGI security is not enabled, or no SASL context name. " + + "Skipping setting up ZK auth."); + return null; + } + } + + /** + * Dynamically sets up the JAAS configuration that uses kerberos + * + * @param principal + * @param keyTabFile + * @throws IOException + */ + private static String setZookeeperClientKerberosJaasConfig( + String saslLoginContextName, String zkPrincipal, String zkKeytab) throws IOException { + // ZooKeeper property name to pick the correct JAAS conf section + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, saslLoginContextName); + + String principal = SecurityUtil.getServerPrincipal(zkPrincipal, "0.0.0.0"); + JaasConfiguration jaasConf = new JaasConfiguration( + saslLoginContextName, principal, zkKeytab); + + // Install the Configuration in the runtime. + javax.security.auth.login.Configuration.setConfiguration(jaasConf); + return principal; + } + + /** + * A JAAS configuration for ZooKeeper clients intended to use for SASL + * Kerberos. + */ + private static class JaasConfiguration extends javax.security.auth.login.Configuration { + // Current installed Configuration + private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration + .getConfiguration(); + private final String loginContextName; + private final String principal; + private final String keyTabFile; + + public JaasConfiguration(String loginContextName, String principal, String keyTabFile) { + this.loginContextName = loginContextName; + this.principal = principal; + this.keyTabFile = keyTabFile; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + if (loginContextName.equals(appName)) { + Map<String, String> krbOptions = new HashMap<String, String>(); + krbOptions.put("doNotPrompt", "true"); + krbOptions.put("storeKey", "true"); + krbOptions.put("useKeyTab", "true"); + krbOptions.put("principal", principal); + krbOptions.put("keyTab", keyTabFile); + krbOptions.put("refreshKrb5Config", "true"); + AppConfigurationEntry zooKeeperClientEntry = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions); + return new AppConfigurationEntry[] { zooKeeperClientEntry }; + } + // Try the base config + if (baseConfig != null) { + return baseConfig.getAppConfigurationEntry(appName); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 201f5fa..f2d9074 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -45,8 +45,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; @@ -104,7 +104,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> public static final String PWD_KEY = "llap.if.pwd"; public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; - public static final ServiceInstance[] serviceInstanceArray = new ServiceInstance[0]; + public static final LlapServiceInstance[] serviceInstanceArray = new LlapServiceInstance[0]; public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.url = url; @@ -126,7 +126,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser()); SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); - ServiceInstance serviceInstance = getServiceInstance(job, llapSplit); + LlapServiceInstance serviceInstance = getServiceInstance(job, llapSplit); String host = serviceInstance.getHost(); int llapSubmitPort = serviceInstance.getRpcPort(); @@ -230,11 +230,11 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> return ins.toArray(new InputSplit[ins.size()]); } - private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { + private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { LlapRegistryService registryService = LlapRegistryService.getClient(job); String host = llapSplit.getLocations()[0]; - ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); + LlapServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); if (serviceInstance == null) { LOG.info("No service instances found for " + host + " in registry."); serviceInstance = getServiceInstanceRandom(registryService); @@ -246,10 +246,10 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> return serviceInstance; } - private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { + private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { InetAddress address = InetAddress.getByName(host); - ServiceInstanceSet instanceSet = registryService.getInstances(); - ServiceInstance serviceInstance = null; + LlapServiceInstanceSet instanceSet = registryService.getInstances(); + LlapServiceInstance serviceInstance = null; // The name used in the service registry may not match the host name we're using. // Try hostname/canonical hostname/host address @@ -279,12 +279,12 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> } - private ServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { - ServiceInstanceSet instanceSet = registryService.getInstances(); - ServiceInstance serviceInstance = null; + private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { + LlapServiceInstanceSet instanceSet = registryService.getInstances(); + LlapServiceInstance serviceInstance = null; LOG.info("Finding random live service instance"); - Collection<ServiceInstance> allInstances = instanceSet.getAll(); + Collection<LlapServiceInstance> allInstances = instanceSet.getAll(); if (allInstances.size() > 0) { int randIdx = rand.nextInt() % allInstances.size(); serviceInstance = allInstances.toArray(serviceInstanceArray)[randIdx]; @@ -292,13 +292,13 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> return serviceInstance; } - private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) { + private LlapServiceInstance selectServiceInstance(Set<LlapServiceInstance> serviceInstances) { if (serviceInstances == null || serviceInstances.isEmpty()) { return null; } // Get the first live service instance - for (ServiceInstance serviceInstance : serviceInstances) { + for (LlapServiceInstance serviceInstance : serviceInstances) { return serviceInstance; } http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java index 1b57e38..4fad441 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers; import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.AppStatusBuilder; import org.apache.hadoop.hive.llap.cli.status.LlapStatusHelpers.LlapInstance; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -563,7 +563,7 @@ public class LlapStatusServiceDriver { } } - Collection<ServiceInstance> serviceInstances; + Collection<LlapServiceInstance> serviceInstances; try { serviceInstances = llapRegistry.getInstances(watchTimeoutMs).getAll(); } catch (Exception e) { @@ -583,7 +583,7 @@ public class LlapStatusServiceDriver { List<LlapInstance> validatedInstances = new LinkedList<>(); List<String> llapExtraInstances = new LinkedList<>(); - for (ServiceInstance serviceInstance : serviceInstances) { + for (LlapServiceInstance serviceInstance : serviceInstances) { String containerIdString = serviceInstance.getProperties().get( HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index ebc3437..58bf8dc 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -33,7 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; import org.apache.hadoop.security.UserGroupInformation; @@ -230,7 +230,7 @@ public class LlapWebServices extends AbstractService { } jg.writeStringField("identity", registry.getWorkerIdentity()); jg.writeArrayFieldStart("peers"); - for (ServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) { + for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) { jg.writeStartObject(); jg.writeStringField("identity", s.getWorkerIdentity()); jg.writeStringField("host", s.getHost()); http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index ff00aba..100c132 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -15,7 +15,7 @@ package org.apache.hadoop.hive.llap.tezplugins; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.io.Writable; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; @@ -542,7 +542,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { if (timelineServerUri == null || containerNodeId == null) { return null; } - Set<ServiceInstance> instanceSet; + Set<LlapServiceInstance> instanceSet; try { instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost()); } catch (IOException e) { @@ -554,8 +554,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } // Once NodeId includes fragmentId - this becomes a lot more reliable. if (instanceSet != null) { - ServiceInstance matchedInstance = null; - for (ServiceInstance instance : instanceSet) { + LlapServiceInstance matchedInstance = null; + for (LlapServiceInstance instance : instanceSet) { if (instance.getRpcPort() == containerNodeId.getPort()) { matchedInstance = instance; break; http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 6bedccb..94f1369 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -14,6 +14,8 @@ package org.apache.hadoop.hive.llap.tezplugins; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -62,9 +64,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; @@ -107,7 +108,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private final Configuration conf; // interface into the registry service - private ServiceInstanceSet activeInstances; + private LlapServiceInstanceSet activeInstances; // Tracks all instances, including ones which have been disabled in the past. // LinkedHashMap to provide the same iteration order when selecting a random host. @@ -330,7 +331,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { registry.start(); registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); - for (ServiceInstance inst : activeInstances.getAll()) { + for (LlapServiceInstance inst : activeInstances.getAll()) { addNode(new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics), inst); } @@ -340,15 +341,16 @@ public class LlapTaskSchedulerService extends TaskScheduler { } @VisibleForTesting - public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) { + public void setServiceInstanceSet(LlapServiceInstanceSet serviceInstanceSet) { this.activeInstances = serviceInstanceSet; } - private class NodeStateChangeListener implements ServiceInstanceStateChangeListener { + private class NodeStateChangeListener + implements ServiceInstanceStateChangeListener<LlapServiceInstance> { private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class); @Override - public void onCreate(ServiceInstance serviceInstance) { + public void onCreate(LlapServiceInstance serviceInstance) { LOG.info("Added node with identity: {} as a result of registry callback", serviceInstance.getWorkerIdentity()); addNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock, @@ -356,7 +358,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } @Override - public void onUpdate(ServiceInstance serviceInstance) { + public void onUpdate(LlapServiceInstance serviceInstance) { // TODO In what situations will this be invoked? LOG.warn( "Not expecing Updates from the registry. Received update for instance={}. Ignoring", @@ -364,7 +366,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } @Override - public void onRemove(ServiceInstance serviceInstance) { + public void onRemove(LlapServiceInstance serviceInstance) { NodeReport nodeReport = constructNodeReport(serviceInstance, false); LOG.info("Sending out nodeReport for onRemove: {}", nodeReport); getContext().nodesUpdated(Collections.singletonList(nodeReport)); @@ -475,7 +477,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { readLock.lock(); try { int numInstancesFound = 0; - for (ServiceInstance inst : activeInstances.getAll()) { + for (LlapServiceInstance inst : activeInstances.getAll()) { Resource r = inst.getResource(); memory += r.getMemory(); vcores += r.getVirtualCores(); @@ -506,7 +508,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { readLock.lock(); try { int numInstancesFound = 0; - for (ServiceInstance inst : activeInstances.getAll()) { + for (LlapServiceInstance inst : activeInstances.getAll()) { NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); if (nodeInfo != null && !nodeInfo.isDisabled()) { Resource r = inst.getResource(); @@ -766,9 +768,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { for (String host : requestedHosts) { prefHostCount++; // Pick the first host always. Weak attempt at cache affinity. - Set<ServiceInstance> instances = activeInstances.getByHost(host); + Set<LlapServiceInstance> instances = activeInstances.getByHost(host); if (!instances.isEmpty()) { - for (ServiceInstance inst : instances) { + for (LlapServiceInstance inst : instances) { NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); if (nodeInfo != null) { if (nodeInfo.canAcceptTask()) { @@ -828,10 +830,10 @@ public class LlapTaskSchedulerService extends TaskScheduler { } /* fall through - miss in locality or no locality-requested */ - Collection<ServiceInstance> instances = activeInstances.getAllInstancesOrdered(true); + Collection<LlapServiceInstance> instances = activeInstances.getAllInstancesOrdered(true); List<NodeInfo> allNodes = new ArrayList<>(instances.size()); List<NodeInfo> activeNodesWithFreeSlots = new ArrayList<>(); - for (ServiceInstance inst : instances) { + for (LlapServiceInstance inst : instances) { if (inst instanceof InactiveServiceInstance) { allNodes.add(null); } else { @@ -918,7 +920,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { return new SelectHostResult(randomNode); } - private void addNode(NodeInfo node, ServiceInstance serviceInstance) { + private void addNode(NodeInfo node, LlapServiceInstance serviceInstance) { // we have just added a new node. Signal timeout monitor to reset timer if (activeInstances.size() != 0 && timeoutFutureRef.get() != null) { LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer."); @@ -1006,7 +1008,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } - private static NodeReport constructNodeReport(ServiceInstance serviceInstance, + private static NodeReport constructNodeReport(LlapServiceInstance serviceInstance, boolean healthy) { NodeReport nodeReport = NodeReport.newInstance(NodeId .newInstance(serviceInstance.getHost(), serviceInstance.getRpcPort()), @@ -1576,7 +1578,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { @VisibleForTesting static class NodeInfo implements Delayed { private final NodeBlacklistConf blacklistConf; - final ServiceInstance serviceInstance; + final LlapServiceInstance serviceInstance; private final Clock clock; long expireTimeMillis = -1; @@ -1609,7 +1611,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { * detect based on the serviceInstance, -1 indicates indicates * @param metrics */ - NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, + NodeInfo(LlapServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) { Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1"); this.serviceInstance = serviceInstance; http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 339f513..21a39c7 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -40,8 +40,8 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; @@ -1447,7 +1447,7 @@ public class TestLlapTaskSchedulerService { static final Resource resource = Resource.newInstance(1024, 1); Configuration conf; TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class); - ServiceInstanceSet mockServiceInstanceSet = mock(ServiceInstanceSet.class); + LlapServiceInstanceSet mockServiceInstanceSet = mock(LlapServiceInstanceSet.class); ControlledClock clock = new ControlledClock(new MonotonicClock()); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1); LlapTaskSchedulerServiceForTest ts; @@ -1499,16 +1499,16 @@ public class TestLlapTaskSchedulerService { doReturn(userPayload).when(mockAppCallback).getInitialUserPayload(); if (useMockRegistry) { - List<ServiceInstance> liveInstances = new ArrayList<>(); + List<LlapServiceInstance> liveInstances = new ArrayList<>(); for (String host : liveHosts) { if (host == null) { - ServiceInstance mockInactive = mock(InactiveServiceInstance.class); + LlapServiceInstance mockInactive = mock(InactiveServiceInstance.class); doReturn(host).when(mockInactive).getHost(); doReturn("inactive-host-" + host).when(mockInactive).getWorkerIdentity(); doReturn(ImmutableSet.builder().add(mockInactive).build()).when(mockServiceInstanceSet).getByHost(host); liveInstances.add(mockInactive); } else { - ServiceInstance mockActive = mock(ServiceInstance.class); + LlapServiceInstance mockActive = mock(LlapServiceInstance.class); doReturn(host).when(mockActive).getHost(); doReturn("host-" + host).when(mockActive).getWorkerIdentity(); doReturn(ImmutableSet.builder().add(mockActive).build()).when(mockServiceInstanceSet).getByHost(host); @@ -1517,9 +1517,9 @@ public class TestLlapTaskSchedulerService { } doReturn(liveInstances).when(mockServiceInstanceSet).getAllInstancesOrdered(true); - List<ServiceInstance> allInstances = new ArrayList<>(); + List<LlapServiceInstance> allInstances = new ArrayList<>(); for (String host : hosts) { - ServiceInstance mockActive = mock(ServiceInstance.class); + LlapServiceInstance mockActive = mock(LlapServiceInstance.class); doReturn(host).when(mockActive).getHost(); doReturn(Resource.newInstance(100, 1)).when(mockActive).getResource(); doReturn("host-" + host).when(mockActive).getWorkerIdentity(); http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index 2b57d90..4c8e7bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -25,7 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -43,12 +43,12 @@ public class Utils { LlapRegistryService serviceRegistry = LlapRegistryService.getClient(conf); LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); - Collection<ServiceInstance> serviceInstances = + Collection<LlapServiceInstance> serviceInstances = serviceRegistry.getInstances().getAllInstancesOrdered(true); Preconditions.checkArgument(!serviceInstances.isEmpty(), "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); ArrayList<String> locations = new ArrayList<>(serviceInstances.size()); - for (ServiceInstance serviceInstance : serviceInstances) { + for (LlapServiceInstance serviceInstance : serviceInstances) { if (LOG.isDebugEnabled()) { LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + serviceInstance.getHost() + " to list for split locations"); http://git-wip-us.apache.org/repos/asf/hive/blob/4795b996/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index a5ed308..7a02a56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -28,8 +28,8 @@ import java.util.concurrent.Callable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.slf4j.Logger; @@ -100,7 +100,7 @@ public class LlapClusterStateForCompile { return; // Don't fail; this is best-effort. } } - ServiceInstanceSet instances; + LlapServiceInstanceSet instances; try { instances = svc.getInstances(10); } catch (IOException e) { @@ -108,7 +108,7 @@ public class LlapClusterStateForCompile { return; // Don't wait for the cluster if not started; this is best-effort. } int executorsLocal = 0, noConfigNodesLocal = 0; - for (ServiceInstance si : instances.getAll()) { + for (LlapServiceInstance si : instances.getAll()) { if (si instanceof InactiveServiceInstance) continue; // Shouldn't happen in getAll. Map<String, String> props = si.getProperties(); if (props == null) {
