http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java new file mode 100644 index 0000000..004be86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.integration; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.registry.server.services.RegistryAdminService; + +/** + * Select an entry by the YARN persistence policy + */ +public class SelectByYarnPersistence + implements RegistryAdminService.NodeSelector { + private final String id; + private final String targetPolicy; + + public SelectByYarnPersistence(String id, String targetPolicy) { + Preconditions.checkArgument(!StringUtils.isEmpty(id), "id"); + Preconditions.checkArgument(!StringUtils.isEmpty(targetPolicy), + "targetPolicy"); + this.id = id; + this.targetPolicy = targetPolicy; + } + + @Override + public boolean shouldSelect(String path, + RegistryPathStatus registryPathStatus, + ServiceRecord serviceRecord) { + String policy = + serviceRecord.get(YarnRegistryAttributes.YARN_PERSISTENCE, ""); + return id.equals(serviceRecord.get(YarnRegistryAttributes.YARN_ID, "")) + && (targetPolicy.equals(policy)); + } + + @Override + public String toString() { + return String.format( + "Select by ID %s and policy %s: {}", + id, targetPolicy); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java new file mode 100644 index 0000000..22d8bc5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the classes which integrate with the YARN resource + * manager. + */ +package org.apache.hadoop.registry.server.integration; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java new file mode 100644 index 0000000..6962eb8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Server-side classes for the registry + * <p> + * These are components intended to be deployed only on servers or in test + * JVMs, rather than on client machines. + * <p> + * Example components are: server-side ZK support, a REST service, etc. + */ +package org.apache.hadoop.registry.server; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java new file mode 100644 index 0000000..9faede4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; + +/** + * Composite service that exports the add/remove methods. + * <p> + * This allows external classes to add services to these methods, after which + * they follow the same lifecyce. + * <p> + * It is essential that any service added is in a state where it can be moved + * on with that of the parent services. Specifically, do not add an uninited + * service to a parent that is already inited âas the <code>start</code> + * operation will then fail + * + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AddingCompositeService extends CompositeService { + + + public AddingCompositeService(String name) { + super(name); + } + + @Override + public void addService(Service service) { + super.addService(service); + } + + @Override + public boolean removeService(Service service) { + return super.removeService(service); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java new file mode 100644 index 0000000..e160d4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Curator callback for delete operations completing. + * <p> + * This callback logs at debug and increments the event counter. + */ +public class DeleteCompletionCallback implements BackgroundCallback { + private static final Logger LOG = + LoggerFactory.getLogger(RMRegistryOperationsService.class); + + private AtomicInteger events = new AtomicInteger(0); + + @Override + public void processResult(CuratorFramework client, + CuratorEvent event) throws + Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Delete event {}", event); + } + events.incrementAndGet(); + } + + /** + * Get the number of deletion events + * @return the count of events + */ + public int getEventCount() { + return events.get(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java new file mode 100644 index 0000000..3fa0c19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.impl.zk.BindingInformation; +import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource; +import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; +import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; +import org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +/** + * This is a small, localhost Zookeeper service instance that is contained + * in a YARN service...it's been derived from Apache Twill. + * + * It implements {@link RegistryBindingSource} and provides binding information, + * <i>once started</i>. Until <code>start()</code> is called, the hostname & + * port may be undefined. Accordingly, the service raises an exception in this + * condition. + * + * If you wish to chain together a registry service with this one under + * the same <code>CompositeService</code>, this service must be added + * as a child first. + * + * It also sets the configuration parameter + * {@link RegistryConstants#KEY_REGISTRY_ZK_QUORUM} + * to its connection string. Any code with access to the service configuration + * can view it. + */ +@InterfaceStability.Evolving +public class MicroZookeeperService + extends AbstractService + implements RegistryBindingSource, RegistryConstants, + ZookeeperConfigOptions, + MicroZookeeperServiceKeys{ + + + private static final Logger + LOG = LoggerFactory.getLogger(MicroZookeeperService.class); + + private File instanceDir; + private File dataDir; + private int tickTime; + private int port; + private String host; + private boolean secureServer; + + private ServerCnxnFactory factory; + private BindingInformation binding; + private File confDir; + private StringBuilder diagnostics = new StringBuilder(); + + /** + * Create an instance + * @param name service name + */ + public MicroZookeeperService(String name) { + super(name); + } + + /** + * Get the connection string. + * @return the string + * @throws IllegalStateException if the connection is not yet valid + */ + public String getConnectionString() { + Preconditions.checkState(factory != null, "service not started"); + InetSocketAddress addr = factory.getLocalAddress(); + return String.format("%s:%d", addr.getHostName(), addr.getPort()); + } + + /** + * Get the connection address + * @return the connection as an address + * @throws IllegalStateException if the connection is not yet valid + */ + public InetSocketAddress getConnectionAddress() { + Preconditions.checkState(factory != null, "service not started"); + return factory.getLocalAddress(); + } + + /** + * Create an inet socket addr from the local host + port number + * @param port port to use + * @return a (hostname, port) pair + * @throws UnknownHostException if the server cannot resolve the host + */ + private InetSocketAddress getAddress(int port) throws UnknownHostException { + return new InetSocketAddress(host, port < 0 ? 0 : port); + } + + /** + * Initialize the service, including choosing a path for the data + * @param conf configuration + * @throws Exception + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + port = conf.getInt(KEY_ZKSERVICE_PORT, 0); + tickTime = conf.getInt(KEY_ZKSERVICE_TICK_TIME, + ZooKeeperServer.DEFAULT_TICK_TIME); + String instancedirname = conf.getTrimmed( + KEY_ZKSERVICE_DIR, ""); + host = conf.getTrimmed(KEY_ZKSERVICE_HOST, DEFAULT_ZKSERVICE_HOST); + if (instancedirname.isEmpty()) { + File testdir = new File(System.getProperty("test.dir", "target")); + instanceDir = new File(testdir, "zookeeper" + getName()); + } else { + instanceDir = new File(instancedirname); + FileUtil.fullyDelete(instanceDir); + } + LOG.debug("Instance directory is {}", instanceDir); + mkdirStrict(instanceDir); + dataDir = new File(instanceDir, "data"); + confDir = new File(instanceDir, "conf"); + mkdirStrict(dataDir); + mkdirStrict(confDir); + super.serviceInit(conf); + } + + /** + * Create a directory, ignoring if the dir is already there, + * and failing if a file or something else was at the end of that + * path + * @param dir dir to guarantee the existence of + * @throws IOException IO problems, or path exists but is not a dir + */ + private void mkdirStrict(File dir) throws IOException { + if (!dir.mkdirs()) { + if (!dir.isDirectory()) { + throw new IOException("Failed to mkdir " + dir); + } + } + } + + /** + * Append a formatted string to the diagnostics. + * <p> + * A newline is appended afterwards. + * @param text text including any format commands + * @param args arguments for the forma operation. + */ + protected void addDiagnostics(String text, Object ... args) { + diagnostics.append(String.format(text, args)).append('\n'); + } + + /** + * Get the diagnostics info + * @return the diagnostics string built up + */ + public String getDiagnostics() { + return diagnostics.toString(); + } + + /** + * set up security. this must be done prior to creating + * the ZK instance, as it sets up JAAS if that has not been done already. + * + * @return true if the cluster has security enabled. + */ + public boolean setupSecurity() throws IOException { + Configuration conf = getConfig(); + String jaasContext = conf.getTrimmed(KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT); + secureServer = StringUtils.isNotEmpty(jaasContext); + if (secureServer) { + RegistrySecurity.validateContext(jaasContext); + RegistrySecurity.bindZKToServerJAASContext(jaasContext); + // policy on failed auth + System.setProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS, + conf.get(KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS, + "true")); + + //needed so that you can use sasl: strings in the registry + System.setProperty(RegistryInternalConstants.ZOOKEEPER_AUTH_PROVIDER +".1", + RegistryInternalConstants.SASLAUTHENTICATION_PROVIDER); + String serverContext = + System.getProperty(PROP_ZK_SERVER_SASL_CONTEXT); + addDiagnostics("Server JAAS context s = %s", serverContext); + return true; + } else { + return false; + } + } + + /** + * Startup: start ZK. It is only after this that + * the binding information is valid. + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + + setupSecurity(); + + ZooKeeperServer zkServer = new ZooKeeperServer(); + FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir); + zkServer.setTxnLogFactory(ftxn); + zkServer.setTickTime(tickTime); + + LOG.info("Starting Local Zookeeper service"); + factory = ServerCnxnFactory.createFactory(); + factory.configure(getAddress(port), -1); + factory.startup(zkServer); + + String connectString = getConnectionString(); + LOG.info("In memory ZK started at {}\n", connectString); + + if (LOG.isDebugEnabled()) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + zkServer.dumpConf(pw); + pw.flush(); + LOG.debug(sw.toString()); + } + binding = new BindingInformation(); + binding.ensembleProvider = new FixedEnsembleProvider(connectString); + binding.description = + getName() + " reachable at \"" + connectString + "\""; + + addDiagnostics(binding.description); + // finally: set the binding information in the config + getConfig().set(KEY_REGISTRY_ZK_QUORUM, connectString); + } + + /** + * When the service is stopped, it deletes the data directory + * and its contents + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + if (factory != null) { + factory.shutdown(); + factory = null; + } + if (dataDir != null) { + FileUtil.fullyDelete(dataDir); + } + } + + @Override + public BindingInformation supplyBindingInformation() { + Preconditions.checkNotNull(binding, + "Service is not started: binding information undefined"); + return binding; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java new file mode 100644 index 0000000..f4f4976 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import org.apache.hadoop.registry.client.api.RegistryConstants; + +/** + * Service keys for configuring the {@link MicroZookeeperService}. + * These are not used in registry clients or the RM-side service, + * so are kept separate. + */ +public interface MicroZookeeperServiceKeys { + public static final String ZKSERVICE_PREFIX = + RegistryConstants.REGISTRY_PREFIX + "zk.service."; + /** + * Key to define the JAAS context for the ZK service: {@value}. + */ + public static final String KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT = + ZKSERVICE_PREFIX + "service.jaas.context"; + + /** + * ZK servertick time: {@value} + */ + public static final String KEY_ZKSERVICE_TICK_TIME = + ZKSERVICE_PREFIX + "ticktime"; + + /** + * host to register on: {@value}. + */ + public static final String KEY_ZKSERVICE_HOST = ZKSERVICE_PREFIX + "host"; + /** + * Default host to serve on -this is <code>localhost</code> as it + * is the only one guaranteed to be available: {@value}. + */ + public static final String DEFAULT_ZKSERVICE_HOST = "localhost"; + /** + * port; 0 or below means "any": {@value} + */ + public static final String KEY_ZKSERVICE_PORT = ZKSERVICE_PREFIX + "port"; + + /** + * Directory containing data: {@value} + */ + public static final String KEY_ZKSERVICE_DIR = ZKSERVICE_PREFIX + "dir"; + + /** + * Should failed SASL clients be allowed: {@value}? + * + * Default is the ZK default: true + */ + public static final String KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS = + ZKSERVICE_PREFIX + "allow.failed.sasl.clients"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java new file mode 100644 index 0000000..693bb0b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Administrator service for the registry. This is the one with + * permissions to create the base directories and those for users. + * + * It also includes support for asynchronous operations, so that + * zookeeper connectivity problems do not hold up the server code + * performing the actions. + * + * Any action queued via {@link #submit(Callable)} will be + * run asynchronously. The {@link #createDirAsync(String, List, boolean)} + * is an example of such an an action + * + * A key async action is the depth-first tree purge, which supports + * pluggable policies for deleting entries. The method + * {@link #purge(String, NodeSelector, PurgePolicy, BackgroundCallback)} + * implements the recursive purge operation âthe class + * {{AsyncPurge}} provides the asynchronous scheduling of this. + */ +public class RegistryAdminService extends RegistryOperationsService { + + private static final Logger LOG = + LoggerFactory.getLogger(RegistryAdminService.class); + /** + * The ACL permissions for the user's homedir ACL. + */ + public static final int USER_HOMEDIR_ACL_PERMISSIONS = + ZooDefs.Perms.READ | ZooDefs.Perms.WRITE + | ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; + + /** + * Executor for async operations + */ + protected final ExecutorService executor; + + /** + * Construct an instance of the service + * @param name service name + */ + public RegistryAdminService(String name) { + this(name, null); + } + + /** + * construct an instance of the service, using the + * specified binding source to bond to ZK + * @param name service name + * @param bindingSource provider of ZK binding information + */ + public RegistryAdminService(String name, + RegistryBindingSource bindingSource) { + super(name, bindingSource); + executor = Executors.newCachedThreadPool( + new ThreadFactory() { + private AtomicInteger counter = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, + "RegistryAdminService " + counter.getAndIncrement()); + } + }); + } + + /** + * Stop the service: halt the executor. + * @throws Exception exception. + */ + @Override + protected void serviceStop() throws Exception { + stopExecutor(); + super.serviceStop(); + } + + /** + * Stop the executor if it is not null. + * This uses {@link ExecutorService#shutdownNow()} + * and so does not block until they have completed. + */ + protected synchronized void stopExecutor() { + if (executor != null) { + executor.shutdownNow(); + } + } + + /** + * Get the executor + * @return the executor + */ + protected ExecutorService getExecutor() { + return executor; + } + + /** + * Submit a callable + * @param callable callable + * @param <V> type of the final get + * @return a future to wait on + */ + public <V> Future<V> submit(Callable<V> callable) { + if (LOG.isDebugEnabled()) { + LOG.debug("Submitting {}", callable); + } + return getExecutor().submit(callable); + } + + /** + * Asynchronous operation to create a directory + * @param path path + * @param acls ACL list + * @param createParents flag to indicate parent dirs should be created + * as needed + * @return the future which will indicate whether or not the operation + * succeeded âand propagate any exceptions + * @throws IOException + */ + public Future<Boolean> createDirAsync(final String path, + final List<ACL> acls, + final boolean createParents) throws IOException { + return submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return maybeCreate(path, CreateMode.PERSISTENT, + acls, createParents); + } + }); + } + + /** + * Init operation sets up the system ACLs. + * @param conf configuration of the service + * @throws Exception + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + RegistrySecurity registrySecurity = getRegistrySecurity(); + if (registrySecurity.isSecureRegistry()) { + ACL sasl = registrySecurity.createSaslACLFromCurrentUser(ZooDefs.Perms.ALL); + registrySecurity.addSystemACL(sasl); + LOG.info("Registry System ACLs:", + RegistrySecurity.aclsToString( + registrySecurity.getSystemACLs())); + } + } + + /** + * Start the service, including creating base directories with permissions + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + // create the root directories + try { + createRootRegistryPaths(); + } catch (NoPathPermissionsException e) { + + String message = String.format(Locale.ENGLISH, + "Failed to create root paths {%s};" + + "\ndiagnostics={%s}" + + "\ncurrent registry is:" + + "\n{%s}", + e, + bindingDiagnosticDetails(), + dumpRegistryRobustly(true)); + + LOG.error(" Failure {}", e, e); + LOG.error(message); + + // TODO: this is something temporary to deal with the problem + // that jenkins is failing this test + throw new NoPathPermissionsException(e.getPath().toString(), message, e); + } + } + + /** + * Create the initial registry paths + * @throws IOException any failure + */ + @VisibleForTesting + public void createRootRegistryPaths() throws IOException { + + List<ACL> systemACLs = getRegistrySecurity().getSystemACLs(); + LOG.info("System ACLs {}", + RegistrySecurity.aclsToString(systemACLs)); + maybeCreate("", CreateMode.PERSISTENT, systemACLs, false); + maybeCreate(PATH_USERS, CreateMode.PERSISTENT, + systemACLs, false); + maybeCreate(PATH_SYSTEM_SERVICES, + CreateMode.PERSISTENT, + systemACLs, false); + } + + /** + * Get the path to a user's home dir + * @param username username + * @return a path for services underneath + */ + protected String homeDir(String username) { + return RegistryUtils.homePathForUser(username); + } + + /** + * Set up the ACL for the user. + * <b>Important: this must run client-side as it needs + * to know the id:pass tuple for a user</b> + * @param username user name + * @param perms permissions + * @return an ACL list + * @throws IOException ACL creation/parsing problems + */ + public List<ACL> aclsForUser(String username, int perms) throws IOException { + List<ACL> clientACLs = getClientAcls(); + RegistrySecurity security = getRegistrySecurity(); + if (security.isSecureRegistry()) { + clientACLs.add(security.createACLfromUsername(username, perms)); + } + return clientACLs; + } + + /** + * Start an async operation to create the home path for a user + * if it does not exist + * @param shortname username, without any @REALM in kerberos + * @return the path created + * @throws IOException any failure while setting up the operation + * + */ + public Future<Boolean> initUserRegistryAsync(final String shortname) + throws IOException { + + String homeDir = homeDir(shortname); + if (!exists(homeDir)) { + // create the directory. The user does not + return createDirAsync(homeDir, + aclsForUser(shortname, + USER_HOMEDIR_ACL_PERMISSIONS), + false); + } + return null; + } + + /** + * Create the home path for a user if it does not exist. + * + * This uses {@link #initUserRegistryAsync(String)} and then waits for the + * result ... the code path is the same as the async operation; this just + * picks up and relays/converts exceptions + * @param username username + * @return the path created + * @throws IOException any failure + * + */ + public String initUserRegistry(final String username) + throws IOException { + + try { + Future<Boolean> future = initUserRegistryAsync(username); + future.get(); + } catch (InterruptedException e) { + throw (InterruptedIOException) + (new InterruptedIOException(e.toString()).initCause(e)); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) (cause); + } else { + throw new IOException(cause.toString(), cause); + } + } + + return homeDir(username); + } + + /** + * Method to validate the validity of the kerberos realm. + * <ul> + * <li>Insecure: not needed.</li> + * <li>Secure: must have been determined.</li> + * </ul> + */ + protected void verifyRealmValidity() throws ServiceStateException { + if (isSecure()) { + String realm = getRegistrySecurity().getKerberosRealm(); + if (StringUtils.isEmpty(realm)) { + throw new ServiceStateException("Cannot determine service realm"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Started Registry operations in realm {}", realm); + } + } + } + + /** + * Policy to purge entries + */ + public enum PurgePolicy { + PurgeAll, + FailOnChildren, + SkipOnChildren + } + + /** + * Recursive operation to purge all matching records under a base path. + * <ol> + * <li>Uses a depth first search</li> + * <li>A match is on ID and persistence policy, or, if policy==-1, any match</li> + * <li>If a record matches then it is deleted without any child searches</li> + * <li>Deletions will be asynchronous if a callback is provided</li> + * </ol> + * + * The code is designed to be robust against parallel deletions taking place; + * in such a case it will stop attempting that part of the tree. This + * avoid the situation of more than 1 purge happening in parallel and + * one of the purge operations deleteing the node tree above the other. + * @param path base path + * @param selector selector for the purge policy + * @param purgePolicy what to do if there is a matching record with children + * @param callback optional curator callback + * @return the number of delete operations perfomed. As deletes may be for + * everything under a path, this may be less than the number of records + * actually deleted + * @throws IOException problems + * @throws PathIsNotEmptyDirectoryException if an entry cannot be deleted + * as it has children and the purge policy is FailOnChildren + */ + @VisibleForTesting + public int purge(String path, + NodeSelector selector, + PurgePolicy purgePolicy, + BackgroundCallback callback) throws IOException { + + + boolean toDelete = false; + // look at self to see if it has a service record + Map<String, RegistryPathStatus> childEntries; + Collection<RegistryPathStatus> entries; + try { + // list this path's children + childEntries = RegistryUtils.statChildren(this, path); + entries = childEntries.values(); + } catch (PathNotFoundException e) { + // there's no record here, it may have been deleted already. + // exit + return 0; + } + + try { + RegistryPathStatus registryPathStatus = stat(path); + ServiceRecord serviceRecord = resolve(path); + // there is now an entry here. + toDelete = selector.shouldSelect(path, registryPathStatus, serviceRecord); + } catch (EOFException ignored) { + // ignore + } catch (InvalidRecordException ignored) { + // ignore + } catch (NoRecordException ignored) { + // ignore + } catch (PathNotFoundException e) { + // there's no record here, it may have been deleted already. + // exit + return 0; + } + + if (toDelete && !entries.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Match on record @ {} with children ", path); + } + // there's children + switch (purgePolicy) { + case SkipOnChildren: + // don't do the deletion... continue to next record + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping deletion"); + } + toDelete = false; + break; + case PurgeAll: + // mark for deletion + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling for deletion with children"); + } + toDelete = true; + entries = new ArrayList<RegistryPathStatus>(0); + break; + case FailOnChildren: + if (LOG.isDebugEnabled()) { + LOG.debug("Failing deletion operation"); + } + throw new PathIsNotEmptyDirectoryException(path); + } + } + + int deleteOps = 0; + if (toDelete) { + try { + zkDelete(path, true, callback); + } catch (PathNotFoundException e) { + // sign that the path was deleted during the operation. + // this is a no-op, and all children can be skipped + return deleteOps; + } + deleteOps++; + } + + // now go through the children + for (RegistryPathStatus status : entries) { + String childname = status.path; + String childpath = RegistryPathUtils.join(path, childname); + deleteOps += purge(childpath, + selector, + purgePolicy, + callback); + } + + return deleteOps; + } + + /** + * Comparator used for purge logic + */ + public interface NodeSelector { + + boolean shouldSelect(String path, + RegistryPathStatus registryPathStatus, + ServiceRecord serviceRecord); + } + + /** + * An async registry purge action taking + * a selector which decides what to delete + */ + public class AsyncPurge implements Callable<Integer> { + + private final BackgroundCallback callback; + private final NodeSelector selector; + private final String path; + private final PurgePolicy purgePolicy; + + public AsyncPurge(String path, + NodeSelector selector, + PurgePolicy purgePolicy, + BackgroundCallback callback) { + this.callback = callback; + this.selector = selector; + this.path = path; + this.purgePolicy = purgePolicy; + } + + @Override + public Integer call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing {}", this); + } + return purge(path, + selector, + purgePolicy, + callback); + } + + @Override + public String toString() { + return String.format( + "Record purge under %s with selector %s", + path, selector); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java new file mode 100644 index 0000000..85d24b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Basic services for the YARN registry + * <ul> + * <li>The {@link org.apache.hadoop.registry.server.services.RegistryAdminService}</ol> + * extends the shared Yarn Registry client with registry setup and + * (potentially asynchronous) administrative actions. + * </li> + * <li> + * The {@link org.apache.hadoop.registry.server.services.MicroZookeeperService} + * is a transient Zookeeper instance bound to the YARN service lifecycle. + * It is suitable for testing. + * </li> + * <li> + * The {@link org.apache.hadoop.registry.server.services.AddingCompositeService} + * extends the standard YARN composite service by making its add and remove + * methods public. It is a utility service used in parts of the codebase + * </li> + * + * </ul> + * + */ +package org.apache.hadoop.registry.server.services; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla new file mode 100644 index 0000000..1c19ade --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla @@ -0,0 +1,538 @@ +---------------------------- MODULE yarnregistry ---------------------------- + +EXTENDS FiniteSets, Sequences, Naturals, TLC + + +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *) + +(* + +============================================================================ + +This defines the YARN registry in terms of operations on sets of records. + +Every registry entry is represented as a record containing both the path and the data. + +It assumes that + +1. operations on this set are immediate. +2. selection operations (such as \A and \E are atomic) +3. changes are immediately visible to all other users of the registry. +4. This clearly implies that changes are visible in the sequence in which they happen. + +A multi-server Zookeeper-based registry may not meet all those assumptions + +1. changes may take time to propagate across the ZK quorum, hence changes cannot +be considered immediate from the perspective of other registry clients. +(assumptions (1) and (3)). + +2. Selection operations may not be atomic. (assumption (2)). + +Operations will still happen in the order received by the elected ZK master + +A stricter definition would try to state that all operations are eventually +true excluding other changes happening during a sequence of action. +This is left as an excercise for the reader. + +The specification also omits all coverage of the permissions policy. +*) + + + +CONSTANTS + PathChars, \* the set of valid characters in a path + Paths, \* the set of all possible valid paths + Data, \* the set of all possible sequences of bytes + Address, \* the set of all possible address n-tuples + Addresses, \* the set of all possible address instances + Endpoints , \* the set of all possible endpoints + PersistPolicies,\* the set of persistence policies + ServiceRecords, \* all service records + Registries, \* the set of all possile registries + BindActions, \* all possible put actions + DeleteActions, \* all possible delete actions + PurgeActions, \* all possible purge actions + MknodeActions \* all possible mkdir actions + + + +(* the registry*) +VARIABLE registry + +(* Sequence of actions to apply to the registry *) +VARIABLE actions + +---------------------------------------------------------------------------------------- +(* Tuple of all variables. *) + + +vars == << registry, actions >> + + +---------------------------------------------------------------------------------------- + + + + +(* Persistence policy *) +PersistPolicySet == { + "", \* Undefined; field not present. PERMANENT is implied. + "permanent", \* persists until explicitly removed + "application", \* persists until the application finishes + "application-attempt", \* persists until the application attempt finishes + "container" \* persists until the container finishes + } + +(* Type invariants. *) +TypeInvariant == + /\ \A p \in PersistPolicies: p \in PersistPolicySet + + + +---------------------------------------------------------------------------------------- + + + +(* + +An Entry is defined as a path, and the actual +data which it contains. + +By including the path in an entry, we avoid having to define some +function mapping Path -> entry. Instead a registry can be defined as a +set of RegistryEntries matching the validity critera. + +*) + +RegistryEntry == [ + \* The path to the entry + path: Paths, + + \* the data in the entry + data: Data + ] + + +(* + An endpoint in a service record +*) +Endpoint == [ + \* API of the endpoint: some identifier + api: STRING, + + \* A list of address n-tuples + addresses: Addresses +] + +(* Attributes are the set of all string to string mappings *) + +Attributes == [ +STRING |-> STRING +] + +(* + A service record +*) +ServiceRecord == [ + \* ID -used when applying the persistence policy + yarn_id: STRING, + + \* the persistence policy + yarn_persistence: PersistPolicySet, + + \*A description + description: STRING, + + \* A set of endpoints + external: Endpoints, + + \* Endpoints intended for use internally + internal: Endpoints, + + \* Attributes are a function + attributes: Attributes +] + + +---------------------------------------------------------------------------------------- + +(* Action Records *) + +putAction == [ + type: "put", + record: ServiceRecord +] + +deleteAction == [ + type: "delete", + path: STRING, + recursive: BOOLEAN +] + +purgeAction == [ + type: "purge", + path: STRING, + persistence: PersistPolicySet +] + +mkNodeAction == [ + type: "mknode", + path: STRING, + parents: BOOLEAN +] + + +---------------------------------------------------------------------------------------- + +(* + + Path operations + +*) + +(* +Parent is defined for non empty sequences + *) + +parent(path) == SubSeq(path, 1, Len(path)-1) + +isParent(path, c) == path = parent(c) + +---------------------------------------------------------------------------------------- +(* +Registry Access Operations +*) + +(* +Lookup all entries in a registry with a matching path +*) + +resolve(Registry, path) == \A entry \in Registry: entry.path = path + +(* +A path exists in the registry iff there is an entry with that path +*) + +exists(Registry, path) == resolve(Registry, path) /= {} + +(* +A parent entry, or an empty set if there is none +*) +parentEntry(Registry, path) == resolve(Registry, parent(path)) + +(* +A root path is the empty sequence +*) +isRootPath(path) == path = <<>> + +(* +The root entry is the entry whose path is the root path +*) +isRootEntry(entry) == entry.path = <<>> + + +(* +A path p is an ancestor of another path d if they are different, and the path d +starts with path p +*) + +isAncestorOf(path, d) == + /\ path /= d + /\ \E k : SubSeq(d, 0, k) = path + + +ancestorPathOf(path) == + \A a \in Paths: isAncestorOf(a, path) + +(* +The set of all children of a path in the registry +*) + +children(R, path) == \A c \in R: isParent(path, c.path) + +(* +A path has children if the children() function does not return the empty set +*) +hasChildren(R, path) == children(R, path) /= {} + +(* +Descendant: a child of a path or a descendant of a child of a path +*) + +descendants(R, path) == \A e \in R: isAncestorOf(path, e.path) + +(* +Ancestors: all entries in the registry whose path is an entry of the path argument +*) +ancestors(R, path) == \A e \in R: isAncestorOf(e.path, path) + +(* +The set of entries that are a path and its descendants +*) +pathAndDescendants(R, path) == + \/ \A e \in R: isAncestorOf(path, e.path) + \/ resolve(R, path) + + +(* +For validity, all entries must match the following criteria + *) + +validRegistry(R) == + \* there can be at most one entry for a path. + /\ \A e \in R: Cardinality(resolve(R, e.path)) = 1 + + \* There's at least one root entry + /\ \E e \in R: isRootEntry(e) + + \* an entry must be the root entry or have a parent entry + /\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path)) + + \* If the entry has data, it must be a service record + /\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords) + + +---------------------------------------------------------------------------------------- +(* +Registry Manipulation +*) + +(* +An entry can be put into the registry iff +its parent is present or it is the root entry +*) +canBind(R, e) == + isRootEntry(e) \/ exists(R, parent(e.path)) + +(* +'bind() adds/replaces an entry if permitted +*) + +bind(R, e) == + /\ canBind(R, e) + /\ R' = (R \ resolve(R, e.path)) \union {e} + + +(* +mknode() adds a new empty entry where there was none before, iff +-the parent exists +-it meets the requirement for being "bindable" +*) + +mknodeSimple(R, path) == + LET record == [ path |-> path, data |-> <<>> ] + IN \/ exists(R, path) + \/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} )) + + +(* +For all parents, the mknodeSimpl() criteria must apply. +This could be defined recursively, though as TLA+ does not support recursion, +an alternative is required + + +Because this specification is declaring the final state of a operation, not +the implemental, all that is needed is to describe those parents. + +It declares that the mkdirSimple state applies to the path and all its parents in the set R' + +*) +mknodeWithParents(R, path) == + /\ \A p2 \in ancestors(R, path) : mknodeSimple(R, p2) + /\ mknodeSimple(R, path) + + +mknode(R, path, recursive) == + IF recursive THEN mknodeWithParents(R, path) ELSE mknodeSimple(R, path) + +(* +Deletion is set difference on any existing entries +*) + +simpleDelete(R, path) == + /\ ~isRootPath(path) + /\ children(R, path) = {} + /\ R' = R \ resolve(R, path) + +(* +Recursive delete: neither the path or its descendants exists in the new registry +*) + +recursiveDelete(R, path) == + \* Root path: the new registry is the initial registry again + /\ isRootPath(path) => R' = { [ path |-> <<>>, data |-> <<>> ] } + \* Any other entry: the new registry is a set with any existing + \* entry for that path is removed, and the new entry added + /\ ~isRootPath(path) => R' = R \ ( resolve(R, path) \union descendants(R, path)) + + +(* +Delete operation which chooses the recursiveness policy based on an argument +*) + +delete(R, path, recursive) == + IF recursive THEN recursiveDelete(R, path) ELSE simpleDelete(R, path) + + +(* +Purge ensures that all entries under a path with the matching ID and policy are not there +afterwards +*) + +purge(R, path, id, persistence) == + /\ (persistence \in PersistPolicySet) + /\ \A p2 \in pathAndDescendants(R, path) : + (p2.attributes["yarn:id"] = id /\ p2.attributes["yarn:persistence"] = persistence) + => recursiveDelete(R, p2.path) + +(* +resolveRecord() resolves the record at a path or fails. + +It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator +is guaranteed to return the single entry of that set, iff the choice predicate holds. + +Using a predicate of TRUE, it always succeeds, so this function selects +the sole entry of the resolve operation. +*) + +resolveRecord(R, path) == + LET l == resolve(R, path) IN + /\ Cardinality(l) = 1 + /\ CHOOSE e \in l : TRUE + +(* +The specific action of putting an entry into a record includes validating the record +*) + +validRecordToBind(path, record) == + \* The root entry must have permanent persistence + isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent" + \/ record.attributes["yarn:persistence"] = "") + + +(* +Binding a service record involves validating it then putting it in the registry +marshalled as the data in the entry + *) +bindRecord(R, path, record) == + /\ validRecordToBind(path, record) + /\ bind(R, [path |-> path, data |-> record]) + + +---------------------------------------------------------------------------------------- + + + +(* +The action queue can only contain one of the sets of action types, and +by giving each a unique name, those sets are guaranteed to be disjoint +*) + QueueInvariant == + /\ \A a \in actions: + \/ (a \in BindActions /\ a.type="bind") + \/ (a \in DeleteActions /\ a.type="delete") + \/ (a \in PurgeActions /\ a.type="purge") + \/ (a \in MknodeActions /\ a.type="mknode") + + +(* +Applying queued actions +*) + +applyAction(R, a) == + \/ (a \in BindActions /\ bindRecord(R, a.path, a.record) ) + \/ (a \in MknodeActions /\ mknode(R, a.path, a.recursive) ) + \/ (a \in DeleteActions /\ delete(R, a.path, a.recursive) ) + \/ (a \in PurgeActions /\ purge(R, a.path, a.id, a.persistence)) + + +(* +Apply the first action in a list and then update the actions +*) +applyFirstAction(R, a) == + /\ actions /= <<>> + /\ applyAction(R, Head(a)) + /\ actions' = Tail(a) + + +Next == applyFirstAction(registry, actions) + +(* +All submitted actions must eventually be applied. +*) + + +Liveness == <>( actions = <<>> ) + + +(* +The initial state of a registry has the root entry. +*) + +InitialRegistry == registry = { + [ path |-> <<>>, data |-> <<>> ] +} + + +(* +The valid state of the "registry" variable is defined as +Via the validRegistry predicate +*) + +ValidRegistryState == validRegistry(registry) + + + +(* +The initial state of the system +*) +InitialState == + /\ InitialRegistry + /\ ValidRegistryState + /\ actions = <<>> + + +(* +The registry has an initial state, the series of state changes driven by the actions, +and the requirement that it does act on those actions. +*) +RegistrySpec == + /\ InitialState + /\ [][Next]_vars + /\ Liveness + + +---------------------------------------------------------------------------------------- + +(* +Theorem: For all operations from that initial state, the registry state is still valid +*) +THEOREM InitialState => [] ValidRegistryState + +(* +Theorem: for all operations from that initial state, the type invariants hold +*) +THEOREM InitialState => [] TypeInvariant + +(* +Theorem: the queue invariants hold +*) +THEOREM InitialState => [] QueueInvariant + +============================================================================= http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java new file mode 100644 index 0000000..5b34f60 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry; + +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; + +/** + * Abstract registry tests .. inits the field {@link #registry} + * before the test with an instance of {@link RMRegistryOperationsService}; + * and {@link #operations} with the same instance cast purely + * to the type {@link RegistryOperations}. + * + */ +public class AbstractRegistryTest extends AbstractZKRegistryTest { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractRegistryTest.class); + protected RMRegistryOperationsService registry; + protected RegistryOperations operations; + + @Before + public void setupRegistry() throws IOException { + registry = new RMRegistryOperationsService("yarnRegistry"); + operations = registry; + registry.init(createRegistryConfiguration()); + registry.start(); + operations.delete("/", true); + registry.createRootRegistryPaths(); + addToTeardown(registry); + } + + /** + * Create a service entry with the sample endpoints, and put it + * at the destination + * @param path path + * @param createFlags flags + * @return the record + * @throws IOException on a failure + */ + protected ServiceRecord putExampleServiceEntry(String path, int createFlags) throws + IOException, + URISyntaxException { + return putExampleServiceEntry(path, createFlags, PersistencePolicies.PERMANENT); + } + + /** + * Create a service entry with the sample endpoints, and put it + * at the destination + * @param path path + * @param createFlags flags + * @return the record + * @throws IOException on a failure + */ + protected ServiceRecord putExampleServiceEntry(String path, + int createFlags, + String persistence) + throws IOException, URISyntaxException { + ServiceRecord record = buildExampleServiceEntry(persistence); + + registry.mknode(RegistryPathUtils.parentOf(path), true); + operations.bind(path, record, createFlags); + return record; + } + + /** + * Assert a path exists + * @param path path in the registry + * @throws IOException + */ + public void assertPathExists(String path) throws IOException { + operations.stat(path); + } + + /** + * assert that a path does not exist + * @param path path in the registry + * @throws IOException + */ + public void assertPathNotFound(String path) throws IOException { + try { + operations.stat(path); + fail("Path unexpectedly found: " + path); + } catch (PathNotFoundException e) { + + } + } + + /** + * Assert that a path resolves to a service record + * @param path path in the registry + * @throws IOException + */ + public void assertResolves(String path) throws IOException { + operations.resolve(path); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java new file mode 100644 index 0000000..bcff622 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.server.services.AddingCompositeService; +import org.apache.hadoop.registry.server.services.MicroZookeeperService; +import org.apache.hadoop.registry.server.services.MicroZookeeperServiceKeys; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class AbstractZKRegistryTest extends RegistryTestHelper { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractZKRegistryTest.class); + + private static final AddingCompositeService servicesToTeardown = + new AddingCompositeService("teardown"); + // static initializer guarantees it is always started + // ahead of any @BeforeClass methods + static { + servicesToTeardown.init(new Configuration()); + servicesToTeardown.start(); + } + + @Rule + public final Timeout testTimeout = new Timeout(10000); + + @Rule + public TestName methodName = new TestName(); + + protected static void addToTeardown(Service svc) { + servicesToTeardown.addService(svc); + } + + @AfterClass + public static void teardownServices() throws IOException { + describe(LOG, "teardown of static services"); + servicesToTeardown.close(); + } + + protected static MicroZookeeperService zookeeper; + + + @BeforeClass + public static void createZKServer() throws Exception { + File zkDir = new File("target/zookeeper"); + FileUtils.deleteDirectory(zkDir); + assertTrue(zkDir.mkdirs()); + zookeeper = new MicroZookeeperService("InMemoryZKService"); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(MicroZookeeperServiceKeys.KEY_ZKSERVICE_DIR, zkDir.getAbsolutePath()); + zookeeper.init(conf); + zookeeper.start(); + addToTeardown(zookeeper); + } + + /** + * give our thread a name + */ + @Before + public void nameThread() { + Thread.currentThread().setName("JUnit"); + } + + /** + * Returns the connection string to use + * + * @return connection string + */ + public String getConnectString() { + return zookeeper.getConnectionString(); + } + + public YarnConfiguration createRegistryConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, 1000); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_INTERVAL, 500); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_TIMES, 10); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_CEILING, 10); + conf.set(RegistryConstants.KEY_REGISTRY_ZK_QUORUM, + zookeeper.getConnectionString()); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d218ab58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java new file mode 100644 index 0000000..38cc2cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.registry.secure.AbstractSecureRegistryTest; +import org.apache.zookeeper.common.PathUtils; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint; +import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint; +import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint; +import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple; +import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint; + +/** + * This is a set of static methods to aid testing the registry operations. + * The methods can be imported statically âor the class used as a base + * class for tests. + */ +public class RegistryTestHelper extends Assert { + public static final String SC_HADOOP = "org-apache-hadoop"; + public static final String USER = "devteam/"; + public static final String NAME = "hdfs"; + public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs"; + public static final String API_HDFS = "org_apache_hadoop_namenode_dfs"; + public static final String USERPATH = RegistryConstants.PATH_USERS + USER; + public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/"; + public static final String ENTRY_PATH = PARENT_PATH + NAME; + public static final String NNIPC = "nnipc"; + public static final String IPC2 = "IPC2"; + private static final Logger LOG = + LoggerFactory.getLogger(RegistryTestHelper.class); + public static final String KTUTIL = "ktutil"; + private static final RegistryUtils.ServiceRecordMarshal recordMarshal = + new RegistryUtils.ServiceRecordMarshal(); + + /** + * Assert the path is valid by ZK rules + * @param path path to check + */ + public static void assertValidZKPath(String path) { + try { + PathUtils.validatePath(path); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid Path " + path + ": " + e, e); + } + } + + /** + * Assert that a string is not empty (null or "") + * @param message message to raise if the string is empty + * @param check string to check + */ + public static void assertNotEmpty(String message, String check) { + if (StringUtils.isEmpty(check)) { + fail(message); + } + } + + /** + * Assert that a string is empty (null or "") + * @param check string to check + */ + public static void assertNotEmpty(String check) { + if (StringUtils.isEmpty(check)) { + fail("Empty string"); + } + } + + /** + * Log the details of a login context + * @param name name to assert that the user is logged in as + * @param loginContext the login context + */ + public static void logLoginDetails(String name, + LoginContext loginContext) { + assertNotNull("Null login context", loginContext); + Subject subject = loginContext.getSubject(); + LOG.info("Logged in as {}:\n {}", name, subject); + } + + /** + * Set the JVM property to enable Kerberos debugging + */ + public static void enableKerberosDebugging() { + System.setProperty(AbstractSecureRegistryTest.SUN_SECURITY_KRB5_DEBUG, + "true"); + } + /** + * Set the JVM property to enable Kerberos debugging + */ + public static void disableKerberosDebugging() { + System.setProperty(AbstractSecureRegistryTest.SUN_SECURITY_KRB5_DEBUG, + "false"); + } + + /** + * General code to validate bits of a component/service entry built iwth + * {@link #addSampleEndpoints(ServiceRecord, String)} + * @param record instance to check + */ + public static void validateEntry(ServiceRecord record) { + assertNotNull("null service record", record); + List<Endpoint> endpoints = record.external; + assertEquals(2, endpoints.size()); + + Endpoint webhdfs = findEndpoint(record, API_WEBHDFS, true, 1, 1); + assertEquals(API_WEBHDFS, webhdfs.api); + assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType); + assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType); + List<List<String>> addressList = webhdfs.addresses; + List<String> url = addressList.get(0); + String addr = url.get(0); + assertTrue(addr.contains("http")); + assertTrue(addr.contains(":8020")); + + Endpoint nnipc = findEndpoint(record, NNIPC, false, 1,2); + assertEquals("wrong protocol in " + nnipc, ProtocolTypes.PROTOCOL_THRIFT, + nnipc.protocolType); + + Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2); + + Endpoint web = findEndpoint(record, "web", true, 1, 1); + assertEquals(1, web.addresses.size()); + assertEquals(1, web.addresses.get(0).size()); + } + + /** + * Assert that an endpoint matches the criteria + * @param endpoint endpoint to examine + * @param addressType expected address type + * @param protocolType expected protocol type + * @param api API + */ + public static void assertMatches(Endpoint endpoint, + String addressType, + String protocolType, + String api) { + assertNotNull(endpoint); + assertEquals(addressType, endpoint.addressType); + assertEquals(protocolType, endpoint.protocolType); + assertEquals(api, endpoint.api); + } + + /** + * Assert the records match. + * @param source record that was written + * @param resolved the one that resolved. + */ + public static void assertMatches(ServiceRecord source, ServiceRecord resolved) { + assertNotNull("Null source record ", source); + assertNotNull("Null resolved record ", resolved); + assertEquals(source.description, resolved.description); + + Map<String, String> srcAttrs = source.attributes(); + Map<String, String> resolvedAttrs = resolved.attributes(); + String sourceAsString = source.toString(); + String resolvedAsString = resolved.toString(); + assertEquals("Wrong count of attrs in \n" + sourceAsString + + "\nfrom\n" + resolvedAsString, + srcAttrs.size(), + resolvedAttrs.size()); + for (Map.Entry<String, String> entry : srcAttrs.entrySet()) { + String attr = entry.getKey(); + assertEquals("attribute "+ attr, entry.getValue(), resolved.get(attr)); + } + assertEquals("wrong external endpoint count", + source.external.size(), resolved.external.size()); + assertEquals("wrong external endpoint count", + source.internal.size(), resolved.internal.size()); + } + + /** + * Find an endpoint in a record or fail, + * @param record record + * @param api API + * @param external external? + * @param addressElements expected # of address elements? + * @param addressTupleSize expected size of a type + * @return the endpoint. + */ + public static Endpoint findEndpoint(ServiceRecord record, + String api, boolean external, int addressElements, int addressTupleSize) { + Endpoint epr = external ? record.getExternalEndpoint(api) + : record.getInternalEndpoint(api); + if (epr != null) { + assertEquals("wrong # of addresses", + addressElements, epr.addresses.size()); + assertEquals("wrong # of elements in an address tuple", + addressTupleSize, epr.addresses.get(0).size()); + return epr; + } + List<Endpoint> endpoints = external ? record.external : record.internal; + StringBuilder builder = new StringBuilder(); + for (Endpoint endpoint : endpoints) { + builder.append("\"").append(endpoint).append("\" "); + } + fail("Did not find " + api + " in endpoints " + builder); + // never reached; here to keep the compiler happy + return null; + } + + /** + * Log a record + * @param name record name + * @param record details + * @throws IOException only if something bizarre goes wrong marshalling + * a record. + */ + public static void logRecord(String name, ServiceRecord record) throws + IOException { + LOG.info(" {} = \n{}\n", name, recordMarshal.toJson(record)); + } + + /** + * Create a service entry with the sample endpoints + * @param persistence persistence policy + * @return the record + * @throws IOException on a failure + */ + public static ServiceRecord buildExampleServiceEntry(String persistence) throws + IOException, + URISyntaxException { + ServiceRecord record = new ServiceRecord(); + record.set(YarnRegistryAttributes.YARN_ID, "example-0001"); + record.set(YarnRegistryAttributes.YARN_PERSISTENCE, persistence); + addSampleEndpoints(record, "namenode"); + return record; + } + + /** + * Add some endpoints + * @param entry entry + */ + public static void addSampleEndpoints(ServiceRecord entry, String hostname) + throws URISyntaxException { + assertNotNull(hostname); + entry.addExternalEndpoint(webEndpoint("web", + new URI("http", hostname + ":80", "/"))); + entry.addExternalEndpoint( + restEndpoint(API_WEBHDFS, + new URI("http", hostname + ":8020", "/"))); + + Endpoint endpoint = ipcEndpoint(API_HDFS, true, null); + endpoint.addresses.add(tuple(hostname, "8030")); + entry.addInternalEndpoint(endpoint); + InetSocketAddress localhost = new InetSocketAddress("localhost", 8050); + entry.addInternalEndpoint( + inetAddrEndpoint(NNIPC, ProtocolTypes.PROTOCOL_THRIFT, "localhost", + 8050)); + entry.addInternalEndpoint( + RegistryTypeUtils.ipcEndpoint( + IPC2, + true, + RegistryTypeUtils.marshall(localhost))); + } + + /** + * Describe the stage in the process with a box around it -so as + * to highlight it in test logs + * @param log log to use + * @param text text + * @param args logger args + */ + public static void describe(Logger log, String text, Object...args) { + log.info("\n======================================="); + log.info(text, args); + log.info("=======================================\n"); + } + + /** + * log out from a context if non-null ... exceptions are caught and logged + * @param login login context + * @return null, always + */ + public static LoginContext logout(LoginContext login) { + try { + if (login != null) { + LOG.debug("Logging out login context {}", login.toString()); + login.logout(); + } + } catch (LoginException e) { + LOG.warn("Exception logging out: {}", e, e); + } + return null; + } + + /** + * Exec the native <code>ktutil</code> to list the keys + * (primarily to verify that the generated keytabs are compatible). + * This operation is not executed on windows. On other platforms + * it requires <code>ktutil</code> to be installed and on the path + * <pre> + * ktutil --keytab=target/kdc/zookeeper.keytab list --keys + * </pre> + * @param keytab keytab to list + * @throws IOException on any execution problem, including the executable + * being missing + */ + public static String ktList(File keytab) throws IOException { + if (!Shell.WINDOWS) { + String path = keytab.getAbsolutePath(); + String out = Shell.execCommand( + KTUTIL, + "--keytab=" + path, + "list", + "--keys" + ); + LOG.info("Listing of keytab {}:\n{}\n", path, out); + return out; + } + return ""; + } + + /** + * Perform a robust <code>ktutils -l</code> ... catches and ignores + * exceptions, otherwise the output is logged. + * @param keytab keytab to list + * @return the result of the operation, or "" on any problem + */ + public static String ktListRobust(File keytab) { + try { + return ktList(keytab); + } catch (IOException e) { + // probably not on the path + return ""; + } + } + + /** + * Login via a UGI. Requres UGI to have been set up + * @param user username + * @param keytab keytab to list + * @return the UGI + * @throws IOException + */ + public static UserGroupInformation loginUGI(String user, File keytab) throws + IOException { + LOG.info("Logging in as {} from {}", user, keytab); + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, + keytab.getAbsolutePath()); + } + + public static ServiceRecord createRecord(String persistence) { + return createRecord("01", persistence, "description"); + } + + public static ServiceRecord createRecord(String id, String persistence, + String description) { + ServiceRecord serviceRecord = new ServiceRecord(); + serviceRecord.set(YarnRegistryAttributes.YARN_ID, id); + serviceRecord.description = description; + serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE, persistence); + return serviceRecord; + } + + public static ServiceRecord createRecord(String id, String persistence, + String description, String data) { + return createRecord(id, persistence, description); + } +}