http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java 
b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
deleted file mode 100644
index fd3647d..0000000
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ /dev/null
@@ -1,4572 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.slider.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathNotFoundException;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.registry.client.exceptions.NoRecordException;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.RegistryPathStatus;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.KerberosDiags;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.api.ClusterNode;
-import org.apache.slider.api.SliderApplicationApi;
-import org.apache.slider.api.SliderClusterProtocol;
-import org.apache.slider.api.StateValues;
-import org.apache.slider.api.proto.Messages;
-import org.apache.slider.api.types.ContainerInformation;
-import org.apache.slider.api.types.NodeInformationList;
-import org.apache.slider.api.types.SliderInstanceDescription;
-import org.apache.slider.client.ipc.SliderApplicationIpcClient;
-import org.apache.slider.client.ipc.SliderClusterOperations;
-import org.apache.slider.common.Constants;
-import org.apache.slider.common.SliderExitCodes;
-import org.apache.slider.common.SliderKeys;
-import org.apache.slider.common.params.AbstractActionArgs;
-import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
-import org.apache.slider.common.params.ActionAMSuicideArgs;
-import org.apache.slider.common.params.ActionClientArgs;
-import org.apache.slider.common.params.ActionCreateArgs;
-import org.apache.slider.common.params.ActionDependencyArgs;
-import org.apache.slider.common.params.ActionDestroyArgs;
-import org.apache.slider.common.params.ActionDiagnosticArgs;
-import org.apache.slider.common.params.ActionEchoArgs;
-import org.apache.slider.common.params.ActionExistsArgs;
-import org.apache.slider.common.params.ActionFlexArgs;
-import org.apache.slider.common.params.ActionFreezeArgs;
-import org.apache.slider.common.params.ActionInstallKeytabArgs;
-import org.apache.slider.common.params.ActionInstallPackageArgs;
-import org.apache.slider.common.params.ActionKDiagArgs;
-import org.apache.slider.common.params.ActionKeytabArgs;
-import org.apache.slider.common.params.ActionKillContainerArgs;
-import org.apache.slider.common.params.ActionListArgs;
-import org.apache.slider.common.params.ActionLookupArgs;
-import org.apache.slider.common.params.ActionNodesArgs;
-import org.apache.slider.common.params.ActionPackageArgs;
-import org.apache.slider.common.params.ActionRegistryArgs;
-import org.apache.slider.common.params.ActionResolveArgs;
-import org.apache.slider.common.params.ActionResourceArgs;
-import org.apache.slider.common.params.ActionStatusArgs;
-import org.apache.slider.common.params.ActionThawArgs;
-import org.apache.slider.common.params.ActionTokensArgs;
-import org.apache.slider.common.params.ActionUpgradeArgs;
-import org.apache.slider.common.params.Arguments;
-import org.apache.slider.common.params.ClientArgs;
-import org.apache.slider.common.params.CommonArgs;
-import org.apache.slider.common.params.LaunchArgsAccessor;
-import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.Duration;
-import org.apache.slider.common.tools.SliderFileSystem;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.common.tools.SliderVersionInfo;
-import org.apache.slider.core.build.InstanceBuilder;
-import org.apache.slider.core.build.InstanceIO;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTree;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.conf.ResourcesInputPropertiesValidator;
-import org.apache.slider.core.conf.TemplateInputPropertiesValidator;
-import org.apache.slider.core.exceptions.BadClusterStateException;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
-import org.apache.slider.core.exceptions.BadConfigException;
-import org.apache.slider.core.exceptions.ErrorStrings;
-import org.apache.slider.core.exceptions.NoSuchNodeException;
-import org.apache.slider.core.exceptions.NotFoundException;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
-import org.apache.slider.core.exceptions.UsageException;
-import org.apache.slider.core.exceptions.WaitTimeoutException;
-import org.apache.slider.core.launch.AppMasterLauncher;
-import org.apache.slider.core.launch.ClasspathConstructor;
-import org.apache.slider.core.launch.CredentialUtils;
-import org.apache.slider.core.launch.JavaCommandLineBuilder;
-import org.apache.slider.core.launch.LaunchedApplication;
-import org.apache.slider.core.launch.RunningApplication;
-import org.apache.slider.core.launch.SerializedApplicationReport;
-import org.apache.slider.core.main.RunService;
-import org.apache.slider.core.persist.AppDefinitionPersister;
-import org.apache.slider.core.persist.ApplicationReportSerDeser;
-import org.apache.slider.core.persist.ConfPersister;
-import org.apache.slider.core.persist.JsonSerDeser;
-import org.apache.slider.core.persist.LockAcquireFailedException;
-import org.apache.slider.core.registry.SliderRegistryUtils;
-import org.apache.slider.core.registry.YarnAppListClient;
-import org.apache.slider.core.registry.docstore.ConfigFormat;
-import org.apache.slider.core.registry.docstore.PublishedConfigSet;
-import org.apache.slider.core.registry.docstore.PublishedConfiguration;
-import 
org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
-import org.apache.slider.core.registry.docstore.PublishedExports;
-import org.apache.slider.core.registry.docstore.PublishedExportsOutputter;
-import org.apache.slider.core.registry.docstore.PublishedExportsSet;
-import org.apache.slider.core.registry.retrieve.RegistryRetriever;
-import org.apache.slider.core.zk.BlockingZKWatcher;
-import org.apache.slider.core.zk.ZKIntegration;
-import org.apache.slider.core.zk.ZKPathBuilder;
-import org.apache.slider.providers.AbstractClientProvider;
-import org.apache.slider.providers.SliderProviderFactory;
-import org.apache.slider.providers.agent.AgentKeys;
-import org.apache.slider.providers.slideram.SliderAMClientProvider;
-import org.apache.slider.server.appmaster.SliderAppMaster;
-import org.apache.slider.server.appmaster.rpc.RpcBinder;
-import org.apache.slider.server.services.security.SecurityStore;
-import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.InterruptedIOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.hadoop.registry.client.binding.RegistryUtils.*;
-import static org.apache.slider.api.InternalKeys.*;
-import static org.apache.slider.api.OptionKeys.*;
-import static org.apache.slider.api.ResourceKeys.*;
-import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
-import static org.apache.slider.common.params.SliderActions.*;
-import static org.apache.slider.common.tools.SliderUtils.*;
-
-
-/**
- * Client service for Slider
- */
-
-public class SliderClient extends AbstractSliderLaunchedService implements 
RunService,
-    SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI {
-  private static final Logger log = 
LoggerFactory.getLogger(SliderClient.class);
-  public static final String E_MUST_BE_A_VALID_JSON_FILE
-      = "Invalid configuration. Must be a valid json file.";
-  public static final String E_INVALID_INSTALL_LOCATION
-      = "A valid install location must be provided for the client.";
-  public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE
-      = "Unable to read supplied package file";
-  public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION
-      = "A valid application package location required.";
-  public static final String E_INVALID_INSTALL_PATH = "Install path is not a 
valid directory";
-  public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path 
does not exist";
-  public static final String E_INVALID_APPLICATION_TYPE_NAME
-      = "A valid application type name is required (e.g. HBASE).";
-  public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg 
to overwrite.";
-  public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not 
exist";
-  public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum 
defined";
-  public static final String E_NO_RESOURCE_MANAGER = "No valid Resource 
Manager address provided";
-  public static final String E_PACKAGE_EXISTS = "Package exists";
-  private static PrintStream clientOutputStream = System.out;
-
-  // value should not be changed without updating string find in slider.py
-  private static final String PASSWORD_PROMPT = "Enter password for";
-
-  private ClientArgs serviceArgs;
-  public ApplicationId applicationId;
-  
-  private String deployedClusterName;
-  /**
-   * Cluster operations against the deployed cluster -will be null
-   * if no bonding has yet taken place
-   */
-  private SliderClusterOperations sliderClusterOperations;
-
-  protected SliderFileSystem sliderFileSystem;
-
-  /**
-   * Yarn client service
-   */
-  private SliderYarnClientImpl yarnClient;
-  private YarnAppListClient yarnAppListClient;
-  private AggregateConf launchedInstanceDefinition;
-
-  /**
-   * The YARN registry service
-   */
-  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-  private RegistryOperations registryOperations;
-
-  /**
-   * Constructor
-   */
-  public SliderClient() {
-    super("Slider Client");
-    new HdfsConfiguration();
-    new YarnConfiguration();
-  }
-
-  /**
-   * This is called <i>Before serviceInit is called</i>
-   * @param config the initial configuration build up by the
-   * service launcher.
-   * @param args argument list list of arguments passed to the command line
-   * after any launcher-specific commands have been stripped.
-   * @return the post-binding configuration to pass to the <code>init()</code>
-   * operation.
-   * @throws Exception
-   */
-  @Override
-  public Configuration bindArgs(Configuration config, String... args) throws 
Exception {
-    config = super.bindArgs(config, args);
-    serviceArgs = new ClientArgs(args);
-    serviceArgs.parse();
-    // add the slider XML config
-    ConfigHelper.injectSliderXMLResource();
-    // yarn-ify
-    YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
-    return patchConfiguration(yarnConfiguration);
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    Configuration clientConf = loadSliderClientXML();
-    ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, 
true);
-    serviceArgs.applyDefinitions(conf);
-    serviceArgs.applyFileSystemBinding(conf);
-    AbstractActionArgs coreAction = serviceArgs.getCoreAction();
-    // init security with our conf
-    if (!coreAction.disableSecureLogin() && isHadoopClusterSecure(conf)) {
-      forceLogin();
-      initProcessSecurity(conf);
-    }
-    if (coreAction.getHadoopServicesRequired()) {
-      initHadoopBinding();
-    }
-    super.serviceInit(conf);
-  }
-
-  /**
-   * Launched service execution. This runs {@link #exec()}
-   * then catches some exceptions and converts them to exit codes
-   * @return an exit code
-   * @throws Throwable
-   */
-  @Override
-  public int runService() throws Throwable {
-    try {
-      return exec();
-    } catch (FileNotFoundException | PathNotFoundException nfe) {
-      throw new NotFoundException(nfe, nfe.toString());
-    }
-  }
-
-  /**
-   * Execute the command line
-   * @return an exit code
-   * @throws Throwable on a failure
-   */
-  public int exec() throws Throwable {
-
-    // choose the action
-    String action = serviceArgs.getAction();
-    if (isUnset(action)) {
-      throw new SliderException(EXIT_USAGE, serviceArgs.usage());
-    }
-
-    int exitCode = EXIT_SUCCESS;
-    String clusterName = serviceArgs.getClusterName();
-    // actions
-
-    switch (action) {
-      case ACTION_AM_SUICIDE:
-        exitCode = actionAmSuicide(clusterName,
-            serviceArgs.getActionAMSuicideArgs());
-        break;
-      
-      case ACTION_BUILD:
-        exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs());
-        break;
-      
-      case ACTION_CLIENT:
-        exitCode = actionClient(serviceArgs.getActionClientArgs());
-        break;
-
-      case ACTION_CREATE:
-        exitCode = actionCreate(clusterName, 
serviceArgs.getActionCreateArgs());
-        break;
-
-      case ACTION_DEPENDENCY:
-        exitCode = actionDependency(serviceArgs.getActionDependencyArgs());
-        break;
-
-      case ACTION_DESTROY:
-        exitCode = actionDestroy(clusterName, 
serviceArgs.getActionDestroyArgs());
-        break;
-
-      case ACTION_DIAGNOSTICS:
-        exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs());
-        break;
-      
-      case ACTION_EXISTS:
-        exitCode = actionExists(clusterName,
-            serviceArgs.getActionExistsArgs());
-        break;
-      
-      case ACTION_FLEX:
-        exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs());
-        break;
-      
-      case ACTION_FREEZE:
-        exitCode = actionFreeze(clusterName, 
serviceArgs.getActionFreezeArgs());
-        break;
-      
-      case ACTION_HELP:
-        log.info(serviceArgs.usage());
-        break;
-
-      case ACTION_KDIAG:
-        exitCode = actionKDiag(serviceArgs.getActionKDiagArgs());
-        break;
-
-      case ACTION_KILL_CONTAINER:
-        exitCode = actionKillContainer(clusterName,
-            serviceArgs.getActionKillContainerArgs());
-        break;
-
-      case ACTION_INSTALL_KEYTAB:
-        exitCode = 
actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs());
-        break;
-      
-      case ACTION_INSTALL_PACKAGE:
-        exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs());
-        break;
-
-      case ACTION_KEYTAB:
-        exitCode = actionKeytab(serviceArgs.getActionKeytabArgs());
-        break;
-
-      case ACTION_LIST:
-        exitCode = actionList(clusterName, serviceArgs.getActionListArgs());
-        break;
-
-      case ACTION_LOOKUP:
-        exitCode = actionLookup(serviceArgs.getActionLookupArgs());
-        break;
-
-      case ACTION_NODES:
-        exitCode = actionNodes("", serviceArgs.getActionNodesArgs());
-        break;
-
-      case ACTION_PACKAGE:
-        exitCode = actionPackage(serviceArgs.getActionPackageArgs());
-        break;
-
-      case ACTION_REGISTRY:
-        exitCode = actionRegistry(serviceArgs.getActionRegistryArgs());
-        break;
-      
-      case ACTION_RESOLVE:
-        exitCode = actionResolve(serviceArgs.getActionResolveArgs());
-        break;
-
-      case ACTION_RESOURCE:
-        exitCode = actionResource(serviceArgs.getActionResourceArgs());
-        break;
-
-      case ACTION_STATUS:
-        exitCode = actionStatus(clusterName, 
serviceArgs.getActionStatusArgs());
-        break;
-
-      case ACTION_THAW:
-        exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs());
-        break;
-
-      case ACTION_TOKENS:
-        exitCode = actionTokens(serviceArgs.getActionTokenArgs());
-        break;
-
-      case ACTION_UPDATE:
-        exitCode = actionUpdate(clusterName, 
serviceArgs.getActionUpdateArgs());
-        break;
-
-      case ACTION_UPGRADE:
-        exitCode = actionUpgrade(clusterName, 
serviceArgs.getActionUpgradeArgs());
-        break;
-
-      case ACTION_VERSION:
-        exitCode = actionVersion();
-        break;
-      
-      default:
-        throw new SliderException(EXIT_UNIMPLEMENTED,
-            "Unimplemented: " + action);
-    }
-   
-    return exitCode;
-  }
-
-  /**
-   * Perform everything needed to init the hadoop binding.
-   * This assumes that the service is already  in inited or started state
-   * @throws IOException
-   * @throws SliderException
-   */
-  protected void initHadoopBinding() throws IOException, SliderException {
-    // validate the client
-    validateSliderClientEnvironment(null);
-    //create the YARN client
-    yarnClient = new SliderYarnClientImpl();
-    yarnClient.init(getConfig());
-    if (getServiceState() == STATE.STARTED) {
-      yarnClient.start();
-    }
-    addService(yarnClient);
-    yarnAppListClient =
-        new YarnAppListClient(yarnClient, getUsername(), getConfig());
-    // create the filesystem
-    sliderFileSystem = new SliderFileSystem(getConfig());
-  }
-
-  /**
-   * Delete the zookeeper node associated with the calling user and the cluster
-   * TODO: YARN registry operations
-   **/
-  @VisibleForTesting
-  public boolean deleteZookeeperNode(String clusterName) throws YarnException, 
IOException {
-    String user = getUsername();
-    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
-    Exception e = null;
-    try {
-      Configuration config = getConfig();
-      ZKIntegration client = getZkClient(clusterName, user);
-      if (client != null) {
-        if (client.exists(zkPath)) {
-          log.info("Deleting zookeeper path {}", zkPath);
-        }
-        client.deleteRecursive(zkPath);
-        return true;
-      }
-    } catch (InterruptedException | BadConfigException | KeeperException ex) {
-      e = ex;
-    }
-    if (e != null) {
-      log.warn("Unable to recursively delete zk node {}", zkPath, e);
-    }
-
-    return false;
-  }
-
-  /**
-   * Create the zookeeper node associated with the calling user and the cluster
-   *
-   * @param clusterName slider application name
-   * @param nameOnly should the name only be created (i.e. don't create ZK 
node)
-   * @return the path, using the policy implemented in
-   *   {@link ZKIntegration#mkClusterPath(String, String)}
-   * @throws YarnException
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public String createZookeeperNode(String clusterName, Boolean nameOnly) 
throws YarnException, IOException {
-    try {
-      return createZookeeperNodeInner(clusterName, nameOnly);
-    } catch (KeeperException.NodeExistsException e) {
-      return null;
-    } catch (KeeperException e) {
-      return null;
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException(e.toString());
-    }
-  }
-
-  /**
-   * Create the zookeeper node associated with the calling user and the cluster
-   * -throwing exceptions on any failure
-   * @param clusterName cluster name
-   * @param nameOnly create the path, not the node
-   * @return the path, using the policy implemented in
-   *   {@link ZKIntegration#mkClusterPath(String, String)}
-   * @throws YarnException
-   * @throws IOException
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  @VisibleForTesting
-  public String createZookeeperNodeInner(String clusterName, Boolean nameOnly)
-      throws YarnException, IOException, KeeperException, InterruptedException 
{
-    String user = getUsername();
-    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
-    if (nameOnly) {
-      return zkPath;
-    }
-    ZKIntegration client = getZkClient(clusterName, user);
-    if (client != null) {
-      // set up the permissions. This must be done differently on a secure 
cluster from an insecure
-      // one
-      List<ACL> zkperms = new ArrayList<>();
-      if (UserGroupInformation.isSecurityEnabled()) {
-        zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
-        zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
-      } else {
-        zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
-      }
-      client.createPath(zkPath, "",
-          zkperms,
-          CreateMode.PERSISTENT);
-      return zkPath;
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Gets a zookeeper client, returns null if it cannot connect to zookeeper
-   **/
-  protected ZKIntegration getZkClient(String clusterName, String user) throws 
YarnException {
-    String registryQuorum = lookupZKQuorum();
-    ZKIntegration client = null;
-    try {
-      BlockingZKWatcher watcher = new BlockingZKWatcher();
-      client = ZKIntegration.newInstance(registryQuorum, user, clusterName, 
true, false, watcher,
-          ZKIntegration.SESSION_TIMEOUT);
-      client.init();
-      watcher.waitForZKConnection(2 * 1000);
-    } catch (InterruptedException e) {
-      client = null;
-      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
-    } catch (IOException e) {
-      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
-    }
-    return client;
-  }
-
-  /**
-   * Keep this signature for backward compatibility with
-   * force=true by default.
-   */
-  @Override
-  public int actionDestroy(String clustername) throws YarnException,
-                                                      IOException {
-    ActionDestroyArgs destroyArgs = new ActionDestroyArgs();
-    destroyArgs.force = true;
-    return actionDestroy(clustername, destroyArgs);
-  }
-
-  @Override
-  public int actionDestroy(String clustername,
-      ActionDestroyArgs destroyArgs) throws YarnException, IOException {
-    // verify that a live cluster isn't there
-    validateClusterName(clustername);
-    //no=op, it is now mandatory. 
-    verifyBindingsDefined();
-    verifyNoLiveClusters(clustername, "Destroy");
-    boolean forceDestroy = destroyArgs.force;
-    log.debug("actionDestroy({}, force={})", clustername, forceDestroy);
-
-    // create the directory path
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
-    // delete the directory;
-    FileSystem fs = sliderFileSystem.getFileSystem();
-    boolean exists = fs.exists(clusterDirectory);
-    if (exists) {
-      log.debug("Application Instance {} found at {}: destroying", 
clustername, clusterDirectory);
-      if (!forceDestroy) {
-        // fail the command if --force is not explicitly specified
-        throw new UsageException("Destroy will permanently delete directories 
and registries. "
-            + "Reissue this command with the --force option if you want to 
proceed.");
-      }
-      if (!fs.delete(clusterDirectory, true)) {
-        log.warn("Filesystem returned false from delete() operation");
-      }
-
-      if(!deleteZookeeperNode(clustername)) {
-        log.warn("Unable to perform node cleanup in Zookeeper.");
-      }
-
-      if (fs.exists(clusterDirectory)) {
-        log.warn("Failed to delete {}", clusterDirectory);
-      }
-
-    } else {
-      log.debug("Application Instance {} already destroyed", clustername);
-    }
-
-    // rm the registry entry —do not let this block the destroy operations
-    String registryPath = SliderRegistryUtils.registryPathForInstance(
-        clustername);
-    try {
-      getRegistryOperations().delete(registryPath, true);
-    } catch (IOException e) {
-      log.warn("Error deleting registry entry {}: {} ", registryPath, e, e);
-    } catch (SliderException e) {
-      log.warn("Error binding to registry {} ", e, e);
-    }
-
-    List<ApplicationReport> instances = findAllLiveInstances(clustername);
-    // detect any race leading to cluster creation during the check/destroy 
process
-    // and report a problem.
-    if (!instances.isEmpty()) {
-      throw new SliderException(EXIT_APPLICATION_IN_USE,
-                              clustername + ": "
-                              + E_DESTROY_CREATE_RACE_CONDITION
-                              + " :" +
-                              instances.get(0));
-    }
-    log.info("Destroyed cluster {}", clustername);
-    return EXIT_SUCCESS;
-  }
-  
-  @Override
-  public int actionAmSuicide(String clustername,
-      ActionAMSuicideArgs args) throws YarnException, IOException {
-    SliderClusterOperations clusterOperations =
-      createClusterOperations(clustername);
-    clusterOperations.amSuicide(args.message, args.exitcode, args.waittime);
-    return EXIT_SUCCESS;
-  }
-
-  @Override
-  public AbstractClientProvider createClientProvider(String provider)
-    throws SliderException {
-    SliderProviderFactory factory =
-      SliderProviderFactory.createSliderProviderFactory(provider);
-    return factory.createClientProvider();
-  }
-
-  /**
-   * Create the cluster -saving the arguments to a specification file first
-   * @param clustername cluster name
-   * @return the status code
-   * @throws YarnException Yarn problems
-   * @throws IOException other problems
-   * @throws BadCommandArgumentsException bad arguments.
-   */
-  public int actionCreate(String clustername, ActionCreateArgs createArgs) 
throws
-                                               YarnException,
-                                               IOException {
-
-    actionBuild(clustername, createArgs);
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
-    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
-        clustername, clusterDirectory);
-    try {
-      checkForCredentials(getConfig(), instanceDefinition.getAppConf(),
-          clustername);
-    } catch (IOException e) {
-      sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
-      throw e;
-    }
-    return startCluster(clustername, createArgs);
-  }
-
-  @Override
-  public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs)
-      throws YarnException, IOException {
-    File template = upgradeArgs.template;
-    File resources = upgradeArgs.resources;
-    List<String> containers = upgradeArgs.containers;
-    List<String> components = upgradeArgs.components;
-
-    // For upgrade spec, let's be little more strict with validation. If either
-    // --template or --resources is specified, then both needs to be specified.
-    // Otherwise the internal app config and resources states of the app will 
be
-    // unwantedly modified and the change will take effect to the running app
-    // immediately.
-    require(!(template != null && resources == null),
-          "Option %s must be specified with option %s",
-          Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE);
-
-    require(!(resources != null && template == null),
-          "Option %s must be specified with option %s",
-          Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES);
-
-    // For upgrade spec, both --template and --resources should be specified
-    // and neither of --containers or --components should be used
-    if (template != null && resources != null) {
-      require(CollectionUtils.isEmpty(containers),
-            "Option %s cannot be specified with %s or %s",
-            Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE,
-            Arguments.ARG_RESOURCES);
-      require(CollectionUtils.isEmpty(components),
-              "Option %s cannot be specified with %s or %s",
-              Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE,
-              Arguments.ARG_RESOURCES);
-
-      // not an error to try to upgrade a stopped cluster, just return success
-      // code, appropriate log messages have already been dumped
-      if (!isAppInRunningState(clustername)) {
-        return EXIT_SUCCESS;
-      }
-
-      // Now initiate the upgrade spec flow
-      buildInstanceDefinition(clustername, upgradeArgs, true, true, true);
-      SliderClusterOperations clusterOperations = 
createClusterOperations(clustername);
-      clusterOperations.amSuicide("AM restarted for application upgrade", 1, 
1000);
-      return EXIT_SUCCESS;
-    }
-
-    // Since neither --template or --resources were specified, it is upgrade
-    // containers flow. Here any one or both of --containers and --components
-    // can be specified. If a container is specified with --containers option
-    // and also belongs to a component type specified with --components, it 
will
-    // be upgraded only once.
-    return actionUpgradeContainers(clustername, upgradeArgs);
-  }
-
-  private int actionUpgradeContainers(String clustername,
-      ActionUpgradeArgs upgradeArgs) throws YarnException, IOException {
-    verifyBindingsDefined();
-    validateClusterName(clustername);
-    int waittime = upgradeArgs.getWaittime(); // ignored for now
-    String text = "Upgrade containers";
-    log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername,
-        text, waittime);
-
-    // not an error to try to upgrade a stopped cluster, just return success
-    // code, appropriate log messages have already been dumped
-    if (!isAppInRunningState(clustername)) {
-      return EXIT_SUCCESS;
-    }
-
-    // Create sets of containers and components to get rid of duplicates and
-    // for quick lookup during checks below
-    Set<String> containers = new HashSet<>();
-    if (upgradeArgs.containers != null) {
-      containers.addAll(new ArrayList<>(upgradeArgs.containers));
-    }
-    Set<String> components = new HashSet<>();
-    if (upgradeArgs.components != null) {
-      components.addAll(new ArrayList<>(upgradeArgs.components));
-    }
-
-    // check validity of component names and running containers here
-    List<ContainerInformation> liveContainers = getContainers(clustername);
-    Set<String> validContainers = new HashSet<>();
-    Set<String> validComponents = new HashSet<>();
-    for (ContainerInformation liveContainer : liveContainers) {
-      boolean allContainersAndComponentsAccountedFor = true;
-      if (CollectionUtils.isNotEmpty(containers)) {
-        if (containers.contains(liveContainer.containerId)) {
-          containers.remove(liveContainer.containerId);
-          validContainers.add(liveContainer.containerId);
-        }
-        allContainersAndComponentsAccountedFor = false;
-      }
-      if (CollectionUtils.isNotEmpty(components)) {
-        if (components.contains(liveContainer.component)) {
-          components.remove(liveContainer.component);
-          validComponents.add(liveContainer.component);
-        }
-        allContainersAndComponentsAccountedFor = false;
-      }
-      if (allContainersAndComponentsAccountedFor) {
-        break;
-      }
-    }
-
-    // If any item remains in containers or components then they are invalid.
-    // Log warning for them and proceed.
-    if (CollectionUtils.isNotEmpty(containers)) {
-      log.warn("Invalid set of containers provided {}", containers);
-    }
-    if (CollectionUtils.isNotEmpty(components)) {
-      log.warn("Invalid set of components provided {}", components);
-    }
-
-    // If not a single valid container or component is specified do not proceed
-    if (CollectionUtils.isEmpty(validContainers)
-        && CollectionUtils.isEmpty(validComponents)) {
-      log.error("Not a single valid container or component specified. Nothing 
to do.");
-      return EXIT_NOT_FOUND;
-    }
-
-    SliderClusterProtocol appMaster = connect(findInstance(clustername));
-    Messages.UpgradeContainersRequestProto r =
-      Messages.UpgradeContainersRequestProto
-              .newBuilder()
-              .setMessage(text)
-              .addAllContainer(validContainers)
-              .addAllComponent(validComponents)
-              .build();
-    appMaster.upgradeContainers(r);
-    log.info("Cluster upgrade issued for -");
-    if (CollectionUtils.isNotEmpty(validContainers)) {
-      log.info(" Containers (total {}): {}", validContainers.size(),
-          validContainers);
-    }
-    if (CollectionUtils.isNotEmpty(validComponents)) {
-      log.info(" Components (total {}): {}", validComponents.size(),
-          validComponents);
-    }
-
-    return EXIT_SUCCESS;
-  }
-
-  // returns true if and only if app is in RUNNING state
-  private boolean isAppInRunningState(String clustername) throws YarnException,
-      IOException {
-    // is this actually a known cluster?
-    sliderFileSystem.locateInstanceDefinition(clustername);
-    ApplicationReport app = findInstance(clustername);
-    if (app == null) {
-      // exit early
-      log.info("Cluster {} not running", clustername);
-      return false;
-    }
-    log.debug("App to upgrade was found: {}:\n{}", clustername,
-        new OnDemandReportStringifier(app));
-    if (app.getYarnApplicationState().ordinal() >= 
YarnApplicationState.FINISHED.ordinal()) {
-      log.info("Cluster {} is in a terminated state {}. Use command '{}' 
instead.",
-          clustername, app.getYarnApplicationState(), ACTION_UPDATE);
-      return false;
-    }
-
-    // IPC request to upgrade containers is possible if the app is running.
-    if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING
-        .ordinal()) {
-      log.info("Cluster {} is in a pre-running state {}. To upgrade it needs "
-          + "to be RUNNING.", clustername, app.getYarnApplicationState());
-      return false;
-    }
-
-    return true;
-  }
-
-  protected static void checkForCredentials(Configuration conf,
-      ConfTree tree, String clusterName) throws IOException {
-    if (tree.credentials == null || tree.credentials.isEmpty()) {
-      log.info("No credentials requested");
-      return;
-    }
-
-    BufferedReader br = null;
-    try {
-      for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
-        String provider = cred.getKey()
-            .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName)
-            .replaceAll(Pattern.quote("${CLUSTER}"), clusterName);
-        List<String> aliases = cred.getValue();
-        if (aliases == null || aliases.isEmpty()) {
-          continue;
-        }
-        Configuration c = new Configuration(conf);
-        c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
-        CredentialProvider credentialProvider = 
CredentialProviderFactory.getProviders(c).get(0);
-        Set<String> existingAliases = new 
HashSet<>(credentialProvider.getAliases());
-        for (String alias : aliases) {
-          if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
-            log.info("Credentials for " + alias + " found in " + provider);
-          } else {
-            if (br == null) {
-              br = new BufferedReader(new InputStreamReader(System.in));
-            }
-            char[] pass = readPassword(alias, br);
-            credentialProvider.createCredentialEntry(alias, pass);
-            credentialProvider.flush();
-            Arrays.fill(pass, ' ');
-          }
-        }
-      }
-    } finally {
-      org.apache.hadoop.io.IOUtils.closeStream(br);
-    }
-  }
-
-  private static char[] readOnePassword(String alias) throws IOException {
-    try(BufferedReader br = new BufferedReader(new 
InputStreamReader(System.in))) {
-      return readPassword(alias, br);
-    }
-  }
-
-  // using a normal reader instead of a secure one,
-  // because stdin is not hooked up to the command line
-  private static char[] readPassword(String alias, BufferedReader br)
-      throws IOException {
-    char[] cred = null;
-
-    boolean noMatch;
-    do {
-      log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias));
-      char[] newPassword1 = br.readLine().toCharArray();
-      log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias));
-      char[] newPassword2 = br.readLine().toCharArray();
-      noMatch = !Arrays.equals(newPassword1, newPassword2);
-      if (noMatch) {
-        if (newPassword1 != null) Arrays.fill(newPassword1, ' ');
-        log.info(String.format("Passwords don't match. Try again."));
-      } else {
-        cred = newPassword1;
-      }
-      if (newPassword2 != null) Arrays.fill(newPassword2, ' ');
-    } while (noMatch);
-    if (cred == null)
-      throw new IOException("Could not read credentials for " + alias +
-          " from stdin");
-    return cred;
-  }
-
-  @Override
-  public int actionBuild(String clustername,
-      AbstractClusterBuildingActionArgs buildInfo) throws
-                                               YarnException,
-                                               IOException {
-
-    buildInstanceDefinition(clustername, buildInfo, false, false);
-    return EXIT_SUCCESS; 
-  }
-
-  @Override
-  public int actionKeytab(ActionKeytabArgs keytabInfo)
-      throws YarnException, IOException {
-    if (keytabInfo.install) {
-      return actionInstallKeytab(keytabInfo);
-    } else if (keytabInfo.delete) {
-      return actionDeleteKeytab(keytabInfo);
-    } else if (keytabInfo.list) {
-      return actionListKeytab(keytabInfo);
-    } else {
-      throw new BadCommandArgumentsException(
-          "Keytab option specified not found.\n"
-          + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
-    }
-  }
-
-  private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException 
{
-    String folder = keytabInfo.folder != null ? keytabInfo.folder : 
StringUtils.EMPTY;
-    Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder);
-    RemoteIterator<LocatedFileStatus> files =
-        sliderFileSystem.getFileSystem().listFiles(keytabPath, true);
-    log.info("Keytabs:");
-    while (files.hasNext()) {
-      log.info("\t" + files.next().getPath().toString());
-    }
-
-    return EXIT_SUCCESS;
-  }
-
-  private int actionDeleteKeytab(ActionKeytabArgs keytabInfo)
-      throws BadCommandArgumentsException, IOException {
-    if (StringUtils.isEmpty(keytabInfo.folder)) {
-      throw new BadCommandArgumentsException(
-          "A valid destination keytab sub-folder name is required (e.g. 
'security').\n"
-          + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
-    }
-
-    if (StringUtils.isEmpty(keytabInfo.keytab)) {
-      throw new BadCommandArgumentsException("A keytab name is required.");
-    }
-
-    Path pkgPath = 
sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
-
-    Path fileInFs = new Path(pkgPath, keytabInfo.keytab );
-    log.info("Deleting keytab {}", fileInFs);
-    FileSystem sfs = sliderFileSystem.getFileSystem();
-    require(sfs.exists(fileInFs), "No keytab to delete found at %s",
-        fileInFs.toUri());
-    sfs.delete(fileInFs, false);
-
-    return EXIT_SUCCESS;
-  }
-
-  private int actionInstallKeytab(ActionKeytabArgs keytabInfo)
-      throws BadCommandArgumentsException, IOException {
-    Path srcFile = null;
-    require(isSet(keytabInfo.folder),
-        "A valid destination keytab sub-folder name is required (e.g. 
'security').\n"
-        + CommonArgs.usage(serviceArgs, ACTION_KEYTAB));
-
-    requireArgumentSet(Arguments.ARG_KEYTAB, keytabInfo.keytab);
-    File keytabFile = new File(keytabInfo.keytab);
-    require(keytabFile.isFile(),
-        "Unable to access supplied keytab file at %s", 
keytabFile.getAbsolutePath());
-    srcFile = new Path(keytabFile.toURI());
-
-    Path pkgPath = 
sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder);
-    FileSystem sfs = sliderFileSystem.getFileSystem();
-    sfs.mkdirs(pkgPath);
-    sfs.setPermission(pkgPath, new FsPermission(
-        FsAction.ALL, FsAction.NONE, FsAction.NONE));
-
-    Path fileInFs = new Path(pkgPath, srcFile.getName());
-    log.info("Installing keytab {} at {} and overwrite is {}.",
-        srcFile, fileInFs, keytabInfo.overwrite);
-    require(!(sfs.exists(fileInFs) && !keytabInfo.overwrite),
-        "Keytab exists at %s. Use --overwrite to overwrite.", 
fileInFs.toUri());
-
-    sfs.copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs);
-    sfs.setPermission(fileInFs,
-        new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
-
-    return EXIT_SUCCESS;
-  }
-
-  @Override
-  public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo)
-      throws YarnException, IOException {
-    log.warn("The 'install-keytab' option has been deprecated.  Please use 
'keytab --install'.");
-    return actionKeytab(new ActionKeytabArgs(installKeytabInfo));
-  }
-
-  @Override
-  public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws
-      YarnException,
-      IOException {
-    log.warn("The " + ACTION_INSTALL_PACKAGE
-        + " option has been deprecated. Please use '"
-        + ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'.");
-    if (StringUtils.isEmpty(installPkgInfo.name)) {
-      throw new BadCommandArgumentsException(
-          E_INVALID_APPLICATION_TYPE_NAME + "\n"
-              + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE));
-    }
-    Path srcFile = extractPackagePath(installPkgInfo.packageURI);
-
-    // Do not provide new options to install-package command as it is in
-    // deprecated mode. So version is kept null here. Use package --install.
-    Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name,
-        null);
-    FileSystem sfs = sliderFileSystem.getFileSystem();
-    sfs.mkdirs(pkgPath);
-
-    Path fileInFs = new Path(pkgPath, srcFile.getName());
-    log.info("Installing package {} at {} and overwrite is {}.",
-        srcFile, fileInFs, installPkgInfo.replacePkg);
-    require(!(sfs.exists(fileInFs) && !installPkgInfo.replacePkg),
-          "Package exists at %s. : %s", fileInFs.toUri(), 
E_USE_REPLACEPKG_TO_OVERWRITE);
-    sfs.copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs);
-    return EXIT_SUCCESS;
-  }
-
-  @Override
-  public int actionResource(ActionResourceArgs resourceInfo)
-      throws YarnException, IOException {
-    if (resourceInfo.help) {
-      actionHelp(ACTION_RESOURCE);
-      return EXIT_SUCCESS;
-    } else if (resourceInfo.install) {
-      return actionInstallResource(resourceInfo);
-    } else if (resourceInfo.delete) {
-      return actionDeleteResource(resourceInfo);
-    } else if (resourceInfo.list) {
-      return actionListResource(resourceInfo);
-    } else {
-      throw new BadCommandArgumentsException(
-          "Resource option specified not found.\n"
-              + CommonArgs.usage(serviceArgs, ACTION_RESOURCE));
-    }
-  }
-
-  private int actionListResource(ActionResourceArgs resourceInfo) throws 
IOException {
-    String folder = resourceInfo.folder != null ? resourceInfo.folder : 
StringUtils.EMPTY;
-    Path path = sliderFileSystem.buildResourcePath(folder);
-    RemoteIterator<LocatedFileStatus> files =
-        sliderFileSystem.getFileSystem().listFiles(path, true);
-    log.info("Resources:");
-    while (files.hasNext()) {
-      log.info("\t" + files.next().getPath().toString());
-    }
-
-    return EXIT_SUCCESS;
-  }
-
-  private int actionDeleteResource(ActionResourceArgs resourceInfo)
-      throws BadCommandArgumentsException, IOException {
-    if (StringUtils.isEmpty(resourceInfo.resource)) {
-      throw new BadCommandArgumentsException("A file name is required.");
-    }
-
-    Path fileInFs;
-    if (resourceInfo.folder == null) {
-      fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.resource);
-    } else {
-      fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.folder,
-          resourceInfo.resource);
-    }
-
-    log.info("Deleting resource {}", fileInFs);
-    FileSystem sfs = sliderFileSystem.getFileSystem();
-    require(sfs.exists(fileInFs), "No resource to delete found at %s", 
fileInFs.toUri());
-    sfs.delete(fileInFs, true);
-
-    return EXIT_SUCCESS;
-  }
-
-  private int actionInstallResource(ActionResourceArgs resourceInfo)
-      throws BadCommandArgumentsException, IOException {
-    Path srcFile = null;
-    String folder = resourceInfo.folder != null ? resourceInfo.folder : 
StringUtils.EMPTY;
-
-    requireArgumentSet(Arguments.ARG_RESOURCE, resourceInfo.resource);
-    File file = new File(resourceInfo.resource);
-    require(file.isFile() || file.isDirectory(),
-        "Unable to access supplied file at %s", file.getAbsolutePath());
-
-    File[] files;
-    if (file.isDirectory()) {
-      files = file.listFiles();
-    } else {
-      files = new File[] { file };
-    }
-
-    Path pkgPath = sliderFileSystem.buildResourcePath(folder);
-    FileSystem sfs = sliderFileSystem.getFileSystem();
-
-    if (!sfs.exists(pkgPath)) {
-      sfs.mkdirs(pkgPath);
-      sfs.setPermission(pkgPath, new FsPermission(
-          FsAction.ALL, FsAction.NONE, FsAction.NONE));
-    } else {
-      require(sfs.isDirectory(pkgPath), "Specified folder %s exists and is " +
-          "not a directory", folder);
-    }
-
-    for (File f : files) {
-      srcFile = new Path(f.toURI());
-
-      Path fileInFs = new Path(pkgPath, srcFile.getName());
-      log.info("Installing file {} at {} and overwrite is {}.",
-          srcFile, fileInFs, resourceInfo.overwrite);
-      require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite),
-          "File exists at %s. Use --overwrite to overwrite.", 
fileInFs.toUri());
-
-      sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs);
-      sfs.setPermission(fileInFs,
-          new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
-    }
-
-    return EXIT_SUCCESS;
-  }
-
-  @Override
-  public int actionClient(ActionClientArgs clientInfo) throws
-      YarnException,
-      IOException {
-    if (clientInfo.install) {
-      return doClientInstall(clientInfo);
-    } else if (clientInfo.getCertStore) {
-      return doCertificateStoreRetrieval(clientInfo);
-    } else {
-      throw new BadCommandArgumentsException(
-          "Only install, keystore, and truststore commands are supported for 
the client.\n"
-          + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
-
-    }
-  }
-
-  private int doCertificateStoreRetrieval(ActionClientArgs clientInfo)
-      throws YarnException, IOException {
-    if (clientInfo.keystore != null && clientInfo.truststore != null) {
-      throw new BadCommandArgumentsException(
-          "Only one of either keystore or truststore can be retrieved at one 
time.  "
-          + "Retrieval of both should be done separately\n"
-          + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
-    }
-
-    requireArgumentSet(Arguments.ARG_NAME, clientInfo.name);
-
-    File storeFile = null;
-    SecurityStore.StoreType type;
-    if (clientInfo.keystore != null) {
-      storeFile = clientInfo.keystore;
-      type = SecurityStore.StoreType.keystore;
-    } else {
-      storeFile = clientInfo.truststore;
-      type = SecurityStore.StoreType.truststore;
-    }
-
-    require (!storeFile.exists(),
-        "File %s already exists.  Please remove that file or select a 
different file name.",
-         storeFile.getAbsolutePath());
-    String hostname = null;
-    if (type == SecurityStore.StoreType.keystore) {
-      hostname = clientInfo.hostname;
-      if (hostname == null) {
-        hostname = InetAddress.getLocalHost().getCanonicalHostName();
-        log.info("No hostname specified via command line. Using {}", hostname);
-      }
-    }
-
-    String password = clientInfo.password;
-    if (password == null) {
-      String provider = clientInfo.provider;
-      String alias = clientInfo.alias;
-      if (provider != null && alias != null) {
-        Configuration conf = new Configuration(getConfig());
-        conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
-        char[] chars = conf.getPassword(alias);
-        if (chars == null) {
-          CredentialProvider credentialProvider =
-              CredentialProviderFactory.getProviders(conf).get(0);
-          chars = readOnePassword(alias);
-          credentialProvider.createCredentialEntry(alias, chars);
-          credentialProvider.flush();
-        }
-        password = String.valueOf(chars);
-        Arrays.fill(chars, ' ');
-      } else {
-        log.info("No password and no provider/alias pair were provided, " +
-            "prompting for password");
-        // get a password
-        password = String.valueOf(readOnePassword(type.name()));
-      }
-    }
-
-    byte[] keystore = createClusterOperations(clientInfo.name)
-        .getClientCertificateStore(hostname, "client", password, type.name());
-    // persist to file
-    FileOutputStream storeFileOutputStream = null;
-    try {
-      storeFileOutputStream = new FileOutputStream(storeFile);
-      IOUtils.write(keystore, storeFileOutputStream);
-    } catch (Exception e) {
-      log.error("Unable to persist to file {}", storeFile);
-      throw e;
-    } finally {
-      if (storeFileOutputStream != null) {
-        storeFileOutputStream.close();
-      }
-    }
-
-    return EXIT_SUCCESS;
-  }
-
-  private int doClientInstall(ActionClientArgs clientInfo)
-      throws IOException, SliderException {
-
-    require(clientInfo.installLocation != null,
-          E_INVALID_INSTALL_LOCATION +"\n"
-          + CommonArgs.usage(serviceArgs, ACTION_CLIENT));
-    require(clientInfo.installLocation.exists(),
-        E_INSTALL_PATH_DOES_NOT_EXIST + ": " + 
clientInfo.installLocation.getAbsolutePath());
-
-    require(clientInfo.installLocation.isDirectory(),
-        E_INVALID_INSTALL_PATH + ": " + 
clientInfo.installLocation.getAbsolutePath());
-
-    File pkgFile;
-    File tmpDir = null;
-
-    require(isSet(clientInfo.packageURI) || isSet(clientInfo.name),
-        E_INVALID_APPLICATION_PACKAGE_LOCATION);
-    if (isSet(clientInfo.packageURI)) {
-      pkgFile = new File(clientInfo.packageURI);
-    } else {
-      Path appDirPath = sliderFileSystem.buildAppDefDirPath(clientInfo.name);
-      Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG);
-      require(sliderFileSystem.isFile(appDefPath),
-          E_INVALID_APPLICATION_PACKAGE_LOCATION);
-      tmpDir = Files.createTempDir();
-      pkgFile = new File(tmpDir, SliderKeys.DEFAULT_APP_PKG);
-      sliderFileSystem.copyHdfsFileToLocal(appDefPath, pkgFile);
-    }
-    require(pkgFile.isFile(),
-        E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + " at %s", 
pkgFile.getAbsolutePath());
-
-    JSONObject config = null;
-    if(clientInfo.clientConfig != null) {
-      try {
-        byte[] encoded = Files.toByteArray(clientInfo.clientConfig);
-        config = new JSONObject(new String(encoded, Charset.defaultCharset()));
-      } catch (JSONException jsonEx) {
-        log.error("Unable to read supplied configuration at {}: {}",
-            clientInfo.clientConfig, jsonEx);
-        log.debug("Unable to read supplied configuration at {}: {}",
-            clientInfo.clientConfig, jsonEx, jsonEx);
-        throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx);
-      }
-    }
-
-    // Only INSTALL is supported
-    AbstractClientProvider
-        provider = 
createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE);
-    provider.processClientOperation(sliderFileSystem,
-        getRegistryOperations(),
-        getConfig(),
-        "INSTALL",
-        clientInfo.installLocation,
-        pkgFile,
-        config,
-        clientInfo.name);
-    return EXIT_SUCCESS;
-  }
-
-
-  @Override
-  public int actionPackage(ActionPackageArgs actionPackageInfo)
-      throws YarnException, IOException {
-    initializeOutputStream(actionPackageInfo.out);
-    int exitCode = -1;
-    if (actionPackageInfo.help) {
-      exitCode = actionHelp(ACTION_PACKAGE);
-    }
-    if (actionPackageInfo.install) {
-      exitCode = actionPackageInstall(actionPackageInfo);
-    }
-    if (actionPackageInfo.delete) {
-      exitCode = actionPackageDelete(actionPackageInfo);
-    }
-    if (actionPackageInfo.list) {
-      exitCode = actionPackageList();
-    }
-    if (actionPackageInfo.instances) {
-      exitCode = actionPackageInstances();
-    }
-    finalizeOutputStream(actionPackageInfo.out);
-    if (exitCode != -1) {
-      return exitCode;
-    }
-    throw new BadCommandArgumentsException(
-        "Select valid package operation option");
-  }
-
-  private void initializeOutputStream(String outFile)
-      throws FileNotFoundException {
-    if (outFile != null) {
-      clientOutputStream = new PrintStream(new FileOutputStream(outFile));
-    } else {
-      clientOutputStream = System.out;
-    }
-  }
-
-  private void finalizeOutputStream(String outFile) {
-    if (outFile != null && clientOutputStream != null) {
-      clientOutputStream.flush();
-      clientOutputStream.close();
-    }
-    clientOutputStream = System.out;
-  }
-
-  private int actionPackageInstances() throws YarnException, IOException {
-    Map<String, Path> persistentInstances = sliderFileSystem
-        .listPersistentInstances();
-    if (persistentInstances.isEmpty()) {
-      log.info("No slider cluster specification available");
-      return EXIT_SUCCESS;
-    }
-    String pkgPathValue = sliderFileSystem
-        .buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri()
-        .getPath();
-    FileSystem fs = sliderFileSystem.getFileSystem();
-    Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances
-        .entrySet().iterator();
-    log.info("List of applications with its package name and path");
-    println("%-25s  %15s  %30s  %s", "Cluster Name", "Package Name",
-        "Package Version", "Application Location");
-    while(instanceItr.hasNext()) {
-      Map.Entry<String, Path> entry = instanceItr.next();
-      String clusterName = entry.getKey();
-      Path clusterPath = entry.getValue();
-      AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
-          clusterName, clusterPath);
-      Path appDefPath = null;
-      try {
-        appDefPath = new Path(
-            getApplicationDefinitionPath(instanceDefinition
-                .getAppConfOperations()));
-      } catch (BadConfigException e) {
-        // Invalid cluster state, so move on to next. No need to log anything
-        // as this is just listing of instances.
-        continue;
-      }
-      if (!appDefPath.isUriPathAbsolute()) {
-        appDefPath = new Path(fs.getHomeDirectory(), appDefPath);
-      }
-      String appDefPathStr = appDefPath.toUri().toString();
-      try {
-        if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) {
-          String packageName = appDefPath.getParent().getName();
-          String packageVersion = StringUtils.EMPTY;
-          if (instanceDefinition.isVersioned()) {
-            packageVersion = packageName;
-            packageName = appDefPath.getParent().getParent().getName();
-          }
-          println("%-25s  %15s  %30s  %s", clusterName, packageName,
-              packageVersion, appDefPathStr);
-        }
-      } catch (IOException e) {
-        log.debug("{} application definition path {} is not found.", 
clusterName, appDefPathStr);
-      }
-    }
-    return EXIT_SUCCESS;
-  }
-
-  private int actionPackageList() throws IOException {
-    Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY,
-        StringUtils.EMPTY);
-    log.info("Package install path : {}", pkgPath);
-    FileSystem sfs = sliderFileSystem.getFileSystem();
-    if (!sfs.isDirectory(pkgPath)) {
-      log.info("No package(s) installed");
-      return EXIT_SUCCESS;
-    }
-    FileStatus[] fileStatus = sfs.listStatus(pkgPath);
-    boolean hasPackage = false;
-    StringBuilder sb = new StringBuilder();
-    sb.append("List of installed packages:\n");
-    for (FileStatus fstat : fileStatus) {
-      if (fstat.isDirectory()) {
-        sb.append("\t").append(fstat.getPath().getName());
-        sb.append("\n");
-        hasPackage = true;
-      }
-    }
-    if (hasPackage) {
-      println(sb.toString());
-    } else {
-      log.info("No package(s) installed");
-    }
-    return EXIT_SUCCESS;
-  }
-
-  private void createSummaryMetainfoFile(Path srcFile, Path destFile,
-      boolean overwrite) throws IOException {
-    FileSystem srcFs = srcFile.getFileSystem(getConfig());
-    try (InputStream inputStreamJson = SliderUtils
-        .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.json");
-        InputStream inputStreamXml = SliderUtils
-            .getApplicationResourceInputStream(srcFs, srcFile, 
"metainfo.xml");) {
-      InputStream inputStream = null;
-      Path summaryFileInFs = null;
-      if (inputStreamJson != null) {
-        inputStream = inputStreamJson;
-        summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
-            + ".metainfo.json");
-        log.info("Found JSON metainfo file in package");
-      } else if (inputStreamXml != null) {
-        inputStream = inputStreamXml;
-        summaryFileInFs = new Path(destFile.getParent(), destFile.getName()
-            + ".metainfo.xml");
-        log.info("Found XML metainfo file in package");
-      }
-      if (inputStream != null) {
-        try (FSDataOutputStream dataOutputStream = sliderFileSystem
-            .getFileSystem().create(summaryFileInFs, overwrite)) {
-          log.info("Creating summary metainfo file");
-          IOUtils.copy(inputStream, dataOutputStream);
-        }
-      }
-    }
-  }
-
-  private int actionPackageInstall(ActionPackageArgs actionPackageArgs)
-      throws YarnException, IOException {
-    requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
-
-    Path srcFile = extractPackagePath(actionPackageArgs.packageURI);
-
-    Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
-        actionPackageArgs.version);
-    FileSystem fs = sliderFileSystem.getFileSystem();
-    if (!fs.exists(pkgPath)) {
-      fs.mkdirs(pkgPath);
-    }
-
-    Path fileInFs = new Path(pkgPath, srcFile.getName());
-    require(actionPackageArgs.replacePkg || !fs.exists(fileInFs),
-        E_PACKAGE_EXISTS +" at  %s. Use --replacepkg to overwrite.", 
fileInFs.toUri());
-
-    log.info("Installing package {} to {} (overwrite set to {})", srcFile,
-        fileInFs, actionPackageArgs.replacePkg);
-    fs.copyFromLocalFile(false, actionPackageArgs.replacePkg, srcFile, 
fileInFs);
-    createSummaryMetainfoFile(srcFile, fileInFs, actionPackageArgs.replacePkg);
-
-    String destPathWithHomeDir = Path
-        .getPathWithoutSchemeAndAuthority(fileInFs).toString();
-    String destHomeDir = Path.getPathWithoutSchemeAndAuthority(
-        fs.getHomeDirectory()).toString();
-    // a somewhat contrived approach to stripping out the home directory and 
any trailing
-    // separator; designed to work on windows and unix
-    String destPathWithoutHomeDir;
-    if (destPathWithHomeDir.startsWith(destHomeDir)) {
-      destPathWithoutHomeDir = 
destPathWithHomeDir.substring(destHomeDir.length());
-      if (destPathWithoutHomeDir.startsWith("/") || 
destPathWithoutHomeDir.startsWith("\\")) {
-        destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1);
-      }
-    } else {
-      destPathWithoutHomeDir = destPathWithHomeDir;
-    }
-    log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}",
-        destPathWithoutHomeDir);
-
-    return EXIT_SUCCESS;
-  }
-
-  private Path extractPackagePath(String packageURI)
-      throws BadCommandArgumentsException {
-    require(isSet(packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION);
-    File pkgFile = new File(packageURI);
-    require(pkgFile.isFile(),
-        E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + ":  " + 
pkgFile.getAbsolutePath());
-    return new Path(pkgFile.toURI());
-  }
-
-  private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws
-      YarnException, IOException {
-    requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name);
-
-    Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name,
-        actionPackageArgs.version);
-    FileSystem fs = sliderFileSystem.getFileSystem();
-    require(fs.exists(pkgPath), E_PACKAGE_DOES_NOT_EXIST +": %s ", 
pkgPath.toUri());
-    log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath);
-
-    if(fs.delete(pkgPath, true)) {
-      log.info("Deleted package {} " + actionPackageArgs.name);
-      return EXIT_SUCCESS;
-    } else {
-      log.warn("Package deletion failed.");
-      return EXIT_NOT_FOUND;
-    }
-  }
-
-  @Override
-  public int actionUpdate(String clustername,
-      AbstractClusterBuildingActionArgs buildInfo) throws
-      YarnException, IOException {
-    buildInstanceDefinition(clustername, buildInfo, true, true);
-    return EXIT_SUCCESS; 
-  }
-
-  /**
-   * Build up the AggregateConfiguration for an application instance then
-   * persists it
-   * @param clustername name of the cluster
-   * @param buildInfo the arguments needed to build the cluster
-   * @param overwrite true if existing cluster directory can be overwritten
-   * @param liveClusterAllowed true if live cluster can be modified
-   * @throws YarnException
-   * @throws IOException
-   */
-
-  public void buildInstanceDefinition(String clustername,
-      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
-      boolean liveClusterAllowed) throws YarnException, IOException {
-    buildInstanceDefinition(clustername, buildInfo, overwrite,
-        liveClusterAllowed, false);
-  }
-
-  public void buildInstanceDefinition(String clustername,
-      AbstractClusterBuildingActionArgs buildInfo, boolean overwrite,
-      boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException,
-      IOException {
-    // verify that a live cluster isn't there
-    validateClusterName(clustername);
-    verifyBindingsDefined();
-    if (!liveClusterAllowed) {
-      verifyNoLiveClusters(clustername, "Create");
-    }
-
-    Configuration conf = getConfig();
-    String registryQuorum = lookupZKQuorum();
-
-    Path appconfdir = buildInfo.getConfdir();
-    // Provider
-    String providerName = buildInfo.getProvider();
-    requireArgumentSet(Arguments.ARG_PROVIDER, providerName);
-    log.debug("Provider is {}", providerName);
-    SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf);
-    AbstractClientProvider provider =
-      createClientProvider(providerName);
-    InstanceBuilder builder =
-      new InstanceBuilder(sliderFileSystem, 
-                          getConfig(),
-                          clustername);
-    
-    AggregateConf instanceDefinition = new AggregateConf();
-    ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
-    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
-    ConfTreeOperations internal = instanceDefinition.getInternalOperations();
-    //initial definition is set by the providers 
-    sliderAM.prepareInstanceConfiguration(instanceDefinition);
-    provider.prepareInstanceConfiguration(instanceDefinition);
-
-    //load in any specified on the command line
-    if (buildInfo.resources != null) {
-      try {
-        resources.mergeFile(buildInfo.resources,
-                            new ResourcesInputPropertiesValidator());
-
-      } catch (IOException e) {
-        throw new BadConfigException(e,
-               "incorrect argument to %s: \"%s\" : %s ", 
-                                     Arguments.ARG_RESOURCES,
-                                     buildInfo.resources,
-                                     e.toString());
-      }
-    }
-    if (buildInfo.template != null) {
-      try {
-        appConf.mergeFile(buildInfo.template,
-                          new TemplateInputPropertiesValidator());
-      } catch (IOException e) {
-        throw new BadConfigException(e,
-                                     "incorrect argument to %s: \"%s\" : %s ",
-                                     Arguments.ARG_TEMPLATE,
-                                     buildInfo.template,
-                                     e.toString());
-      }
-    }
-
-    if (isUpgradeFlow) {
-      ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo;
-      if (!upgradeInfo.force) {
-        validateClientAndClusterResource(clustername, resources);
-      }
-    }
-
-    //get the command line options
-    ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree();
-    ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree();
-
-    appConf.merge(cmdLineAppOptions);
-
-    AppDefinitionPersister appDefinitionPersister = new 
AppDefinitionPersister(sliderFileSystem);
-    appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, 
appConf);
-
-    // put the role counts into the resources file
-    Map<String, String> argsRoleMap = buildInfo.getComponentMap();
-    for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) {
-      String count = roleEntry.getValue();
-      String key = roleEntry.getKey();
-      log.info("{} => {}", key, count);
-      resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count);
-    }
-
-    //all CLI role options
-    Map<String, Map<String, String>> appOptionMap =
-      buildInfo.getCompOptionMap();
-    appConf.mergeComponents(appOptionMap);
-
-    //internal picks up core. values only
-    internal.propagateGlobalKeys(appConf, "slider.");
-    internal.propagateGlobalKeys(appConf, "internal.");
-
-    //copy over role. and yarn. values ONLY to the resources
-    if (PROPAGATE_RESOURCE_OPTION) {
-      resources.propagateGlobalKeys(appConf, "component.");
-      resources.propagateGlobalKeys(appConf, "role.");
-      resources.propagateGlobalKeys(appConf, "yarn.");
-      resources.mergeComponentsPrefix(appOptionMap, "component.", true);
-      resources.mergeComponentsPrefix(appOptionMap, "yarn.", true);
-      resources.mergeComponentsPrefix(appOptionMap, "role.", true);
-    }
-
-    // resource component args
-    appConf.merge(cmdLineResourceOptions);
-    resources.merge(cmdLineResourceOptions);
-    resources.mergeComponents(buildInfo.getResourceCompOptionMap());
-
-    builder.init(providerName, instanceDefinition);
-    builder.resolve();
-    builder.propagateFilename();
-    builder.propagatePrincipals();
-    builder.setImageDetailsIfAvailable(buildInfo.getImage(),
-                                       buildInfo.getAppHomeDir());
-    builder.setQueue(buildInfo.queue);
-
-    String quorum = buildInfo.getZKhosts();
-    if (isUnset(quorum)) {
-      quorum = registryQuorum;
-    }
-    if (isUnset(quorum)) {
-      throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM);
-    }
-    ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(),
-        getUsername(),
-        clustername,
-        registryQuorum,
-        quorum);
-    String zookeeperRoot = buildInfo.getAppZKPath();
-
-    if (isSet(zookeeperRoot)) {
-      zkPaths.setAppPath(zookeeperRoot);
-    } else {
-      String createDefaultZkNode = appConf.getGlobalOptions()
-          .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
-      if (createDefaultZkNode.equals("true")) {
-        String defaultZKPath = createZookeeperNode(clustername, false);
-        log.debug("ZK node created for application instance: {}", 
defaultZKPath);
-        if (defaultZKPath != null) {
-          zkPaths.setAppPath(defaultZKPath);
-        }
-      } else {
-        // create AppPath if default is being used
-        String defaultZKPath = createZookeeperNode(clustername, true);
-        log.debug("ZK node assigned to application instance: {}", 
defaultZKPath);
-        zkPaths.setAppPath(defaultZKPath);
-      }
-    }
-
-    builder.addZKBinding(zkPaths);
-
-    //then propagate any package URI
-    if (buildInfo.packageURI != null) {
-      appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI);
-    }
-
-    propagatePythonExecutable(conf, instanceDefinition);
-
-    // make any substitutions needed at this stage
-    replaceTokens(appConf.getConfTree(), getUsername(), clustername);
-
-    // TODO: Refactor the validation code and persistence code
-    try {
-      persistInstanceDefinition(overwrite, appconfdir, builder);
-      appDefinitionPersister.persistPackages();
-
-    } catch (LockAcquireFailedException e) {
-      log.warn("Failed to get a Lock on {} : {}", builder, e, e);
-      throw new BadClusterStateException("Failed to save " + clustername
-                                         + ": " + e);
-    }
-
-    // providers to validate what there is
-    // TODO: Validation should be done before persistence
-    AggregateConf instanceDescription = builder.getInstanceDescription();
-    validateInstanceDefinition(sliderAM, instanceDescription, 
sliderFileSystem);
-    validateInstanceDefinition(provider, instanceDescription, 
sliderFileSystem);
-  }
-
-  private void validateClientAndClusterResource(String clustername,
-      ConfTreeOperations clientResources) throws BadClusterStateException,
-      SliderException, IOException {
-    log.info("Validating upgrade resource definition with current cluster "
-        + "state (components and instance count)");
-    Map<String, Integer> clientComponentInstances = new HashMap<>();
-    for (String componentName : clientResources.getComponentNames()) {
-      if (!SliderKeys.COMPONENT_AM.equals(componentName)) {
-        clientComponentInstances.put(componentName, clientResources
-            .getComponentOptInt(componentName,
-                COMPONENT_INSTANCES, -1));
-      }
-    }
-
-    AggregateConf clusterConf = null;
-    try {
-      clusterConf = loadPersistedClusterDescription(clustername);
-    } catch (LockAcquireFailedException e) {
-      log.warn("Failed to get a Lock on cluster resource : {}", e, e);
-      throw new BadClusterStateException(
-          "Failed to load client resource definition " + clustername + ": " + 
e, e);
-    }
-    Map<String, Integer> clusterComponentInstances = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> component : clusterConf
-        .getResources().components.entrySet()) {
-      if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) {
-        clusterComponentInstances.put(
-            component.getKey(),
-            Integer.decode(component.getValue().get(
-                COMPONENT_INSTANCES)));
-      }
-    }
-
-    // client and cluster should be an exact match
-    Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = 
clientComponentInstances
-        .entrySet().iterator();
-    while (clientComponentInstanceIt.hasNext()) {
-      Map.Entry<String, Integer> clientComponentInstanceEntry = 
clientComponentInstanceIt.next();
-      if 
(clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) {
-        // compare instance count now and remove from both maps if they match
-        if (clusterComponentInstances
-            .get(clientComponentInstanceEntry.getKey()).intValue() == 
clientComponentInstanceEntry
-            .getValue().intValue()) {
-          clusterComponentInstances.remove(clientComponentInstanceEntry
-              .getKey());
-          clientComponentInstanceIt.remove();
-        }
-      }
-    }
-
-    if (!clientComponentInstances.isEmpty()
-        || !clusterComponentInstances.isEmpty()) {
-      log.error("Mismatch found in upgrade resource definition and cluster "
-          + "resource state");
-      if (!clientComponentInstances.isEmpty()) {
-        log.info("The upgrade resource definitions that do not match are:");
-        for (Map.Entry<String, Integer> clientComponentInstanceEntry : 
clientComponentInstances
-            .entrySet()) {
-          log.info("    Component Name: {}, Instance count: {}",
-              clientComponentInstanceEntry.getKey(),
-              clientComponentInstanceEntry.getValue());
-        }
-      }
-      if (!clusterComponentInstances.isEmpty()) {
-        log.info("The cluster resources that do not match are:");
-        for (Map.Entry<String, Integer> clusterComponentInstanceEntry : 
clusterComponentInstances
-            .entrySet()) {
-          log.info("    Component Name: {}, Instance count: {}",
-              clusterComponentInstanceEntry.getKey(),
-              clusterComponentInstanceEntry.getValue());
-        }
-      }
-      throw new BadConfigException("Resource definition provided for "
-          + "upgrade does not match with that of the currently running "
-          + "cluster.\nIf you are aware of what you are doing, rerun the "
-          + "command with " + Arguments.ARG_FORCE + " option.");
-    }
-  }
-
-  protected void persistInstanceDefinition(boolean overwrite,
-                                         Path appconfdir,
-                                         InstanceBuilder builder)
-      throws IOException, SliderException, LockAcquireFailedException {
-    builder.persist(appconfdir, overwrite);
-  }
-
-  @VisibleForTesting
-  public static void replaceTokens(ConfTree conf,
-      String userName, String clusterName) throws IOException {
-    Map<String,String> newglobal = new HashMap<>();
-    for (Entry<String,String> entry : conf.global.entrySet()) {
-      newglobal.put(entry.getKey(), replaceTokens(entry.getValue(),
-          userName, clusterName));
-    }
-    conf.global.putAll(newglobal);
-
-    for (String component : conf.components.keySet()) {
-      Map<String,String> newComponent = new HashMap<>();
-      for (Entry<String,String> entry : 
conf.components.get(component).entrySet()) {
-        newComponent.put(entry.getKey(), replaceTokens(entry.getValue(),
-            userName, clusterName));
-      }
-      conf.components.get(component).putAll(newComponent);
-    }
-
-    Map<String,List<String>> newcred = new HashMap<>();
-    for (Entry<String,List<String>> entry : conf.credentials.entrySet()) {
-      List<String> resultList = new ArrayList<>();
-      for (String v : entry.getValue()) {
-        resultList.add(replaceTokens(v, userName, clusterName));
-      }
-      newcred.put(replaceTokens(entry.getKey(), userName, clusterName),
-          resultList);
-    }
-    conf.credentials.clear();
-    conf.credentials.putAll(newcred);
-  }
-
-  private static String replaceTokens(String s, String userName,
-      String clusterName) throws IOException {
-    return s.replaceAll(Pattern.quote("${USER}"), userName)
-        .replaceAll(Pattern.quote("${USER_NAME}"), userName);
-  }
-
-  public FsPermission getClusterDirectoryPermissions(Configuration conf) {
-    String clusterDirPermsOct =
-      conf.get(CLUSTER_DIRECTORY_PERMISSIONS, 
DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS);
-    return new FsPermission(clusterDirPermsOct);
-  }
-
-  /**
-   * Verify that the Resource Manager is configured (on a non-HA cluster).
-   * with a useful error message
-   * @throws BadCommandArgumentsException the exception raised on an invalid 
config
-   */
-  public void verifyBindingsDefined() throws BadCommandArgumentsException {
-    InetSocketAddress rmAddr = getRmAddress(getConfig());
-    if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false)
-     && !isAddressDefined(rmAddr)) {
-      throw new BadCommandArgumentsException(
-        E_NO_RESOURCE_MANAGER
-        + " in the argument "
-        + Arguments.ARG_MANAGER
-        + " or the configuration property "
-        + YarnConfiguration.RM_ADDRESS 
-        + " value :" + rmAddr);
-    }
-  }
-
-  /**
-   * Load and start a cluster specification.
-   * This assumes that all validation of args and cluster state
-   * have already taken place
-   *
-   * @param clustername name of the cluster.
-   * @param launchArgs launch arguments
-   * @return the exit code
-   * @throws YarnException
-   * @throws IOException
-   */
-  protected int startCluster(String clustername,
-                           LaunchArgsAccessor launchArgs) throws
-                                                          YarnException,
-                                                          IOException {
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
-    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
-      clustername,
-      clusterDirectory);
-
-    LaunchedApplication launchedApplication =
-      launchApplication(clustername, clusterDirectory, instanceDefinition,
-                        serviceArgs.isDebug());
-
-    if (launchArgs.getOutputFile() != null) {
-      // output file has been requested. Get the app report and serialize it
-      ApplicationReport report =
-          launchedApplication.getApplicationReport();
-      SerializedApplicationReport sar = new 
SerializedApplicationReport(report);
-      sar.submitTime = System.currentTimeMillis();
-      ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser();
-      serDeser.save(sar, launchArgs.getOutputFile());
-    }
-    int waittime = launchArgs.getWaittime();
-    if (waittime > 0) {
-      return waitForAppRunning(launchedApplication, waittime, waittime);
-    } else {
-      // no waiting
-      return EXIT_SUCCESS;
-    }
-  }
-
-  /**
-   * Load the instance definition. It is not resolved at this point
-   * @param name cluster name
-   * @param clusterDirectory cluster dir
-   * @return the loaded configuration
-   * @throws IOException
-   * @throws SliderException
-   * @throws UnknownApplicationInstanceException if the file is not found
-   */
-  public AggregateConf loadInstanceDefinitionUnresolved(String name,
-            Path clusterDirectory) throws IOException, SliderException {
-
-    try {
-      AggregateConf definition =
-        InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem,
-                                                    clusterDirectory);
-      definition.setName(name);
-      return definition;
-    } catch (FileNotFoundException e) {
-      throw UnknownApplicationInstanceException.unknownInstance(name, e);
-    }
-  }
-
-  /**
-   * Load the instance definition. 
-   * @param name cluster name
-   * @param resolved flag to indicate the cluster should be resolved
-   * @return the loaded configuration
-   * @throws IOException IO problems
-   * @throws SliderException slider explicit issues
-   * @throws UnknownApplicationInstanceException if the file is not found
-   */
-    public AggregateConf loadInstanceDefinition(String name,
-        boolean resolved) throws
-        IOException,
-        SliderException {
-
-    Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name);
-    AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
-      name,
-      clusterDirectory);
-    if (resolved) {
-      instanceDefinition.resolve();
-    }
-    return instanceDefinition;
-
-  }
-
-  protected AppMasterLauncher setupAppMasterLauncher(String clustername,
-      Path clusterDirectory,
-      AggregateConf instanceDefinition,
-      boolean debugAM)
-    throws YarnException, IOException{
-    deployedClusterName = clustername;
-    validateClusterName(clustername);
-    verifyNoLiveClusters(clustername, "Launch");
-    Configuration config = getConfig();
-    lookupZKQuorum();
-    boolean clusterSecure = isHadoopClusterSecure(config);
-    //create the Slider AM provider -this helps set up the AM
-    SliderAMClientProvider sliderAM = new SliderAMClientProvider(config);
-
-    instanceDefinition.resolve();
-    launchedInstanceDefinition = instanceDefinition;
-
-    ConfTreeOperations internalOperations = 
instanceDefinition.getInternalOperations();
-    MapOperations internalOptions = internalOperations.getGlobalOptions();
-    ConfTreeOperations resourceOperations = 
instanceDefinition.getResourceOperations();
-    ConfTreeOperations appOperations = 
instanceDefinition.getAppConfOperations();
-    Path generatedConfDirPath =
-      createPathThatMustExist(internalOptions.getMandatoryOption(
-        INTERNAL_GENERATED_CONF_PATH));
-    Path snapshotConfPath =
-      createPathThatMustExist(internalOptions.getMandatoryOption(
-        INTERNAL_SNAPSHOT_CONF_PATH));
-
-
-    // cluster Provider
-    AbstractClientProvider provider = createClientProvider(
-      internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME));
-    if (log.isDebugEnabled()) {
-      log.debug(instanceDefinition.toString());
-    }
-    MapOperations sliderAMResourceComponent =
-      resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM);
-    MapOperations resourceGlobalOptions = 
resourceOperations.getGlobalOptions();
-
-    // add the tags if available
-    Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
-        getApplicationDefinitionPath(appOperations));
-
-    Credentials credentials = null;
-    if (clusterSecure) {
-      // pick up oozie credentials
-      credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(),
-          config);
-      if (credentials == null) {
-        // nothing from oozie, so build up directly
-        credentials = new Credentials(
-            UserGroupInformation.getCurrentUser().getCredentials());
-        CredentialUtils.addRMRenewableFSDelegationTokens(config,
-            sliderFileSystem.getFileSystem(),
-            credentials);
-        CredentialUtils.addRMDelegationToken(yarnClient, credentials);
-
-      } else {
-        log.info("Using externally supplied credentials to launch AM");
-      }
-    }
-
-    AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
-        SliderKeys.APP_TYPE,
-        config,
-        sliderFileSystem,
-        yarnClient,
-        clusterSecure,
-        sliderAMResourceComponent,
-        resourceGlobalOptions,
-        applicationTags,
-        credentials);
-
-    ApplicationId appId = amLauncher.getApplicationId();
-    // set the application name;
-    amLauncher.setKeepContainersOverRestarts(true);
-
-    int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
-    amLauncher.setMaxAppAttempts(maxAppAttempts);
-
-    sliderFileSystem.purgeAppInstanceTempFiles(clustername);
-    Path tempPath = sliderFileSystem.createAppInstanceTempPath(
-        clustername,
-        appId.toString() + "/am");
-    String libdir = "lib";
-    Path libPath = new Path(tempPath, libdir);
-    sliderFileSystem.getFileSystem().mkdirs(libPath);
-    log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, 
libPath);
- 
-    // set local resources for the application master
-    // local files or archives as needed
-    // In this scenario, the jar file for the application master is part of 
the local resources
-    Map<String, LocalResource> localResources = amLauncher.getLocalResources();
-    
-    // look for the configuration directory named on the command line
-    boolean hasServerLog4jProperties = false;
-    Path remoteConfPath = null;
-    String relativeConfDir = null;
-    String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR);
-    if (isUnset(confdirProp)) {
-      log.debug("No local configuration directory provided as system 
property");
-    } else {
-      File confDir = new File(confdirProp);
-      if (!confDir.exists()) {
-        throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND,
-                                     confDir);
-      }
-      Path localConfDirPath = createLocalPath(confDir);
-      remoteConfPath = new Path(clusterDirectory, 
SliderKeys.SUBMITTED_CONF_DIR);
-      log.debug("Slider configuration directory is {}; remote to be {}", 
-          localConfDirPath, remoteConfPath);
-      copyDirectory(config, localConfDirPath, remoteConfPath, null);
-
-      File log4jserver =
-          new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
-      hasServerLog4jProperties = log4jserver.isFile();
-    }
-    // the assumption here is that minimr cluster => this is a test run
-    // and the classpath can look after itself
-
-    boolean usingMiniMRCluster = getUsingMiniMRCluster();
-    if (!usingMiniMRCluster) {
-
-      log.debug("Destination is not a MiniYARNCluster -copying full 
classpath");
-
-      // insert conf dir first
-      if (remoteConfPath != null) {
-        relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR;
-        Map<String, LocalResource> submittedConfDir =
-          sliderFileSystem.submitDirectory(remoteConfPath,
-                                         relativeConfDir);
-        mergeMaps(localResources, submittedConfDir);
-      }
-    }
-    // build up the configuration 
-    // IMPORTANT: it is only after this call that site configurations
-    // will be valid.
-
-    propagatePrincipals(config, instanceDefinition);
-    // validate security data
-
-/*
-    // turned off until tested
-    SecurityConfiguration securityConfiguration =
-        new SecurityConfiguration(config,
-            instanceDefinition, clustername);
-    
-*/
-    Configuration clientConfExtras = new Configuration(false);
-    // then build up the generated path.
-    FsPermission clusterPerms = getClusterDirectoryPermissions(config);
-    copyDirectory(config, snapshotConfPath, generatedConfDirPath,
-        clusterPerms);
-
-
-    // standard AM resources
-    sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem,
-                                       config,
-                                       amLauncher,
-                                       instanceDefinition,
-    

<TRUNCATED>

Reply via email to