http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31e05853/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig new file mode 100644 index 0000000..50a7097 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig @@ -0,0 +1,2913 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.slider.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.security.alias.CredentialShell; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import static org.apache.hadoop.registry.client.binding.RegistryUtils.*; + +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.InternalKeys; +import org.apache.slider.api.OptionKeys; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.api.SliderClusterProtocol; +import org.apache.slider.api.proto.Messages; +import org.apache.slider.common.Constants; +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.params.AbstractActionArgs; +import org.apache.slider.common.params.AbstractClusterBuildingActionArgs; +import org.apache.slider.common.params.ActionDiagnosticArgs; +import org.apache.slider.common.params.ActionInstallPackageArgs; +import org.apache.slider.common.params.ActionAMSuicideArgs; +import org.apache.slider.common.params.ActionCreateArgs; +import org.apache.slider.common.params.ActionEchoArgs; +import org.apache.slider.common.params.ActionFlexArgs; +import org.apache.slider.common.params.ActionFreezeArgs; +import org.apache.slider.common.params.ActionKillContainerArgs; +import org.apache.slider.common.params.ActionRegistryArgs; +import org.apache.slider.common.params.ActionResolveArgs; +import org.apache.slider.common.params.ActionStatusArgs; +import org.apache.slider.common.params.ActionThawArgs; +import org.apache.slider.common.params.Arguments; +import org.apache.slider.common.params.ClientArgs; +import org.apache.slider.common.params.LaunchArgsAccessor; +import org.apache.slider.common.tools.ConfigHelper; +import org.apache.slider.common.tools.Duration; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.common.tools.SliderVersionInfo; +import org.apache.slider.core.build.InstanceBuilder; +import org.apache.slider.core.build.InstanceIO; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTree; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.conf.ResourcesInputPropertiesValidator; +import org.apache.slider.core.conf.TemplateInputPropertiesValidator; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.ErrorStrings; +import org.apache.slider.core.exceptions.NoSuchNodeException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; +import org.apache.slider.core.exceptions.WaitTimeoutException; +import org.apache.slider.core.launch.AppMasterLauncher; +import org.apache.slider.core.launch.ClasspathConstructor; +import org.apache.slider.core.launch.CommandLineBuilder; +import org.apache.slider.core.launch.JavaCommandLineBuilder; +import org.apache.slider.core.launch.LaunchedApplication; +import org.apache.slider.core.launch.RunningApplication; +import org.apache.slider.core.main.RunService; +import org.apache.slider.core.persist.ConfPersister; +import org.apache.slider.core.persist.LockAcquireFailedException; +import org.apache.slider.core.registry.SliderRegistryUtils; +import org.apache.slider.core.registry.YarnAppListClient; +import org.apache.slider.core.registry.docstore.ConfigFormat; +import org.apache.slider.core.registry.docstore.PublishedConfigSet; +import org.apache.slider.core.registry.docstore.PublishedConfiguration; +import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; +import org.apache.slider.core.registry.retrieve.RegistryRetriever; +import org.apache.slider.core.zk.BlockingZKWatcher; +import org.apache.slider.core.zk.ZKIntegration; +import org.apache.slider.core.zk.ZKPathBuilder; +import org.apache.slider.providers.AbstractClientProvider; +import org.apache.slider.providers.SliderProviderFactory; +import org.apache.slider.providers.agent.AgentKeys; +import org.apache.slider.providers.slideram.SliderAMClientProvider; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.rpc.RpcBinder; +import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.slider.common.params.SliderActions.*; + +/** + * Client service for Slider + */ + +public class SliderClient extends AbstractSliderLaunchedService implements RunService, + SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI { + private static final Logger log = LoggerFactory.getLogger(SliderClient.class); + + private ClientArgs serviceArgs; + public ApplicationId applicationId; + + private String deployedClusterName; + /** + * Cluster opaerations against the deployed cluster -will be null + * if no bonding has yet taken place + */ + private SliderClusterOperations sliderClusterOperations; + + private SliderFileSystem sliderFileSystem; + + /** + * Yarn client service + */ + private SliderYarnClientImpl yarnClient; + private YarnAppListClient YarnAppListClient; + private AggregateConf launchedInstanceDefinition; +// private SliderRegistryService registry; + + + /** + * The YARN registry service + */ + private RegistryOperations registryOperations; + + /** + * Constructor + */ + public SliderClient() { + super("Slider Client"); + new HdfsConfiguration(); + new YarnConfiguration(); + } + + /** + * This is called <i>Before serviceInit is called</i> + * @param config the initial configuration build up by the + * service launcher. + * @param args argument list list of arguments passed to the command line + * after any launcher-specific commands have been stripped. + * @return the post-binding configuration to pass to the <code>init()</code> + * operation. + * @throws Exception + */ + @Override + public Configuration bindArgs(Configuration config, String... args) throws Exception { + config = super.bindArgs(config, args); + serviceArgs = new ClientArgs(args); + serviceArgs.parse(); + // yarn-ify + YarnConfiguration yarnConfiguration = new YarnConfiguration(config); + return SliderUtils.patchConfiguration(yarnConfiguration); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + Configuration clientConf = SliderUtils.loadClientConfigurationResource(); + ConfigHelper.mergeConfigurations(conf, clientConf, CLIENT_RESOURCE); + serviceArgs.applyDefinitions(conf); + serviceArgs.applyFileSystemBinding(conf); + // init security with our conf + if (SliderUtils.isHadoopClusterSecure(conf)) { + SliderUtils.forceLogin(); + SliderUtils.initProcessSecurity(conf); + } + AbstractActionArgs coreAction = serviceArgs.getCoreAction(); + if (coreAction.getHadoopServicesRequired()) { + initHadoopBinding(); + } + super.serviceInit(conf); + } + + /** + * this is where the work is done. + * @return the exit code + * @throws Throwable anything that went wrong + */ +/* JDK7 + + @Override + public int runService() throws Throwable { + + // choose the action + String action = serviceArgs.getAction(); + int exitCode = EXIT_SUCCESS; + String clusterName = serviceArgs.getClusterName(); + // actions + switch (action) { + case ACTION_BUILD: + exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); + break; + case ACTION_UPDATE: + exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs()); + break; + case ACTION_CREATE: + exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); + break; + case ACTION_FREEZE: + exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs()); + break; + case ACTION_THAW: + exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); + break; + case ACTION_DESTROY: + exitCode = actionDestroy(clusterName); + break; + case ACTION_EXISTS: + exitCode = actionExists(clusterName, + serviceArgs.getActionExistsArgs().live); + break; + case ACTION_FLEX: + exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); + break; + case ACTION_GETCONF: + exitCode = + actionGetConf(clusterName, serviceArgs.getActionGetConfArgs()); + break; + case ACTION_HELP: + case ACTION_USAGE: + log.info(serviceArgs.usage()); + break; + case ACTION_KILL_CONTAINER: + exitCode = actionKillContainer(clusterName, + serviceArgs.getActionKillContainerArgs()); + break; + case ACTION_AM_SUICIDE: + exitCode = actionAmSuicide(clusterName, + serviceArgs.getActionAMSuicideArgs()); + break; + case ACTION_LIST: + exitCode = actionList(clusterName); + break; + case ACTION_REGISTRY: + exitCode = actionRegistry( + serviceArgs.getActionRegistryArgs()); + break; + case ACTION_STATUS: + exitCode = actionStatus(clusterName, + serviceArgs.getActionStatusArgs()); + break; + case ACTION_VERSION: + exitCode = actionVersion(); + break; + default: + throw new SliderException(EXIT_UNIMPLEMENTED, + "Unimplemented: " + action); + } + + return exitCode; + } + +*/ + @Override + public int runService() throws Throwable { + + // choose the action + String action = serviceArgs.getAction(); + + int exitCode = EXIT_SUCCESS; + String clusterName = serviceArgs.getClusterName(); + // actions + if (ACTION_INSTALL_PACKAGE.equals(action)) { + exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs()); + } else if (ACTION_BUILD.equals(action)) { + exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); + } else if (ACTION_CREATE.equals(action)) { + exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); + } else if (ACTION_FREEZE.equals(action)) { + exitCode = actionFreeze(clusterName, + serviceArgs.getActionFreezeArgs()); + } else if (ACTION_THAW.equals(action)) { + exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); + } else if (ACTION_DESTROY.equals(action)) { + exitCode = actionDestroy(clusterName); + } else if (ACTION_DIAGNOSTIC.equals(action)) { + exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs()); + } else if (ACTION_EXISTS.equals(action)) { + exitCode = actionExists(clusterName, + serviceArgs.getActionExistsArgs().live); + } else if (ACTION_FLEX.equals(action)) { + exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); + } else if (ACTION_HELP.equals(action) || + ACTION_USAGE.equals(action)) { + log.info(serviceArgs.usage()); + } else if (ACTION_KILL_CONTAINER.equals(action)) { + exitCode = actionKillContainer(clusterName, + serviceArgs.getActionKillContainerArgs()); + } else if (ACTION_AM_SUICIDE.equals(action)) { + exitCode = actionAmSuicide(clusterName, + serviceArgs.getActionAMSuicideArgs()); + } else if (ACTION_LIST.equals(action)) { + exitCode = actionList(clusterName); + } else if (ACTION_REGISTRY.equals(action)) { + exitCode = actionRegistry(serviceArgs.getActionRegistryArgs()); + } else if (ACTION_RESOLVE.equals(action)) { + exitCode = actionResolve(serviceArgs.getActionResolveArgs()); + } else if (ACTION_STATUS.equals(action)) { + exitCode = actionStatus(clusterName, + serviceArgs.getActionStatusArgs()); + } else if (ACTION_UPDATE.equals(action)) { + exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs()); + } else if (ACTION_VERSION.equals(action)) { + exitCode = actionVersion(); + } else { + throw new SliderException(EXIT_UNIMPLEMENTED, + "Unimplemented: " + action); + } + + return exitCode; + } + +/** + * Perform everything needed to init the hadoop binding. + * This assumes that the service is already in inited or started state + * @throws IOException + * @throws SliderException + */ + protected void initHadoopBinding() throws IOException, SliderException { + // validate the client + SliderUtils.validateSliderClientEnvironment(null); + //create the YARN client + yarnClient = new SliderYarnClientImpl(); + yarnClient.init(getConfig()); + if (getServiceState() == STATE.STARTED) { + yarnClient.start(); + } + addService(yarnClient); + YarnAppListClient = + new YarnAppListClient(yarnClient, getUsername(), getConfig()); + // create the filesystem + sliderFileSystem = new SliderFileSystem(getConfig()); + } + + /** + * Delete the zookeeper node associated with the calling user and the cluster + * TODO: YARN registry operations + **/ + @Deprecated + @VisibleForTesting + public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException { + String user = getUsername(); + String zkPath = ZKIntegration.mkClusterPath(user, clusterName); + Exception e = null; + try { + Configuration config = getConfig(); + if (!SliderUtils.isHadoopClusterSecure(config)) { + ZKIntegration client = getZkClient(clusterName, user); + if (client != null) { + if (client.exists(zkPath)) { + log.info("Deleting zookeeper path {}", zkPath); + } + client.deleteRecursive(zkPath); + return true; + } + } else { + log.warn("Default zookeeper node is not available for secure cluster"); + } + } catch (InterruptedException ignored) { + e = ignored; + } catch (KeeperException ignored) { + e = ignored; + } catch (BadConfigException ignored) { + e = ignored; + } + if (e != null) { + log.warn("Unable to recursively delete zk node {}", zkPath); + log.debug("Reason: ", e); + } + + return false; + } + + /** + * Create the zookeeper node associated with the calling user and the cluster + */ + @VisibleForTesting + @Deprecated + public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException { + String user = getUsername(); + String zkPath = ZKIntegration.mkClusterPath(user, clusterName); + if(nameOnly) { + return zkPath; + } + Configuration config = getConfig(); + if (!SliderUtils.isHadoopClusterSecure(config)) { + ZKIntegration client = getZkClient(clusterName, user); + if (client != null) { + try { + client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + return zkPath; + + //JDK7 +// } catch (InterruptedException | KeeperException e) { + } catch (InterruptedException e) { + log.warn("Unable to create zk node {}", zkPath, e); + } catch ( KeeperException e) { + log.warn("Unable to create zk node {}", zkPath, e); + } + } + } + + return null; + } + + /** + * Gets a zookeeper client, returns null if it cannot connect to zookeeper + **/ + protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException { + String registryQuorum = lookupZKQuorum(); + ZKIntegration client = null; + try { + BlockingZKWatcher watcher = new BlockingZKWatcher(); + client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher); + client.init(); + watcher.waitForZKConnection(2 * 1000); + } catch (InterruptedException e) { + client = null; + log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); + } catch (IOException e) { + log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); + } + return client; + } + + @Override + public int actionDestroy(String clustername) throws YarnException, + IOException { + // verify that a live cluster isn't there + SliderUtils.validateClusterName(clustername); + //no=op, it is now mandatory. + verifyBindingsDefined(); + verifyNoLiveClusters(clustername); + + // create the directory path + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + // delete the directory; + boolean exists = sliderFileSystem.getFileSystem().exists(clusterDirectory); + if (exists) { + log.info("Application Instance {} found at {}: destroying", clustername, clusterDirectory); + } else { + log.info("Application Instance {} already destroyed", clustername); + } + boolean deleted = + sliderFileSystem.getFileSystem().delete(clusterDirectory, true); + if (!deleted) { + log.warn("Filesystem returned false from delete() operation"); + } + + // rm the registry entry âdo not let this block the destroy operations + String registryPath = SliderRegistryUtils.registryPathForInstance( + clustername); + try { + getRegistryOperations().delete(registryPath, true); + } catch (IOException e) { + log.warn("Error deleting {}: {} ", registryPath, e, e); + } catch (SliderException e) { + log.warn("Error binding to registry {} ", e, e); + } + + List<ApplicationReport> instances = findAllLiveInstances(clustername); + // detect any race leading to cluster creation during the check/destroy process + // and report a problem. + if (!instances.isEmpty()) { + throw new SliderException(EXIT_APPLICATION_IN_USE, + clustername + ": " + + E_DESTROY_CREATE_RACE_CONDITION + + " :" + + instances.get(0)); + } + log.info("Destroyed cluster {}", clustername); + return EXIT_SUCCESS; + } + + @Override + public int actionAmSuicide(String clustername, + ActionAMSuicideArgs args) throws YarnException, IOException { + SliderClusterOperations clusterOperations = + createClusterOperations(clustername); + clusterOperations.amSuicide(args.message, args.exitcode, args.waittime); + return EXIT_SUCCESS; + } + + @Override + public AbstractClientProvider createClientProvider(String provider) + throws SliderException { + SliderProviderFactory factory = + SliderProviderFactory.createSliderProviderFactory(provider); + return factory.createClientProvider(); + } + + /** + * Create the cluster -saving the arguments to a specification file first + * @param clustername cluster name + * @return the status code + * @throws YarnException Yarn problems + * @throws IOException other problems + * @throws BadCommandArgumentsException bad arguments. + */ + public int actionCreate(String clustername, ActionCreateArgs createArgs) throws + YarnException, + IOException { + + actionBuild(clustername, createArgs); + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + clustername, clusterDirectory); + try { + checkForCredentials(getConfig(), instanceDefinition.getAppConf()); + } catch (Exception e) { + throw new IOException("problem setting credentials", e); + } + return startCluster(clustername, createArgs); + } + + private void checkForCredentials(Configuration conf, + ConfTree tree) throws Exception { + if (tree.credentials == null || tree.credentials.size()==0) { + log.info("No credentials requested"); + return; + } + CredentialShell credentialShell = null; + for (Entry<String, List<String>> cred : tree.credentials.entrySet()) { + String provider = cred.getKey(); + List<String> aliases = cred.getValue(); + if (aliases == null || aliases.size()==0) { + continue; + } + if (credentialShell == null) { + credentialShell = new CredentialShell(); + credentialShell.setConf(conf); + } + Configuration c = new Configuration(conf); + c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); + CredentialProvider credentialProvider = + CredentialProviderFactory.getProviders(c).get(0); + Set<String> existingAliases = new HashSet<String>(credentialProvider.getAliases()); + for (String alias : aliases) { + if (existingAliases.contains(alias.toLowerCase())) { + log.warn("Skipping creation of credentials for {}, " + + "alias already exists in {}", alias, provider); + continue; + } + String[] csarg = new String[]{ + "create", alias, "-provider", provider}; + log.info("Creating credentials for {} in {}", alias, provider); + credentialShell.run(csarg); + } + } + } + + @Override + public int actionBuild(String clustername, + AbstractClusterBuildingActionArgs buildInfo) throws + YarnException, + IOException { + + buildInstanceDefinition(clustername, buildInfo, false, false); + return EXIT_SUCCESS; + } + + @Override + public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws + YarnException, + IOException { + + Path srcFile = null; + if (StringUtils.isEmpty(installPkgInfo.name )) { + throw new BadCommandArgumentsException("A valid application type name is required (e.g. HBASE)."); + } + + if (StringUtils.isEmpty(installPkgInfo.packageURI)) { + throw new BadCommandArgumentsException("A valid application package location required."); + } else { + File pkgFile = new File(installPkgInfo.packageURI); + if (!pkgFile.exists() || pkgFile.isDirectory()) { + throw new BadCommandArgumentsException("Unable to access supplied pkg file at " + + pkgFile.getAbsolutePath()); + } else { + srcFile = new Path(pkgFile.toURI()); + } + } + + Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name); + sliderFileSystem.getFileSystem().mkdirs(pkgPath); + + Path fileInFs = new Path(pkgPath, srcFile.getName()); + log.info("Installing package {} at {} and overwrite is {}.", srcFile, fileInFs, installPkgInfo.replacePkg); + if (sliderFileSystem.getFileSystem().exists(fileInFs) && !installPkgInfo.replacePkg) { + throw new BadCommandArgumentsException("Pkg exists at " + + fileInFs.toUri().toString() + + ". Use --replacepkg to overwrite."); + } + + sliderFileSystem.getFileSystem().copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs); + return EXIT_SUCCESS; + } + + @Override + public int actionUpdate(String clustername, + AbstractClusterBuildingActionArgs buildInfo) throws + YarnException, IOException { + buildInstanceDefinition(clustername, buildInfo, true, true); + return EXIT_SUCCESS; + } + + /** + * Build up the AggregateConfiguration for an application instance then + * persists it + * @param clustername name of the cluster + * @param buildInfo the arguments needed to build the cluster + * @param overwrite true if existing cluster directory can be overwritten + * @param liveClusterAllowed true if live cluster can be modified + * @throws YarnException + * @throws IOException + */ + + public void buildInstanceDefinition(String clustername, + AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, boolean liveClusterAllowed) + throws YarnException, IOException { + // verify that a live cluster isn't there + SliderUtils.validateClusterName(clustername); + verifyBindingsDefined(); + if (!liveClusterAllowed) { + verifyNoLiveClusters(clustername); + } + + Configuration conf = getConfig(); + String registryQuorum = lookupZKQuorum(); + + Path appconfdir = buildInfo.getConfdir(); + // Provider + String providerName = buildInfo.getProvider(); + requireArgumentSet(Arguments.ARG_PROVIDER, providerName); + log.debug("Provider is {}", providerName); + SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf); + AbstractClientProvider provider = + createClientProvider(providerName); + InstanceBuilder builder = + new InstanceBuilder(sliderFileSystem, + getConfig(), + clustername); + + AggregateConf instanceDefinition = new AggregateConf(); + ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); + ConfTreeOperations resources = instanceDefinition.getResourceOperations(); + ConfTreeOperations internal = instanceDefinition.getInternalOperations(); + //initial definition is set by the providers + sliderAM.prepareInstanceConfiguration(instanceDefinition); + provider.prepareInstanceConfiguration(instanceDefinition); + + //load in any specified on the command line + if (buildInfo.resources != null) { + try { + resources.mergeFile(buildInfo.resources, + new ResourcesInputPropertiesValidator()); + + } catch (IOException e) { + throw new BadConfigException(e, + "incorrect argument to %s: \"%s\" : %s ", + Arguments.ARG_RESOURCES, + buildInfo.resources, + e.toString()); + } + } + if (buildInfo.template != null) { + try { + appConf.mergeFile(buildInfo.template, + new TemplateInputPropertiesValidator()); + } catch (IOException e) { + throw new BadConfigException(e, + "incorrect argument to %s: \"%s\" : %s ", + Arguments.ARG_TEMPLATE, + buildInfo.template, + e.toString()); + } + } + + //get the command line options + ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree(); + ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree(); + + appConf.merge(cmdLineAppOptions); + + // put the role counts into the resources file + Map<String, String> argsRoleMap = buildInfo.getComponentMap(); + for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) { + String count = roleEntry.getValue(); + String key = roleEntry.getKey(); + log.debug("{} => {}", key, count); + resources.getOrAddComponent(key) + .put(ResourceKeys.COMPONENT_INSTANCES, count); + } + + //all CLI role options + Map<String, Map<String, String>> appOptionMap = + buildInfo.getCompOptionMap(); + appConf.mergeComponents(appOptionMap); + + //internal picks up core. values only + internal.propagateGlobalKeys(appConf, "slider."); + internal.propagateGlobalKeys(appConf, "internal."); + + //copy over role. and yarn. values ONLY to the resources + if (PROPAGATE_RESOURCE_OPTION) { + resources.propagateGlobalKeys(appConf, "component."); + resources.propagateGlobalKeys(appConf, "role."); + resources.propagateGlobalKeys(appConf, "yarn."); + resources.mergeComponentsPrefix(appOptionMap, "component.", true); + resources.mergeComponentsPrefix(appOptionMap, "yarn.", true); + resources.mergeComponentsPrefix(appOptionMap, "role.", true); + } + + // resource component args + appConf.merge(cmdLineResourceOptions); + resources.merge(cmdLineResourceOptions); + resources.mergeComponents(buildInfo.getResourceCompOptionMap()); + + builder.init(providerName, instanceDefinition); + builder.propagateFilename(); + builder.propagatePrincipals(); + builder.setImageDetailsIfAvailable(buildInfo.getImage(), + buildInfo.getAppHomeDir()); + builder.setQueue(buildInfo.queue); + + String quorum = buildInfo.getZKhosts(); + if (SliderUtils.isUnset(quorum)) { + quorum = registryQuorum; + } + if (isUnset(quorum)) { + throw new BadConfigException("No Zookeeper quorum defined"); + } + ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(), + getUsername(), + clustername, + registryQuorum, + quorum); + String zookeeperRoot = buildInfo.getAppZKPath(); + + if (isSet(zookeeperRoot)) { + zkPaths.setAppPath(zookeeperRoot); + } else { + String createDefaultZkNode = appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false"); + if (createDefaultZkNode.equals("true")) { + String defaultZKPath = createZookeeperNode(clustername, false); + log.info("ZK node created for application instance: {}.", defaultZKPath); + if (defaultZKPath != null) { + zkPaths.setAppPath(defaultZKPath); + } + } else { + // create AppPath if default is being used + String defaultZKPath = createZookeeperNode(clustername, true); + log.info("ZK node assigned to application instance: {}.", defaultZKPath); + zkPaths.setAppPath(defaultZKPath); + } + } + + builder.addZKBinding(zkPaths); + + //then propagate any package URI + if (buildInfo.packageURI != null) { + appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI); + } + + // make any substitutions needed at this stage + replaceTokens(appConf.getConfTree(), getUsername(), clustername); + + // providers to validate what there is + AggregateConf instanceDescription = builder.getInstanceDescription(); + validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem); + validateInstanceDefinition(provider, instanceDescription, sliderFileSystem); + try { + persistInstanceDefinition(overwrite, appconfdir, builder); + } catch (LockAcquireFailedException e) { + log.warn("Failed to get a Lock on {} : {}", builder, e); + throw new BadClusterStateException("Failed to save " + clustername + + ": " + e); + } + } + + protected void persistInstanceDefinition(boolean overwrite, + Path appconfdir, + InstanceBuilder builder) + throws IOException, SliderException, LockAcquireFailedException { + builder.persist(appconfdir, overwrite); + } + + @VisibleForTesting + public static void replaceTokens(ConfTree conf, + String userName, String clusterName) throws IOException { + Map<String,String> newglobal = new HashMap<String,String>(); + for (Entry<String,String> entry : conf.global.entrySet()) { + newglobal.put(entry.getKey(), replaceTokens(entry.getValue(), + userName, clusterName)); + } + conf.global.putAll(newglobal); + + Map<String,List<String>> newcred = new HashMap<String,List<String>>(); + for (Entry<String,List<String>> entry : conf.credentials.entrySet()) { + List<String> resultList = new ArrayList<String>(); + for (String v : entry.getValue()) { + resultList.add(replaceTokens(v, userName, clusterName)); + } + newcred.put(replaceTokens(entry.getKey(), userName, clusterName), + resultList); + } + conf.credentials.clear(); + conf.credentials.putAll(newcred); + } + + private static String replaceTokens(String s, String userName, + String clusterName) throws IOException { + return s.replaceAll(Pattern.quote("${USER}"), userName) + .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName); + } + + public FsPermission getClusterDirectoryPermissions(Configuration conf) { + String clusterDirPermsOct = + conf.get(CLUSTER_DIRECTORY_PERMISSIONS, + DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS); + return new FsPermission(clusterDirPermsOct); + } + + /** + * Verify that the Resource MAnager is configured, if not fail + * with a useful error message + * @throws BadCommandArgumentsException the exception raised on an invalid config + */ + public void verifyBindingsDefined() throws BadCommandArgumentsException { + InetSocketAddress rmAddr = SliderUtils.getRmAddress(getConfig()); + if (!SliderUtils.isAddressDefined(rmAddr)) { + throw new BadCommandArgumentsException( + "No valid Resource Manager address provided in the argument " + + Arguments.ARG_MANAGER + + " or the configuration property " + + YarnConfiguration.RM_ADDRESS + + " value :" + rmAddr); + } + + } + + /** + * Load and start a cluster specification. + * This assumes that all validation of args and cluster state + * have already taken place + * + * @param clustername name of the cluster. + * @param launchArgs launch arguments + * @return the exit code + * @throws YarnException + * @throws IOException + */ + private int startCluster(String clustername, + LaunchArgsAccessor launchArgs) throws + YarnException, + IOException { + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + clustername, + clusterDirectory); + + LaunchedApplication launchedApplication = + launchApplication(clustername, clusterDirectory, instanceDefinition, + serviceArgs.isDebug()); + applicationId = launchedApplication.getApplicationId(); + + return waitForAppAccepted(launchedApplication, launchArgs.getWaittime()); + } + + /** + * Load the instance definition. It is not resolved at this point + * @param name cluster name + * @param clusterDirectory cluster dir + * @return the loaded configuration + * @throws IOException + * @throws SliderException + * @throws UnknownApplicationInstanceException if the file is not found + */ + public AggregateConf loadInstanceDefinitionUnresolved(String name, + Path clusterDirectory) throws + IOException, + SliderException { + + try { + AggregateConf definition = + InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem, + clusterDirectory); + return definition; + } catch (FileNotFoundException e) { + throw UnknownApplicationInstanceException.unknownInstance(name, e); + } + } + /** + * Load the instance definition. + * @param name cluster name + * @param resolved flag to indicate the cluster should be resolved + * @return the loaded configuration + * @throws IOException IO problems + * @throws SliderException slider explicit issues + * @throws UnknownApplicationInstanceException if the file is not found + */ + public AggregateConf loadInstanceDefinition(String name, + boolean resolved) throws + IOException, + SliderException { + + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + name, + clusterDirectory); + if (resolved) { + instanceDefinition.resolve(); + } + return instanceDefinition; + + } + + /** + * + * @param clustername name of the cluster + * @param clusterDirectory cluster dir + * @param instanceDefinition the instance definition + * @param debugAM enable debug AM options + * @return the launched application + * @throws YarnException + * @throws IOException + */ + public LaunchedApplication launchApplication(String clustername, + Path clusterDirectory, + AggregateConf instanceDefinition, + boolean debugAM) + throws YarnException, IOException { + + + deployedClusterName = clustername; + SliderUtils.validateClusterName(clustername); + verifyNoLiveClusters(clustername); + Configuration config = getConfig(); + lookupZKQuorum(); + boolean clusterSecure = SliderUtils.isHadoopClusterSecure(config); + //create the Slider AM provider -this helps set up the AM + SliderAMClientProvider sliderAM = new SliderAMClientProvider(config); + + instanceDefinition.resolve(); + launchedInstanceDefinition = instanceDefinition; + + ConfTreeOperations internalOperations = + instanceDefinition.getInternalOperations(); + MapOperations internalOptions = internalOperations.getGlobalOptions(); + ConfTreeOperations resourceOperations = + instanceDefinition.getResourceOperations(); + ConfTreeOperations appOperations = + instanceDefinition.getAppConfOperations(); + Path generatedConfDirPath = + createPathThatMustExist(internalOptions.getMandatoryOption( + InternalKeys.INTERNAL_GENERATED_CONF_PATH)); + Path snapshotConfPath = + createPathThatMustExist(internalOptions.getMandatoryOption( + InternalKeys.INTERNAL_SNAPSHOT_CONF_PATH)); + + + // cluster Provider + AbstractClientProvider provider = createClientProvider( + internalOptions.getMandatoryOption( + InternalKeys.INTERNAL_PROVIDER_NAME)); + // make sure the conf dir is valid; + + if (log.isDebugEnabled()) { + log.debug(instanceDefinition.toString()); + } + MapOperations sliderAMResourceComponent = + resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM); + MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions(); + + // add the tags if available + Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem, + appOperations.getGlobalOptions().get(AgentKeys.APP_DEF)); + AppMasterLauncher amLauncher = new AppMasterLauncher(clustername, + SliderKeys.APP_TYPE, + config, + sliderFileSystem, + yarnClient, + clusterSecure, + sliderAMResourceComponent, + resourceGlobalOptions, + applicationTags); + + ApplicationId appId = amLauncher.getApplicationId(); + // set the application name; + amLauncher.setKeepContainersOverRestarts(true); + + int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0); + amLauncher.setMaxAppAttempts(maxAppAttempts); + + sliderFileSystem.purgeAppInstanceTempFiles(clustername); + Path tempPath = sliderFileSystem.createAppInstanceTempPath( + clustername, + appId.toString() + "/am"); + String libdir = "lib"; + Path libPath = new Path(tempPath, libdir); + sliderFileSystem.getFileSystem().mkdirs(libPath); + log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem.toString(), + tempPath, libPath); + // set local resources for the application master + // local files or archives as needed + // In this scenario, the jar file for the application master is part of the local resources + Map<String, LocalResource> localResources = amLauncher.getLocalResources(); + // conf directory setup + Path remoteConfPath = null; + String relativeConfDir = null; + String confdirProp = + System.getProperty(SliderKeys.PROPERTY_CONF_DIR); + if (confdirProp == null || confdirProp.isEmpty()) { + log.debug("No local configuration directory provided as system property"); + } else { + File confDir = new File(confdirProp); + if (!confDir.exists()) { + throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND, + confDir); + } + Path localConfDirPath = SliderUtils.createLocalPath(confDir); + log.debug("Copying AM configuration data from {}", localConfDirPath); + remoteConfPath = new Path(clusterDirectory, + SliderKeys.SUBMITTED_CONF_DIR); + SliderUtils.copyDirectory(config, localConfDirPath, remoteConfPath, + null); + } + // the assumption here is that minimr cluster => this is a test run + // and the classpath can look after itself + + boolean usingMiniMRCluster = getUsingMiniMRCluster(); + if (!usingMiniMRCluster) { + + log.debug("Destination is not a MiniYARNCluster -copying full classpath"); + + // insert conf dir first + if (remoteConfPath != null) { + relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR; + Map<String, LocalResource> submittedConfDir = + sliderFileSystem.submitDirectory(remoteConfPath, + relativeConfDir); + SliderUtils.mergeMaps(localResources, submittedConfDir); + } + } + // build up the configuration + // IMPORTANT: it is only after this call that site configurations + // will be valid. + + propagatePrincipals(config, instanceDefinition); + // validate security data +/* + // turned off until tested + SecurityConfiguration securityConfiguration = + new SecurityConfiguration(config, + instanceDefinition, clustername); + +*/ + Configuration clientConfExtras = new Configuration(false); + // then build up the generated path. + FsPermission clusterPerms = getClusterDirectoryPermissions(config); + SliderUtils.copyDirectory(config, snapshotConfPath, generatedConfDirPath, + clusterPerms); + + + // standard AM resources + sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem, + config, + amLauncher, + instanceDefinition, + snapshotConfPath, + generatedConfDirPath, + clientConfExtras, + libdir, + tempPath, + usingMiniMRCluster); + //add provider-specific resources + provider.prepareAMAndConfigForLaunch(sliderFileSystem, + config, + amLauncher, + instanceDefinition, + snapshotConfPath, + generatedConfDirPath, + clientConfExtras, + libdir, + tempPath, + usingMiniMRCluster); + + // now that the site config is fully generated, the provider gets + // to do a quick review of them. + log.debug("Preflight validation of cluster configuration"); + + + sliderAM.preflightValidateClusterConfiguration(sliderFileSystem, + clustername, + config, + instanceDefinition, + clusterDirectory, + generatedConfDirPath, + clusterSecure + ); + + provider.preflightValidateClusterConfiguration(sliderFileSystem, + clustername, + config, + instanceDefinition, + clusterDirectory, + generatedConfDirPath, + clusterSecure + ); + + + // TODO: consider supporting apps that don't have an image path + Path imagePath = + SliderUtils.extractImagePath(sliderFileSystem, internalOptions); + if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) { + log.debug("Registered image path {}", imagePath); + } + + // build the environment + amLauncher.putEnv( + SliderUtils.buildEnvMap(sliderAMResourceComponent)); + ClasspathConstructor classpath = SliderUtils.buildClasspath(relativeConfDir, + libdir, + getConfig(), + usingMiniMRCluster); + amLauncher.setClasspath(classpath); + if (log.isDebugEnabled()) { + log.debug("AM classpath={}", classpath); + log.debug("Environment Map:\n{}", + SliderUtils.stringifyMap(amLauncher.getEnv())); + log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath)); + } + + // rm address + + InetSocketAddress rmSchedulerAddress; + try { + rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(config); + } catch (IllegalArgumentException e) { + throw new BadConfigException("%s Address invalid: %s", + YarnConfiguration.RM_SCHEDULER_ADDRESS, + config.get( + YarnConfiguration.RM_SCHEDULER_ADDRESS) + ); + + } + String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress); + + JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder(); + // insert any JVM options); + sliderAM.addJVMOptions(instanceDefinition, commandLine); + // enable asserts if the text option is set + commandLine.enableJavaAssertions(); + // add the AM sevice entry point + commandLine.add(SliderAppMaster.SERVICE_CLASSNAME); + + // create action and the cluster name + commandLine.add(ACTION_CREATE, clustername); + + // debug + if (debugAM) { + commandLine.add(Arguments.ARG_DEBUG); + } + + // set the cluster directory path + commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri()); + + if (!isUnset(rmAddr)) { + commandLine.add(Arguments.ARG_RM_ADDR, rmAddr); + } + + if (serviceArgs.getFilesystemBinding() != null) { + commandLine.add(Arguments.ARG_FILESYSTEM, serviceArgs.getFilesystemBinding()); + } + + addConfOptionToCLI(commandLine, config, REGISTRY_PATH, + DEFAULT_REGISTRY_PATH); + addMandatoryConfOptionToCLI(commandLine, config, + RegistryConstants.KEY_REGISTRY_ZK_QUORUM); + + if (clusterSecure) { + // if the cluster is secure, make sure that + // the relevant security settings go over +/* + addConfOptionToCLI(commandLine, config, KEY_SECURITY); +*/ + addConfOptionToCLI(commandLine, + config, + DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); + } + // write out the path output + commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM); + + String cmdStr = commandLine.build(); + log.debug("Completed setting up app master command {}", cmdStr); + + amLauncher.addCommandLine(commandLine); + + // the Slider AM gets to configure the AM requirements, not the custom provider + sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent, + amLauncher.getResource()); + + + // Set the priority for the application master + + int amPriority = config.getInt(KEY_YARN_QUEUE_PRIORITY, + DEFAULT_YARN_QUEUE_PRIORITY); + + + amLauncher.setPriority(amPriority); + + // Set the queue to which this application is to be submitted in the RM + // Queue for App master + String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE); + String suppliedQueue = internalOperations.getGlobalOptions().get(InternalKeys.INTERNAL_QUEUE); + if(!SliderUtils.isUnset(suppliedQueue)) { + amQueue = suppliedQueue; + log.info("Using queue {} for the application instance.", amQueue); + } + + if (amQueue != null) { + amLauncher.setQueue(amQueue); + } + + // Submit the application to the applications manager + // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); + // Ignore the response as either a valid response object is returned on success + // or an exception thrown to denote some form of a failure + + + // submit the application + LaunchedApplication launchedApplication = amLauncher.submitApplication(); + return launchedApplication; + } + + + /** + * Wait for the launched app to be accepted + * @param waittime time in millis + * @return exit code + * @throws YarnException + * @throws IOException + */ + public int waitForAppAccepted(LaunchedApplication launchedApplication, + int waittime) throws + YarnException, + IOException { + assert launchedApplication != null; + int exitCode; + // wait for the submit state to be reached + ApplicationReport report = launchedApplication.monitorAppToState( + YarnApplicationState.ACCEPTED, + new Duration(Constants.ACCEPT_TIME)); + + + // may have failed, so check that + if (SliderUtils.hasAppFinished(report)) { + exitCode = buildExitCode(report); + } else { + // exit unless there is a wait + exitCode = EXIT_SUCCESS; + + if (waittime != 0) { + // waiting for state to change + Duration duration = new Duration(waittime * 1000); + duration.start(); + report = launchedApplication.monitorAppToState( + YarnApplicationState.RUNNING, duration); + if (report != null && + report.getYarnApplicationState() == YarnApplicationState.RUNNING) { + exitCode = EXIT_SUCCESS; + } else { + + launchedApplication.kill(""); + exitCode = buildExitCode(report); + } + } + } + return exitCode; + } + + + /** + * Propagate any critical principals from the current site config down to the HBase one. + * @param config config to read from + * @param clusterSpec cluster spec + */ + private void propagatePrincipals(Configuration config, + AggregateConf clusterSpec) { + String dfsPrincipal = config.get( + DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); + if (dfsPrincipal != null) { + String siteDfsPrincipal = OptionKeys.SITE_XML_PREFIX + + DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; + clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset( + siteDfsPrincipal, + dfsPrincipal); + } + } + + + private boolean addConfOptionToCLI(CommandLineBuilder cmdLine, + Configuration conf, + String key) { + String val = conf.get(key); + return defineIfSet(cmdLine, key, val); + } + + private String addConfOptionToCLI(CommandLineBuilder cmdLine, + Configuration conf, + String key, + String defVal) { + String val = conf.get(key, defVal); + define(cmdLine, key, val); + return val; + } + + /** + * Add a <code>-D key=val</code> command to the CLI + * @param cmdLine command line + * @param key key + * @param val value + */ + private void define(CommandLineBuilder cmdLine, String key, String val) { + Preconditions.checkArgument(key != null, "null key"); + Preconditions.checkArgument(val != null, "null value"); + cmdLine.add(Arguments.ARG_DEFINE, key + "=" + val); + } + + /** + * Add a <code>-D key=val</code> command to the CLI if <code>val</code> + * is not null + * @param cmdLine command line + * @param key key + * @param val value + */ + private boolean defineIfSet(CommandLineBuilder cmdLine, String key, String val) { + Preconditions.checkArgument(key != null, "null key"); + if (val != null) { + define(cmdLine, key, val); + return true; + } else { + return false; + } + } + + private void addMandatoryConfOptionToCLI(CommandLineBuilder cmdLine, + Configuration conf, + String key) throws BadConfigException { + if (!addConfOptionToCLI(cmdLine, conf, key)) { + throw new BadConfigException("Missing configuration option: " + key); + } + } + + /** + * Create a path that must exist in the cluster fs + * @param uri uri to create + * @return the path + * @throws FileNotFoundException if the path does not exist + */ + public Path createPathThatMustExist(String uri) throws + SliderException, + IOException { + return sliderFileSystem.createPathThatMustExist(uri); + } + + /** + * verify that a live cluster isn't there + * @param clustername cluster name + * @throws SliderException with exit code EXIT_CLUSTER_LIVE + * if a cluster of that name is either live or starting up. + */ + public void verifyNoLiveClusters(String clustername) throws + IOException, + YarnException { + List<ApplicationReport> existing = findAllLiveInstances(clustername); + + if (!existing.isEmpty()) { + throw new SliderException(EXIT_APPLICATION_IN_USE, + clustername + ": " + E_CLUSTER_RUNNING + " :" + + existing.get(0)); + } + } + + public String getUsername() throws IOException { + return RegistryUtils.currentUser(); + } + + /** + * Get the name of any deployed cluster + * @return the cluster name + */ + public String getDeployedClusterName() { + return deployedClusterName; + } + + @VisibleForTesting + public void setDeployedClusterName(String deployedClusterName) { + this.deployedClusterName = deployedClusterName; + } + + /** + * ask if the client is using a mini MR cluster + * @return true if they are + */ + private boolean getUsingMiniMRCluster() { + return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, + false); + } + + /** + * Get the application name used in the zookeeper root paths + * @return an application-specific path in ZK + */ + private String getAppName() { + return "slider"; + } + + /** + * Wait for the app to start running (or go past that state) + * @param duration time to wait + * @return the app report; null if the duration turned out + * @throws YarnException YARN or app issues + * @throws IOException IO problems + */ + @VisibleForTesting + public ApplicationReport monitorAppToRunning(Duration duration) + throws YarnException, IOException { + return monitorAppToState(YarnApplicationState.RUNNING, duration); + } + + /** + * Build an exit code for an application Id and its report. + * If the report parameter is null, the app is killed + * @param report report + * @return the exit code + */ + private int buildExitCode(ApplicationReport report) throws + IOException, + YarnException { + if (null == report) { + forceKillApplication("Reached client specified timeout for application"); + return EXIT_TIMED_OUT; + } + + YarnApplicationState state = report.getYarnApplicationState(); + FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); + switch (state) { + case FINISHED: + if (FinalApplicationStatus.SUCCEEDED == dsStatus) { + log.info("Application has completed successfully"); + return EXIT_SUCCESS; + } else { + log.info("Application finished unsuccessfully." + + "YarnState = {}, DSFinalStatus = {} Breaking monitoring loop", + state, dsStatus); + return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR; + } + + case KILLED: + log.info("Application did not finish. YarnState={}, DSFinalStatus={}", + state, dsStatus); + return EXIT_YARN_SERVICE_KILLED; + + case FAILED: + log.info("Application Failed. YarnState={}, DSFinalStatus={}", state, + dsStatus); + return EXIT_YARN_SERVICE_FAILED; + default: + //not in any of these states + return EXIT_SUCCESS; + } + } + + /** + * Monitor the submitted application for reaching the requested state. + * Will also report if the app reaches a later state (failed, killed, etc) + * Kill application if duration!= null & time expires. + * Prerequisite: the applicatin was launched. + * @param desiredState desired state. + * @param duration how long to wait -must be more than 0 + * @return the application report -null on a timeout + * @throws YarnException + * @throws IOException + */ + @VisibleForTesting + public ApplicationReport monitorAppToState( + YarnApplicationState desiredState, + Duration duration) + throws YarnException, IOException { + LaunchedApplication launchedApplication = + new LaunchedApplication(applicationId, yarnClient); + return launchedApplication.monitorAppToState(desiredState, duration); + } + + @Override + public ApplicationReport getApplicationReport() throws + IOException, + YarnException { + return getApplicationReport(applicationId); + } + + @Override + public boolean forceKillApplication(String reason) + throws YarnException, IOException { + if (applicationId != null) { + new LaunchedApplication(applicationId, yarnClient).forceKill(reason); + return true; + } + return false; + } + + /** + * List Slider instances belonging to a specific user + * @param user user: "" means all users + * @return a possibly empty list of Slider AMs + */ + @VisibleForTesting + public List<ApplicationReport> listSliderInstances(String user) + throws YarnException, IOException { + return YarnAppListClient.listInstances(); + } + + @Override + @VisibleForTesting + public int actionList(String clustername) throws IOException, YarnException { + verifyBindingsDefined(); + + String user = UserGroupInformation.getCurrentUser().getUserName(); + List<ApplicationReport> instances = listSliderInstances(user); + + if (isUnset(clustername)) { + log.info("Instances for {}: {}", + (user != null ? user : "all users"), + instances.size()); + for (ApplicationReport report : instances) { + logAppReport(report); + } + return EXIT_SUCCESS; + } else { + SliderUtils.validateClusterName(clustername); + log.debug("Listing cluster named {}", clustername); + ApplicationReport report = + findClusterInInstanceList(instances, clustername); + if (report != null) { + logAppReport(report); + return EXIT_SUCCESS; + } else { + throw unknownClusterException(clustername); + } + } + } + + /** + * Log the application report at INFO + * @param report report to log + */ + public void logAppReport(ApplicationReport report) { + log.info(SliderUtils.appReportToString(report, "\n")); + } + + @Override + @VisibleForTesting + public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException { + verifyBindingsDefined(); + SliderUtils.validateClusterName(name); + log.debug("actionFlex({})", name); + Map<String, Integer> roleInstances = new HashMap<String, Integer>(); + Map<String, String> roleMap = args.getComponentMap(); + for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) { + String key = roleEntry.getKey(); + String val = roleEntry.getValue(); + try { + roleInstances.put(key, Integer.valueOf(val)); + } catch (NumberFormatException e) { + throw new BadCommandArgumentsException("Requested count of role %s" + + " is not a number: \"%s\"", + key, val); + } + } + return flex(name, roleInstances); + } + + @Override + @VisibleForTesting + public int actionExists(String name, boolean checkLive) throws YarnException, IOException { + verifyBindingsDefined(); + SliderUtils.validateClusterName(name); + log.debug("actionExists({}, {})", name, checkLive); + + //initial probe for a cluster in the filesystem + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); + if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) { + throw unknownClusterException(name); + } + + //test for liveness if desired + + if (checkLive) { + ApplicationReport instance = findInstance(name); + if (instance == null) { + log.info("Cluster {} not running", name); + return EXIT_FALSE; + } else { + // the app exists, but it may be in a terminated state + SliderUtils.OnDemandReportStringifier report = + new SliderUtils.OnDemandReportStringifier(instance); + YarnApplicationState state = + instance.getYarnApplicationState(); + if (state.ordinal() >= YarnApplicationState.FINISHED.ordinal()) { + //cluster in the list of apps but not running + log.info("Cluster {} found but is in state {}", name, state); + log.debug("State {}", report); + return EXIT_FALSE; + } + log.info("Cluster {} is live:\n{}", name, report); + } + } else { + log.info("Cluster {} exists", name); + } + return EXIT_SUCCESS; + } + + + @Override + public int actionKillContainer(String name, + ActionKillContainerArgs args) throws YarnException, IOException { + String id = args.id; + if (SliderUtils.isUnset(id)) { + throw new BadCommandArgumentsException("Missing container id"); + } + log.info("killingContainer {}:{}", name, id); + SliderClusterOperations clusterOps = + new SliderClusterOperations(bondToCluster(name)); + try { + clusterOps.killContainer(id); + } catch (NoSuchNodeException e) { + throw new BadClusterStateException("Container %s not found in cluster %s", + id, name); + } + return EXIT_SUCCESS; + } + + @Override + public String actionEcho(String name, ActionEchoArgs args) throws + YarnException, + IOException { + String message = args.message; + if (message == null) { + throw new BadCommandArgumentsException("missing message"); + } + SliderClusterOperations clusterOps = + new SliderClusterOperations(bondToCluster(name)); + return clusterOps.echo(message); + } + + /** + * Get at the service registry operations + * @return registry client -valid after the service is inited. + */ + public YarnAppListClient getYarnAppListClient() { + return YarnAppListClient; + } + + /** + * Find an instance of an application belonging to the current user + * @param appname application name + * @return the app report or null if none is found + * @throws YarnException YARN issues + * @throws IOException IO problems + */ + private ApplicationReport findInstance(String appname) + throws YarnException, IOException { + return YarnAppListClient.findInstance(appname); + } + + private RunningApplication findApplication(String appname) + throws YarnException, IOException { + ApplicationReport applicationReport = findInstance(appname); + return applicationReport != null ? new RunningApplication(yarnClient, applicationReport): null; + + } + + /** + * find all live instances of a specific app -if there is >1 in the cluster, + * this returns them all. State should be running or less + * @param appname application name + * @return the list of all matching application instances + */ + private List<ApplicationReport> findAllLiveInstances(String appname) + throws YarnException, IOException { + + return YarnAppListClient.findAllLiveInstances(appname); + } + + + public ApplicationReport findClusterInInstanceList(List<ApplicationReport> instances, + String appname) { + return yarnClient.findClusterInInstanceList(instances, appname); + } + + /** + * Connect to a Slider AM + * @param app application report providing the details on the application + * @return an instance + * @throws YarnException + * @throws IOException + */ + private SliderClusterProtocol connect(ApplicationReport app) + throws YarnException, IOException { + + try { + return RpcBinder.getProxy(getConfig(), + yarnClient.getRmClient(), + app, + Constants.CONNECT_TIMEOUT, + Constants.RPC_TIMEOUT); + } catch (InterruptedException e) { + throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT, + e, + "Interrupted waiting for communications with the Slider AM"); + } + } + + @Override + @VisibleForTesting + public int actionStatus(String clustername, ActionStatusArgs statusArgs) throws + YarnException, + IOException { + verifyBindingsDefined(); + SliderUtils.validateClusterName(clustername); + String outfile = statusArgs.getOutput(); + ClusterDescription status = getClusterDescription(clustername); + String text = status.toJsonString(); + if (outfile == null) { + log.info(text); + } else { + status.save(new File(outfile).getAbsoluteFile()); + } + return EXIT_SUCCESS; + } + + @Override + public int actionVersion() { + SliderVersionInfo.loadAndPrintVersionInfo(log); + return EXIT_SUCCESS; + } + + @Override + public int actionFreeze(String clustername, + ActionFreezeArgs freezeArgs) throws YarnException, IOException { + verifyBindingsDefined(); + SliderUtils.validateClusterName(clustername); + int waittime = freezeArgs.getWaittime(); + String text = freezeArgs.message; + boolean forcekill = freezeArgs.force; + log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername, + text, + waittime, + forcekill); + + //is this actually a known cluster? + sliderFileSystem.locateInstanceDefinition(clustername); + ApplicationReport app = findInstance(clustername); + if (app == null) { + // exit early + log.info("Cluster {} not running", clustername); + // not an error to stop a stopped cluster + return EXIT_SUCCESS; + } + log.debug("App to stop was found: {}:\n{}", clustername, + new SliderUtils.OnDemandReportStringifier(app)); + if (app.getYarnApplicationState().ordinal() >= + YarnApplicationState.FINISHED.ordinal()) { + log.info("Cluster {} is a terminated state {}", clustername, + app.getYarnApplicationState()); + return EXIT_SUCCESS; + } + LaunchedApplication application = new LaunchedApplication(yarnClient, app); + applicationId = application.getApplicationId(); + + if (forcekill) { + //escalating to forced kill + application.kill("Forced stop of " + clustername + + ": " + text); + } else { + try { + SliderClusterProtocol appMaster = connect(app); + Messages.StopClusterRequestProto r = + Messages.StopClusterRequestProto + .newBuilder() + .setMessage(text) + .build(); + appMaster.stopCluster(r); + + log.debug("Cluster stop command issued"); + + } catch (YarnException e) { + log.warn("Exception while trying to terminate {}: {}", clustername, e); + return EXIT_FALSE; + } catch (IOException e) { + log.warn("Exception while trying to terminate {}: {}", clustername, e); + return EXIT_FALSE; + } + } + + //wait for completion. We don't currently return an exception during this process + //as the stop operation has been issued, this is just YARN. + try { + if (waittime > 0) { + ApplicationReport applicationReport = + application.monitorAppToState(YarnApplicationState.FINISHED, + new Duration(waittime * 1000)); + if (applicationReport == null) { + log.info("application did not shut down in time"); + return EXIT_FALSE; + } + } + +// JDK7 } catch (YarnException | IOException e) { + } catch (YarnException e) { + log.warn("Exception while waiting for the cluster {} to shut down: {}", + clustername, e); + } catch ( IOException e) { + log.warn("Exception while waiting for the cluster {} to shut down: {}", + clustername, e); + } + + return EXIT_SUCCESS; + } + + @Override + public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnException, IOException { + SliderUtils.validateClusterName(clustername); + // see if it is actually running and bail out; + verifyBindingsDefined(); + verifyNoLiveClusters(clustername); + + + //start the cluster + return startCluster(clustername, thaw); + } + + /** + * Implement flexing + * @param clustername name of the cluster + * @param roleInstances map of new role instances + * @return EXIT_SUCCESS if the #of nodes in a live cluster changed + * @throws YarnException + * @throws IOException + */ + public int flex(String clustername, Map<String, Integer> roleInstances) + throws YarnException, IOException { + verifyBindingsDefined(); + SliderUtils.validateClusterName(clustername); + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + clustername, + clusterDirectory); + + ConfTreeOperations resources = + instanceDefinition.getResourceOperations(); + for (Map.Entry<String, Integer> entry : roleInstances.entrySet()) { + String role = entry.getKey(); + int count = entry.getValue(); + resources.getOrAddComponent(role).put(ResourceKeys.COMPONENT_INSTANCES, + Integer.toString(count)); + + log.debug("Flexed cluster specification ( {} -> {}) : \n{}", + role, + count, + resources); + } + SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig()); + AbstractClientProvider provider = createClientProvider( + instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption( + InternalKeys.INTERNAL_PROVIDER_NAME)); + // slider provider to validate what there is + validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem); + validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem); + + int exitCode = EXIT_FALSE; + // save the specification + try { + InstanceIO.updateInstanceDefinition(sliderFileSystem, clusterDirectory, instanceDefinition); + } catch (LockAcquireFailedException e) { + // lock failure + log.debug("Failed to lock dir {}", clusterDirectory, e); + log.warn("Failed to save new resource definition to {} : {}", clusterDirectory, e); + } + + // now see if it is actually running and tell it about the update if it is + ApplicationReport instance = findInstance(clustername); + if (instance != null) { + log.info("Flexing running cluster"); + SliderClusterProtocol appMaster = connect(instance); + SliderClusterOperations clusterOps = new SliderClusterOperations(appMaster); + clusterOps.flex(instanceDefinition.getResources()); + log.info("application instance size updated"); + exitCode = EXIT_SUCCESS; + } else { + log.info("No running instance to update"); + } + return exitCode; + } + + /** + * Validate an instance definition against a provider. + * @param provider the provider performing the validation + * @param instanceDefinition the instance definition + * @throws SliderException if invalid. + */ + protected void validateInstanceDefinition(AbstractClientProvider provider, + AggregateConf instanceDefinition, SliderFileSystem fs) throws SliderException { + try { + provider.validateInstanceDefinition(instanceDefinition, fs); + } catch (SliderException e) { + //problem, reject it + log.info("Error {} validating application instance definition ", e.getMessage()); + log.debug("Error validating application instance definition ", e); + log.info(instanceDefinition.toString()); + throw e; + } + } + + + /** + * Load the persistent cluster description + * @param clustername name of the cluster + * @return the description in the filesystem + * @throws IOException any problems loading -including a missing file + */ + @VisibleForTesting + public AggregateConf loadPersistedClusterDescription(String clustername) + throws IOException, SliderException, LockAcquireFailedException { + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + ConfPersister persister = new ConfPersister(sliderFileSystem, clusterDirectory); + AggregateConf instanceDescription = new AggregateConf(); + persister.load(instanceDescription); + return instanceDescription; + } + + /** + * Connect to a live cluster and get its current state + * @param clustername the cluster name + * @return its description + */ + @VisibleForTesting + public ClusterDescription getClusterDescription(String clustername) throws + YarnException, + IOException { + SliderClusterOperations clusterOperations = + createClusterOperations(clustername); + return clusterOperations.getClusterDescription(); + } + + /** + * Connect to the cluster and get its current state + * @return its description + */ + @VisibleForTesting + public ClusterDescription getClusterDescription() throws + YarnException, + IOException { + return getClusterDescription(getDeployedClusterName()); + } + + /** + * List all node UUIDs in a role + * @param role role name or "" for all + * @return an array of UUID strings + * @throws IOException + * @throws YarnException + */ + @VisibleForTesting + public String[] listNodeUUIDsByRole(String role) throws + IOException, + YarnException { + return createClusterOperations() + .listNodeUUIDsByRole(role); + } + + /** + * List all nodes in a role. This is a double round trip: once to list + * the nodes in a role, another to get their details + * @param role component/role to look for + * @return an array of ContainerNode instances + * @throws IOException + * @throws YarnException + */ + @VisibleForTesting + public List<ClusterNode> listClusterNodesInRole(String role) throws + IOException, + YarnException { + return createClusterOperations().listClusterNodesInRole(role); + } + + /** + * Get the details on a list of uuids + * @param uuids uuids to ask for + * @return a possibly empty list of node details + * @throws IOException + * @throws YarnException + */ + @VisibleForTesting + public List<ClusterNode> listClusterNodes(String[] uuids) throws + IOException, + YarnException { + + if (uuids.length == 0) { + // short cut on an empty list + return new LinkedList<ClusterNode>(); + } + return createClusterOperations().listClusterNodes(uuids); + } + + /** + * Get a node from the AM + * @param uuid uuid of node + * @return deserialized node + * @throws IOException IO problems + * @throws NoSuchNodeException if the node isn't found + */ + @VisibleForTesting + public ClusterNode getNode(String uuid) throws IOException, YarnException { + return createClusterOperations().getNode(uuid); + } + + /** + * Get the instance definition from the far end + */ + @VisibleForTesting + public AggregateConf getLiveInstanceDefinition() throws IOException, YarnException { + return createClusterOperations().getInstanceDefinition(); + } + + /** + * Bond to a running cluster + * @param clustername cluster name + * @return the AM RPC client + * @throws SliderException if the cluster is unkown + */ + private SliderClusterProtocol bondToCluster(String clustername) throws + YarnException, + IOException { + verifyBindingsDefined(); + if (clustername == null) { + throw unknownClusterException("(undefined)"); + } + ApplicationReport instance = findInstance(clustername); + if (null == instance) { + throw unknownClusterException(clustername); + } + return connect(instance); + } + + /** + * Create a cluster operations instance against a given cluster + * @param clustername cluster name + * @return a bonded cluster operations instance + * @throws YarnException YARN issues + * @throws IOException IO problems + */ + private SliderClusterOperations createClusterOperations(String clustername) throws + YarnException, + IOException { + SliderClusterProtocol sliderAM = bondToCluster(clustername); + return new SliderClusterOperations(sliderAM); + } + + /** + * Create a cluster operations instance against the active cluster + * -returning any previous created one if held. + * @return a bonded cluster operations instance + * @throws YarnException YARN issues + * @throws IOException IO problems + */ + public SliderClusterOperations createClusterOperations() throws + YarnException, + IOException { + if (sliderClusterOperations == null) { + sliderClusterOperations = + createClusterOperations(getDeployedClusterName()); + } + return sliderClusterOperations; + } + + /** + * Wait for an instance of a named role to be live (or past it in the lifecycle) + * @param role role to look for + * @param timeout time to wait + * @return the state. If still in CREATED, the cluster didn't come up + * in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason + * @throws IOException IO + * @throws SliderException Slider + * @throws WaitTimeoutException if the wait timed out + */ + @VisibleForTesting + public int waitForRoleInstanceLive(String role, long timeout) + throws WaitTimeoutException, IOException, YarnException { + return createClusterOperations().waitForRoleInstanceLive(role, timeout); + } + + /** + * Generate an exception for an unknown cluster + * @param clustername cluster name + * @return an exception with text and a relevant exit code + */ + public UnknownApplicationInstanceException unknownClusterException(String clustername) { + return UnknownApplicationInstanceException.unknownInstance(clustername); + } + + @Override + public String toString() { + return "Slider Client in state " + getServiceState() + + " and Slider Application Instance " + deployedClusterName; + } + + /** + * Get all YARN applications + * @return a possibly empty list + * @throws YarnException + * @throws IOException + */ + @VisibleForTesting + public List<ApplicationReport> getApplications() throws YarnException, IOException { + return yarnClient.getApplications(); + } + + @VisibleForTesting + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + return new LaunchedApplication(appId, yarnClient).getApplicationReport(); + + } + + /** + * The configuration used for deployment (after resolution) + * @return + */ + @VisibleForTesting + public AggregateConf getLaunchedInstanceDefinition() { + return launchedInstanceDefinition; + } + + + @Override + public int actionResolve(ActionResolveArgs args) + throws YarnException, IOException { + // as this is an API entry point, validate + // the arguments + args.validate(); + RegistryOperations operations = getRegistryOperations(); + String path = SliderRegistryUtils.resolvePath(args.path); + ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal(); + try { + if (args.list) { + File destDir = args.destdir; + if (destDir != null) { + destDir.mkdirs(); + } + + Map<String, ServiceRecord> recordMap = + listServiceRecords(operations, path); + + for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) { + String name = recordEntry.getKey(); + ServiceRecord instance = recordEntry.getValue(); + String json = serviceRecordMarshal.toJson(instance); + if (destDir == null) { + print(name); + print(json); + } else { + String filename = RegistryPathUtils.lastPathEntry(name) + ".json"; + File jsonFile = new File(destDir, filename); + SliderUtils.write(jsonFile, + serviceRecordMarshal.toBytes(instance), true); + } + } + } else { + // resolve single entry + ServiceRecord instance = resolve(path); + File outFile = args.out; + if (args.destdir != null) { + outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path)); + } + if (outFile != null) { + SliderUtils.write(outFile, serviceRecordMarshal.toBytes(instance), true); + } else { + print(serviceRecordMarshal.toJson(instance)); + } + } +// TODO JDK7 + } catch (PathNotFoundException e) { + // no record at this path + return EXIT_NOT_FOUND; + } catch (NoRecordException e) { + return EXIT_NOT_FOUND; + } catch (UnknownApplicationInstanceException e) { + return EXIT_NOT_FOUND; + } catch (InvalidRecordException e) { + // it is not a record + log.error("{}", e); + log.debug("{}", e, e); + return EXIT_EXCEPTION_THROWN; + } + return EXIT_SUCCESS; + } + + @Override + public int actionRegistry(ActionRegistryArgs registryArgs) throws + YarnException, + IOException { + // as this is also a test entry point, validate + // the arguments + registryArgs.validate(); + try { + if (registryArgs.list) { + actionRegistryList(registryArgs); + } else if (registryArgs.listConf) { + // list the configurations + actionRegistryListConfigsYarn(registryArgs); + } else if (SliderUtils.isSet(registryArgs.getConf)) { + // get a configuration + PublishedConfiguration publishedConfiguration = + actionRegistryGetConfig(registryArgs); + outputConfig(publishedConfiguration, registryArgs); + } else { + // it's an unknown command + log.info(ActionRegistryArgs.USAGE); + return EXIT_USAGE; + } +// JDK7 + } catch (FileNotFoundException e) { + log.info("{}", e); + log.debug("{}", e, e); + return EXIT_NOT_FOUND; + } catch (PathNotFoundException e) { + log.info("{}", e); + log.debug("{}", e, e); + return EXIT_NOT_FOUND; + } + return EXIT_SUCCESS; + } + + /** + * Registry operation + * + * @param registryArgs registry Arguments + * @return the instances (for tests) + * @throws YarnException YARN problems + * @throws IOException Network or other problems + */ + @VisibleForTesting + public Collection<ServiceRecord> actionRegistryList( + ActionRegistryArgs registryArgs) + throws YarnException, IOException { + String serviceType = registryArgs.serviceType; + String name = registryArgs.name; + RegistryOperations operations = getRegistryOperations(); + Collection<ServiceRecord> serviceRecords; + if (StringUtils.isEmpty(name)) { + String path = + serviceclassPath( + currentUser(), + serviceType); + + try { + Map<String, ServiceRecord> recordMap = + listS
<TRUNCATED>
