http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java new file mode 100644 index 0000000..b4254a3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.binding; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; + +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Static methods to work with registry types âprimarily endpoints and the + * list representation of addresses. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RegistryTypeUtils { + + /** + * Create a URL endpoint from a list of URIs + * @param api implemented API + * @param protocolType protocol type + * @param uris URIs + * @return a new endpoint + */ + public static Endpoint urlEndpoint(String api, + String protocolType, + URI... uris) { + return new Endpoint(api, protocolType, uris); + } + + /** + * Create a REST endpoint from a list of URIs + * @param api implemented API + * @param uris URIs + * @return a new endpoint + */ + public static Endpoint restEndpoint(String api, + URI... uris) { + return urlEndpoint(api, ProtocolTypes.PROTOCOL_REST, uris); + } + + /** + * Create a Web UI endpoint from a list of URIs + * @param api implemented API + * @param uris URIs + * @return a new endpoint + */ + public static Endpoint webEndpoint(String api, + URI... uris) { + return urlEndpoint(api, ProtocolTypes.PROTOCOL_WEBUI, uris); + } + + /** + * Create an internet address endpoint from a list of URIs + * @param api implemented API + * @param protocolType protocol type + * @param hostname hostname/FQDN + * @param port port + * @return a new endpoint + */ + + public static Endpoint inetAddrEndpoint(String api, + String protocolType, + String hostname, + int port) { + Preconditions.checkArgument(api != null, "null API"); + Preconditions.checkArgument(protocolType != null, "null protocolType"); + Preconditions.checkArgument(hostname != null, "null hostname"); + return new Endpoint(api, + AddressTypes.ADDRESS_HOSTNAME_AND_PORT, + protocolType, + tuplelist(hostname, Integer.toString(port))); + } + + /** + * Create an IPC endpoint + * @param api API + * @param protobuf flag to indicate whether or not the IPC uses protocol + * buffers + * @param address the address as a tuple of (hostname, port) + * @return the new endpoint + */ + public static Endpoint ipcEndpoint(String api, + boolean protobuf, List<String> address) { + ArrayList<List<String>> addressList = new ArrayList<List<String>>(); + if (address != null) { + addressList.add(address); + } + return new Endpoint(api, + AddressTypes.ADDRESS_HOSTNAME_AND_PORT, + protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF + : ProtocolTypes.PROTOCOL_HADOOP_IPC, + addressList); + } + + /** + * Create a single-element list of tuples from the input. + * that is, an input ("a","b","c") is converted into a list + * in the form [["a","b","c"]] + * @param t1 tuple elements + * @return a list containing a single tuple + */ + public static List<List<String>> tuplelist(String... t1) { + List<List<String>> outer = new ArrayList<List<String>>(); + outer.add(tuple(t1)); + return outer; + } + + /** + * Create a tuples from the input. + * that is, an input ("a","b","c") is converted into a list + * in the form ["a","b","c"] + * @param t1 tuple elements + * @return a single tuple as a list + */ + public static List<String> tuple(String... t1) { + return Arrays.asList(t1); + } + + /** + * Create a tuples from the input, converting all to Strings in the process + * that is, an input ("a", 7, true) is converted into a list + * in the form ["a","7,"true"] + * @param t1 tuple elements + * @return a single tuple as a list + */ + public static List<String> tuple(Object... t1) { + List<String> l = new ArrayList<String>(t1.length); + for (Object t : t1) { + l.add(t.toString()); + } + return l; + } + + /** + * Convert a socket address pair into a string tuple, (host, port). + * TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups. + * @param address an address + * @return an element for the address list + */ + public static List<String> marshall(InetSocketAddress address) { + return tuple(address.getHostName(), address.getPort()); + } + + /** + * Require a specific address type on an endpoint + * @param required required type + * @param epr endpoint + * @throws InvalidRecordException if the type is wrong + */ + public static void requireAddressType(String required, Endpoint epr) throws + InvalidRecordException { + if (!required.equals(epr.addressType)) { + throw new InvalidRecordException( + epr.toString(), + "Address type of " + epr.addressType + + " does not match required type of " + + required); + } + } + + /** + * Get a single URI endpoint + * @param epr endpoint + * @return the uri of the first entry in the address list. Null if the endpoint + * itself is null + * @throws InvalidRecordException if the type is wrong, there are no addresses + * or the payload ill-formatted + */ + public static List<String> retrieveAddressesUriType(Endpoint epr) + throws InvalidRecordException { + if (epr == null) { + return null; + } + requireAddressType(AddressTypes.ADDRESS_URI, epr); + List<List<String>> addresses = epr.addresses; + if (addresses.size() < 1) { + throw new InvalidRecordException(epr.toString(), + "No addresses in endpoint"); + } + List<String> results = new ArrayList<String>(addresses.size()); + for (List<String> address : addresses) { + if (address.size() != 1) { + throw new InvalidRecordException(epr.toString(), + "Address payload invalid: wrong element count: " + + address.size()); + } + results.add(address.get(0)); + } + return results; + } + + /** + * Get the address URLs. Guranteed to return at least one address. + * @param epr endpoint + * @return the address as a URL + * @throws InvalidRecordException if the type is wrong, there are no addresses + * or the payload ill-formatted + * @throws MalformedURLException address can't be turned into a URL + */ + public static List<URL> retrieveAddressURLs(Endpoint epr) + throws InvalidRecordException, MalformedURLException { + if (epr == null) { + throw new InvalidRecordException("", "Null endpoint"); + } + List<String> addresses = retrieveAddressesUriType(epr); + List<URL> results = new ArrayList<URL>(addresses.size()); + for (String address : addresses) { + results.add(new URL(address)); + } + return results; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java new file mode 100644 index 0000000..3b28a02 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.binding; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.ServiceRecordHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.*; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility methods for working with a registry. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RegistryUtils { + private static final Logger LOG = + LoggerFactory.getLogger(RegistryUtils.class); + + /** + * Buld the user path -switches to the system path if the user is "". + * It also cross-converts the username to ascii via punycode + * @param shortname username or "" + * @return the path to the user + */ + public static String homePathForUser(String shortname) { + Preconditions.checkArgument(shortname != null, "null user"); + + // catch recursion + if (shortname.startsWith(RegistryConstants.PATH_USERS)) { + return shortname; + } + if (shortname.isEmpty()) { + return RegistryConstants.PATH_SYSTEM_SERVICES; + } + return RegistryPathUtils.join(RegistryConstants.PATH_USERS, + encodeForRegistry(shortname)); + } + + /** + * Create a service classpath + * @param user username or "" + * @param serviceClass service name + * @return a full path + */ + public static String serviceclassPath(String user, + String serviceClass) { + String services = join(homePathForUser(user), + RegistryConstants.PATH_USER_SERVICES); + return join(services, + serviceClass); + } + + /** + * Create a path to a service under a user & service class + * @param user username or "" + * @param serviceClass service name + * @param serviceName service name unique for that user & service class + * @return a full path + */ + public static String servicePath(String user, + String serviceClass, + String serviceName) { + + return join( + serviceclassPath(user, serviceClass), + serviceName); + } + + /** + * Create a path for listing components under a service + * @param user username or "" + * @param serviceClass service name + * @param serviceName service name unique for that user & service class + * @return a full path + */ + public static String componentListPath(String user, + String serviceClass, String serviceName) { + + return join(servicePath(user, serviceClass, serviceName), + RegistryConstants.SUBPATH_COMPONENTS); + } + + /** + * Create the path to a service record for a component + * @param user username or "" + * @param serviceClass service name + * @param serviceName service name unique for that user & service class + * @param componentName unique name/ID of the component + * @return a full path + */ + public static String componentPath(String user, + String serviceClass, String serviceName, String componentName) { + + return join( + componentListPath(user, serviceClass, serviceName), + componentName); + } + + /** + * List service records directly under a path + * @param registryOperations registry operations instance + * @param path path to list + * @return a mapping of the service records that were resolved, indexed + * by their full path + * @throws IOException + */ + public static Map<String, ServiceRecord> listServiceRecords( + RegistryOperations registryOperations, + String path) throws IOException { + Map<String, RegistryPathStatus> children = + statChildren(registryOperations, path); + return extractServiceRecords(registryOperations, + path, + children.values()); + } + + /** + * List children of a directory and retrieve their + * {@link RegistryPathStatus} values. + * <p> + * This is not an atomic operation; A child may be deleted + * during the iteration through the child entries. If this happens, + * the <code>PathNotFoundException</code> is caught and that child + * entry ommitted. + * + * @param path path + * @return a possibly empty map of child entries listed by + * their short name. + * @throws PathNotFoundException path is not in the registry. + * @throws InvalidPathnameException the path is invalid. + * @throws IOException Any other IO Exception + */ + public static Map<String, RegistryPathStatus> statChildren( + RegistryOperations registryOperations, + String path) + throws PathNotFoundException, + InvalidPathnameException, + IOException { + List<String> childNames = registryOperations.list(path); + Map<String, RegistryPathStatus> results = + new HashMap<String, RegistryPathStatus>(); + for (String childName : childNames) { + String child = join(path, childName); + try { + RegistryPathStatus stat = registryOperations.stat(child); + results.put(childName, stat); + } catch (PathNotFoundException pnfe) { + if (LOG.isDebugEnabled()) { + LOG.debug("stat failed on {}: moved? {}", child, pnfe, pnfe); + } + // and continue + } + } + return results; + } + + /** + * Get the home path of the current user. + * <p> + * In an insecure cluster, the environment variable + * <code>HADOOP_USER_NAME</code> is queried <i>first</i>. + * <p> + * This means that in a YARN container where the creator set this + * environment variable to propagate their identity, the defined + * user name is used in preference to the actual user. + * <p> + * In a secure cluster, the kerberos identity of the current user is used. + * @return a path for the current user's home dir. + * @throws RuntimeException if the current user identity cannot be determined + * from the OS/kerberos. + */ + public static String homePathForCurrentUser() { + String shortUserName = currentUsernameUnencoded(); + return homePathForUser(shortUserName); + } + + /** + * Get the current username, before any encoding has been applied. + * @return the current user from the kerberos identity, falling back + * to the user and/or env variables. + */ + private static String currentUsernameUnencoded() { + String env_hadoop_username = System.getenv( + RegistryInternalConstants.HADOOP_USER_NAME); + return getCurrentUsernameUnencoded(env_hadoop_username); + } + + /** + * Get the current username, using the value of the parameter + * <code>env_hadoop_username</code> if it is set on an insecure cluster. + * This ensures that the username propagates correctly across processes + * started by YARN. + * <p> + * This method is primarly made visible for testing. + * @param env_hadoop_username the environment variable + * @return the selected username + * @throws RuntimeException if there is a problem getting the short user + * name of the current user. + */ + @VisibleForTesting + public static String getCurrentUsernameUnencoded(String env_hadoop_username) { + String shortUserName = null; + if (!UserGroupInformation.isSecurityEnabled()) { + shortUserName = env_hadoop_username; + } + if (StringUtils.isEmpty(shortUserName)) { + try { + shortUserName = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return shortUserName; + } + + /** + * Get the current user path formatted for the registry + * <p> + * In an insecure cluster, the environment variable + * <code>HADOOP_USER_NAME </code> is queried <i>first</i>. + * <p> + * This means that in a YARN container where the creator set this + * environment variable to propagate their identity, the defined + * user name is used in preference to the actual user. + * <p> + * In a secure cluster, the kerberos identity of the current user is used. + * @return the encoded shortname of the current user + * @throws RuntimeException if the current user identity cannot be determined + * from the OS/kerberos. + * + */ + public static String currentUser() { + String shortUserName = currentUsernameUnencoded(); + return encodeForRegistry(shortUserName); + } + + /** + * Extract all service records under a list of stat operations...this + * skips entries that are too short or simply not matching + * @param operations operation support for fetches + * @param parentpath path of the parent of all the entries + * @param stats Collection of stat results + * @return a possibly empty map of fullpath:record. + * @throws IOException for any IO Operation that wasn't ignored. + */ + public static Map<String, ServiceRecord> extractServiceRecords( + RegistryOperations operations, + String parentpath, + Collection<RegistryPathStatus> stats) throws IOException { + Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size()); + for (RegistryPathStatus stat : stats) { + if (stat.size > ServiceRecordHeader.getLength()) { + // maybe has data + String path = join(parentpath, stat.path); + try { + ServiceRecord serviceRecord = operations.resolve(path); + results.put(path, serviceRecord); + } catch (EOFException ignored) { + if (LOG.isDebugEnabled()) { + LOG.debug("data too short for {}", path); + } + } catch (InvalidRecordException record) { + if (LOG.isDebugEnabled()) { + LOG.debug("Invalid record at {}", path); + } + } catch (NoRecordException record) { + if (LOG.isDebugEnabled()) { + LOG.debug("No record at {}", path); + } + } + } + } + return results; + } + + /** + * Extract all service records under a list of stat operations...this + * non-atomic action skips entries that are too short or simply not matching. + * <p> + * @param operations operation support for fetches + * @param parentpath path of the parent of all the entries + * @param stats a map of name:value mappings. + * @return a possibly empty map of fullpath:record. + * @throws IOException for any IO Operation that wasn't ignored. + */ + public static Map<String, ServiceRecord> extractServiceRecords( + RegistryOperations operations, + String parentpath, + Map<String , RegistryPathStatus> stats) throws IOException { + return extractServiceRecords(operations, parentpath, stats.values()); + } + + + /** + * Extract all service records under a list of stat operations...this + * non-atomic action skips entries that are too short or simply not matching. + * <p> + * @param operations operation support for fetches + * @param parentpath path of the parent of all the entries + * @param stats a map of name:value mappings. + * @return a possibly empty map of fullpath:record. + * @throws IOException for any IO Operation that wasn't ignored. + */ + public static Map<String, ServiceRecord> extractServiceRecords( + RegistryOperations operations, + String parentpath) throws IOException { + return + extractServiceRecords(operations, + parentpath, + statChildren(operations, parentpath).values()); + } + + + + /** + * Static instance of service record marshalling + */ + public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> { + public ServiceRecordMarshal() { + super(ServiceRecord.class, ServiceRecordHeader.getData()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java new file mode 100644 index 0000000..f99aa71 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Registry binding utility classes. + */ +package org.apache.hadoop.registry.client.binding; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java new file mode 100644 index 0000000..aadb7fc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.exceptions; + +/** + * Exception raised when client access wasn't authenticated. + * That is: the credentials provided were incomplete or invalid. + */ +public class AuthenticationFailedException extends RegistryIOException { + public AuthenticationFailedException(String path, Throwable cause) { + super(path, cause); + } + + public AuthenticationFailedException(String path, String error) { + super(path, error); + } + + public AuthenticationFailedException(String path, + String error, + Throwable cause) { + super(path, error, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java new file mode 100644 index 0000000..c984f41 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A path name was invalid. This is raised when a path string has + * characters in it that are not permitted. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class InvalidPathnameException extends RegistryIOException { + public InvalidPathnameException(String path, String message) { + super(path, message); + } + + public InvalidPathnameException(String path, + String message, + Throwable cause) { + super(path, message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java new file mode 100644 index 0000000..e4f545e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Raised if an attempt to parse a record failed. + * + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class InvalidRecordException extends RegistryIOException { + + public InvalidRecordException(String path, String error) { + super(path, error); + } + + public InvalidRecordException(String path, + String error, + Throwable cause) { + super(path, error, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java new file mode 100644 index 0000000..24070a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is a manifestation of the Zookeeper restrictions about + * what nodes may act as parents. + * + * Children are not allowed under ephemeral nodes. This is an aspect + * of ZK which isn't directly exposed to the registry API. It may + * surface if the registry is manipulated outside of the registry API. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class NoChildrenForEphemeralsException extends RegistryIOException { + public NoChildrenForEphemeralsException(String path, Throwable cause) { + super(path, cause); + } + + public NoChildrenForEphemeralsException(String path, String error) { + super(path, error); + } + + public NoChildrenForEphemeralsException(String path, + String error, + Throwable cause) { + super(path, error, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java new file mode 100644 index 0000000..ce84f5b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.exceptions; + +import org.apache.hadoop.fs.PathIOException; + +/** + * Raised on path permission exceptions. + * <p> + * This is similar to PathIOException, except that exception doesn't let + */ +public class NoPathPermissionsException extends RegistryIOException { + public NoPathPermissionsException(String path, Throwable cause) { + super(path, cause); + } + + public NoPathPermissionsException(String path, String error) { + super(path, error); + } + + public NoPathPermissionsException(String path, String error, Throwable cause) { + super(path, error, cause); + } + + public NoPathPermissionsException(String message, + PathIOException cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java new file mode 100644 index 0000000..160433f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.ServiceRecordHeader; + +/** + * Raised if there is no {@link ServiceRecord} resolved at the end + * of the specified path, for reasons such as: + * <ul> + * <li>There wasn't enough data to contain a Service Record.</li> + * <li>The start of the data did not match the {@link ServiceRecordHeader} + * header.</li> + * </ul> + * + * There may be valid data of some form at the end of the path, but it does + * not appear to be a Service Record. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class NoRecordException extends RegistryIOException { + + public NoRecordException(String path, String error) { + super(path, error); + } + + public NoRecordException(String path, + String error, + Throwable cause) { + super(path, error, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java new file mode 100644 index 0000000..ca966db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.PathIOException; + +/** + * Base exception for registry operations. + * <p> + * These exceptions include the path of the failing operation wherever possible; + * this can be retrieved via {@link PathIOException#getPath()}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RegistryIOException extends PathIOException { + + /** + * Build an exception from any other Path IO Exception. + * This propagates the path of the original exception + * @param message more specific text + * @param cause cause + */ + public RegistryIOException(String message, PathIOException cause) { + super(cause.getPath() != null ? cause.getPath().toString() : "", + message, + cause); + } + + public RegistryIOException(String path, Throwable cause) { + super(path, cause); + } + + public RegistryIOException(String path, String error) { + super(path, error); + } + + public RegistryIOException(String path, String error, Throwable cause) { + super(path, error, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java new file mode 100644 index 0000000..7d9c8ad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Registry Service Exceptions + * <p> + * These are the Registry-specific exceptions that may be raised during + * Registry operations. + * <p> + * Other exceptions may be raised, especially <code>IOExceptions</code> + * triggered by network problems, and <code>IllegalArgumentException</code> + * exceptions that may be raised if invalid (often null) arguments are passed + * to a method call. + * <p> + * All exceptions in this package are derived from + * {@link org.apache.hadoop.registry.client.exceptions.RegistryIOException} + */ +package org.apache.hadoop.registry.client.exceptions; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java new file mode 100644 index 0000000..db03936 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; + + +/** + * This is the client service for applications to work with the registry. + * + * It does not set up the root paths for the registry, is bonded + * to a user, and can be set to use SASL, anonymous or id:pass auth. + * + * For SASL, the client must be operating in the context of an authed user. + * + * For id:pass the client must have the relevant id & password, SASL is + * not used even if the client has credentials. + * + * For anonymous, nothing is used. + * + * Any SASL-authed client also has the ability to add one or more authentication + * id:pass pair on all future writes, and to reset them later. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RegistryOperationsClient extends RegistryOperationsService { + + public RegistryOperationsClient(String name) { + super(name); + } + + public RegistryOperationsClient(String name, + RegistryBindingSource bindingSource) { + super(name, bindingSource); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java new file mode 100644 index 0000000..d85b6a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Registry client services + * <p> + * These are classes which follow the YARN lifecycle and which implement + * the {@link org.apache.hadoop.registry.client.api.RegistryOperations} + * API. + */ +package org.apache.hadoop.registry.client.impl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java new file mode 100644 index 0000000..8ae003d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Binding information provided by a {@link RegistryBindingSource} + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class BindingInformation { + + /** + * The Curator Ensemble Provider + */ + public EnsembleProvider ensembleProvider; + + /** + * Any information that may be useful for diagnostics + */ + public String description; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java new file mode 100644 index 0000000..a0e6365 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java @@ -0,0 +1,769 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.DeleteBuilder; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.retry.BoundedExponentialBackoffRetry; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.exceptions.AuthenticationFailedException; +import org.apache.hadoop.registry.client.exceptions.NoChildrenForEphemeralsException; +import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException; +import org.apache.hadoop.registry.client.exceptions.RegistryIOException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * This service binds to Zookeeper via Apache Curator. It is more + * generic than just the YARN service registry; it does not implement + * any of the Registry Operations API. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class CuratorService extends CompositeService + implements RegistryConstants, RegistryBindingSource { + + private static final Logger LOG = + LoggerFactory.getLogger(CuratorService.class); + + /** + * the Curator binding + */ + private CuratorFramework curator; + + /** + * Path to the registry root + */ + private String registryRoot; + + /** + * Supplied binding source. This defaults to being this + * service itself. + */ + private final RegistryBindingSource bindingSource; + + /** + * Security service + */ + private RegistrySecurity registrySecurity; + + /** + * the connection binding text for messages + */ + private String connectionDescription; + + /** + * Security connection diagnostics + */ + private String securityConnectionDiagnostics = ""; + + /** + * Provider of curator "ensemble"; offers a basis for + * more flexible bonding in future. + */ + private EnsembleProvider ensembleProvider; + + /** + * Construct the service. + * @param name service name + * @param bindingSource source of binding information. + * If null: use this instance + */ + public CuratorService(String name, RegistryBindingSource bindingSource) { + super(name); + if (bindingSource != null) { + this.bindingSource = bindingSource; + } else { + this.bindingSource = this; + } + } + + /** + * Create an instance using this service as the binding source (i.e. read + * configuration options from the registry) + * @param name service name + */ + public CuratorService(String name) { + this(name, null); + } + + /** + * Init the service. + * This is where the security bindings are set up + * @param conf configuration of the service + * @throws Exception + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + + registryRoot = conf.getTrimmed(KEY_REGISTRY_ZK_ROOT, + DEFAULT_ZK_REGISTRY_ROOT); + + // create and add the registy service + registrySecurity = new RegistrySecurity("registry security"); + addService(registrySecurity); + + if (LOG.isDebugEnabled()) { + LOG.debug("Creating Registry with root {}", registryRoot); + } + + super.serviceInit(conf); + } + + /** + * Start the service. + * This is where the curator instance is started. + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + + // create the curator; rely on the registry security code + // to set up the JVM context and curator + curator = createCurator(); + } + + /** + * Close the ZK connection if it is open + */ + @Override + protected void serviceStop() throws Exception { + IOUtils.closeStream(curator); + super.serviceStop(); + } + + /** + * Internal check that a service is in the live state + * @throws ServiceStateException if not + */ + private void checkServiceLive() throws ServiceStateException { + if (!isInState(STATE.STARTED)) { + throw new ServiceStateException( + "Service " + getName() + " is in wrong state: " + + getServiceState()); + } + } + + /** + * Flag to indicate whether or not the registry is secure. + * Valid once the service is inited. + * @return service security policy + */ + public boolean isSecure() { + return registrySecurity.isSecureRegistry(); + } + + /** + * Get the registry security helper + * @return the registry security helper + */ + protected RegistrySecurity getRegistrySecurity() { + return registrySecurity; + } + + /** + * Build the security diagnostics string + * @return a string for diagnostics + */ + protected String buildSecurityDiagnostics() { + // build up the security connection diags + if (!isSecure()) { + return "security disabled"; + } else { + StringBuilder builder = new StringBuilder(); + builder.append("secure cluster; "); + builder.append(registrySecurity.buildSecurityDiagnostics()); + return builder.toString(); + } + } + + /** + * Create a new curator instance off the root path; using configuration + * options provided in the service configuration to set timeouts and + * retry policy. + * @return the newly created creator + */ + private CuratorFramework createCurator() throws IOException { + Configuration conf = getConfig(); + createEnsembleProvider(); + int sessionTimeout = conf.getInt(KEY_REGISTRY_ZK_SESSION_TIMEOUT, + DEFAULT_ZK_SESSION_TIMEOUT); + int connectionTimeout = conf.getInt(KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, + DEFAULT_ZK_CONNECTION_TIMEOUT); + int retryTimes = conf.getInt(KEY_REGISTRY_ZK_RETRY_TIMES, + DEFAULT_ZK_RETRY_TIMES); + int retryInterval = conf.getInt(KEY_REGISTRY_ZK_RETRY_INTERVAL, + DEFAULT_ZK_RETRY_INTERVAL); + int retryCeiling = conf.getInt(KEY_REGISTRY_ZK_RETRY_CEILING, + DEFAULT_ZK_RETRY_CEILING); + + if (LOG.isDebugEnabled()) { + LOG.debug("Creating CuratorService with connection {}", + connectionDescription); + } + CuratorFramework framework; + + synchronized (CuratorService.class) { + // set the security options + + //log them + securityConnectionDiagnostics = buildSecurityDiagnostics(); + + // build up the curator itself + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + builder.ensembleProvider(ensembleProvider) + .connectionTimeoutMs(connectionTimeout) + .sessionTimeoutMs(sessionTimeout) + + .retryPolicy(new BoundedExponentialBackoffRetry(retryInterval, + retryCeiling, + retryTimes)); + + // set up the builder AND any JVM context + registrySecurity.applySecurityEnvironment(builder); + + framework = builder.build(); + framework.start(); + } + + return framework; + } + + @Override + public String toString() { + return super.toString() + + bindingDiagnosticDetails(); + } + + /** + * Get the binding diagnostics + * @return a diagnostics string valid after the service is started. + */ + public String bindingDiagnosticDetails() { + return " Connection=\"" + connectionDescription + "\"" + + " root=\"" + registryRoot + "\"" + + " " + securityConnectionDiagnostics; + } + + /** + * Create a full path from the registry root and the supplied subdir + * @param path path of operation + * @return an absolute path + * @throws IllegalArgumentException if the path is invalide + */ + protected String createFullPath(String path) throws IOException { + return RegistryPathUtils.createFullPath(registryRoot, path); + } + + /** + * Get the registry binding source ... this can be used to + * create new ensemble providers + * @return the registry binding source in use + */ + public RegistryBindingSource getBindingSource() { + return bindingSource; + } + + /** + * Create the ensemble provider for this registry, by invoking + * {@link RegistryBindingSource#supplyBindingInformation()} on + * the provider stored in {@link #bindingSource} + * Sets {@link #ensembleProvider} to that value; + * sets {@link #connectionDescription} to the binding info + * for use in toString and logging; + * + */ + protected void createEnsembleProvider() { + BindingInformation binding = bindingSource.supplyBindingInformation(); + connectionDescription = binding.description + + " " + securityConnectionDiagnostics; + ensembleProvider = binding.ensembleProvider; + } + + /** + * Supply the binding information. + * This implementation returns a fixed ensemble bonded to + * the quorum supplied by {@link #buildConnectionString()} + * @return the binding information + */ + @Override + public BindingInformation supplyBindingInformation() { + BindingInformation binding = new BindingInformation(); + String connectString = buildConnectionString(); + binding.ensembleProvider = new FixedEnsembleProvider(connectString); + binding.description = + "fixed ZK quorum \"" + connectString + "\""; + return binding; + } + + /** + * Override point: get the connection string used to connect to + * the ZK service + * @return a registry quorum + */ + protected String buildConnectionString() { + return getConfig().getTrimmed(KEY_REGISTRY_ZK_QUORUM, + DEFAULT_REGISTRY_ZK_QUORUM); + } + + /** + * Create an IOE when an operation fails + * @param path path of operation + * @param operation operation attempted + * @param exception caught the exception caught + * @return an IOE to throw that contains the path and operation details. + */ + protected IOException operationFailure(String path, + String operation, + Exception exception) { + return operationFailure(path, operation, exception, null); + } + + /** + * Create an IOE when an operation fails + * @param path path of operation + * @param operation operation attempted + * @param exception caught the exception caught + * @return an IOE to throw that contains the path and operation details. + */ + protected IOException operationFailure(String path, + String operation, + Exception exception, + List<ACL> acls) { + IOException ioe; + String aclList = "[" + RegistrySecurity.aclsToString(acls) + "]"; + if (exception instanceof KeeperException.NoNodeException) { + ioe = new PathNotFoundException(path); + } else if (exception instanceof KeeperException.NodeExistsException) { + ioe = new FileAlreadyExistsException(path); + } else if (exception instanceof KeeperException.NoAuthException) { + ioe = new NoPathPermissionsException(path, + "Not authorized to access path; ACLs: " + aclList); + } else if (exception instanceof KeeperException.NotEmptyException) { + ioe = new PathIsNotEmptyDirectoryException(path); + } else if (exception instanceof KeeperException.AuthFailedException) { + ioe = new AuthenticationFailedException(path, + "Authentication Failed: " + exception, exception); + } else if (exception instanceof KeeperException.NoChildrenForEphemeralsException) { + ioe = new NoChildrenForEphemeralsException(path, + "Cannot create a path under an ephemeral node: " + exception, + exception); + } else if (exception instanceof KeeperException.InvalidACLException) { + // this is a security exception of a kind + // include the ACLs to help the diagnostics + StringBuilder builder = new StringBuilder(); + builder.append("Path access failure ").append(aclList); + builder.append(" "); + builder.append(securityConnectionDiagnostics); + ioe = new NoPathPermissionsException(path, builder.toString()); + } else { + ioe = new RegistryIOException(path, + "Failure of " + operation + " on " + path + ": " + + exception.toString(), + exception); + } + if (ioe.getCause() == null) { + ioe.initCause(exception); + } + return ioe; + } + + /** + * Create a path if it does not exist. + * The check is poll + create; there's a risk that another process + * may create the same path before the create() operation is executed/ + * propagated to the ZK node polled. + * + * @param path path to create + * @param acl ACL for path -used when creating a new entry + * @param createParents flag to trigger parent creation + * @return true iff the path was created + * @throws IOException + */ + @VisibleForTesting + public boolean maybeCreate(String path, + CreateMode mode, + List<ACL> acl, + boolean createParents) throws IOException { + return zkMkPath(path, mode, createParents, acl); + } + + /** + * Stat the file + * @param path path of operation + * @return a curator stat entry + * @throws IOException on a failure + * @throws PathNotFoundException if the path was not found + */ + public Stat zkStat(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + Stat stat; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Stat {}", fullpath); + } + stat = curator.checkExists().forPath(fullpath); + } catch (Exception e) { + throw operationFailure(fullpath, "read()", e); + } + if (stat == null) { + throw new PathNotFoundException(path); + } + return stat; + } + + /** + * Get the ACLs of a path + * @param path path of operation + * @return a possibly empty list of ACLs + * @throws IOException + */ + public List<ACL> zkGetACLS(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + List<ACL> acls; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("GetACLS {}", fullpath); + } + acls = curator.getACL().forPath(fullpath); + } catch (Exception e) { + throw operationFailure(fullpath, "read()", e); + } + if (acls == null) { + throw new PathNotFoundException(path); + } + return acls; + } + + /** + * Probe for a path existing + * @param path path of operation + * @return true if the path was visible from the ZK server + * queried. + * @throws IOException on any exception other than + * {@link PathNotFoundException} + */ + public boolean zkPathExists(String path) throws IOException { + checkServiceLive(); + try { + return zkStat(path) != null; + } catch (PathNotFoundException e) { + return false; + } catch (IOException e) { + throw e; + } + } + + /** + * Verify a path exists + * @param path path of operation + * @throws PathNotFoundException if the path is absent + * @throws IOException + */ + public String zkPathMustExist(String path) throws IOException { + zkStat(path); + return path; + } + + /** + * Create a directory. It is not an error if it already exists + * @param path path to create + * @param mode mode for path + * @param createParents flag to trigger parent creation + * @param acls ACL for path + * @throws IOException any problem + */ + public boolean zkMkPath(String path, + CreateMode mode, + boolean createParents, + List<ACL> acls) + throws IOException { + checkServiceLive(); + path = createFullPath(path); + if (acls == null || acls.isEmpty()) { + throw new NoPathPermissionsException(path, "Empty ACL list"); + } + + try { + RegistrySecurity.AclListInfo aclInfo = + new RegistrySecurity.AclListInfo(acls); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating path {} with mode {} and ACL {}", + path, mode, aclInfo); + } + CreateBuilder createBuilder = curator.create(); + createBuilder.withMode(mode).withACL(acls); + if (createParents) { + createBuilder.creatingParentsIfNeeded(); + } + createBuilder.forPath(path); + + } catch (KeeperException.NodeExistsException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("path already present: {}", path, e); + } + return false; + } catch (Exception e) { + throw operationFailure(path, "mkdir() ", e, acls); + } + return true; + } + + /** + * Recursively make a path + * @param path path to create + * @param acl ACL for path + * @throws IOException any problem + */ + public void zkMkParentPath(String path, + List<ACL> acl) throws + IOException { + // split path into elements + + zkMkPath(RegistryPathUtils.parentOf(path), + CreateMode.PERSISTENT, true, acl); + } + + /** + * Create a path with given data. byte[0] is used for a path + * without data + * @param path path of operation + * @param data initial data + * @param acls + * @throws IOException + */ + public void zkCreate(String path, + CreateMode mode, + byte[] data, + List<ACL> acls) throws IOException { + Preconditions.checkArgument(data != null, "null data"); + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating {} with {} bytes of data and ACL {}", + fullpath, data.length, + new RegistrySecurity.AclListInfo(acls)); + } + curator.create().withMode(mode).withACL(acls).forPath(fullpath, data); + } catch (Exception e) { + throw operationFailure(fullpath, "create()", e, acls); + } + } + + /** + * Update the data for a path + * @param path path of operation + * @param data new data + * @throws IOException + */ + public void zkUpdate(String path, byte[] data) throws IOException { + Preconditions.checkArgument(data != null, "null data"); + checkServiceLive(); + path = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating {} with {} bytes", path, data.length); + } + curator.setData().forPath(path, data); + } catch (Exception e) { + throw operationFailure(path, "update()", e); + } + } + + /** + * Create or update an entry + * @param path path + * @param data data + * @param acl ACL for path -used when creating a new entry + * @param overwrite enable overwrite + * @throws IOException + * @return true if the entry was created, false if it was simply updated. + */ + public boolean zkSet(String path, + CreateMode mode, + byte[] data, + List<ACL> acl, boolean overwrite) throws IOException { + Preconditions.checkArgument(data != null, "null data"); + checkServiceLive(); + if (!zkPathExists(path)) { + zkCreate(path, mode, data, acl); + return true; + } else { + if (overwrite) { + zkUpdate(path, data); + return false; + } else { + throw new FileAlreadyExistsException(path); + } + } + } + + /** + * Delete a directory/directory tree. + * It is not an error to delete a path that does not exist + * @param path path of operation + * @param recursive flag to trigger recursive deletion + * @param backgroundCallback callback; this being set converts the operation + * into an async/background operation. + * task + * @throws IOException on problems other than no-such-path + */ + public void zkDelete(String path, + boolean recursive, + BackgroundCallback backgroundCallback) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting {}", fullpath); + } + DeleteBuilder delete = curator.delete(); + if (recursive) { + delete.deletingChildrenIfNeeded(); + } + if (backgroundCallback != null) { + delete.inBackground(backgroundCallback); + } + delete.forPath(fullpath); + } catch (KeeperException.NoNodeException e) { + // not an error + } catch (Exception e) { + throw operationFailure(fullpath, "delete()", e); + } + } + + /** + * List all children of a path + * @param path path of operation + * @return a possibly empty list of children + * @throws IOException + */ + public List<String> zkList(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("ls {}", fullpath); + } + GetChildrenBuilder builder = curator.getChildren(); + List<String> children = builder.forPath(fullpath); + return children; + } catch (Exception e) { + throw operationFailure(path, "ls()", e); + } + } + + /** + * Read data on a path + * @param path path of operation + * @return the data + * @throws IOException read failure + */ + public byte[] zkRead(String path) throws IOException { + checkServiceLive(); + String fullpath = createFullPath(path); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Reading {}", fullpath); + } + return curator.getData().forPath(fullpath); + } catch (Exception e) { + throw operationFailure(fullpath, "read()", e); + } + } + + /** + * Return a path dumper instance which can do a full dump + * of the registry tree in its <code>toString()</code> + * operation + * @return a class to dump the registry + * @param verbose verbose flag - includes more details (such as ACLs) + */ + public ZKPathDumper dumpPath(boolean verbose) { + return new ZKPathDumper(curator, registryRoot, verbose); + } + + /** + * Add a new write access entry for all future write operations. + * @param id ID to use + * @param pass password + * @throws IOException on any failure to build the digest + */ + public boolean addWriteAccessor(String id, String pass) throws IOException { + RegistrySecurity security = getRegistrySecurity(); + ACL digestACL = new ACL(ZooDefs.Perms.ALL, + security.toDigestId(security.digest(id, pass))); + return security.addDigestACL(digestACL); + } + + /** + * Clear all write accessors + */ + public void clearWriteAccessors() { + getRegistrySecurity().resetDigestACLs(); + } + + + /** + * Diagnostics method to dump a registry robustly. + * Any exception raised is swallowed + * @param verbose verbose path dump + * @return the registry tree + */ + protected String dumpRegistryRobustly(boolean verbose) { + try { + ZKPathDumper pathDumper = dumpPath(verbose); + return pathDumper.toString(); + } catch (Exception e) { + // ignore + LOG.debug("Ignoring exception: {}", e); + } + return ""; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java new file mode 100644 index 0000000..bab4742 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface which can be implemented by a registry binding source + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RegistryBindingSource { + + /** + * Supply the binding information for this registry + * @return the binding information data + */ + BindingInformation supplyBindingInformation(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java new file mode 100644 index 0000000..f04673a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import org.apache.zookeeper.ZooDefs; + +/** + * Internal constants for the registry. + * + * These are the things which aren't visible to users. + * + */ +public interface RegistryInternalConstants { + + /** + * Pattern of a single entry in the registry path. : {@value}. + * <p> + * This is what constitutes a valid hostname according to current RFCs. + * Alphanumeric first two and last one digit, alphanumeric + * and hyphens allowed in between. + * <p> + * No upper limit is placed on the size of an entry. + */ + String VALID_PATH_ENTRY_PATTERN = + "([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])"; + + /** + * Permissions for readers: {@value}. + */ + int PERMISSIONS_REGISTRY_READERS = ZooDefs.Perms.READ; + + /** + * Permissions for system services: {@value} + */ + int PERMISSIONS_REGISTRY_SYSTEM_SERVICES = ZooDefs.Perms.ALL; + + /** + * Permissions for a user's root entry: {@value}. + * All except the admin permissions (ACL access) on a node + */ + int PERMISSIONS_REGISTRY_USER_ROOT = + ZooDefs.Perms.READ | ZooDefs.Perms.WRITE | ZooDefs.Perms.CREATE | + ZooDefs.Perms.DELETE; + + /** + * Name of the SASL auth provider which has to be added to ZK server to enable + * sasl: auth patterns: {@value}. + * + * Without this callers can connect via SASL, but + * they can't use it in ACLs + */ + String SASLAUTHENTICATION_PROVIDER = + "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"; + + /** + * String to use as the prefix when declaring a new auth provider: {@value}. + */ + String ZOOKEEPER_AUTH_PROVIDER = "zookeeper.authProvider"; + + /** + * This the Hadoop environment variable which propagates the identity + * of a user in an insecure cluster + */ + String HADOOP_USER_NAME = "HADOOP_USER_NAME"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a326711/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java new file mode 100644 index 0000000..c54c205 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.client.impl.zk; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; + +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * The Registry operations service. + * <p> + * This service implements the {@link RegistryOperations} + * API by mapping the commands to zookeeper operations, and translating + * results and exceptions back into those specified by the API. + * <p> + * Factory methods should hide the detail that this has been implemented via + * the {@link CuratorService} by returning it cast to that + * {@link RegistryOperations} interface, rather than this implementation class. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RegistryOperationsService extends CuratorService + implements RegistryOperations { + + private static final Logger LOG = + LoggerFactory.getLogger(RegistryOperationsService.class); + + private final RegistryUtils.ServiceRecordMarshal serviceRecordMarshal + = new RegistryUtils.ServiceRecordMarshal(); + + public RegistryOperationsService(String name) { + this(name, null); + } + + public RegistryOperationsService() { + this("RegistryOperationsService"); + } + + public RegistryOperationsService(String name, + RegistryBindingSource bindingSource) { + super(name, bindingSource); + } + + /** + * Get the aggregate set of ACLs the client should use + * to create directories + * @return the ACL list + */ + public List<ACL> getClientAcls() { + return getRegistrySecurity().getClientACLs(); + } + + /** + * Validate a path ... this includes checking that they are DNS-valid + * @param path path to validate + * @throws InvalidPathnameException if a path is considered invalid + */ + protected void validatePath(String path) throws InvalidPathnameException { + RegistryPathUtils.validateElementsAsDNS(path); + } + + @Override + public boolean mknode(String path, boolean createParents) throws IOException { + validatePath(path); + return zkMkPath(path, CreateMode.PERSISTENT, createParents, getClientAcls()); + } + + @Override + public void bind(String path, + ServiceRecord record, + int flags) throws IOException { + Preconditions.checkArgument(record != null, "null record"); + validatePath(path); + LOG.info("Bound at {} : {}", path, record); + + CreateMode mode = CreateMode.PERSISTENT; + byte[] bytes = serviceRecordMarshal.toByteswithHeader(record); + zkSet(path, mode, bytes, getClientAcls(), + ((flags & BindFlags.OVERWRITE) != 0)); + } + + @Override + public ServiceRecord resolve(String path) throws IOException { + byte[] bytes = zkRead(path); + return serviceRecordMarshal.fromBytesWithHeader(path, bytes); + } + + @Override + public boolean exists(String path) throws IOException { + validatePath(path); + return zkPathExists(path); + } + + @Override + public RegistryPathStatus stat(String path) throws IOException { + validatePath(path); + Stat stat = zkStat(path); + + String name = RegistryPathUtils.lastPathEntry(path); + RegistryPathStatus status = new RegistryPathStatus( + name, + stat.getCtime(), + stat.getDataLength(), + stat.getNumChildren()); + if (LOG.isDebugEnabled()) { + LOG.debug("Stat {} => {}", path, status); + } + return status; + } + + @Override + public List<String> list(String path) throws IOException { + validatePath(path); + return zkList(path); + } + + @Override + public void delete(String path, boolean recursive) throws IOException { + validatePath(path); + zkDelete(path, recursive, null); + } + +}