http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31e05853/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig 
b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig
new file mode 100644
index 0000000..50a7097
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java.orig
@@ -0,0 +1,2913 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.slider.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.security.alias.CredentialShell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
+
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.OptionKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.common.Constants;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.params.AbstractActionArgs;
+import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
+import org.apache.slider.common.params.ActionDiagnosticArgs;
+import org.apache.slider.common.params.ActionInstallPackageArgs;
+import org.apache.slider.common.params.ActionAMSuicideArgs;
+import org.apache.slider.common.params.ActionCreateArgs;
+import org.apache.slider.common.params.ActionEchoArgs;
+import org.apache.slider.common.params.ActionFlexArgs;
+import org.apache.slider.common.params.ActionFreezeArgs;
+import org.apache.slider.common.params.ActionKillContainerArgs;
+import org.apache.slider.common.params.ActionRegistryArgs;
+import org.apache.slider.common.params.ActionResolveArgs;
+import org.apache.slider.common.params.ActionStatusArgs;
+import org.apache.slider.common.params.ActionThawArgs;
+import org.apache.slider.common.params.Arguments;
+import org.apache.slider.common.params.ClientArgs;
+import org.apache.slider.common.params.LaunchArgsAccessor;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.Duration;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.common.tools.SliderVersionInfo;
+import org.apache.slider.core.build.InstanceBuilder;
+import org.apache.slider.core.build.InstanceIO;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
+import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
+import org.apache.slider.core.exceptions.WaitTimeoutException;
+import org.apache.slider.core.launch.AppMasterLauncher;
+import org.apache.slider.core.launch.ClasspathConstructor;
+import org.apache.slider.core.launch.CommandLineBuilder;
+import org.apache.slider.core.launch.JavaCommandLineBuilder;
+import org.apache.slider.core.launch.LaunchedApplication;
+import org.apache.slider.core.launch.RunningApplication;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.core.persist.ConfPersister;
+import org.apache.slider.core.persist.LockAcquireFailedException;
+import org.apache.slider.core.registry.SliderRegistryUtils;
+import org.apache.slider.core.registry.YarnAppListClient;
+import org.apache.slider.core.registry.docstore.ConfigFormat;
+import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedConfiguration;
+import 
org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
+import org.apache.slider.core.registry.retrieve.RegistryRetriever;
+import org.apache.slider.core.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.ZKIntegration;
+import org.apache.slider.core.zk.ZKPathBuilder;
+import org.apache.slider.providers.AbstractClientProvider;
+import org.apache.slider.providers.SliderProviderFactory;
+import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.slideram.SliderAMClientProvider;
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.apache.slider.server.appmaster.rpc.RpcBinder;
+import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.slider.common.params.SliderActions.*;
+
+/**
+ * Client service for Slider
+ */
+
+public class SliderClient extends AbstractSliderLaunchedService implements 
RunService,
+    SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
+  private static final Logger log = 
LoggerFactory.getLogger(SliderClient.class);
+
+  private ClientArgs serviceArgs;
+  public ApplicationId applicationId;
+  
+  private String deployedClusterName;
+  /**
+   * Cluster opaerations against the deployed cluster -will be null
+   * if no bonding has yet taken place
+   */
+  private SliderClusterOperations sliderClusterOperations;
+
+  private SliderFileSystem sliderFileSystem;
+
+  /**
+   * Yarn client service
+   */
+  private SliderYarnClientImpl yarnClient;
+  private YarnAppListClient YarnAppListClient;
+  private AggregateConf launchedInstanceDefinition;
+//  private SliderRegistryService registry;
+
+
+  /**
+   * The YARN registry service
+   */
+  private RegistryOperations registryOperations;
+
+  /**
+   * Constructor
+   */
+  public SliderClient() {
+    super("Slider Client");
+    new HdfsConfiguration();
+    new YarnConfiguration();
+  }
+
+  /**
+   * This is called <i>Before serviceInit is called</i>
+   * @param config the initial configuration build up by the
+   * service launcher.
+   * @param args argument list list of arguments passed to the command line
+   * after any launcher-specific commands have been stripped.
+   * @return the post-binding configuration to pass to the <code>init()</code>
+   * operation.
+   * @throws Exception
+   */
+  @Override
+  public Configuration bindArgs(Configuration config, String... args) throws 
Exception {
+    config = super.bindArgs(config, args);
+    serviceArgs = new ClientArgs(args);
+    serviceArgs.parse();
+    // yarn-ify
+    YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
+    return SliderUtils.patchConfiguration(yarnConfiguration);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    Configuration clientConf = SliderUtils.loadClientConfigurationResource();
+    ConfigHelper.mergeConfigurations(conf, clientConf, CLIENT_RESOURCE);
+    serviceArgs.applyDefinitions(conf);
+    serviceArgs.applyFileSystemBinding(conf);
+    // init security with our conf
+    if (SliderUtils.isHadoopClusterSecure(conf)) {
+      SliderUtils.forceLogin();
+      SliderUtils.initProcessSecurity(conf);
+    }
+    AbstractActionArgs coreAction = serviceArgs.getCoreAction();
+    if (coreAction.getHadoopServicesRequired()) {
+      initHadoopBinding();
+    }
+    super.serviceInit(conf);
+  }
+
+  /**
+   * this is where the work is done.
+   * @return the exit code
+   * @throws Throwable anything that went wrong
+   */
+/* JDK7
+
+  @Override
+  public int runService() throws Throwable {
+
+    // choose the action
+    String action = serviceArgs.getAction();
+    int exitCode = EXIT_SUCCESS;
+    String clusterName = serviceArgs.getClusterName();
+    // actions
+    switch (action) {
+      case ACTION_BUILD:
+        exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+        break;
+      case ACTION_UPDATE:
+        exitCode = actionUpdate(clusterName, 
serviceArgs.getActionUpdateArgs());
+        break;
+      case ACTION_CREATE:
+        exitCode = actionCreate(clusterName, 
serviceArgs.getActionCreateArgs());
+        break;
+      case ACTION_FREEZE:
+        exitCode = actionFreeze(clusterName, 
serviceArgs.getActionFreezeArgs());
+        break;
+      case ACTION_THAW:
+        exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+        break;
+      case ACTION_DESTROY:
+        exitCode = actionDestroy(clusterName);
+        break;
+      case ACTION_EXISTS:
+        exitCode = actionExists(clusterName,
+            serviceArgs.getActionExistsArgs().live);
+        break;
+      case ACTION_FLEX:
+        exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+        break;
+      case ACTION_GETCONF:
+        exitCode =
+            actionGetConf(clusterName, serviceArgs.getActionGetConfArgs());
+        break;
+      case ACTION_HELP:
+      case ACTION_USAGE:
+        log.info(serviceArgs.usage());
+        break;
+      case ACTION_KILL_CONTAINER:
+        exitCode = actionKillContainer(clusterName,
+            serviceArgs.getActionKillContainerArgs());
+        break;
+      case ACTION_AM_SUICIDE:
+        exitCode = actionAmSuicide(clusterName,
+            serviceArgs.getActionAMSuicideArgs());
+        break;
+      case ACTION_LIST:
+        exitCode = actionList(clusterName);
+        break;
+      case ACTION_REGISTRY:
+        exitCode = actionRegistry(
+            serviceArgs.getActionRegistryArgs());
+        break;
+      case ACTION_STATUS:
+        exitCode = actionStatus(clusterName,
+            serviceArgs.getActionStatusArgs());
+        break;
+      case ACTION_VERSION:
+        exitCode = actionVersion();
+        break;
+      default:
+        throw new SliderException(EXIT_UNIMPLEMENTED,
+            "Unimplemented: " + action);
+    }
+
+    return exitCode;
+  }
+
+*/
+  @Override
+  public int runService() throws Throwable {
+
+    // choose the action
+    String action = serviceArgs.getAction();
+
+    int exitCode = EXIT_SUCCESS;
+    String clusterName = serviceArgs.getClusterName();
+    // actions
+    if (ACTION_INSTALL_PACKAGE.equals(action)) {
+      exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
+    } else if (ACTION_BUILD.equals(action)) {
+      exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
+    } else if (ACTION_CREATE.equals(action)) {
+      exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs());
+    } else if (ACTION_FREEZE.equals(action)) {
+      exitCode = actionFreeze(clusterName,
+            serviceArgs.getActionFreezeArgs());
+    } else if (ACTION_THAW.equals(action)) {
+      exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
+    } else if (ACTION_DESTROY.equals(action)) {
+      exitCode = actionDestroy(clusterName);
+    } else if (ACTION_DIAGNOSTIC.equals(action)) {
+      exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
+    } else if (ACTION_EXISTS.equals(action)) {
+      exitCode = actionExists(clusterName,
+          serviceArgs.getActionExistsArgs().live);
+    } else if (ACTION_FLEX.equals(action)) {
+      exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
+    } else if (ACTION_HELP.equals(action) ||
+               ACTION_USAGE.equals(action)) {
+      log.info(serviceArgs.usage());
+    } else if (ACTION_KILL_CONTAINER.equals(action)) {
+      exitCode = actionKillContainer(clusterName,
+          serviceArgs.getActionKillContainerArgs());
+    } else if (ACTION_AM_SUICIDE.equals(action)) {
+      exitCode = actionAmSuicide(clusterName,
+          serviceArgs.getActionAMSuicideArgs());
+    } else if (ACTION_LIST.equals(action)) {
+      exitCode = actionList(clusterName);
+    } else if (ACTION_REGISTRY.equals(action)) {
+      exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
+    } else if (ACTION_RESOLVE.equals(action)) {
+      exitCode = actionResolve(serviceArgs.getActionResolveArgs());
+    } else if (ACTION_STATUS.equals(action)) {
+      exitCode = actionStatus(clusterName,
+          serviceArgs.getActionStatusArgs());
+    } else if (ACTION_UPDATE.equals(action)) {
+      exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs());
+    } else if (ACTION_VERSION.equals(action)) {
+      exitCode = actionVersion();
+    } else {
+      throw new SliderException(EXIT_UNIMPLEMENTED,
+          "Unimplemented: " + action);
+    }
+
+    return exitCode;
+  }
+
+/**
+   * Perform everything needed to init the hadoop binding.
+   * This assumes that the service is already  in inited or started state
+   * @throws IOException
+   * @throws SliderException
+   */
+  protected void initHadoopBinding() throws IOException, SliderException {
+    // validate the client
+    SliderUtils.validateSliderClientEnvironment(null);
+    //create the YARN client
+    yarnClient = new SliderYarnClientImpl();
+    yarnClient.init(getConfig());
+    if (getServiceState() == STATE.STARTED) {
+      yarnClient.start();
+    }
+    addService(yarnClient);
+    YarnAppListClient =
+        new YarnAppListClient(yarnClient, getUsername(), getConfig());
+    // create the filesystem
+    sliderFileSystem = new SliderFileSystem(getConfig());    
+  }
+
+  /**
+   * Delete the zookeeper node associated with the calling user and the cluster
+   * TODO: YARN registry operations
+   **/
+  @Deprecated
+  @VisibleForTesting
+  public boolean deleteZookeeperNode(String clusterName) throws YarnException, 
IOException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    Exception e = null;
+    try {
+      Configuration config = getConfig();
+      if (!SliderUtils.isHadoopClusterSecure(config)) {
+        ZKIntegration client = getZkClient(clusterName, user);
+        if (client != null) {
+          if (client.exists(zkPath)) {
+            log.info("Deleting zookeeper path {}", zkPath);
+          }
+          client.deleteRecursive(zkPath);
+          return true;
+        }
+      } else {
+        log.warn("Default zookeeper node is not available for secure cluster");
+      }
+    } catch (InterruptedException ignored) {
+      e = ignored;
+    } catch (KeeperException ignored) {
+      e = ignored;
+    } catch (BadConfigException ignored) {
+      e = ignored;
+    }
+    if (e != null) {
+      log.warn("Unable to recursively delete zk node {}", zkPath);
+      log.debug("Reason: ", e);
+    }
+
+    return false;
+  }
+
+  /**
+   * Create the zookeeper node associated with the calling user and the cluster
+   */
+  @VisibleForTesting
+  @Deprecated
+  public String createZookeeperNode(String clusterName, Boolean nameOnly) 
throws YarnException, IOException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    if(nameOnly) {
+      return zkPath;
+    }
+    Configuration config = getConfig();
+    if (!SliderUtils.isHadoopClusterSecure(config)) {
+      ZKIntegration client = getZkClient(clusterName, user);
+      if (client != null) {
+        try {
+          client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+          return zkPath;
+          
+          //JDK7
+//        } catch (InterruptedException | KeeperException e) {
+        } catch (InterruptedException e) {
+          log.warn("Unable to create zk node {}", zkPath, e);
+        } catch ( KeeperException e) {
+          log.warn("Unable to create zk node {}", zkPath, e);
+        }
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Gets a zookeeper client, returns null if it cannot connect to zookeeper
+   **/
+  protected ZKIntegration getZkClient(String clusterName, String user) throws 
YarnException {
+    String registryQuorum = lookupZKQuorum();
+    ZKIntegration client = null;
+    try {
+      BlockingZKWatcher watcher = new BlockingZKWatcher();
+      client = ZKIntegration.newInstance(registryQuorum, user, clusterName, 
true, false, watcher);
+      client.init();
+      watcher.waitForZKConnection(2 * 1000);
+    } catch (InterruptedException e) {
+      client = null;
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    } catch (IOException e) {
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    }
+    return client;
+  }
+
+  @Override
+  public int actionDestroy(String clustername) throws YarnException,
+                                                      IOException {
+    // verify that a live cluster isn't there
+    SliderUtils.validateClusterName(clustername);
+    //no=op, it is now mandatory. 
+    verifyBindingsDefined();
+    verifyNoLiveClusters(clustername);
+
+    // create the directory path
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    // delete the directory;
+    boolean exists = sliderFileSystem.getFileSystem().exists(clusterDirectory);
+    if (exists) {
+      log.info("Application Instance {} found at {}: destroying", clustername, 
clusterDirectory);
+    } else {
+      log.info("Application Instance {} already destroyed", clustername);
+    }
+    boolean deleted =
+      sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
+    if (!deleted) {
+      log.warn("Filesystem returned false from delete() operation");
+    }
+
+    // rm the registry entry —do not let this block the destroy operations
+    String registryPath = SliderRegistryUtils.registryPathForInstance(
+        clustername);
+    try {
+      getRegistryOperations().delete(registryPath, true);
+    } catch (IOException e) {
+      log.warn("Error deleting {}: {} ", registryPath, e, e);
+    } catch (SliderException e) {
+      log.warn("Error binding to registry {} ", e, e);
+    }
+
+    List<ApplicationReport> instances = findAllLiveInstances(clustername);
+    // detect any race leading to cluster creation during the check/destroy 
process
+    // and report a problem.
+    if (!instances.isEmpty()) {
+      throw new SliderException(EXIT_APPLICATION_IN_USE,
+                              clustername + ": "
+                              + E_DESTROY_CREATE_RACE_CONDITION
+                              + " :" +
+                              instances.get(0));
+    }
+    log.info("Destroyed cluster {}", clustername);
+    return EXIT_SUCCESS;
+  }
+  
+  @Override
+  public int actionAmSuicide(String clustername,
+      ActionAMSuicideArgs args) throws YarnException, IOException {
+    SliderClusterOperations clusterOperations =
+      createClusterOperations(clustername);
+    clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public AbstractClientProvider createClientProvider(String provider)
+    throws SliderException {
+    SliderProviderFactory factory =
+      SliderProviderFactory.createSliderProviderFactory(provider);
+    return factory.createClientProvider();
+  }
+
+  /**
+   * Create the cluster -saving the arguments to a specification file first
+   * @param clustername cluster name
+   * @return the status code
+   * @throws YarnException Yarn problems
+   * @throws IOException other problems
+   * @throws BadCommandArgumentsException bad arguments.
+   */
+  public int actionCreate(String clustername, ActionCreateArgs createArgs) 
throws
+                                               YarnException,
+                                               IOException {
+
+    actionBuild(clustername, createArgs);
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+        clustername, clusterDirectory);
+    try {
+      checkForCredentials(getConfig(), instanceDefinition.getAppConf());
+    } catch (Exception e) {
+      throw new IOException("problem setting credentials", e);
+    }
+    return startCluster(clustername, createArgs);
+  }
+
+  private void checkForCredentials(Configuration conf,
+      ConfTree tree) throws Exception {
+    if (tree.credentials == null || tree.credentials.size()==0) {
+      log.info("No credentials requested");
+      return;
+    }
+    CredentialShell credentialShell = null;
+    for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
+      String provider = cred.getKey();
+      List<String> aliases = cred.getValue();
+      if (aliases == null || aliases.size()==0) {
+        continue;
+      }
+      if (credentialShell == null) {
+        credentialShell = new CredentialShell();
+        credentialShell.setConf(conf);
+      }
+      Configuration c = new Configuration(conf);
+      c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
+      CredentialProvider credentialProvider =
+          CredentialProviderFactory.getProviders(c).get(0);
+      Set<String> existingAliases = new 
HashSet<String>(credentialProvider.getAliases());
+      for (String alias : aliases) {
+        if (existingAliases.contains(alias.toLowerCase())) {
+          log.warn("Skipping creation of credentials for {}, " +
+              "alias already exists in {}", alias, provider);
+          continue;
+        }
+        String[] csarg = new String[]{
+            "create", alias, "-provider", provider};
+        log.info("Creating credentials for {} in {}", alias, provider);
+        credentialShell.run(csarg);
+      }
+    }
+  }
+
+  @Override
+  public int actionBuild(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo) throws
+                                               YarnException,
+                                               IOException {
+
+    buildInstanceDefinition(clustername, buildInfo, false, false);
+    return EXIT_SUCCESS; 
+  }
+
+  @Override
+  public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
+      YarnException,
+      IOException {
+
+    Path srcFile = null;
+    if (StringUtils.isEmpty(installPkgInfo.name )) {
+      throw new BadCommandArgumentsException("A valid application type name is 
required (e.g. HBASE).");
+    }
+
+    if (StringUtils.isEmpty(installPkgInfo.packageURI)) {
+      throw new BadCommandArgumentsException("A valid application package 
location required.");
+    } else {
+      File pkgFile = new File(installPkgInfo.packageURI);
+      if (!pkgFile.exists() || pkgFile.isDirectory()) {
+        throw new BadCommandArgumentsException("Unable to access supplied pkg 
file at " +
+                                               pkgFile.getAbsolutePath());
+      } else {
+        srcFile = new Path(pkgFile.toURI());
+      }
+    }
+
+    Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name);
+    sliderFileSystem.getFileSystem().mkdirs(pkgPath);
+
+    Path fileInFs = new Path(pkgPath, srcFile.getName());
+    log.info("Installing package {} at {} and overwrite is {}.", srcFile, 
fileInFs, installPkgInfo.replacePkg);
+    if (sliderFileSystem.getFileSystem().exists(fileInFs) && 
!installPkgInfo.replacePkg) {
+      throw new BadCommandArgumentsException("Pkg exists at " +
+                                             fileInFs.toUri().toString() +
+                                             ". Use --replacepkg to 
overwrite.");
+    }
+
+    sliderFileSystem.getFileSystem().copyFromLocalFile(false, 
installPkgInfo.replacePkg, srcFile, fileInFs);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionUpdate(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo) throws
+      YarnException, IOException {
+    buildInstanceDefinition(clustername, buildInfo, true, true);
+    return EXIT_SUCCESS; 
+  }
+
+  /**
+   * Build up the AggregateConfiguration for an application instance then
+   * persists it
+   * @param clustername name of the cluster
+   * @param buildInfo the arguments needed to build the cluster
+   * @param overwrite true if existing cluster directory can be overwritten
+   * @param liveClusterAllowed true if live cluster can be modified
+   * @throws YarnException
+   * @throws IOException
+   */
+  
+  public void buildInstanceDefinition(String clustername,
+      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, boolean 
liveClusterAllowed)
+        throws YarnException, IOException {
+    // verify that a live cluster isn't there
+    SliderUtils.validateClusterName(clustername);
+    verifyBindingsDefined();
+    if (!liveClusterAllowed) {
+      verifyNoLiveClusters(clustername);
+    }
+
+    Configuration conf = getConfig();
+    String registryQuorum = lookupZKQuorum();
+
+    Path appconfdir = buildInfo.getConfdir();
+    // Provider
+    String providerName = buildInfo.getProvider();
+    requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
+    log.debug("Provider is {}", providerName);
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
+    AbstractClientProvider provider =
+      createClientProvider(providerName);
+    InstanceBuilder builder =
+      new InstanceBuilder(sliderFileSystem, 
+                          getConfig(),
+                          clustername);
+    
+    AggregateConf instanceDefinition = new AggregateConf();
+    ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
+    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+    ConfTreeOperations internal = instanceDefinition.getInternalOperations();
+    //initial definition is set by the providers 
+    sliderAM.prepareInstanceConfiguration(instanceDefinition);
+    provider.prepareInstanceConfiguration(instanceDefinition);
+
+    //load in any specified on the command line
+    if (buildInfo.resources != null) {
+      try {
+        resources.mergeFile(buildInfo.resources,
+                            new ResourcesInputPropertiesValidator());
+
+      } catch (IOException e) {
+        throw new BadConfigException(e,
+               "incorrect argument to %s: \"%s\" : %s ", 
+                                     Arguments.ARG_RESOURCES,
+                                     buildInfo.resources,
+                                     e.toString());
+      }
+    }
+    if (buildInfo.template != null) {
+      try {
+        appConf.mergeFile(buildInfo.template,
+                          new TemplateInputPropertiesValidator());
+      } catch (IOException e) {
+        throw new BadConfigException(e,
+                                     "incorrect argument to %s: \"%s\" : %s ",
+                                     Arguments.ARG_TEMPLATE,
+                                     buildInfo.template,
+                                     e.toString());
+      }
+    }
+
+    //get the command line options
+    ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
+    ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
+
+    appConf.merge(cmdLineAppOptions);
+
+    // put the role counts into the resources file
+    Map<String, String> argsRoleMap = buildInfo.getComponentMap();
+    for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
+      String count = roleEntry.getValue();
+      String key = roleEntry.getKey();
+      log.debug("{} => {}", key, count);
+      resources.getOrAddComponent(key)
+                 .put(ResourceKeys.COMPONENT_INSTANCES, count);
+    }
+
+    //all CLI role options
+    Map<String, Map<String, String>> appOptionMap =
+      buildInfo.getCompOptionMap();
+    appConf.mergeComponents(appOptionMap);
+
+    //internal picks up core. values only
+    internal.propagateGlobalKeys(appConf, "slider.");
+    internal.propagateGlobalKeys(appConf, "internal.");
+
+    //copy over role. and yarn. values ONLY to the resources
+    if (PROPAGATE_RESOURCE_OPTION) {
+      resources.propagateGlobalKeys(appConf, "component.");
+      resources.propagateGlobalKeys(appConf, "role.");
+      resources.propagateGlobalKeys(appConf, "yarn.");
+      resources.mergeComponentsPrefix(appOptionMap, "component.", true);
+      resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
+      resources.mergeComponentsPrefix(appOptionMap, "role.", true);
+    }
+
+    // resource component args
+    appConf.merge(cmdLineResourceOptions);
+    resources.merge(cmdLineResourceOptions);
+    resources.mergeComponents(buildInfo.getResourceCompOptionMap());
+
+    builder.init(providerName, instanceDefinition);
+    builder.propagateFilename();
+    builder.propagatePrincipals();
+    builder.setImageDetailsIfAvailable(buildInfo.getImage(),
+                                       buildInfo.getAppHomeDir());
+    builder.setQueue(buildInfo.queue);
+
+    String quorum = buildInfo.getZKhosts();
+    if (SliderUtils.isUnset(quorum)) {
+      quorum = registryQuorum;
+    }
+    if (isUnset(quorum)) {
+      throw new BadConfigException("No Zookeeper quorum defined");
+    }
+    ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
+        getUsername(),
+        clustername,
+        registryQuorum,
+        quorum);
+    String zookeeperRoot = buildInfo.getAppZKPath();
+
+    if (isSet(zookeeperRoot)) {
+      zkPaths.setAppPath(zookeeperRoot);
+    } else {
+      String createDefaultZkNode = 
appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
+      if (createDefaultZkNode.equals("true")) {
+        String defaultZKPath = createZookeeperNode(clustername, false);
+        log.info("ZK node created for application instance: {}.", 
defaultZKPath);
+        if (defaultZKPath != null) {
+          zkPaths.setAppPath(defaultZKPath);
+        }
+      } else {
+        // create AppPath if default is being used
+        String defaultZKPath = createZookeeperNode(clustername, true);
+        log.info("ZK node assigned to application instance: {}.", 
defaultZKPath);
+        zkPaths.setAppPath(defaultZKPath);
+      }
+    }
+
+    builder.addZKBinding(zkPaths);
+
+    //then propagate any package URI
+    if (buildInfo.packageURI != null) {
+      appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
+    }
+
+    // make any substitutions needed at this stage
+    replaceTokens(appConf.getConfTree(), getUsername(), clustername);
+
+    // providers to validate what there is
+    AggregateConf instanceDescription = builder.getInstanceDescription();
+    validateInstanceDefinition(sliderAM, instanceDescription, 
sliderFileSystem);
+    validateInstanceDefinition(provider, instanceDescription, 
sliderFileSystem);
+    try {
+      persistInstanceDefinition(overwrite, appconfdir, builder);
+    } catch (LockAcquireFailedException e) {
+      log.warn("Failed to get a Lock on {} : {}", builder, e);
+      throw new BadClusterStateException("Failed to save " + clustername
+                                         + ": " + e);
+    }
+  }
+
+  protected void persistInstanceDefinition(boolean overwrite,
+                                         Path appconfdir,
+                                         InstanceBuilder builder)
+      throws IOException, SliderException, LockAcquireFailedException {
+    builder.persist(appconfdir, overwrite);
+  }
+
+  @VisibleForTesting
+  public static void replaceTokens(ConfTree conf,
+      String userName, String clusterName) throws IOException {
+    Map<String,String> newglobal = new HashMap<String,String>();
+    for (Entry<String,String> entry : conf.global.entrySet()) {
+      newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
+          userName, clusterName));
+    }
+    conf.global.putAll(newglobal);
+
+    Map<String,List<String>> newcred = new HashMap<String,List<String>>();
+    for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
+      List<String> resultList = new ArrayList<String>();
+      for (String v : entry.getValue()) {
+        resultList.add(replaceTokens(v, userName, clusterName));
+      }
+      newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
+          resultList);
+    }
+    conf.credentials.clear();
+    conf.credentials.putAll(newcred);
+  }
+
+  private static String replaceTokens(String s, String userName,
+      String clusterName) throws IOException {
+    return s.replaceAll(Pattern.quote("${USER}"), userName)
+        .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName);
+  }
+
+  public FsPermission getClusterDirectoryPermissions(Configuration conf) {
+    String clusterDirPermsOct =
+      conf.get(CLUSTER_DIRECTORY_PERMISSIONS,
+          DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
+    return new FsPermission(clusterDirPermsOct);
+  }
+
+  /**
+   * Verify that the Resource MAnager is configured, if not fail
+   * with a useful error message
+   * @throws BadCommandArgumentsException the exception raised on an invalid 
config
+   */
+  public void verifyBindingsDefined() throws BadCommandArgumentsException {
+    InetSocketAddress rmAddr = SliderUtils.getRmAddress(getConfig());
+    if (!SliderUtils.isAddressDefined(rmAddr)) {
+      throw new BadCommandArgumentsException(
+        "No valid Resource Manager address provided in the argument "
+        + Arguments.ARG_MANAGER
+        + " or the configuration property "
+        + YarnConfiguration.RM_ADDRESS 
+        + " value :" + rmAddr);
+    }
+
+  }
+
+  /**
+   * Load and start a cluster specification.
+   * This assumes that all validation of args and cluster state
+   * have already taken place
+   *
+   * @param clustername name of the cluster.
+   * @param launchArgs launch arguments
+   * @return the exit code
+   * @throws YarnException
+   * @throws IOException
+   */
+  private int startCluster(String clustername,
+                           LaunchArgsAccessor launchArgs) throws
+                                                          YarnException,
+                                                          IOException {
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      clustername,
+      clusterDirectory);
+
+    LaunchedApplication launchedApplication =
+      launchApplication(clustername, clusterDirectory, instanceDefinition,
+                        serviceArgs.isDebug());
+    applicationId = launchedApplication.getApplicationId();
+
+    return waitForAppAccepted(launchedApplication, launchArgs.getWaittime());
+  }
+
+  /**
+   * Load the instance definition. It is not resolved at this point
+   * @param name cluster name
+   * @param clusterDirectory cluster dir
+   * @return the loaded configuration
+   * @throws IOException
+   * @throws SliderException
+   * @throws UnknownApplicationInstanceException if the file is not found
+   */
+  public AggregateConf loadInstanceDefinitionUnresolved(String name,
+                                                         Path 
clusterDirectory) throws
+                                                                      
IOException,
+      SliderException {
+
+    try {
+      AggregateConf definition =
+        InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
+                                                    clusterDirectory);
+      return definition;
+    } catch (FileNotFoundException e) {
+      throw UnknownApplicationInstanceException.unknownInstance(name, e);
+    }
+  }
+    /**
+   * Load the instance definition. 
+   * @param name cluster name
+   * @param resolved flag to indicate the cluster should be resolved
+   * @return the loaded configuration
+   * @throws IOException IO problems
+   * @throws SliderException slider explicit issues
+   * @throws UnknownApplicationInstanceException if the file is not found
+   */
+    public AggregateConf loadInstanceDefinition(String name,
+        boolean resolved) throws
+        IOException,
+        SliderException {
+
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      name,
+      clusterDirectory);
+    if (resolved) {
+      instanceDefinition.resolve();
+    }
+    return instanceDefinition;
+
+  }
+
+  /**
+   *
+   * @param clustername name of the cluster
+   * @param clusterDirectory cluster dir
+   * @param instanceDefinition the instance definition
+   * @param debugAM enable debug AM options
+   * @return the launched application
+   * @throws YarnException
+   * @throws IOException
+   */
+  public LaunchedApplication launchApplication(String clustername,
+                                               Path clusterDirectory,
+                                               AggregateConf 
instanceDefinition,
+                                               boolean debugAM)
+    throws YarnException, IOException {
+
+
+    deployedClusterName = clustername;
+    SliderUtils.validateClusterName(clustername);
+    verifyNoLiveClusters(clustername);
+    Configuration config = getConfig();
+    lookupZKQuorum();
+    boolean clusterSecure = SliderUtils.isHadoopClusterSecure(config);
+    //create the Slider AM provider -this helps set up the AM
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
+
+    instanceDefinition.resolve();
+    launchedInstanceDefinition = instanceDefinition;
+
+    ConfTreeOperations internalOperations =
+      instanceDefinition.getInternalOperations();
+    MapOperations internalOptions = internalOperations.getGlobalOptions();
+    ConfTreeOperations resourceOperations =
+      instanceDefinition.getResourceOperations();
+    ConfTreeOperations appOperations =
+      instanceDefinition.getAppConfOperations();
+    Path generatedConfDirPath =
+      createPathThatMustExist(internalOptions.getMandatoryOption(
+        InternalKeys.INTERNAL_GENERATED_CONF_PATH));
+    Path snapshotConfPath =
+      createPathThatMustExist(internalOptions.getMandatoryOption(
+        InternalKeys.INTERNAL_SNAPSHOT_CONF_PATH));
+
+
+    // cluster Provider
+    AbstractClientProvider provider = createClientProvider(
+      internalOptions.getMandatoryOption(
+        InternalKeys.INTERNAL_PROVIDER_NAME));
+    // make sure the conf dir is valid;
+    
+    if (log.isDebugEnabled()) {
+      log.debug(instanceDefinition.toString());
+    }
+    MapOperations sliderAMResourceComponent =
+      resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
+    MapOperations resourceGlobalOptions = 
resourceOperations.getGlobalOptions();
+
+    // add the tags if available
+    Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
+        appOperations.getGlobalOptions().get(AgentKeys.APP_DEF));
+    AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
+        SliderKeys.APP_TYPE,
+        config,
+        sliderFileSystem,
+        yarnClient,
+        clusterSecure,
+        sliderAMResourceComponent,
+        resourceGlobalOptions,
+        applicationTags);
+
+    ApplicationId appId = amLauncher.getApplicationId();
+    // set the application name;
+    amLauncher.setKeepContainersOverRestarts(true);
+
+    int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
+    amLauncher.setMaxAppAttempts(maxAppAttempts);
+
+    sliderFileSystem.purgeAppInstanceTempFiles(clustername);
+    Path tempPath = sliderFileSystem.createAppInstanceTempPath(
+        clustername,
+        appId.toString() + "/am");
+    String libdir = "lib";
+    Path libPath = new Path(tempPath, libdir);
+    sliderFileSystem.getFileSystem().mkdirs(libPath);
+    log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem.toString(),
+              tempPath, libPath);
+    // set local resources for the application master
+    // local files or archives as needed
+    // In this scenario, the jar file for the application master is part of 
the local resources
+    Map<String, LocalResource> localResources = amLauncher.getLocalResources();
+    // conf directory setup
+    Path remoteConfPath = null;
+    String relativeConfDir = null;
+    String confdirProp =
+      System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
+    if (confdirProp == null || confdirProp.isEmpty()) {
+      log.debug("No local configuration directory provided as system 
property");
+    } else {
+      File confDir = new File(confdirProp);
+      if (!confDir.exists()) {
+        throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
+                                     confDir);
+      }
+      Path localConfDirPath = SliderUtils.createLocalPath(confDir);
+      log.debug("Copying AM configuration data from {}", localConfDirPath);
+      remoteConfPath = new Path(clusterDirectory,
+                                    SliderKeys.SUBMITTED_CONF_DIR);
+      SliderUtils.copyDirectory(config, localConfDirPath, remoteConfPath,
+          null);
+    }
+    // the assumption here is that minimr cluster => this is a test run
+    // and the classpath can look after itself
+
+    boolean usingMiniMRCluster = getUsingMiniMRCluster();
+    if (!usingMiniMRCluster) {
+
+      log.debug("Destination is not a MiniYARNCluster -copying full 
classpath");
+
+      // insert conf dir first
+      if (remoteConfPath != null) {
+        relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
+        Map<String, LocalResource> submittedConfDir =
+          sliderFileSystem.submitDirectory(remoteConfPath,
+                                         relativeConfDir);
+        SliderUtils.mergeMaps(localResources, submittedConfDir);
+      }
+    }
+    // build up the configuration 
+    // IMPORTANT: it is only after this call that site configurations
+    // will be valid.
+
+    propagatePrincipals(config, instanceDefinition);
+    // validate security data
+/*
+    // turned off until tested
+    SecurityConfiguration securityConfiguration =
+        new SecurityConfiguration(config,
+            instanceDefinition, clustername);
+    
+*/
+    Configuration clientConfExtras = new Configuration(false);
+    // then build up the generated path.
+    FsPermission clusterPerms = getClusterDirectoryPermissions(config);
+    SliderUtils.copyDirectory(config, snapshotConfPath, generatedConfDirPath,
+        clusterPerms);
+
+
+    // standard AM resources
+    sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
+                                       config,
+                                       amLauncher,
+                                       instanceDefinition,
+                                       snapshotConfPath,
+                                       generatedConfDirPath,
+                                       clientConfExtras,
+                                       libdir,
+                                       tempPath,
+                                       usingMiniMRCluster);
+    //add provider-specific resources
+    provider.prepareAMAndConfigForLaunch(sliderFileSystem,
+                                         config,
+                                         amLauncher,
+                                         instanceDefinition,
+                                         snapshotConfPath,
+                                         generatedConfDirPath,
+                                         clientConfExtras,
+                                         libdir,
+                                         tempPath,
+                                         usingMiniMRCluster);
+
+    // now that the site config is fully generated, the provider gets
+    // to do a quick review of them.
+    log.debug("Preflight validation of cluster configuration");
+
+
+    sliderAM.preflightValidateClusterConfiguration(sliderFileSystem,
+                                                 clustername,
+                                                 config,
+                                                 instanceDefinition,
+                                                 clusterDirectory,
+                                                 generatedConfDirPath,
+                                                 clusterSecure
+                                                );
+
+    provider.preflightValidateClusterConfiguration(sliderFileSystem,
+                                                   clustername,
+                                                   config,
+                                                   instanceDefinition,
+                                                   clusterDirectory,
+                                                   generatedConfDirPath,
+                                                   clusterSecure
+                                                  );
+
+
+    // TODO: consider supporting apps that don't have an image path
+    Path imagePath =
+        SliderUtils.extractImagePath(sliderFileSystem, internalOptions);
+    if (sliderFileSystem.maybeAddImagePath(localResources, imagePath)) {
+      log.debug("Registered image path {}", imagePath);
+    }
+
+    // build the environment
+    amLauncher.putEnv(
+      SliderUtils.buildEnvMap(sliderAMResourceComponent));
+    ClasspathConstructor classpath = 
SliderUtils.buildClasspath(relativeConfDir,
+        libdir,
+        getConfig(),
+        usingMiniMRCluster);
+    amLauncher.setClasspath(classpath);
+    if (log.isDebugEnabled()) {
+      log.debug("AM classpath={}", classpath);
+      log.debug("Environment Map:\n{}",
+                SliderUtils.stringifyMap(amLauncher.getEnv()));
+      log.debug("Files in lib path\n{}", sliderFileSystem.listFSDir(libPath));
+    }
+
+    // rm address
+
+    InetSocketAddress rmSchedulerAddress;
+    try {
+      rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(config);
+    } catch (IllegalArgumentException e) {
+      throw new BadConfigException("%s Address invalid: %s",
+                                   YarnConfiguration.RM_SCHEDULER_ADDRESS,
+                                   config.get(
+                                     YarnConfiguration.RM_SCHEDULER_ADDRESS)
+      );
+
+    }
+    String rmAddr = NetUtils.getHostPortString(rmSchedulerAddress);
+
+    JavaCommandLineBuilder commandLine = new JavaCommandLineBuilder();
+    // insert any JVM options);
+    sliderAM.addJVMOptions(instanceDefinition, commandLine);
+    // enable asserts if the text option is set
+    commandLine.enableJavaAssertions();
+    // add the AM sevice entry point
+    commandLine.add(SliderAppMaster.SERVICE_CLASSNAME);
+
+    // create action and the cluster name
+    commandLine.add(ACTION_CREATE, clustername);
+
+    // debug
+    if (debugAM) {
+      commandLine.add(Arguments.ARG_DEBUG);
+    }
+
+    // set the cluster directory path
+    commandLine.add(Arguments.ARG_CLUSTER_URI, clusterDirectory.toUri());
+
+    if (!isUnset(rmAddr)) {
+      commandLine.add(Arguments.ARG_RM_ADDR, rmAddr);
+    }
+
+    if (serviceArgs.getFilesystemBinding() != null) {
+      commandLine.add(Arguments.ARG_FILESYSTEM, 
serviceArgs.getFilesystemBinding());
+    }
+    
+    addConfOptionToCLI(commandLine, config, REGISTRY_PATH,
+        DEFAULT_REGISTRY_PATH);
+    addMandatoryConfOptionToCLI(commandLine, config,
+        RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
+
+    if (clusterSecure) {
+      // if the cluster is secure, make sure that
+      // the relevant security settings go over
+/*
+      addConfOptionToCLI(commandLine, config, KEY_SECURITY);
+*/
+      addConfOptionToCLI(commandLine,
+          config,
+          DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
+    }
+    // write out the path output
+    commandLine.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
+
+    String cmdStr = commandLine.build();
+    log.debug("Completed setting up app master command {}", cmdStr);
+
+    amLauncher.addCommandLine(commandLine);
+
+    // the Slider AM gets to configure the AM requirements, not the custom 
provider
+    sliderAM.prepareAMResourceRequirements(sliderAMResourceComponent,
+        amLauncher.getResource());
+
+
+    // Set the priority for the application master
+
+    int amPriority = config.getInt(KEY_YARN_QUEUE_PRIORITY,
+                                   DEFAULT_YARN_QUEUE_PRIORITY);
+
+
+    amLauncher.setPriority(amPriority);
+
+    // Set the queue to which this application is to be submitted in the RM
+    // Queue for App master
+    String amQueue = config.get(KEY_YARN_QUEUE, DEFAULT_YARN_QUEUE);
+    String suppliedQueue = 
internalOperations.getGlobalOptions().get(InternalKeys.INTERNAL_QUEUE);
+    if(!SliderUtils.isUnset(suppliedQueue)) {
+      amQueue = suppliedQueue;
+      log.info("Using queue {} for the application instance.", amQueue);
+    }
+
+    if (amQueue != null) {
+      amLauncher.setQueue(amQueue);
+    }
+
+    // Submit the application to the applications manager
+    // SubmitApplicationResponse submitResp = 
applicationsManager.submitApplication(appRequest);
+    // Ignore the response as either a valid response object is returned on 
success
+    // or an exception thrown to denote some form of a failure
+    
+
+    // submit the application
+    LaunchedApplication launchedApplication = amLauncher.submitApplication();
+    return launchedApplication;
+  }
+  
+  
+  /**
+   * Wait for the launched app to be accepted
+   * @param waittime time in millis
+   * @return exit code
+   * @throws YarnException
+   * @throws IOException
+   */
+  public int waitForAppAccepted(LaunchedApplication launchedApplication, 
+                                int waittime) throws
+                                              YarnException,
+                                              IOException {
+    assert launchedApplication != null;
+    int exitCode;
+    // wait for the submit state to be reached
+    ApplicationReport report = launchedApplication.monitorAppToState(
+      YarnApplicationState.ACCEPTED,
+      new Duration(Constants.ACCEPT_TIME));
+
+
+    // may have failed, so check that
+    if (SliderUtils.hasAppFinished(report)) {
+      exitCode = buildExitCode(report);
+    } else {
+      // exit unless there is a wait
+      exitCode = EXIT_SUCCESS;
+
+      if (waittime != 0) {
+        // waiting for state to change
+        Duration duration = new Duration(waittime * 1000);
+        duration.start();
+        report = launchedApplication.monitorAppToState(
+          YarnApplicationState.RUNNING, duration);
+        if (report != null &&
+            report.getYarnApplicationState() == YarnApplicationState.RUNNING) {
+          exitCode = EXIT_SUCCESS;
+        } else {
+
+          launchedApplication.kill("");
+          exitCode = buildExitCode(report);
+        }
+      }
+    }
+    return exitCode;
+  }
+
+
+  /**
+   * Propagate any critical principals from the current site config down to 
the HBase one.
+   * @param config config to read from
+   * @param clusterSpec cluster spec
+   */
+  private void propagatePrincipals(Configuration config,
+                                   AggregateConf clusterSpec) {
+    String dfsPrincipal = config.get(
+        DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
+    if (dfsPrincipal != null) {
+      String siteDfsPrincipal = OptionKeys.SITE_XML_PREFIX +
+                                
DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+      clusterSpec.getAppConfOperations().getGlobalOptions().putIfUnset(
+        siteDfsPrincipal,
+        dfsPrincipal);
+    }
+  }
+
+
+  private boolean addConfOptionToCLI(CommandLineBuilder cmdLine,
+      Configuration conf,
+      String key) {
+    String val = conf.get(key);
+    return defineIfSet(cmdLine, key, val);
+  }
+
+  private String addConfOptionToCLI(CommandLineBuilder cmdLine,
+      Configuration conf,
+      String key,
+      String defVal) {
+    String val = conf.get(key, defVal);
+    define(cmdLine, key, val);
+    return val;
+  }
+
+  /**
+   * Add a <code>-D key=val</code> command to the CLI
+   * @param cmdLine command line
+   * @param key key
+   * @param val value
+   */
+  private void define(CommandLineBuilder cmdLine, String key, String val) {
+    Preconditions.checkArgument(key != null, "null key");
+    Preconditions.checkArgument(val != null, "null value");
+    cmdLine.add(Arguments.ARG_DEFINE, key + "=" + val);
+  }
+
+  /**
+   * Add a <code>-D key=val</code> command to the CLI if <code>val</code>
+   * is not null
+   * @param cmdLine command line
+   * @param key key
+   * @param val value
+   */
+  private boolean defineIfSet(CommandLineBuilder cmdLine, String key, String 
val) {
+    Preconditions.checkArgument(key != null, "null key");
+    if (val != null) {
+      define(cmdLine, key, val);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private void addMandatoryConfOptionToCLI(CommandLineBuilder cmdLine,
+      Configuration conf,
+      String key) throws BadConfigException {
+    if (!addConfOptionToCLI(cmdLine, conf, key)) {
+      throw new BadConfigException("Missing configuration option: " + key);
+    }
+  }
+  
+  /**
+   * Create a path that must exist in the cluster fs
+   * @param uri uri to create
+   * @return the path
+   * @throws FileNotFoundException if the path does not exist
+   */
+  public Path createPathThatMustExist(String uri) throws
+      SliderException,
+                                                  IOException {
+    return sliderFileSystem.createPathThatMustExist(uri);
+  }
+
+  /**
+   * verify that a live cluster isn't there
+   * @param clustername cluster name
+   * @throws SliderException with exit code EXIT_CLUSTER_LIVE
+   * if a cluster of that name is either live or starting up.
+   */
+  public void verifyNoLiveClusters(String clustername) throws
+                                                       IOException,
+                                                       YarnException {
+    List<ApplicationReport> existing = findAllLiveInstances(clustername);
+
+    if (!existing.isEmpty()) {
+      throw new SliderException(EXIT_APPLICATION_IN_USE,
+                              clustername + ": " + E_CLUSTER_RUNNING + " :" +
+                              existing.get(0));
+    }
+  }
+
+  public String getUsername() throws IOException {
+    return RegistryUtils.currentUser();
+  }
+
+  /**
+   * Get the name of any deployed cluster
+   * @return the cluster name
+   */
+  public String getDeployedClusterName() {
+    return deployedClusterName;
+  }
+
+  @VisibleForTesting
+  public void setDeployedClusterName(String deployedClusterName) {
+    this.deployedClusterName = deployedClusterName;
+  }
+
+  /**
+   * ask if the client is using a mini MR cluster
+   * @return true if they are
+   */
+  private boolean getUsingMiniMRCluster() {
+    return getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
+        false);
+  }
+
+  /**
+   * Get the application name used in the zookeeper root paths
+   * @return an application-specific path in ZK
+   */
+  private String getAppName() {
+    return "slider";
+  }
+
+  /**
+   * Wait for the app to start running (or go past that state)
+   * @param duration time to wait
+   * @return the app report; null if the duration turned out
+   * @throws YarnException YARN or app issues
+   * @throws IOException IO problems
+   */
+  @VisibleForTesting
+  public ApplicationReport monitorAppToRunning(Duration duration)
+    throws YarnException, IOException {
+    return monitorAppToState(YarnApplicationState.RUNNING, duration);
+  }
+
+  /**
+   * Build an exit code for an application Id and its report.
+   * If the report parameter is null, the app is killed
+   * @param report report
+   * @return the exit code
+   */
+  private int buildExitCode(ApplicationReport report) throws
+                                                      IOException,
+                                                      YarnException {
+    if (null == report) {
+      forceKillApplication("Reached client specified timeout for application");
+      return EXIT_TIMED_OUT;
+    }
+
+    YarnApplicationState state = report.getYarnApplicationState();
+    FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+    switch (state) {
+      case FINISHED:
+        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+          log.info("Application has completed successfully");
+          return EXIT_SUCCESS;
+        } else {
+          log.info("Application finished unsuccessfully." +
+                   "YarnState = {}, DSFinalStatus = {} Breaking monitoring 
loop",
+                   state, dsStatus);
+          return EXIT_YARN_SERVICE_FINISHED_WITH_ERROR;
+        }
+
+      case KILLED:
+        log.info("Application did not finish. YarnState={}, DSFinalStatus={}",
+                 state, dsStatus);
+        return EXIT_YARN_SERVICE_KILLED;
+
+      case FAILED:
+        log.info("Application Failed. YarnState={}, DSFinalStatus={}", state,
+                 dsStatus);
+        return EXIT_YARN_SERVICE_FAILED;
+      default:
+        //not in any of these states
+        return EXIT_SUCCESS;
+    }
+  }
+
+  /**
+   * Monitor the submitted application for reaching the requested state.
+   * Will also report if the app reaches a later state (failed, killed, etc)
+   * Kill application if duration!= null & time expires. 
+   * Prerequisite: the applicatin was launched.
+   * @param desiredState desired state.
+   * @param duration how long to wait -must be more than 0
+   * @return the application report -null on a timeout
+   * @throws YarnException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public ApplicationReport monitorAppToState(
+    YarnApplicationState desiredState,
+    Duration duration)
+    throws YarnException, IOException {
+    LaunchedApplication launchedApplication =
+      new LaunchedApplication(applicationId, yarnClient);
+    return launchedApplication.monitorAppToState(desiredState, duration);
+  }
+
+  @Override
+  public ApplicationReport getApplicationReport() throws
+                                                  IOException,
+                                                  YarnException {
+    return getApplicationReport(applicationId);
+  }
+
+  @Override
+  public boolean forceKillApplication(String reason)
+    throws YarnException, IOException {
+    if (applicationId != null) {
+      new LaunchedApplication(applicationId, yarnClient).forceKill(reason);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * List Slider instances belonging to a specific user
+   * @param user user: "" means all users
+   * @return a possibly empty list of Slider AMs
+   */
+  @VisibleForTesting
+  public List<ApplicationReport> listSliderInstances(String user)
+    throws YarnException, IOException {
+    return YarnAppListClient.listInstances();
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionList(String clustername) throws IOException, YarnException {
+    verifyBindingsDefined();
+
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    List<ApplicationReport> instances = listSliderInstances(user);
+
+    if (isUnset(clustername)) {
+      log.info("Instances for {}: {}",
+               (user != null ? user : "all users"),
+               instances.size());
+      for (ApplicationReport report : instances) {
+        logAppReport(report);
+      }
+      return EXIT_SUCCESS;
+    } else {
+      SliderUtils.validateClusterName(clustername);
+      log.debug("Listing cluster named {}", clustername);
+      ApplicationReport report =
+        findClusterInInstanceList(instances, clustername);
+      if (report != null) {
+        logAppReport(report);
+        return EXIT_SUCCESS;
+      } else {
+        throw unknownClusterException(clustername);
+      }
+    }
+  }
+
+  /**
+   * Log the application report at INFO
+   * @param report report to log
+   */
+  public void logAppReport(ApplicationReport report) {
+    log.info(SliderUtils.appReportToString(report, "\n"));
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionFlex(String name, ActionFlexArgs args) throws 
YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(name);
+    log.debug("actionFlex({})", name);
+    Map<String, Integer> roleInstances = new HashMap<String, Integer>();
+    Map<String, String> roleMap = args.getComponentMap();
+    for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
+      String key = roleEntry.getKey();
+      String val = roleEntry.getValue();
+      try {
+        roleInstances.put(key, Integer.valueOf(val));
+      } catch (NumberFormatException e) {
+        throw new BadCommandArgumentsException("Requested count of role %s" +
+                                               " is not a number: \"%s\"",
+                                               key, val);
+      }
+    }
+    return flex(name, roleInstances);
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionExists(String name, boolean checkLive) throws 
YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(name);
+    log.debug("actionExists({}, {})", name, checkLive);
+
+    //initial probe for a cluster in the filesystem
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
+    if (!sliderFileSystem.getFileSystem().exists(clusterDirectory)) {
+      throw unknownClusterException(name);
+    }
+    
+    //test for liveness if desired
+
+    if (checkLive) {
+      ApplicationReport instance = findInstance(name);
+      if (instance == null) {
+        log.info("Cluster {} not running", name);
+        return EXIT_FALSE;
+      } else {
+        // the app exists, but it may be in a terminated state
+        SliderUtils.OnDemandReportStringifier report =
+          new SliderUtils.OnDemandReportStringifier(instance);
+        YarnApplicationState state =
+          instance.getYarnApplicationState();
+        if (state.ordinal() >= YarnApplicationState.FINISHED.ordinal()) {
+          //cluster in the list of apps but not running
+          log.info("Cluster {} found but is in state {}", name, state);
+          log.debug("State {}", report);
+          return EXIT_FALSE;
+        }
+        log.info("Cluster {} is live:\n{}", name, report);
+      }
+    } else {
+      log.info("Cluster {} exists", name);
+    }
+    return EXIT_SUCCESS;
+  }
+
+
+  @Override
+  public int actionKillContainer(String name,
+      ActionKillContainerArgs args) throws YarnException, IOException {
+    String id = args.id;
+    if (SliderUtils.isUnset(id)) {
+      throw new BadCommandArgumentsException("Missing container id");
+    }
+    log.info("killingContainer {}:{}", name, id);
+    SliderClusterOperations clusterOps =
+      new SliderClusterOperations(bondToCluster(name));
+    try {
+      clusterOps.killContainer(id);
+    } catch (NoSuchNodeException e) {
+      throw new BadClusterStateException("Container %s not found in cluster 
%s",
+                                         id, name);
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public String actionEcho(String name, ActionEchoArgs args) throws
+                                                             YarnException,
+                                                             IOException {
+    String message = args.message;
+    if (message == null) {
+      throw new BadCommandArgumentsException("missing message");
+    }
+    SliderClusterOperations clusterOps =
+      new SliderClusterOperations(bondToCluster(name));
+    return clusterOps.echo(message);
+  }
+
+  /**
+   * Get at the service registry operations
+   * @return registry client -valid after the service is inited.
+   */
+  public YarnAppListClient getYarnAppListClient() {
+    return YarnAppListClient;
+  }
+
+  /**
+   * Find an instance of an application belonging to the current user
+   * @param appname application name
+   * @return the app report or null if none is found
+   * @throws YarnException YARN issues
+   * @throws IOException IO problems
+   */
+  private ApplicationReport findInstance(String appname)
+      throws YarnException, IOException {
+    return YarnAppListClient.findInstance(appname);
+  }
+  
+  private RunningApplication findApplication(String appname)
+      throws YarnException, IOException {
+    ApplicationReport applicationReport = findInstance(appname);
+    return applicationReport != null ? new RunningApplication(yarnClient, 
applicationReport): null; 
+      
+  }
+
+  /**
+   * find all live instances of a specific app -if there is >1 in the cluster,
+   * this returns them all. State should be running or less
+   * @param appname application name
+   * @return the list of all matching application instances
+   */
+  private List<ApplicationReport> findAllLiveInstances(String appname)
+    throws YarnException, IOException {
+    
+    return YarnAppListClient.findAllLiveInstances(appname);
+  }
+
+
+  public ApplicationReport findClusterInInstanceList(List<ApplicationReport> 
instances,
+                                                     String appname) {
+    return yarnClient.findClusterInInstanceList(instances, appname);
+  }
+
+  /**
+   * Connect to a Slider AM
+   * @param app application report providing the details on the application
+   * @return an instance
+   * @throws YarnException
+   * @throws IOException
+   */
+  private SliderClusterProtocol connect(ApplicationReport app)
+      throws YarnException, IOException {
+
+    try {
+      return RpcBinder.getProxy(getConfig(),
+                                yarnClient.getRmClient(),
+                                app,
+                                Constants.CONNECT_TIMEOUT,
+                                Constants.RPC_TIMEOUT);
+    } catch (InterruptedException e) {
+      throw new SliderException(SliderExitCodes.EXIT_TIMED_OUT,
+                              e,
+                              "Interrupted waiting for communications with the 
Slider AM");
+    }
+  }
+
+  @Override
+  @VisibleForTesting
+  public int actionStatus(String clustername, ActionStatusArgs statusArgs) 
throws
+                                              YarnException,
+                                              IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(clustername);
+    String outfile = statusArgs.getOutput();
+    ClusterDescription status = getClusterDescription(clustername);
+    String text = status.toJsonString();
+    if (outfile == null) {
+      log.info(text);
+    } else {
+      status.save(new File(outfile).getAbsoluteFile());
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionVersion() {
+    SliderVersionInfo.loadAndPrintVersionInfo(log);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionFreeze(String clustername,
+      ActionFreezeArgs freezeArgs) throws YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(clustername);
+    int waittime = freezeArgs.getWaittime();
+    String text = freezeArgs.message;
+    boolean forcekill = freezeArgs.force;
+    log.debug("actionFreeze({}, reason={}, wait={}, force={})", clustername,
+              text,
+              waittime,
+              forcekill);
+    
+    //is this actually a known cluster?
+    sliderFileSystem.locateInstanceDefinition(clustername);
+    ApplicationReport app = findInstance(clustername);
+    if (app == null) {
+      // exit early
+      log.info("Cluster {} not running", clustername);
+      // not an error to stop a stopped cluster
+      return EXIT_SUCCESS;
+    }
+    log.debug("App to stop was found: {}:\n{}", clustername,
+              new SliderUtils.OnDemandReportStringifier(app));
+    if (app.getYarnApplicationState().ordinal() >=
+        YarnApplicationState.FINISHED.ordinal()) {
+      log.info("Cluster {} is a terminated state {}", clustername,
+               app.getYarnApplicationState());
+      return EXIT_SUCCESS;
+    }
+    LaunchedApplication application = new LaunchedApplication(yarnClient, app);
+    applicationId = application.getApplicationId();
+    
+    if (forcekill) {
+      //escalating to forced kill
+      application.kill("Forced stop of " + clustername +
+                       ": " + text);
+    } else {
+      try {
+        SliderClusterProtocol appMaster = connect(app);
+        Messages.StopClusterRequestProto r =
+          Messages.StopClusterRequestProto
+                  .newBuilder()
+                  .setMessage(text)
+                  .build();
+        appMaster.stopCluster(r);
+
+        log.debug("Cluster stop command issued");
+
+      } catch (YarnException e) {
+        log.warn("Exception while trying to terminate {}: {}", clustername, e);
+        return EXIT_FALSE;
+      } catch (IOException e) {
+        log.warn("Exception while trying to terminate {}: {}", clustername, e);
+        return EXIT_FALSE;
+      }
+    }
+
+    //wait for completion. We don't currently return an exception during this 
process
+    //as the stop operation has been issued, this is just YARN.
+    try {
+      if (waittime > 0) {
+        ApplicationReport applicationReport =
+          application.monitorAppToState(YarnApplicationState.FINISHED,
+                                        new Duration(waittime * 1000));
+        if (applicationReport == null) {
+          log.info("application did not shut down in time");
+          return EXIT_FALSE;
+        }
+      }
+
+// JDK7    } catch (YarnException | IOException e) {
+    } catch (YarnException e) {
+      log.warn("Exception while waiting for the cluster {} to shut down: {}",
+               clustername, e);
+    } catch ( IOException e) {
+      log.warn("Exception while waiting for the cluster {} to shut down: {}",
+               clustername, e);
+    }
+
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionThaw(String clustername, ActionThawArgs thaw) throws 
YarnException, IOException {
+    SliderUtils.validateClusterName(clustername);
+    // see if it is actually running and bail out;
+    verifyBindingsDefined();
+    verifyNoLiveClusters(clustername);
+
+
+    //start the cluster
+    return startCluster(clustername, thaw);
+  }
+
+  /**
+   * Implement flexing
+   * @param clustername name of the cluster
+   * @param roleInstances map of new role instances
+   * @return EXIT_SUCCESS if the #of nodes in a live cluster changed
+   * @throws YarnException
+   * @throws IOException
+   */
+  public int flex(String clustername, Map<String, Integer> roleInstances)
+      throws YarnException, IOException {
+    verifyBindingsDefined();
+    SliderUtils.validateClusterName(clustername);
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
+      clustername,
+      clusterDirectory);
+
+    ConfTreeOperations resources =
+      instanceDefinition.getResourceOperations();
+    for (Map.Entry<String, Integer> entry : roleInstances.entrySet()) {
+      String role = entry.getKey();
+      int count = entry.getValue();
+      resources.getOrAddComponent(role).put(ResourceKeys.COMPONENT_INSTANCES,
+                                            Integer.toString(count));
+
+      log.debug("Flexed cluster specification ( {} -> {}) : \n{}",
+                role,
+                count,
+                resources);
+    }
+    SliderAMClientProvider sliderAM = new SliderAMClientProvider(getConfig());
+    AbstractClientProvider provider = createClientProvider(
+        
instanceDefinition.getInternalOperations().getGlobalOptions().getMandatoryOption(
+            InternalKeys.INTERNAL_PROVIDER_NAME));
+    // slider provider to validate what there is
+    validateInstanceDefinition(sliderAM, instanceDefinition, sliderFileSystem);
+    validateInstanceDefinition(provider, instanceDefinition, sliderFileSystem);
+
+    int exitCode = EXIT_FALSE;
+    // save the specification
+    try {
+      InstanceIO.updateInstanceDefinition(sliderFileSystem, clusterDirectory, 
instanceDefinition);
+    } catch (LockAcquireFailedException e) {
+      // lock failure
+      log.debug("Failed to lock dir {}", clusterDirectory, e);
+      log.warn("Failed to save new resource definition to {} : {}", 
clusterDirectory, e);
+    }
+
+    // now see if it is actually running and tell it about the update if it is
+    ApplicationReport instance = findInstance(clustername);
+    if (instance != null) {
+      log.info("Flexing running cluster");
+      SliderClusterProtocol appMaster = connect(instance);
+      SliderClusterOperations clusterOps = new 
SliderClusterOperations(appMaster);
+      clusterOps.flex(instanceDefinition.getResources());
+      log.info("application instance size updated");
+      exitCode = EXIT_SUCCESS;
+    } else {
+      log.info("No running instance to update");
+    }
+    return exitCode;
+  }
+
+  /**
+   * Validate an instance definition against a provider.
+   * @param provider the provider performing the validation
+   * @param instanceDefinition the instance definition
+   * @throws SliderException if invalid.
+   */
+  protected void validateInstanceDefinition(AbstractClientProvider provider,
+      AggregateConf instanceDefinition, SliderFileSystem fs) throws 
SliderException {
+    try {
+      provider.validateInstanceDefinition(instanceDefinition, fs);
+    } catch (SliderException e) {
+      //problem, reject it
+      log.info("Error {} validating application instance definition ", 
e.getMessage());
+      log.debug("Error validating application instance definition ", e);
+      log.info(instanceDefinition.toString());
+      throw e;
+    }
+  }
+
+
+  /**
+   * Load the persistent cluster description
+   * @param clustername name of the cluster
+   * @return the description in the filesystem
+   * @throws IOException any problems loading -including a missing file
+   */
+  @VisibleForTesting
+  public AggregateConf loadPersistedClusterDescription(String clustername)
+      throws IOException, SliderException, LockAcquireFailedException {
+    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
+    ConfPersister persister = new ConfPersister(sliderFileSystem, 
clusterDirectory);
+    AggregateConf instanceDescription = new AggregateConf();
+    persister.load(instanceDescription);
+    return instanceDescription;
+  }
+
+    /**
+     * Connect to a live cluster and get its current state
+     * @param clustername the cluster name
+     * @return its description
+     */
+  @VisibleForTesting
+  public ClusterDescription getClusterDescription(String clustername) throws
+                                                                 YarnException,
+                                                                 IOException {
+    SliderClusterOperations clusterOperations =
+      createClusterOperations(clustername);
+    return clusterOperations.getClusterDescription();
+  }
+
+  /**
+   * Connect to the cluster and get its current state
+   * @return its description
+   */
+  @VisibleForTesting
+  public ClusterDescription getClusterDescription() throws
+                                               YarnException,
+                                               IOException {
+    return getClusterDescription(getDeployedClusterName());
+  }
+
+  /**
+   * List all node UUIDs in a role
+   * @param role role name or "" for all
+   * @return an array of UUID strings
+   * @throws IOException
+   * @throws YarnException
+   */
+  @VisibleForTesting
+  public String[] listNodeUUIDsByRole(String role) throws
+                                               IOException,
+                                               YarnException {
+    return createClusterOperations()
+              .listNodeUUIDsByRole(role);
+  }
+
+  /**
+   * List all nodes in a role. This is a double round trip: once to list
+   * the nodes in a role, another to get their details
+   * @param role component/role to look for
+   * @return an array of ContainerNode instances
+   * @throws IOException
+   * @throws YarnException
+   */
+  @VisibleForTesting
+  public List<ClusterNode> listClusterNodesInRole(String role) throws
+                                               IOException,
+                                               YarnException {
+    return createClusterOperations().listClusterNodesInRole(role);
+  }
+
+  /**
+   * Get the details on a list of uuids
+   * @param uuids uuids to ask for 
+   * @return a possibly empty list of node details
+   * @throws IOException
+   * @throws YarnException
+   */
+  @VisibleForTesting
+  public List<ClusterNode> listClusterNodes(String[] uuids) throws
+                                               IOException,
+                                               YarnException {
+
+    if (uuids.length == 0) {
+      // short cut on an empty list
+      return new LinkedList<ClusterNode>();
+    }
+    return createClusterOperations().listClusterNodes(uuids);
+  }
+
+  /**
+   * Get a node from the AM
+   * @param uuid uuid of node
+   * @return deserialized node
+   * @throws IOException IO problems
+   * @throws NoSuchNodeException if the node isn't found
+   */
+  @VisibleForTesting
+  public ClusterNode getNode(String uuid) throws IOException, YarnException {
+    return createClusterOperations().getNode(uuid);
+  }
+  
+  /**
+   * Get the instance definition from the far end
+   */
+  @VisibleForTesting
+  public AggregateConf getLiveInstanceDefinition() throws IOException, 
YarnException {
+    return createClusterOperations().getInstanceDefinition();
+  }
+
+  /**
+   * Bond to a running cluster
+   * @param clustername cluster name
+   * @return the AM RPC client
+   * @throws SliderException if the cluster is unkown
+   */
+  private SliderClusterProtocol bondToCluster(String clustername) throws
+                                                                  
YarnException,
+                                                                  IOException {
+    verifyBindingsDefined();
+    if (clustername == null) {
+      throw unknownClusterException("(undefined)");
+    }
+    ApplicationReport instance = findInstance(clustername);
+    if (null == instance) {
+      throw unknownClusterException(clustername);
+    }
+    return connect(instance);
+  }
+
+  /**
+   * Create a cluster operations instance against a given cluster
+   * @param clustername cluster name
+   * @return a bonded cluster operations instance
+   * @throws YarnException YARN issues
+   * @throws IOException IO problems
+   */
+  private SliderClusterOperations createClusterOperations(String clustername) 
throws
+                                                                            
YarnException,
+                                                                            
IOException {
+    SliderClusterProtocol sliderAM = bondToCluster(clustername);
+    return new SliderClusterOperations(sliderAM);
+  }
+
+  /**
+   * Create a cluster operations instance against the active cluster
+   * -returning any previous created one if held.
+   * @return a bonded cluster operations instance
+   * @throws YarnException YARN issues
+   * @throws IOException IO problems
+   */
+  public SliderClusterOperations createClusterOperations() throws
+                                                         YarnException,
+                                                         IOException {
+    if (sliderClusterOperations == null) {
+      sliderClusterOperations =
+        createClusterOperations(getDeployedClusterName());
+    }
+    return sliderClusterOperations;
+  }
+
+  /**
+   * Wait for an instance of a named role to be live (or past it in the 
lifecycle)
+   * @param role role to look for
+   * @param timeout time to wait
+   * @return the state. If still in CREATED, the cluster didn't come up
+   * in the time period. If LIVE, all is well. If >LIVE, it has shut for a 
reason
+   * @throws IOException IO
+   * @throws SliderException Slider
+   * @throws WaitTimeoutException if the wait timed out
+   */
+  @VisibleForTesting
+  public int waitForRoleInstanceLive(String role, long timeout)
+    throws WaitTimeoutException, IOException, YarnException {
+    return createClusterOperations().waitForRoleInstanceLive(role, timeout);
+  }
+
+  /**
+   * Generate an exception for an unknown cluster
+   * @param clustername cluster name
+   * @return an exception with text and a relevant exit code
+   */
+  public UnknownApplicationInstanceException unknownClusterException(String 
clustername) {
+    return UnknownApplicationInstanceException.unknownInstance(clustername);
+  }
+
+  @Override
+  public String toString() {
+    return "Slider Client in state " + getServiceState()
+           + " and Slider Application Instance " + deployedClusterName;
+  }
+
+  /**
+   * Get all YARN applications
+   * @return a possibly empty list
+   * @throws YarnException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public List<ApplicationReport> getApplications() throws YarnException, 
IOException {
+    return yarnClient.getApplications();
+  }
+
+  @VisibleForTesting
+  public ApplicationReport getApplicationReport(ApplicationId appId)
+    throws YarnException, IOException {
+    return new LaunchedApplication(appId, yarnClient).getApplicationReport();
+
+  }
+
+  /**
+   * The configuration used for deployment (after resolution)
+   * @return
+   */
+  @VisibleForTesting
+  public AggregateConf getLaunchedInstanceDefinition() {
+    return launchedInstanceDefinition;
+  }
+
+
+  @Override
+  public int actionResolve(ActionResolveArgs args)
+      throws YarnException, IOException {
+    // as this is an API entry point, validate
+    // the arguments
+    args.validate();
+    RegistryOperations operations = getRegistryOperations();
+    String path = SliderRegistryUtils.resolvePath(args.path);
+    ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal();
+    try {
+      if (args.list) {
+        File destDir = args.destdir;
+        if (destDir != null) {
+          destDir.mkdirs();
+        }
+
+        Map<String, ServiceRecord> recordMap =
+            listServiceRecords(operations, path);
+
+        for (Entry<String, ServiceRecord> recordEntry : recordMap.entrySet()) {
+          String name = recordEntry.getKey();
+          ServiceRecord instance = recordEntry.getValue();
+          String json = serviceRecordMarshal.toJson(instance);
+          if (destDir == null) {
+            print(name);
+            print(json);
+          } else {
+            String filename = RegistryPathUtils.lastPathEntry(name) + ".json";
+            File jsonFile = new File(destDir, filename);
+            SliderUtils.write(jsonFile,
+                serviceRecordMarshal.toBytes(instance), true);
+          }
+        }
+      } else  {
+        // resolve single entry
+        ServiceRecord instance = resolve(path);
+        File outFile = args.out;
+        if (args.destdir != null) {
+          outFile = new File(args.destdir, 
RegistryPathUtils.lastPathEntry(path));
+        }
+        if (outFile != null) {
+          SliderUtils.write(outFile, serviceRecordMarshal.toBytes(instance), 
true);
+        } else {
+          print(serviceRecordMarshal.toJson(instance));
+        }
+      }
+//      TODO JDK7
+    } catch (PathNotFoundException e) {
+      // no record at this path
+      return EXIT_NOT_FOUND;
+    } catch (NoRecordException e) {
+      return EXIT_NOT_FOUND;
+    } catch (UnknownApplicationInstanceException e) {
+      return EXIT_NOT_FOUND;
+    } catch (InvalidRecordException e) {
+      // it is not a record
+      log.error("{}", e);
+      log.debug("{}", e, e);
+      return EXIT_EXCEPTION_THROWN;
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionRegistry(ActionRegistryArgs registryArgs) throws
+      YarnException,
+      IOException {
+    // as this is also a test entry point, validate
+    // the arguments
+    registryArgs.validate();
+    try {
+      if (registryArgs.list) {
+        actionRegistryList(registryArgs);
+      } else if (registryArgs.listConf) {
+        // list the configurations
+        actionRegistryListConfigsYarn(registryArgs);
+      } else if (SliderUtils.isSet(registryArgs.getConf)) {
+        // get a configuration
+        PublishedConfiguration publishedConfiguration =
+            actionRegistryGetConfig(registryArgs);
+        outputConfig(publishedConfiguration, registryArgs);
+      } else {
+        // it's an unknown command
+        log.info(ActionRegistryArgs.USAGE);
+        return EXIT_USAGE;
+      }
+//      JDK7
+    } catch (FileNotFoundException e) {
+      log.info("{}", e);
+      log.debug("{}", e, e);
+      return EXIT_NOT_FOUND;
+    } catch (PathNotFoundException e) {
+      log.info("{}", e);
+      log.debug("{}", e, e);
+      return EXIT_NOT_FOUND;
+    }
+    return EXIT_SUCCESS;
+  }
+
+  /**
+   * Registry operation
+   *
+   * @param registryArgs registry Arguments
+   * @return the instances (for tests)
+   * @throws YarnException YARN problems
+   * @throws IOException Network or other problems
+   */
+  @VisibleForTesting
+  public Collection<ServiceRecord> actionRegistryList(
+      ActionRegistryArgs registryArgs)
+      throws YarnException, IOException {
+    String serviceType = registryArgs.serviceType;
+    String name = registryArgs.name;
+    RegistryOperations operations = getRegistryOperations();
+    Collection<ServiceRecord> serviceRecords;
+    if (StringUtils.isEmpty(name)) {
+      String path =
+          serviceclassPath(
+              currentUser(),
+              serviceType);
+
+      try {
+        Map<String, ServiceRecord> recordMap =
+            listS

<TRUNCATED>

Reply via email to