http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java new file mode 100644 index 0000000..2eb7aa5 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java @@ -0,0 +1,896 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.DeleteBuilder; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.retry.BoundedExponentialBackoffRetry; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.exceptions.AuthenticationFailedException; +import org.apache.hadoop.registry.client.exceptions.NoChildrenForEphemeralsException; +import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException; +import org.apache.hadoop.registry.client.exceptions.RegistryIOException; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * This service binds to Zookeeper via Apache Curator. It is more + * generic than just the YARN service registry; it does not implement + * any of the Registry Operations API. + */ [email protected] [email protected] +public class CuratorService extends CompositeService + implements RegistryConstants, RegistryBindingSource { + + private static final Logger LOG = + LoggerFactory.getLogger(CuratorService.class); + + /** + * the Curator binding. + */ + private CuratorFramework curator; + + /** + * Path to the registry root. + */ + private String registryRoot; + + /** + * Supplied binding source. This defaults to being this + * service itself. + */ + private final RegistryBindingSource bindingSource; + + /** + * Security service. + */ + private RegistrySecurity registrySecurity; + + /** + * the connection binding text for messages. + */ + private String connectionDescription; + + /** + * Security connection diagnostics. + */ + private String securityConnectionDiagnostics = ""; + + /** + * Provider of curator "ensemble"; offers a basis for + * more flexible bonding in future. + */ + private EnsembleProvider ensembleProvider; + + /** + * Registry tree cache. + */ + private TreeCache treeCache; + + /** + * Construct the service. + * + * @param name service name + * @param bindingSource source of binding information. + * If null: use this instance + */ + public CuratorService(String name, RegistryBindingSource bindingSource) { + super(name); + if (bindingSource != null) { + this.bindingSource = bindingSource; + } else { + this.bindingSource = this; + } + registrySecurity = new RegistrySecurity("registry security"); + } + + /** + * Create an instance using this service as the binding source (i.e. read + * configuration options from the registry). + * + * @param name service name + */ + public CuratorService(String name) { + this(name, null); + } + + /** + * Init the service. + * This is where the security bindings are set up. + * + * @param conf configuration of the service + * @throws Exception + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + + registryRoot = conf.getTrimmed(KEY_REGISTRY_ZK_ROOT, + DEFAULT_ZK_REGISTRY_ROOT); + + // add the registy service + addService(registrySecurity); + + if (LOG.isDebugEnabled()) { + LOG.debug("Creating Registry with root {}", registryRoot); + } + + super.serviceInit(conf); + } + + public void setKerberosPrincipalAndKeytab(String principal, String keytab) { + registrySecurity.setKerberosPrincipalAndKeytab(principal, keytab); + } + + /** + * Start the service. + * This is where the curator instance is started. + * + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + + // create the curator; rely on the registry security code + // to set up the JVM context and curator + curator = createCurator(); + } + + /** + * Close the ZK connection if it is open. + */ + @Override + protected void serviceStop() throws Exception { + IOUtils.closeStream(curator); + + if (treeCache != null) { + treeCache.close(); + } + super.serviceStop(); + } + + /** + * Internal check that a service is in the live state. + * + * @throws ServiceStateException if not + */ + private void checkServiceLive() throws ServiceStateException { + if (!isInState(STATE.STARTED)) { + throw new ServiceStateException( + "Service " + getName() + " is in wrong state: " + + getServiceState()); + } + } + + /** + * Flag to indicate whether or not the registry is secure. + * Valid once the service is inited. + * + * @return service security policy + */ + public boolean isSecure() { + return registrySecurity.isSecureRegistry(); + } + + /** + * Get the registry security helper. + * + * @return the registry security helper + */ + protected RegistrySecurity getRegistrySecurity() { + return registrySecurity; + } + + /** + * Build the security diagnostics string. + * + * @return a string for diagnostics + */ + protected String buildSecurityDiagnostics() { + // build up the security connection diags + if (!isSecure()) { + return "security disabled"; + } else { + StringBuilder builder = new StringBuilder(); + builder.append("secure cluster; "); + builder.append(registrySecurity.buildSecurityDiagnostics()); + return builder.toString(); + } + } + + /** + * Create a new curator instance off the root path; using configuration + * options provided in the service configuration to set timeouts and + * retry policy. + * + * @return the newly created creator + */ + private CuratorFramework createCurator() throws IOException { + Configuration conf = getConfig(); + createEnsembleProvider(); + int sessionTimeout = conf.getInt(KEY_REGISTRY_ZK_SESSION_TIMEOUT, + DEFAULT_ZK_SESSION_TIMEOUT); + int connectionTimeout = conf.getInt(KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, + DEFAULT_ZK_CONNECTION_TIMEOUT); + int retryTimes = conf.getInt(KEY_REGISTRY_ZK_RETRY_TIMES, + DEFAULT_ZK_RETRY_TIMES); + int retryInterval = conf.getInt(KEY_REGISTRY_ZK_RETRY_INTERVAL, + DEFAULT_ZK_RETRY_INTERVAL); + int retryCeiling = conf.getInt(KEY_REGISTRY_ZK_RETRY_CEILING, + DEFAULT_ZK_RETRY_CEILING); + + LOG.info("Creating CuratorService with connection {}", + connectionDescription); + + CuratorFramework framework; + + synchronized (CuratorService.class) { + // set the security options + + // build up the curator itself + CuratorFrameworkFactory.Builder builder = + CuratorFrameworkFactory.builder(); + builder.ensembleProvider(ensembleProvider) + .connectionTimeoutMs(connectionTimeout) + .sessionTimeoutMs(sessionTimeout) + + .retryPolicy(new BoundedExponentialBackoffRetry(retryInterval, + retryCeiling, + retryTimes)); + + // set up the builder AND any JVM context + registrySecurity.applySecurityEnvironment(builder); + //log them + securityConnectionDiagnostics = buildSecurityDiagnostics(); + if (LOG.isDebugEnabled()) { + LOG.debug(securityConnectionDiagnostics); + } + framework = builder.build(); + framework.start(); + } + + return framework; + } + + @Override + public String toString() { + return super.toString() + + " " + bindingDiagnosticDetails(); + } + + /** + * Get the binding diagnostics. + * + * @return a diagnostics string valid after the service is started. + */ + public String bindingDiagnosticDetails() { + return " Connection=\"" + connectionDescription + "\"" + + " root=\"" + registryRoot + "\"" + + " " + securityConnectionDiagnostics; + } + + /** + * Create a full path from the registry root and the supplied subdir. + * + * @param path path of operation + * @return an absolute path + * @throws IllegalArgumentException if the path is invalide + */ + protected String createFullPath(String path) throws IOException { + return RegistryPathUtils.createFullPath(registryRoot, path); + } + + /** + * Get the registry binding source ... this can be used to + * create new ensemble providers + * + * @return the registry binding source in use + */ + public RegistryBindingSource getBindingSource() { + return bindingSource; + } + + /** + * Create the ensemble provider for this registry, by invoking + * {@link RegistryBindingSource#supplyBindingInformation()} on + * the provider stored in {@link #bindingSource}. + * Sets {@link #ensembleProvider} to that value; + * sets {@link #connectionDescription} to the binding info + * for use in toString and logging; + */ + protected void createEnsembleProvider() { + BindingInformation binding = bindingSource.supplyBindingInformation(); + connectionDescription = binding.description + + " " + securityConnectionDiagnostics; + ensembleProvider = binding.ensembleProvider; + } + + /** + * Supply the binding information. + * This implementation returns a fixed ensemble bonded to + * the quorum supplied by {@link #buildConnectionString()}. + * + * @return the binding information + */ + @Override + public BindingInformation supplyBindingInformation() { + BindingInformation binding = new BindingInformation(); + String connectString = buildConnectionString(); + binding.ensembleProvider = new FixedEnsembleProvider(connectString); + binding.description = + "fixed ZK quorum \"" + connectString + "\""; + return binding; + } + + /** + * Override point: get the connection string used to connect to + * the ZK service. + * + * @return a registry quorum + */ + protected String buildConnectionString() { + return getConfig().getTrimmed(KEY_REGISTRY_ZK_QUORUM, + DEFAULT_REGISTRY_ZK_QUORUM); + } + + /** + * Create an IOE when an operation fails. + * + * @param path path of operation + * @param operation operation attempted + * @param exception caught the exception caught + * @return an IOE to throw that contains the path and operation details. + */ + protected IOException operationFailure(String path, + String operation, + Exception exception) { + return operationFailure(path, operation, exception, null); + } + + /** + * Create an IOE when an operation fails. + * + * @param path path of operation + * @param operation operation attempted + * @param exception caught the exception caught + * @return an IOE to throw that contains the path and operation details. + */ + protected IOException operationFailure(String path, + String operation, + Exception exception, + List<ACL> acls) { + IOException ioe; + String aclList = "[" + RegistrySecurity.aclsToString(acls) + "]"; + if (exception instanceof KeeperException.NoNodeException) { + ioe = new PathNotFoundException(path); + } else if (exception instanceof KeeperException.NodeExistsException) { + ioe = new FileAlreadyExistsException(path); + } else if (exception instanceof KeeperException.NoAuthException) { + ioe = new NoPathPermissionsException(path, + "Not authorized to access path; ACLs: " + aclList); + } else if (exception instanceof KeeperException.NotEmptyException) { + ioe = new PathIsNotEmptyDirectoryException(path); + } else if (exception instanceof KeeperException.AuthFailedException) { + ioe = new AuthenticationFailedException(path, + "Authentication Failed: " + exception + + "; " + securityConnectionDiagnostics, + exception); + } else if (exception instanceof + KeeperException.NoChildrenForEphemeralsException) { + ioe = new NoChildrenForEphemeralsException(path, + "Cannot create a path under an ephemeral node: " + exception, + exception); + } else if (exception instanceof KeeperException.InvalidACLException) { + // this is a security exception of a kind + // include the ACLs to help the diagnostics + StringBuilder builder = new StringBuilder(); + builder.append("Path access failure ").append(aclList); + builder.append(" "); + builder.append(securityConnectionDiagnostics); + ioe = new NoPathPermissionsException(path, builder.toString()); + } else { + ioe = new RegistryIOException(path, + "Failure of " + operation + " on " + path + ": " + + exception.toString(), + exception); + } + if (ioe.getCause() == null) { + ioe.initCause(exception); + } + return ioe; + } + + /** + * Create a path if it does not exist. + * The check is poll + create; there's a risk that another process + * may create the same path before the create() operation is executed/ + * propagated to the ZK node polled. + * + * @param path path to create + * @param acl ACL for path -used when creating a new entry + * @param createParents flag to trigger parent creation + * @return true iff the path was created + * @throws IOException + */ + @VisibleForTesting + public boolean maybeCreate(String path, + CreateMode mode, + List<ACL> acl, + boolean createParents) throws IOException { + return zkMkPath(path, mode, createParents, acl); + } + + /** + * Stat the file. + * + * @param path path of operation + * @return a curator stat entry + * @throws IOException on a failure + * @throws PathNotFoundException if the path was not found + */ + public Stat zkStat(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + Stat stat; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Stat {}", fullpath); + } + stat = curator.checkExists().forPath(fullpath); + } catch (Exception e) { + throw operationFailure(fullpath, "read()", e); + } + if (stat == null) { + throw new PathNotFoundException(path); + } + return stat; + } + + /** + * Get the ACLs of a path. + * + * @param path path of operation + * @return a possibly empty list of ACLs + * @throws IOException + */ + public List<ACL> zkGetACLS(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + List<ACL> acls; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("GetACLS {}", fullpath); + } + acls = curator.getACL().forPath(fullpath); + } catch (Exception e) { + throw operationFailure(fullpath, "read()", e); + } + if (acls == null) { + throw new PathNotFoundException(path); + } + return acls; + } + + /** + * Probe for a path existing. + * + * @param path path of operation + * @return true if the path was visible from the ZK server + * queried. + * @throws IOException on any exception other than + * {@link PathNotFoundException} + */ + public boolean zkPathExists(String path) throws IOException { + checkServiceLive(); + try { + // if zkStat(path) returns without throwing an exception, the return value + // is guaranteed to be not null + zkStat(path); + return true; + } catch (PathNotFoundException e) { + return false; + } catch (IOException e) { + throw e; + } + } + + /** + * Verify a path exists. + * + * @param path path of operation + * @throws PathNotFoundException if the path is absent + * @throws IOException + */ + public String zkPathMustExist(String path) throws IOException { + zkStat(path); + return path; + } + + /** + * Create a directory. It is not an error if it already exists. + * + * @param path path to create + * @param mode mode for path + * @param createParents flag to trigger parent creation + * @param acls ACL for path + * @throws IOException any problem + */ + public boolean zkMkPath(String path, + CreateMode mode, + boolean createParents, + List<ACL> acls) + throws IOException { + checkServiceLive(); + path = createFullPath(path); + if (acls == null || acls.isEmpty()) { + throw new NoPathPermissionsException(path, "Empty ACL list"); + } + + try { + RegistrySecurity.AclListInfo aclInfo = + new RegistrySecurity.AclListInfo(acls); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating path {} with mode {} and ACL {}", + path, mode, aclInfo); + } + CreateBuilder createBuilder = curator.create(); + createBuilder.withMode(mode).withACL(acls); + if (createParents) { + createBuilder.creatingParentsIfNeeded(); + } + createBuilder.forPath(path); + + } catch (KeeperException.NodeExistsException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("path already present: {}", path, e); + } + return false; + } catch (Exception e) { + throw operationFailure(path, "mkdir() ", e, acls); + } + return true; + } + + /** + * Recursively make a path. + * + * @param path path to create + * @param acl ACL for path + * @throws IOException any problem + */ + public void zkMkParentPath(String path, + List<ACL> acl) throws + IOException { + // split path into elements + + zkMkPath(RegistryPathUtils.parentOf(path), + CreateMode.PERSISTENT, true, acl); + } + + /** + * Create a path with given data. byte[0] is used for a path + * without data. + * + * @param path path of operation + * @param data initial data + * @param acls + * @throws IOException + */ + public void zkCreate(String path, + CreateMode mode, + byte[] data, + List<ACL> acls) throws IOException { + Preconditions.checkArgument(data != null, "null data"); + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating {} with {} bytes of data and ACL {}", + fullpath, data.length, + new RegistrySecurity.AclListInfo(acls)); + } + curator.create().withMode(mode).withACL(acls).forPath(fullpath, data); + } catch (Exception e) { + throw operationFailure(fullpath, "create()", e, acls); + } + } + + /** + * Update the data for a path. + * + * @param path path of operation + * @param data new data + * @throws IOException + */ + public void zkUpdate(String path, byte[] data) throws IOException { + Preconditions.checkArgument(data != null, "null data"); + checkServiceLive(); + path = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating {} with {} bytes", path, data.length); + } + curator.setData().forPath(path, data); + } catch (Exception e) { + throw operationFailure(path, "update()", e); + } + } + + /** + * Create or update an entry. + * + * @param path path + * @param data data + * @param acl ACL for path -used when creating a new entry + * @param overwrite enable overwrite + * @return true if the entry was created, false if it was simply updated. + * @throws IOException + */ + public boolean zkSet(String path, + CreateMode mode, + byte[] data, + List<ACL> acl, boolean overwrite) throws IOException { + Preconditions.checkArgument(data != null, "null data"); + checkServiceLive(); + if (!zkPathExists(path)) { + zkCreate(path, mode, data, acl); + return true; + } else { + if (overwrite) { + zkUpdate(path, data); + return false; + } else { + throw new FileAlreadyExistsException(path); + } + } + } + + /** + * Delete a directory/directory tree. + * It is not an error to delete a path that does not exist. + * + * @param path path of operation + * @param recursive flag to trigger recursive deletion + * @param backgroundCallback callback; this being set converts the operation + * into an async/background operation. + * task + * @throws IOException on problems other than no-such-path + */ + public void zkDelete(String path, + boolean recursive, + BackgroundCallback backgroundCallback) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting {}", fullpath); + } + DeleteBuilder delete = curator.delete(); + if (recursive) { + delete.deletingChildrenIfNeeded(); + } + if (backgroundCallback != null) { + delete.inBackground(backgroundCallback); + } + delete.forPath(fullpath); + } catch (KeeperException.NoNodeException e) { + // not an error + } catch (Exception e) { + throw operationFailure(fullpath, "delete()", e); + } + } + + /** + * List all children of a path. + * + * @param path path of operation + * @return a possibly empty list of children + * @throws IOException + */ + public List<String> zkList(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("ls {}", fullpath); + } + GetChildrenBuilder builder = curator.getChildren(); + List<String> children = builder.forPath(fullpath); + return children; + } catch (Exception e) { + throw operationFailure(path, "ls()", e); + } + } + + /** + * Read data on a path. + * + * @param path path of operation + * @return the data + * @throws IOException read failure + */ + public byte[] zkRead(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Reading {}", fullpath); + } + return curator.getData().forPath(fullpath); + } catch (Exception e) { + throw operationFailure(fullpath, "read()", e); + } + } + + /** + * Return a path dumper instance which can do a full dump + * of the registry tree in its <code>toString()</code> + * operation. + * + * @param verbose verbose flag - includes more details (such as ACLs) + * @return a class to dump the registry + */ + public ZKPathDumper dumpPath(boolean verbose) { + return new ZKPathDumper(curator, registryRoot, verbose); + } + + /** + * Add a new write access entry for all future write operations. + * + * @param id ID to use + * @param pass password + * @throws IOException on any failure to build the digest + */ + public boolean addWriteAccessor(String id, String pass) throws IOException { + RegistrySecurity security = getRegistrySecurity(); + ACL digestACL = new ACL(ZooDefs.Perms.ALL, + security.toDigestId(security.digest(id, pass))); + return security.addDigestACL(digestACL); + } + + /** + * Clear all write accessors. + */ + public void clearWriteAccessors() { + getRegistrySecurity().resetDigestACLs(); + } + + /** + * Diagnostics method to dump a registry robustly. + * Any exception raised is swallowed. + * + * @param verbose verbose path dump + * @return the registry tree + */ + protected String dumpRegistryRobustly(boolean verbose) { + try { + ZKPathDumper pathDumper = dumpPath(verbose); + return pathDumper.toString(); + } catch (Exception e) { + // ignore + LOG.debug("Ignoring exception: {}", e); + } + return ""; + } + + /** + * Registers a listener to path related events. + * + * @param listener the listener. + * @return a handle allowing for the management of the listener. + * @throws Exception if registration fails due to error. + */ + public ListenerHandle registerPathListener(final PathListener listener) + throws Exception { + + final TreeCacheListener pathChildrenCacheListener = + new TreeCacheListener() { + + public void childEvent(CuratorFramework curatorFramework, + TreeCacheEvent event) + throws Exception { + String path = null; + if (event != null && event.getData() != null) { + path = event.getData().getPath(); + } + assert event != null; + switch (event.getType()) { + case NODE_ADDED: + LOG.info("Informing listener of added node {}", path); + listener.nodeAdded(path); + + break; + + case NODE_REMOVED: + LOG.info("Informing listener of removed node {}", path); + listener.nodeRemoved(path); + + break; + + case NODE_UPDATED: + LOG.info("Informing listener of updated node {}", path); + listener.nodeAdded(path); + + break; + + default: + // do nothing + break; + + } + } + }; + treeCache.getListenable().addListener(pathChildrenCacheListener); + + return new ListenerHandle() { + @Override + public void remove() { + treeCache.getListenable().removeListener(pathChildrenCacheListener); + } + }; + + } + + // TODO: should caches be stopped and then restarted if need be? + + /** + * Create the tree cache that monitors the registry for node addition, update, + * and deletion. + * + * @throws Exception if any issue arises during monitoring. + */ + public void monitorRegistryEntries() + throws Exception { + String registryPath = + getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, + RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); + treeCache = new TreeCache(curator, registryPath); + treeCache.start(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java new file mode 100644 index 0000000..e43dbbe --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.registry.client.impl.zk; + +/** + * + */ +public interface ListenerHandle { + void remove(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java new file mode 100644 index 0000000..db1e509 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.registry.client.impl.zk; + +import java.io.IOException; + +/** + * + */ +public interface PathListener { + + void nodeAdded(String path) throws IOException; + + void nodeRemoved(String path) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java new file mode 100644 index 0000000..bab4742 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface which can be implemented by a registry binding source + */ [email protected] [email protected] +public interface RegistryBindingSource { + + /** + * Supply the binding information for this registry + * @return the binding information data + */ + BindingInformation supplyBindingInformation(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java new file mode 100644 index 0000000..f04673a --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import org.apache.zookeeper.ZooDefs; + +/** + * Internal constants for the registry. + * + * These are the things which aren't visible to users. + * + */ +public interface RegistryInternalConstants { + + /** + * Pattern of a single entry in the registry path. : {@value}. + * <p> + * This is what constitutes a valid hostname according to current RFCs. + * Alphanumeric first two and last one digit, alphanumeric + * and hyphens allowed in between. + * <p> + * No upper limit is placed on the size of an entry. + */ + String VALID_PATH_ENTRY_PATTERN = + "([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])"; + + /** + * Permissions for readers: {@value}. + */ + int PERMISSIONS_REGISTRY_READERS = ZooDefs.Perms.READ; + + /** + * Permissions for system services: {@value} + */ + int PERMISSIONS_REGISTRY_SYSTEM_SERVICES = ZooDefs.Perms.ALL; + + /** + * Permissions for a user's root entry: {@value}. + * All except the admin permissions (ACL access) on a node + */ + int PERMISSIONS_REGISTRY_USER_ROOT = + ZooDefs.Perms.READ | ZooDefs.Perms.WRITE | ZooDefs.Perms.CREATE | + ZooDefs.Perms.DELETE; + + /** + * Name of the SASL auth provider which has to be added to ZK server to enable + * sasl: auth patterns: {@value}. + * + * Without this callers can connect via SASL, but + * they can't use it in ACLs + */ + String SASLAUTHENTICATION_PROVIDER = + "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"; + + /** + * String to use as the prefix when declaring a new auth provider: {@value}. + */ + String ZOOKEEPER_AUTH_PROVIDER = "zookeeper.authProvider"; + + /** + * This the Hadoop environment variable which propagates the identity + * of a user in an insecure cluster + */ + String HADOOP_USER_NAME = "HADOOP_USER_NAME"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java new file mode 100644 index 0000000..4c911da --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; + +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * The Registry operations service. + * <p> + * This service implements the {@link RegistryOperations} + * API by mapping the commands to zookeeper operations, and translating + * results and exceptions back into those specified by the API. + * <p> + * Factory methods should hide the detail that this has been implemented via + * the {@link CuratorService} by returning it cast to that + * {@link RegistryOperations} interface, rather than this implementation class. + */ [email protected] [email protected] +public class RegistryOperationsService extends CuratorService + implements RegistryOperations { + + private static final Logger LOG = + LoggerFactory.getLogger(RegistryOperationsService.class); + + private final RegistryUtils.ServiceRecordMarshal serviceRecordMarshal + = new RegistryUtils.ServiceRecordMarshal(); + + public RegistryOperationsService(String name) { + this(name, null); + } + + public RegistryOperationsService() { + this("RegistryOperationsService"); + } + + public RegistryOperationsService(String name, + RegistryBindingSource bindingSource) { + super(name, bindingSource); + } + + /** + * Get the aggregate set of ACLs the client should use + * to create directories + * @return the ACL list + */ + public List<ACL> getClientAcls() { + return getRegistrySecurity().getClientACLs(); + } + + /** + * Validate a path + * @param path path to validate + * @throws InvalidPathnameException if a path is considered invalid + */ + protected void validatePath(String path) throws InvalidPathnameException { + // currently no checks are performed + } + + @Override + public boolean mknode(String path, boolean createParents) throws IOException { + validatePath(path); + return zkMkPath(path, CreateMode.PERSISTENT, createParents, getClientAcls()); + } + + @Override + public void bind(String path, + ServiceRecord record, + int flags) throws IOException { + Preconditions.checkArgument(record != null, "null record"); + validatePath(path); + // validate the record before putting it + RegistryTypeUtils.validateServiceRecord(path, record); + + if (LOG.isDebugEnabled()) { + LOG.debug("Bound at {} : ServiceRecord = {}", path, record); + } + CreateMode mode = CreateMode.PERSISTENT; + byte[] bytes = serviceRecordMarshal.toBytes(record); + zkSet(path, mode, bytes, getClientAcls(), + ((flags & BindFlags.OVERWRITE) != 0)); + } + + @Override + public ServiceRecord resolve(String path) throws IOException { + byte[] bytes = zkRead(path); + + ServiceRecord record = serviceRecordMarshal.fromBytes(path, + bytes, ServiceRecord.RECORD_TYPE); + RegistryTypeUtils.validateServiceRecord(path, record); + return record; + } + + @Override + public boolean exists(String path) throws IOException { + validatePath(path); + return zkPathExists(path); + } + + @Override + public RegistryPathStatus stat(String path) throws IOException { + validatePath(path); + Stat stat = zkStat(path); + + String name = RegistryPathUtils.lastPathEntry(path); + RegistryPathStatus status = new RegistryPathStatus( + name, + stat.getCtime(), + stat.getDataLength(), + stat.getNumChildren()); + if (LOG.isDebugEnabled()) { + LOG.debug("Stat {} => {}", path, status); + } + return status; + } + + @Override + public List<String> list(String path) throws IOException { + validatePath(path); + return zkList(path); + } + + @Override + public void delete(String path, boolean recursive) throws IOException { + validatePath(path); + zkDelete(path, recursive, null); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java new file mode 100644 index 0000000..12a4133 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java @@ -0,0 +1,1143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.ZKUtil; +import org.apache.zookeeper.Environment; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions.*; +import static org.apache.hadoop.registry.client.api.RegistryConstants.*; + +import static org.apache.hadoop.util.PlatformName.IBM_JAVA; + +/** + * Implement the registry security ... a self contained service for + * testability. + * <p> + * This class contains: + * <ol> + * <li> + * The registry security policy implementation, configuration reading, ACL + * setup and management + * </li> + * <li>Lots of static helper methods to aid security setup and debugging</li> + * </ol> + */ + +public class RegistrySecurity extends AbstractService { + + private static final Logger LOG = + LoggerFactory.getLogger(RegistrySecurity.class); + + public static final String E_UNKNOWN_AUTHENTICATION_MECHANISM = + "Unknown/unsupported authentication mechanism; "; + + /** + * there's no default user to add with permissions, so it would be + * impossible to create nodes with unrestricted user access + */ + public static final String E_NO_USER_DETERMINED_FOR_ACLS = + "No user for ACLs determinable from current user or registry option " + + KEY_REGISTRY_USER_ACCOUNTS; + + /** + * Error raised when the registry is tagged as secure but this + * process doesn't have hadoop security enabled. + */ + public static final String E_NO_KERBEROS = + "Registry security is enabled -but Hadoop security is not enabled"; + + /** + * Access policy options + */ + private enum AccessPolicy { + anon, sasl, digest, simple + } + + /** + * Access mechanism + */ + private AccessPolicy access; + + /** + * User used for digest auth + */ + + private String digestAuthUser; + + /** + * Password used for digest auth + */ + + private String digestAuthPassword; + + /** + * Auth data used for digest auth + */ + private byte[] digestAuthData; + + /** + * flag set to true if the registry has security enabled. + */ + private boolean secureRegistry; + + /** + * An ACL with read-write access for anyone + */ + public static final ACL ALL_READWRITE_ACCESS = + new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE); + + /** + * An ACL with read access for anyone + */ + public static final ACL ALL_READ_ACCESS = + new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE); + + /** + * An ACL list containing the {@link #ALL_READWRITE_ACCESS} entry. + * It is copy on write so can be shared without worry + */ + public static final List<ACL> WorldReadWriteACL; + + static { + List<ACL> acls = new ArrayList<ACL>(); + acls.add(ALL_READWRITE_ACCESS); + WorldReadWriteACL = new CopyOnWriteArrayList<ACL>(acls); + } + + /** + * the list of system ACLs + */ + private final List<ACL> systemACLs = new ArrayList<ACL>(); + + private boolean usesRealm = true; + + /** + * A list of digest ACLs which can be added to permissions + * âand cleared later. + */ + private final List<ACL> digestACLs = new ArrayList<ACL>(); + + /** + * the default kerberos realm + */ + private String kerberosRealm; + + /** + * Client context + */ + private String jaasClientEntry; + + /** + * Client identity + */ + private String jaasClientIdentity; + + private String principal; + + private String keytab; + + /** + * Create an instance + * @param name service name + */ + public RegistrySecurity(String name) { + super(name); + } + + /** + * Init the service: this sets up security based on the configuration + * @param conf configuration + * @throws Exception + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + String auth = conf.getTrimmed(KEY_REGISTRY_CLIENT_AUTH, + REGISTRY_CLIENT_AUTH_ANONYMOUS); + + switch (auth) { + case REGISTRY_CLIENT_AUTH_KERBEROS: + access = AccessPolicy.sasl; + break; + case REGISTRY_CLIENT_AUTH_DIGEST: + access = AccessPolicy.digest; + break; + case REGISTRY_CLIENT_AUTH_ANONYMOUS: + access = AccessPolicy.anon; + break; + case REGISTRY_CLIENT_AUTH_SIMPLE: + access = AccessPolicy.simple; + break; + default: + throw new ServiceStateException(E_UNKNOWN_AUTHENTICATION_MECHANISM + + "\"" + auth + "\""); + } + initSecurity(); + } + + /** + * Init security. + * + * After this operation, the {@link #systemACLs} list is valid. + * @throws IOException + */ + private void initSecurity() throws IOException { + + secureRegistry = + getConfig().getBoolean(KEY_REGISTRY_SECURE, DEFAULT_REGISTRY_SECURE); + systemACLs.clear(); + if (secureRegistry) { + addSystemACL(ALL_READ_ACCESS); + + // determine the kerberos realm from JVM and settings + kerberosRealm = getConfig().get(KEY_REGISTRY_KERBEROS_REALM, + getDefaultRealmInJVM()); + + // System Accounts + String system = getOrFail(KEY_REGISTRY_SYSTEM_ACCOUNTS, + DEFAULT_REGISTRY_SYSTEM_ACCOUNTS); + usesRealm = system.contains("@"); + + systemACLs.addAll(buildACLs(system, kerberosRealm, ZooDefs.Perms.ALL)); + + LOG.info("Registry default system acls: " + System.lineSeparator() + + systemACLs); + // user accounts (may be empty, but for digest one user AC must + // be built up + String user = getConfig().get(KEY_REGISTRY_USER_ACCOUNTS, + DEFAULT_REGISTRY_USER_ACCOUNTS); + List<ACL> userACLs = buildACLs(user, kerberosRealm, ZooDefs.Perms.ALL); + + // add self if the current user can be determined + ACL self; + if (UserGroupInformation.isSecurityEnabled()) { + self = createSaslACLFromCurrentUser(ZooDefs.Perms.ALL); + if (self != null) { + userACLs.add(self); + } + } + LOG.info("Registry User ACLs " + System.lineSeparator()+ userACLs); + + // here check for UGI having secure on or digest + ID + switch (access) { + case sasl: + // secure + SASL => has to be authenticated + if (!UserGroupInformation.isSecurityEnabled()) { + throw new IOException("Kerberos required for secure registry access"); + } + UserGroupInformation currentUser = + UserGroupInformation.getCurrentUser(); + jaasClientEntry = getOrFail(KEY_REGISTRY_CLIENT_JAAS_CONTEXT, + DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT); + jaasClientIdentity = currentUser.getShortUserName(); + if (LOG.isDebugEnabled()) { + LOG.debug("Auth is SASL user=\"{}\" JAAS context=\"{}\"", + jaasClientIdentity, jaasClientEntry); + } + break; + + case digest: + String id = getOrFail(KEY_REGISTRY_CLIENT_AUTHENTICATION_ID, ""); + String pass = getOrFail(KEY_REGISTRY_CLIENT_AUTHENTICATION_PASSWORD, ""); + if (userACLs.isEmpty()) { + // + throw new ServiceStateException(E_NO_USER_DETERMINED_FOR_ACLS); + } + digest(id, pass); + ACL acl = new ACL(ZooDefs.Perms.ALL, toDigestId(id, pass)); + userACLs.add(acl); + digestAuthUser = id; + digestAuthPassword = pass; + String authPair = id + ":" + pass; + digestAuthData = authPair.getBytes("UTF-8"); + if (LOG.isDebugEnabled()) { + LOG.debug("Auth is Digest ACL: {}", aclToString(acl)); + } + break; + + case anon: + case simple: + // nothing is needed; account is read only. + if (LOG.isDebugEnabled()) { + LOG.debug("Auth is anonymous"); + } + userACLs = new ArrayList<ACL>(0); + break; + } + systemACLs.addAll(userACLs); + + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Registry has no security"); + } + // wide open cluster, adding system acls + systemACLs.addAll(WorldReadWriteACL); + } + } + + /** + * Add another system ACL + * @param acl add ACL + */ + public void addSystemACL(ACL acl) { + systemACLs.add(acl); + } + + /** + * Add a digest ACL + * @param acl add ACL + */ + public boolean addDigestACL(ACL acl) { + if (secureRegistry) { + if (LOG.isDebugEnabled()) { + LOG.debug("Added ACL {}", aclToString(acl)); + } + digestACLs.add(acl); + return true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring added ACL - registry is insecure{}", + aclToString(acl)); + } + return false; + } + } + + /** + * Reset the digest ACL list + */ + public void resetDigestACLs() { + if (LOG.isDebugEnabled()) { + LOG.debug("Cleared digest ACLs"); + } + digestACLs.clear(); + } + + /** + * Flag to indicate the cluster is secure + * @return true if the config enabled security + */ + public boolean isSecureRegistry() { + return secureRegistry; + } + + /** + * Get the system principals + * @return the system principals + */ + public List<ACL> getSystemACLs() { + Preconditions.checkNotNull(systemACLs, "registry security is uninitialized"); + return Collections.unmodifiableList(systemACLs); + } + + /** + * Get all ACLs needed for a client to use when writing to the repo. + * That is: system ACLs, its own ACL, any digest ACLs + * @return the client ACLs + */ + public List<ACL> getClientACLs() { + List<ACL> clientACLs = new ArrayList<ACL>(systemACLs); + clientACLs.addAll(digestACLs); + return clientACLs; + } + + /** + * Create a SASL ACL for the user + * @param perms permissions + * @return an ACL for the current user or null if they aren't a kerberos user + * @throws IOException + */ + public ACL createSaslACLFromCurrentUser(int perms) throws IOException { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + if (currentUser.hasKerberosCredentials()) { + return createSaslACL(currentUser, perms); + } else { + return null; + } + } + + /** + * Given a UGI, create a SASL ACL from it + * @param ugi UGI + * @param perms permissions + * @return a new ACL + */ + public ACL createSaslACL(UserGroupInformation ugi, int perms) { + String userName = null; + if (usesRealm) { + userName = ugi.getUserName(); + } else { + userName = ugi.getShortUserName(); + } + return new ACL(perms, new Id(SCHEME_SASL, userName)); + } + + /** + * Get a conf option, throw an exception if it is null/empty + * @param key key + * @param defval default value + * @return the value + * @throws IOException if missing + */ + private String getOrFail(String key, String defval) throws IOException { + String val = getConfig().get(key, defval); + if (StringUtils.isEmpty(val)) { + throw new IOException("Missing value for configuration option " + key); + } + return val; + } + + /** + * Check for an id:password tuple being valid. + * This test is stricter than that in {@link DigestAuthenticationProvider}, + * which splits the string, but doesn't check the contents of each + * half for being non-"". + * @param idPasswordPair id:pass pair + * @return true if the pass is considered valid. + */ + public boolean isValid(String idPasswordPair) { + String[] parts = idPasswordPair.split(":"); + return parts.length == 2 + && !StringUtils.isEmpty(parts[0]) + && !StringUtils.isEmpty(parts[1]); + } + + /** + * Get the derived kerberos realm. + * @return this is built from the JVM realm, or the configuration if it + * overrides it. If "", it means "don't know". + */ + public String getKerberosRealm() { + return kerberosRealm; + } + + /** + * Generate a base-64 encoded digest of the idPasswordPair pair + * @param idPasswordPair id:password + * @return a string that can be used for authentication + */ + public String digest(String idPasswordPair) throws IOException { + if (StringUtils.isEmpty(idPasswordPair) || !isValid(idPasswordPair)) { + throw new IOException("Invalid id:password"); + } + try { + return DigestAuthenticationProvider.generateDigest(idPasswordPair); + } catch (NoSuchAlgorithmException e) { + // unlikely since it is standard to the JVM, but maybe JCE restrictions + // could trigger it + throw new IOException(e.toString(), e); + } + } + + /** + * Generate a base-64 encoded digest of the idPasswordPair pair + * @param id ID + * @param password pass + * @return a string that can be used for authentication + * @throws IOException + */ + public String digest(String id, String password) throws IOException { + return digest(id + ":" + password); + } + + /** + * Given a digest, create an ID from it + * @param digest digest + * @return ID + */ + public Id toDigestId(String digest) { + return new Id(SCHEME_DIGEST, digest); + } + + /** + * Create a Digest ID from an id:pass pair + * @param id ID + * @param password password + * @return an ID + * @throws IOException + */ + public Id toDigestId(String id, String password) throws IOException { + return toDigestId(digest(id, password)); + } + + /** + * Split up a list of the form + * <code>sasl:mapred@,digest:5f55d66, sasl@[email protected]</code> + * into a list of possible ACL values, trimming as needed + * + * The supplied realm is added to entries where + * <ol> + * <li>the string begins "sasl:"</li> + * <li>the string ends with "@"</li> + * </ol> + * No attempt is made to validate any of the acl patterns. + * + * @param aclString list of 0 or more ACLs + * @param realm realm to add + * @return a list of split and potentially patched ACL pairs. + * + */ + public List<String> splitAclPairs(String aclString, String realm) { + List<String> list = Lists.newArrayList( + Splitter.on(',').omitEmptyStrings().trimResults() + .split(aclString)); + ListIterator<String> listIterator = list.listIterator(); + while (listIterator.hasNext()) { + String next = listIterator.next(); + if (next.startsWith(SCHEME_SASL +":") && next.endsWith("@")) { + listIterator.set(next + realm); + } + } + return list; + } + + /** + * Parse a string down to an ID, adding a realm if needed + * @param idPair id:data tuple + * @param realm realm to add + * @return the ID. + * @throws IllegalArgumentException if the idPair is invalid + */ + public Id parse(String idPair, String realm) { + int firstColon = idPair.indexOf(':'); + int lastColon = idPair.lastIndexOf(':'); + if (firstColon == -1 || lastColon == -1 || firstColon != lastColon) { + throw new IllegalArgumentException( + "ACL '" + idPair + "' not of expected form scheme:id"); + } + String scheme = idPair.substring(0, firstColon); + String id = idPair.substring(firstColon + 1); + if (id.endsWith("@")) { + Preconditions.checkArgument( + StringUtils.isNotEmpty(realm), + "@ suffixed account but no realm %s", id); + id = id + realm; + } + return new Id(scheme, id); + } + + /** + * Parse the IDs, adding a realm if needed, setting the permissions + * @param principalList id string + * @param realm realm to add + * @param perms permissions + * @return the relevant ACLs + * @throws IOException + */ + public List<ACL> buildACLs(String principalList, String realm, int perms) + throws IOException { + List<String> aclPairs = splitAclPairs(principalList, realm); + List<ACL> ids = new ArrayList<ACL>(aclPairs.size()); + for (String aclPair : aclPairs) { + ACL newAcl = new ACL(); + newAcl.setId(parse(aclPair, realm)); + newAcl.setPerms(perms); + ids.add(newAcl); + } + return ids; + } + + /** + * Parse an ACL list. This includes configuration indirection + * {@link ZKUtil#resolveConfIndirection(String)} + * @param zkAclConf configuration string + * @return an ACL list + * @throws IOException on a bad ACL parse + */ + public List<ACL> parseACLs(String zkAclConf) throws IOException { + try { + return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf)); + } catch (ZKUtil.BadAclFormatException e) { + throw new IOException("Parsing " + zkAclConf + " :" + e, e); + } + } + + /** + * Get the appropriate Kerberos Auth module for JAAS entries + * for this JVM. + * @return a JVM-specific kerberos login module classname. + */ + public static String getKerberosAuthModuleForJVM() { + if (System.getProperty("java.vendor").contains("IBM")) { + return "com.ibm.security.auth.module.Krb5LoginModule"; + } else { + return "com.sun.security.auth.module.Krb5LoginModule"; + } + } + + /** + * JAAS template: {@value} + * Note the semicolon on the last entry + */ + private static final String JAAS_ENTRY = + (IBM_JAVA ? + "%s { %n" + + " %s required%n" + + " useKeytab=\"%s\"%n" + + " debug=true%n" + + " principal=\"%s\"%n" + + " credsType=both%n" + + " refreshKrb5Config=true;%n" + + "}; %n" + : + "%s { %n" + + " %s required%n" + // kerberos module + + " keyTab=\"%s\"%n" + + " debug=true%n" + + " principal=\"%s\"%n" + + " useKeyTab=true%n" + + " useTicketCache=false%n" + + " doNotPrompt=true%n" + + " storeKey=true;%n" + + "}; %n" + ); + + /** + * Create a JAAS entry for insertion + * @param context context of the entry + * @param principal kerberos principal + * @param keytab keytab + * @return a context + */ + public String createJAASEntry( + String context, + String principal, + File keytab) { + Preconditions.checkArgument(StringUtils.isNotEmpty(principal), + "invalid principal"); + Preconditions.checkArgument(StringUtils.isNotEmpty(context), + "invalid context"); + Preconditions.checkArgument(keytab != null && keytab.isFile(), + "Keytab null or missing: "); + String keytabpath = keytab.getAbsolutePath(); + // fix up for windows; no-op on unix + keytabpath = keytabpath.replace('\\', '/'); + return String.format( + Locale.ENGLISH, + JAAS_ENTRY, + context, + getKerberosAuthModuleForJVM(), + keytabpath, + principal); + } + + /** + * Bind the JVM JAS setting to the specified JAAS file. + * + * <b>Important:</b> once a file has been loaded the JVM doesn't pick up + * changes + * @param jaasFile the JAAS file + */ + public static void bindJVMtoJAASFile(File jaasFile) { + String path = jaasFile.getAbsolutePath(); + if (LOG.isDebugEnabled()) { + LOG.debug("Binding {} to {}", Environment.JAAS_CONF_KEY, path); + } + System.setProperty(Environment.JAAS_CONF_KEY, path); + } + + /** + * Set the Zookeeper server property + * {@link ZookeeperConfigOptions#PROP_ZK_SERVER_SASL_CONTEXT} + * to the SASL context. When the ZK server starts, this is the context + * which it will read in + * @param contextName the name of the context + */ + public static void bindZKToServerJAASContext(String contextName) { + System.setProperty(PROP_ZK_SERVER_SASL_CONTEXT, contextName); + } + + /** + * Reset any system properties related to JAAS + */ + public static void clearJaasSystemProperties() { + System.clearProperty(Environment.JAAS_CONF_KEY); + } + + /** + * Resolve the context of an entry. This is an effective test of + * JAAS setup, because it will relay detected problems up + * @param context context name + * @return the entry + * @throws RuntimeException if there is no context entry found + */ + public static AppConfigurationEntry[] validateContext(String context) { + if (context == null) { + throw new RuntimeException("Null context argument"); + } + if (context.isEmpty()) { + throw new RuntimeException("Empty context argument"); + } + javax.security.auth.login.Configuration configuration = + javax.security.auth.login.Configuration.getConfiguration(); + AppConfigurationEntry[] entries = + configuration.getAppConfigurationEntry(context); + if (entries == null) { + throw new RuntimeException( + String.format("Entry \"%s\" not found; " + + "JAAS config = %s", + context, + describeProperty(Environment.JAAS_CONF_KEY) )); + } + return entries; + } + + /** + * Apply the security environment to this curator instance. This + * may include setting up the ZK system properties for SASL + * @param builder curator builder + * @throws IOException if jaas configuration can't be generated or found + */ + public void applySecurityEnvironment(CuratorFrameworkFactory.Builder + builder) throws IOException { + + if (isSecureRegistry()) { + switch (access) { + case anon: + clearZKSaslClientProperties(); + break; + + case digest: + // no SASL + clearZKSaslClientProperties(); + builder.authorization(SCHEME_DIGEST, digestAuthData); + break; + + case sasl: + String existingJaasConf = System.getProperty( + "java.security.auth.login.config"); + if (existingJaasConf == null || existingJaasConf.isEmpty()) { + if (principal == null || keytab == null) { + throw new IOException("SASL is configured for registry, " + + "but neither keytab/principal nor java.security.auth.login" + + ".config system property are specified"); + } + // in this case, keytab and principal are specified and no jaas + // config is specified, so we will create one + LOG.info( + "Enabling ZK sasl client: jaasClientEntry = " + jaasClientEntry + + ", principal = " + principal + ", keytab = " + keytab); + JaasConfiguration jconf = + new JaasConfiguration(jaasClientEntry, principal, keytab); + javax.security.auth.login.Configuration.setConfiguration(jconf); + setSystemPropertyIfUnset(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, + "true"); + setSystemPropertyIfUnset(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, + jaasClientEntry); + } else { + // in this case, jaas config is specified so we will not change it + LOG.info("Using existing ZK sasl configuration: " + + "jaasClientEntry = " + System.getProperty( + ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + + ", sasl client = " + System.getProperty( + ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, + ZooKeeperSaslClient.ENABLE_CLIENT_SASL_DEFAULT) + + ", jaas = " + existingJaasConf); + } + break; + + default: + clearZKSaslClientProperties(); + break; + } + } + } + + public void setKerberosPrincipalAndKeytab(String principal, String keytab) { + this.principal = principal; + this.keytab = keytab; + } + + /** + * Creates a programmatic version of a jaas.conf file. This can be used + * instead of writing a jaas.conf file and setting the system property, + * "java.security.auth.login.config", to point to that file. It is meant to be + * used for connecting to ZooKeeper. + */ + @InterfaceAudience.Private + public static class JaasConfiguration extends + javax.security.auth.login.Configuration { + + private final javax.security.auth.login.Configuration baseConfig = + javax.security.auth.login.Configuration.getConfiguration(); + private static AppConfigurationEntry[] entry; + private String entryName; + + /** + * Add an entry to the jaas configuration with the passed in name, + * principal, and keytab. The other necessary options will be set for you. + * + * @param entryName The name of the entry (e.g. "Client") + * @param principal The principal of the user + * @param keytab The location of the keytab + */ + public JaasConfiguration(String entryName, String principal, String keytab) { + this.entryName = entryName; + Map<String, String> options = new HashMap<String, String>(); + options.put("keyTab", keytab); + options.put("principal", principal); + options.put("useKeyTab", "true"); + options.put("storeKey", "true"); + options.put("useTicketCache", "false"); + options.put("refreshKrb5Config", "true"); + String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG"); + if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) { + options.put("debug", "true"); + } + entry = new AppConfigurationEntry[]{ + new AppConfigurationEntry(getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options)}; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return (entryName.equals(name)) ? entry : ((baseConfig != null) + ? baseConfig.getAppConfigurationEntry(name) : null); + } + + private String getKrb5LoginModuleName() { + String krb5LoginModuleName; + if (System.getProperty("java.vendor").contains("IBM")) { + krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule"; + } else { + krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule"; + } + return krb5LoginModuleName; + } + } + + /** + * Set the client properties. This forces the ZK client into + * failing if it can't auth. + * <b>Important:</b>This is JVM-wide. + * @param username username + * @param context login context + * @throws RuntimeException if the context cannot be found in the current + * JAAS context + */ + public static void setZKSaslClientProperties(String username, + String context) { + RegistrySecurity.validateContext(context); + enableZookeeperClientSASL(); + setSystemPropertyIfUnset(PROP_ZK_SASL_CLIENT_USERNAME, username); + setSystemPropertyIfUnset(PROP_ZK_SASL_CLIENT_CONTEXT, context); + } + + private static void setSystemPropertyIfUnset(String name, String value) { + String existingValue = System.getProperty(name); + if (existingValue == null || existingValue.isEmpty()) { + System.setProperty(name, value); + } + } + + /** + * Clear all the ZK SASL Client properties + * <b>Important:</b>This is JVM-wide + */ + public static void clearZKSaslClientProperties() { + disableZookeeperClientSASL(); + System.clearProperty(PROP_ZK_SASL_CLIENT_CONTEXT); + System.clearProperty(PROP_ZK_SASL_CLIENT_USERNAME); + } + + /** + * Turn ZK SASL on + * <b>Important:</b>This is JVM-wide + */ + protected static void enableZookeeperClientSASL() { + System.setProperty(PROP_ZK_ENABLE_SASL_CLIENT, "true"); + } + + /** + * Force disable ZK SASL bindings. + * <b>Important:</b>This is JVM-wide + */ + public static void disableZookeeperClientSASL() { + System.setProperty(ZookeeperConfigOptions.PROP_ZK_ENABLE_SASL_CLIENT, "false"); + } + + /** + * Is the system property enabling the SASL client set? + * @return true if the SASL client system property is set. + */ + public static boolean isClientSASLEnabled() { + return Boolean.parseBoolean(System.getProperty( + ZookeeperConfigOptions.PROP_ZK_ENABLE_SASL_CLIENT, "true")); + } + + /** + * Log details about the current Hadoop user at INFO. + * Robust against IOEs when trying to get the current user + */ + public void logCurrentHadoopUser() { + try { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + LOG.info("Current user = {}",currentUser); + UserGroupInformation realUser = currentUser.getRealUser(); + LOG.info("Real User = {}" , realUser); + } catch (IOException e) { + LOG.warn("Failed to get current user {}, {}", e); + } + } + + /** + * Stringify a list of ACLs for logging. Digest ACLs have their + * digest values stripped for security. + * @param acls ACL list + * @return a string for logs, exceptions, ... + */ + public static String aclsToString(List<ACL> acls) { + StringBuilder builder = new StringBuilder(); + if (acls == null) { + builder.append("null ACL"); + } else { + builder.append('\n'); + for (ACL acl : acls) { + builder.append(aclToString(acl)) + .append(" "); + } + } + return builder.toString(); + } + + /** + * Convert an ACL to a string, with any obfuscation needed + * @param acl ACL + * @return ACL string value + */ + public static String aclToString(ACL acl) { + return String.format(Locale.ENGLISH, + "0x%02x: %s", + acl.getPerms(), + idToString(acl.getId()) + ); + } + + /** + * Convert an ID to a string, stripping out all but the first few characters + * of any digest auth hash for security reasons + * @param id ID + * @return a string description of a Zookeeper ID + */ + public static String idToString(Id id) { + String s; + if (id.getScheme().equals(SCHEME_DIGEST)) { + String ids = id.getId(); + int colon = ids.indexOf(':'); + if (colon > 0) { + ids = ids.substring(colon + 3); + } + s = SCHEME_DIGEST + ": " + ids; + } else { + s = id.toString(); + } + return s; + } + + /** + * Build up low-level security diagnostics to aid debugging + * @return a string to use in diagnostics + */ + public String buildSecurityDiagnostics() { + StringBuilder builder = new StringBuilder(); + builder.append(secureRegistry ? "secure registry; " + : "insecure registry; "); + builder.append("Curator service access policy: ").append(access); + + builder.append("; System ACLs: ").append(aclsToString(systemACLs)); + builder.append("User: ").append(UgiInfo.fromCurrentUser()); + builder.append("; Kerberos Realm: ").append(kerberosRealm); + builder.append(describeProperty(Environment.JAAS_CONF_KEY)); + String sasl = + System.getProperty(PROP_ZK_ENABLE_SASL_CLIENT, + DEFAULT_ZK_ENABLE_SASL_CLIENT); + boolean saslEnabled = Boolean.parseBoolean(sasl); + builder.append(describeProperty(PROP_ZK_ENABLE_SASL_CLIENT, + DEFAULT_ZK_ENABLE_SASL_CLIENT)); + if (saslEnabled) { + builder.append("; JAAS Client Identity") + .append("=") + .append(jaasClientIdentity) + .append("; "); + builder.append(KEY_REGISTRY_CLIENT_JAAS_CONTEXT) + .append("=") + .append(jaasClientEntry) + .append("; "); + builder.append(describeProperty(PROP_ZK_SASL_CLIENT_USERNAME)); + builder.append(describeProperty(PROP_ZK_SASL_CLIENT_CONTEXT)); + } + builder.append(describeProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS, + "(undefined but defaults to true)")); + builder.append(describeProperty( + PROP_ZK_SERVER_MAINTAIN_CONNECTION_DESPITE_SASL_FAILURE)); + return builder.toString(); + } + + private static String describeProperty(String name) { + return describeProperty(name, "(undefined)"); + } + + private static String describeProperty(String name, String def) { + return "; " + name + "=" + System.getProperty(name, def); + } + + /** + * Get the default kerberos realm âreturning "" if there + * is no realm or other problem + * @return the default realm of the system if it + * could be determined + */ + public static String getDefaultRealmInJVM() { + try { + return KerberosUtil.getDefaultRealm(); + // JDK7 + } catch (ClassNotFoundException ignored) { + // ignored + } catch (NoSuchMethodException ignored) { + // ignored + } catch (IllegalAccessException ignored) { + // ignored + } catch (InvocationTargetException ignored) { + // ignored + } + return ""; + } + + /** + * Create an ACL For a user. + * @param ugi User identity + * @return the ACL For the specified user. Ifthe username doesn't end + * in "@" then the realm is added + */ + public ACL createACLForUser(UserGroupInformation ugi, int perms) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating ACL For ", new UgiInfo(ugi)); + } + if (!secureRegistry) { + return ALL_READWRITE_ACCESS; + } else { + return createACLfromUsername(ugi.getUserName(), perms); + } + } + + /** + * Given a user name (short or long), create a SASL ACL + * @param username user name; if it doesn't contain an "@" symbol, the + * service's kerberos realm is added + * @param perms permissions + * @return an ACL for the user + */ + public ACL createACLfromUsername(String username, int perms) { + if (usesRealm && !username.contains("@")) { + username = username + "@" + kerberosRealm; + if (LOG.isDebugEnabled()) { + LOG.debug("Appending kerberos realm to make {}", username); + } + } + return new ACL(perms, new Id(SCHEME_SASL, username)); + } + + /** + * On demand string-ifier for UGI with extra details + */ + public static class UgiInfo { + + public static UgiInfo fromCurrentUser() { + try { + return new UgiInfo(UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + LOG.info("Failed to get current user {}", e, e); + return new UgiInfo(null); + } + } + + private final UserGroupInformation ugi; + + public UgiInfo(UserGroupInformation ugi) { + this.ugi = ugi; + } + + @Override + public String toString() { + if (ugi==null) { + return "(null ugi)"; + } + StringBuilder builder = new StringBuilder(); + builder.append(ugi.getUserName()).append(": "); + builder.append(ugi.toString()); + builder.append(" hasKerberosCredentials=").append( + ugi.hasKerberosCredentials()); + builder.append(" isFromKeytab=").append(ugi.isFromKeytab()); + builder.append(" kerberos is enabled in Hadoop =").append(UserGroupInformation.isSecurityEnabled()); + return builder.toString(); + } + + } + + /** + * on-demand stringifier for a list of ACLs + */ + public static class AclListInfo { + public final List<ACL> acls; + + public AclListInfo(List<ACL> acls) { + this.acls = acls; + } + + @Override + public String toString() { + return aclsToString(acls); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java new file mode 100644 index 0000000..3c4a730 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.util.List; + +/** + * This class dumps a registry tree to a string. + * It does this in the <code>toString()</code> method, so it + * can be used in a log statement -the operation + * will only take place if the method is evaluated. + * + */ +@VisibleForTesting +public class ZKPathDumper { + + public static final int INDENT = 2; + private final CuratorFramework curator; + private final String root; + private final boolean verbose; + + /** + * Create a path dumper -but do not dump the path until asked + * @param curator curator instance + * @param root root + * @param verbose verbose flag - includes more details (such as ACLs) + */ + public ZKPathDumper(CuratorFramework curator, + String root, + boolean verbose) { + Preconditions.checkArgument(curator != null); + Preconditions.checkArgument(root != null); + this.curator = curator; + this.root = root; + this.verbose = verbose; + } + + /** + * Trigger the recursive registry dump. + * @return a string view of the registry + */ + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("ZK tree for ").append(root).append('\n'); + expand(builder, root, 1); + return builder.toString(); + } + + /** + * Recursively expand the path into the supplied string builder, increasing + * the indentation by {@link #INDENT} as it proceeds (depth first) down + * the tree + * @param builder string build to append to + * @param path path to examine + * @param indent current indentation + */ + private void expand(StringBuilder builder, + String path, + int indent) { + try { + GetChildrenBuilder childrenBuilder = curator.getChildren(); + List<String> children = childrenBuilder.forPath(path); + for (String child : children) { + String childPath = path + "/" + child; + String body; + Stat stat = curator.checkExists().forPath(childPath); + StringBuilder bodyBuilder = new StringBuilder(256); + bodyBuilder.append(" [") + .append(stat.getDataLength()) + .append("]"); + if (stat.getEphemeralOwner() > 0) { + bodyBuilder.append("*"); + } + if (verbose) { + // verbose: extract ACLs + builder.append(" -- "); + List<ACL> acls = + curator.getACL().forPath(childPath); + for (ACL acl : acls) { + builder.append(RegistrySecurity.aclToString(acl)); + builder.append(" "); + } + } + body = bodyBuilder.toString(); + // print each child + append(builder, indent, ' '); + builder.append('/').append(child); + builder.append(body); + builder.append('\n'); + // recurse + expand(builder, childPath, indent + INDENT); + } + } catch (Exception e) { + builder.append(e.toString()).append("\n"); + } + } + + /** + * Append the specified indentation to a builder + * @param builder string build to append to + * @param indent current indentation + * @param c charactor to use for indentation + */ + private void append(StringBuilder builder, int indent, char c) { + for (int i = 0; i < indent; i++) { + builder.append(c); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
