http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java deleted file mode 100644 index fd3647d..0000000 --- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ /dev/null @@ -1,4572 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.slider.client; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.Files; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathNotFoundException; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.registry.client.api.RegistryConstants; -import org.apache.hadoop.registry.client.api.RegistryOperations; -import org.apache.hadoop.registry.client.binding.RegistryPathUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.registry.client.exceptions.NoRecordException; -import org.apache.hadoop.registry.client.types.Endpoint; -import org.apache.hadoop.registry.client.types.RegistryPathStatus; -import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.KerberosDiags; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.alias.CredentialProvider; -import org.apache.hadoop.security.alias.CredentialProviderFactory; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; -import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.slider.api.ClusterDescription; -import org.apache.slider.api.ClusterNode; -import org.apache.slider.api.SliderApplicationApi; -import org.apache.slider.api.SliderClusterProtocol; -import org.apache.slider.api.StateValues; -import org.apache.slider.api.proto.Messages; -import org.apache.slider.api.types.ContainerInformation; -import org.apache.slider.api.types.NodeInformationList; -import org.apache.slider.api.types.SliderInstanceDescription; -import org.apache.slider.client.ipc.SliderApplicationIpcClient; -import org.apache.slider.client.ipc.SliderClusterOperations; -import org.apache.slider.common.Constants; -import org.apache.slider.common.SliderExitCodes; -import org.apache.slider.common.SliderKeys; -import org.apache.slider.common.params.AbstractActionArgs; -import org.apache.slider.common.params.AbstractClusterBuildingActionArgs; -import org.apache.slider.common.params.ActionAMSuicideArgs; -import org.apache.slider.common.params.ActionClientArgs; -import org.apache.slider.common.params.ActionCreateArgs; -import org.apache.slider.common.params.ActionDependencyArgs; -import org.apache.slider.common.params.ActionDestroyArgs; -import org.apache.slider.common.params.ActionDiagnosticArgs; -import org.apache.slider.common.params.ActionEchoArgs; -import org.apache.slider.common.params.ActionExistsArgs; -import org.apache.slider.common.params.ActionFlexArgs; -import org.apache.slider.common.params.ActionFreezeArgs; -import org.apache.slider.common.params.ActionInstallKeytabArgs; -import org.apache.slider.common.params.ActionInstallPackageArgs; -import org.apache.slider.common.params.ActionKDiagArgs; -import org.apache.slider.common.params.ActionKeytabArgs; -import org.apache.slider.common.params.ActionKillContainerArgs; -import org.apache.slider.common.params.ActionListArgs; -import org.apache.slider.common.params.ActionLookupArgs; -import org.apache.slider.common.params.ActionNodesArgs; -import org.apache.slider.common.params.ActionPackageArgs; -import org.apache.slider.common.params.ActionRegistryArgs; -import org.apache.slider.common.params.ActionResolveArgs; -import org.apache.slider.common.params.ActionResourceArgs; -import org.apache.slider.common.params.ActionStatusArgs; -import org.apache.slider.common.params.ActionThawArgs; -import org.apache.slider.common.params.ActionTokensArgs; -import org.apache.slider.common.params.ActionUpgradeArgs; -import org.apache.slider.common.params.Arguments; -import org.apache.slider.common.params.ClientArgs; -import org.apache.slider.common.params.CommonArgs; -import org.apache.slider.common.params.LaunchArgsAccessor; -import org.apache.slider.common.tools.ConfigHelper; -import org.apache.slider.common.tools.Duration; -import org.apache.slider.common.tools.SliderFileSystem; -import org.apache.slider.common.tools.SliderUtils; -import org.apache.slider.common.tools.SliderVersionInfo; -import org.apache.slider.core.build.InstanceBuilder; -import org.apache.slider.core.build.InstanceIO; -import org.apache.slider.core.conf.AggregateConf; -import org.apache.slider.core.conf.ConfTree; -import org.apache.slider.core.conf.ConfTreeOperations; -import org.apache.slider.core.conf.MapOperations; -import org.apache.slider.core.conf.ResourcesInputPropertiesValidator; -import org.apache.slider.core.conf.TemplateInputPropertiesValidator; -import org.apache.slider.core.exceptions.BadClusterStateException; -import org.apache.slider.core.exceptions.BadCommandArgumentsException; -import org.apache.slider.core.exceptions.BadConfigException; -import org.apache.slider.core.exceptions.ErrorStrings; -import org.apache.slider.core.exceptions.NoSuchNodeException; -import org.apache.slider.core.exceptions.NotFoundException; -import org.apache.slider.core.exceptions.SliderException; -import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; -import org.apache.slider.core.exceptions.UsageException; -import org.apache.slider.core.exceptions.WaitTimeoutException; -import org.apache.slider.core.launch.AppMasterLauncher; -import org.apache.slider.core.launch.ClasspathConstructor; -import org.apache.slider.core.launch.CredentialUtils; -import org.apache.slider.core.launch.JavaCommandLineBuilder; -import org.apache.slider.core.launch.LaunchedApplication; -import org.apache.slider.core.launch.RunningApplication; -import org.apache.slider.core.launch.SerializedApplicationReport; -import org.apache.slider.core.main.RunService; -import org.apache.slider.core.persist.AppDefinitionPersister; -import org.apache.slider.core.persist.ApplicationReportSerDeser; -import org.apache.slider.core.persist.ConfPersister; -import org.apache.slider.core.persist.JsonSerDeser; -import org.apache.slider.core.persist.LockAcquireFailedException; -import org.apache.slider.core.registry.SliderRegistryUtils; -import org.apache.slider.core.registry.YarnAppListClient; -import org.apache.slider.core.registry.docstore.ConfigFormat; -import org.apache.slider.core.registry.docstore.PublishedConfigSet; -import org.apache.slider.core.registry.docstore.PublishedConfiguration; -import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; -import org.apache.slider.core.registry.docstore.PublishedExports; -import org.apache.slider.core.registry.docstore.PublishedExportsOutputter; -import org.apache.slider.core.registry.docstore.PublishedExportsSet; -import org.apache.slider.core.registry.retrieve.RegistryRetriever; -import org.apache.slider.core.zk.BlockingZKWatcher; -import org.apache.slider.core.zk.ZKIntegration; -import org.apache.slider.core.zk.ZKPathBuilder; -import org.apache.slider.providers.AbstractClientProvider; -import org.apache.slider.providers.SliderProviderFactory; -import org.apache.slider.providers.agent.AgentKeys; -import org.apache.slider.providers.slideram.SliderAMClientProvider; -import org.apache.slider.server.appmaster.SliderAppMaster; -import org.apache.slider.server.appmaster.rpc.RpcBinder; -import org.apache.slider.server.services.security.SecurityStore; -import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.InterruptedIOException; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.io.Writer; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.hadoop.registry.client.binding.RegistryUtils.*; -import static org.apache.slider.api.InternalKeys.*; -import static org.apache.slider.api.OptionKeys.*; -import static org.apache.slider.api.ResourceKeys.*; -import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG; -import static org.apache.slider.common.params.SliderActions.*; -import static org.apache.slider.common.tools.SliderUtils.*; - - -/** - * Client service for Slider - */ - -public class SliderClient extends AbstractSliderLaunchedService implements RunService, - SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI { - private static final Logger log = LoggerFactory.getLogger(SliderClient.class); - public static final String E_MUST_BE_A_VALID_JSON_FILE - = "Invalid configuration. Must be a valid json file."; - public static final String E_INVALID_INSTALL_LOCATION - = "A valid install location must be provided for the client."; - public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE - = "Unable to read supplied package file"; - public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION - = "A valid application package location required."; - public static final String E_INVALID_INSTALL_PATH = "Install path is not a valid directory"; - public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path does not exist"; - public static final String E_INVALID_APPLICATION_TYPE_NAME - = "A valid application type name is required (e.g. HBASE)."; - public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg to overwrite."; - public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not exist"; - public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum defined"; - public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided"; - public static final String E_PACKAGE_EXISTS = "Package exists"; - private static PrintStream clientOutputStream = System.out; - - // value should not be changed without updating string find in slider.py - private static final String PASSWORD_PROMPT = "Enter password for"; - - private ClientArgs serviceArgs; - public ApplicationId applicationId; - - private String deployedClusterName; - /** - * Cluster operations against the deployed cluster -will be null - * if no bonding has yet taken place - */ - private SliderClusterOperations sliderClusterOperations; - - protected SliderFileSystem sliderFileSystem; - - /** - * Yarn client service - */ - private SliderYarnClientImpl yarnClient; - private YarnAppListClient yarnAppListClient; - private AggregateConf launchedInstanceDefinition; - - /** - * The YARN registry service - */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private RegistryOperations registryOperations; - - /** - * Constructor - */ - public SliderClient() { - super("Slider Client"); - new HdfsConfiguration(); - new YarnConfiguration(); - } - - /** - * This is called <i>Before serviceInit is called</i> - * @param config the initial configuration build up by the - * service launcher. - * @param args argument list list of arguments passed to the command line - * after any launcher-specific commands have been stripped. - * @return the post-binding configuration to pass to the <code>init()</code> - * operation. - * @throws Exception - */ - @Override - public Configuration bindArgs(Configuration config, String... args) throws Exception { - config = super.bindArgs(config, args); - serviceArgs = new ClientArgs(args); - serviceArgs.parse(); - // add the slider XML config - ConfigHelper.injectSliderXMLResource(); - // yarn-ify - YarnConfiguration yarnConfiguration = new YarnConfiguration(config); - return patchConfiguration(yarnConfiguration); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - Configuration clientConf = loadSliderClientXML(); - ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true); - serviceArgs.applyDefinitions(conf); - serviceArgs.applyFileSystemBinding(conf); - AbstractActionArgs coreAction = serviceArgs.getCoreAction(); - // init security with our conf - if (!coreAction.disableSecureLogin() && isHadoopClusterSecure(conf)) { - forceLogin(); - initProcessSecurity(conf); - } - if (coreAction.getHadoopServicesRequired()) { - initHadoopBinding(); - } - super.serviceInit(conf); - } - - /** - * Launched service execution. This runs {@link #exec()} - * then catches some exceptions and converts them to exit codes - * @return an exit code - * @throws Throwable - */ - @Override - public int runService() throws Throwable { - try { - return exec(); - } catch (FileNotFoundException | PathNotFoundException nfe) { - throw new NotFoundException(nfe, nfe.toString()); - } - } - - /** - * Execute the command line - * @return an exit code - * @throws Throwable on a failure - */ - public int exec() throws Throwable { - - // choose the action - String action = serviceArgs.getAction(); - if (isUnset(action)) { - throw new SliderException(EXIT_USAGE, serviceArgs.usage()); - } - - int exitCode = EXIT_SUCCESS; - String clusterName = serviceArgs.getClusterName(); - // actions - - switch (action) { - case ACTION_AM_SUICIDE: - exitCode = actionAmSuicide(clusterName, - serviceArgs.getActionAMSuicideArgs()); - break; - - case ACTION_BUILD: - exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); - break; - - case ACTION_CLIENT: - exitCode = actionClient(serviceArgs.getActionClientArgs()); - break; - - case ACTION_CREATE: - exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); - break; - - case ACTION_DEPENDENCY: - exitCode = actionDependency(serviceArgs.getActionDependencyArgs()); - break; - - case ACTION_DESTROY: - exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs()); - break; - - case ACTION_DIAGNOSTICS: - exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs()); - break; - - case ACTION_EXISTS: - exitCode = actionExists(clusterName, - serviceArgs.getActionExistsArgs()); - break; - - case ACTION_FLEX: - exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); - break; - - case ACTION_FREEZE: - exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs()); - break; - - case ACTION_HELP: - log.info(serviceArgs.usage()); - break; - - case ACTION_KDIAG: - exitCode = actionKDiag(serviceArgs.getActionKDiagArgs()); - break; - - case ACTION_KILL_CONTAINER: - exitCode = actionKillContainer(clusterName, - serviceArgs.getActionKillContainerArgs()); - break; - - case ACTION_INSTALL_KEYTAB: - exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs()); - break; - - case ACTION_INSTALL_PACKAGE: - exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs()); - break; - - case ACTION_KEYTAB: - exitCode = actionKeytab(serviceArgs.getActionKeytabArgs()); - break; - - case ACTION_LIST: - exitCode = actionList(clusterName, serviceArgs.getActionListArgs()); - break; - - case ACTION_LOOKUP: - exitCode = actionLookup(serviceArgs.getActionLookupArgs()); - break; - - case ACTION_NODES: - exitCode = actionNodes("", serviceArgs.getActionNodesArgs()); - break; - - case ACTION_PACKAGE: - exitCode = actionPackage(serviceArgs.getActionPackageArgs()); - break; - - case ACTION_REGISTRY: - exitCode = actionRegistry(serviceArgs.getActionRegistryArgs()); - break; - - case ACTION_RESOLVE: - exitCode = actionResolve(serviceArgs.getActionResolveArgs()); - break; - - case ACTION_RESOURCE: - exitCode = actionResource(serviceArgs.getActionResourceArgs()); - break; - - case ACTION_STATUS: - exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs()); - break; - - case ACTION_THAW: - exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); - break; - - case ACTION_TOKENS: - exitCode = actionTokens(serviceArgs.getActionTokenArgs()); - break; - - case ACTION_UPDATE: - exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs()); - break; - - case ACTION_UPGRADE: - exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs()); - break; - - case ACTION_VERSION: - exitCode = actionVersion(); - break; - - default: - throw new SliderException(EXIT_UNIMPLEMENTED, - "Unimplemented: " + action); - } - - return exitCode; - } - - /** - * Perform everything needed to init the hadoop binding. - * This assumes that the service is already in inited or started state - * @throws IOException - * @throws SliderException - */ - protected void initHadoopBinding() throws IOException, SliderException { - // validate the client - validateSliderClientEnvironment(null); - //create the YARN client - yarnClient = new SliderYarnClientImpl(); - yarnClient.init(getConfig()); - if (getServiceState() == STATE.STARTED) { - yarnClient.start(); - } - addService(yarnClient); - yarnAppListClient = - new YarnAppListClient(yarnClient, getUsername(), getConfig()); - // create the filesystem - sliderFileSystem = new SliderFileSystem(getConfig()); - } - - /** - * Delete the zookeeper node associated with the calling user and the cluster - * TODO: YARN registry operations - **/ - @VisibleForTesting - public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException { - String user = getUsername(); - String zkPath = ZKIntegration.mkClusterPath(user, clusterName); - Exception e = null; - try { - Configuration config = getConfig(); - ZKIntegration client = getZkClient(clusterName, user); - if (client != null) { - if (client.exists(zkPath)) { - log.info("Deleting zookeeper path {}", zkPath); - } - client.deleteRecursive(zkPath); - return true; - } - } catch (InterruptedException | BadConfigException | KeeperException ex) { - e = ex; - } - if (e != null) { - log.warn("Unable to recursively delete zk node {}", zkPath, e); - } - - return false; - } - - /** - * Create the zookeeper node associated with the calling user and the cluster - * - * @param clusterName slider application name - * @param nameOnly should the name only be created (i.e. don't create ZK node) - * @return the path, using the policy implemented in - * {@link ZKIntegration#mkClusterPath(String, String)} - * @throws YarnException - * @throws IOException - */ - @VisibleForTesting - public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException { - try { - return createZookeeperNodeInner(clusterName, nameOnly); - } catch (KeeperException.NodeExistsException e) { - return null; - } catch (KeeperException e) { - return null; - } catch (InterruptedException e) { - throw new InterruptedIOException(e.toString()); - } - } - - /** - * Create the zookeeper node associated with the calling user and the cluster - * -throwing exceptions on any failure - * @param clusterName cluster name - * @param nameOnly create the path, not the node - * @return the path, using the policy implemented in - * {@link ZKIntegration#mkClusterPath(String, String)} - * @throws YarnException - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - @VisibleForTesting - public String createZookeeperNodeInner(String clusterName, Boolean nameOnly) - throws YarnException, IOException, KeeperException, InterruptedException { - String user = getUsername(); - String zkPath = ZKIntegration.mkClusterPath(user, clusterName); - if (nameOnly) { - return zkPath; - } - ZKIntegration client = getZkClient(clusterName, user); - if (client != null) { - // set up the permissions. This must be done differently on a secure cluster from an insecure - // one - List<ACL> zkperms = new ArrayList<>(); - if (UserGroupInformation.isSecurityEnabled()) { - zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS)); - zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)); - } else { - zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE)); - } - client.createPath(zkPath, "", - zkperms, - CreateMode.PERSISTENT); - return zkPath; - } else { - return null; - } - } - - /** - * Gets a zookeeper client, returns null if it cannot connect to zookeeper - **/ - protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException { - String registryQuorum = lookupZKQuorum(); - ZKIntegration client = null; - try { - BlockingZKWatcher watcher = new BlockingZKWatcher(); - client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher, - ZKIntegration.SESSION_TIMEOUT); - client.init(); - watcher.waitForZKConnection(2 * 1000); - } catch (InterruptedException e) { - client = null; - log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); - } catch (IOException e) { - log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); - } - return client; - } - - /** - * Keep this signature for backward compatibility with - * force=true by default. - */ - @Override - public int actionDestroy(String clustername) throws YarnException, - IOException { - ActionDestroyArgs destroyArgs = new ActionDestroyArgs(); - destroyArgs.force = true; - return actionDestroy(clustername, destroyArgs); - } - - @Override - public int actionDestroy(String clustername, - ActionDestroyArgs destroyArgs) throws YarnException, IOException { - // verify that a live cluster isn't there - validateClusterName(clustername); - //no=op, it is now mandatory. - verifyBindingsDefined(); - verifyNoLiveClusters(clustername, "Destroy"); - boolean forceDestroy = destroyArgs.force; - log.debug("actionDestroy({}, force={})", clustername, forceDestroy); - - // create the directory path - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); - // delete the directory; - FileSystem fs = sliderFileSystem.getFileSystem(); - boolean exists = fs.exists(clusterDirectory); - if (exists) { - log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory); - if (!forceDestroy) { - // fail the command if --force is not explicitly specified - throw new UsageException("Destroy will permanently delete directories and registries. " - + "Reissue this command with the --force option if you want to proceed."); - } - if (!fs.delete(clusterDirectory, true)) { - log.warn("Filesystem returned false from delete() operation"); - } - - if(!deleteZookeeperNode(clustername)) { - log.warn("Unable to perform node cleanup in Zookeeper."); - } - - if (fs.exists(clusterDirectory)) { - log.warn("Failed to delete {}", clusterDirectory); - } - - } else { - log.debug("Application Instance {} already destroyed", clustername); - } - - // rm the registry entry âdo not let this block the destroy operations - String registryPath = SliderRegistryUtils.registryPathForInstance( - clustername); - try { - getRegistryOperations().delete(registryPath, true); - } catch (IOException e) { - log.warn("Error deleting registry entry {}: {} ", registryPath, e, e); - } catch (SliderException e) { - log.warn("Error binding to registry {} ", e, e); - } - - List<ApplicationReport> instances = findAllLiveInstances(clustername); - // detect any race leading to cluster creation during the check/destroy process - // and report a problem. - if (!instances.isEmpty()) { - throw new SliderException(EXIT_APPLICATION_IN_USE, - clustername + ": " - + E_DESTROY_CREATE_RACE_CONDITION - + " :" + - instances.get(0)); - } - log.info("Destroyed cluster {}", clustername); - return EXIT_SUCCESS; - } - - @Override - public int actionAmSuicide(String clustername, - ActionAMSuicideArgs args) throws YarnException, IOException { - SliderClusterOperations clusterOperations = - createClusterOperations(clustername); - clusterOperations.amSuicide(args.message, args.exitcode, args.waittime); - return EXIT_SUCCESS; - } - - @Override - public AbstractClientProvider createClientProvider(String provider) - throws SliderException { - SliderProviderFactory factory = - SliderProviderFactory.createSliderProviderFactory(provider); - return factory.createClientProvider(); - } - - /** - * Create the cluster -saving the arguments to a specification file first - * @param clustername cluster name - * @return the status code - * @throws YarnException Yarn problems - * @throws IOException other problems - * @throws BadCommandArgumentsException bad arguments. - */ - public int actionCreate(String clustername, ActionCreateArgs createArgs) throws - YarnException, - IOException { - - actionBuild(clustername, createArgs); - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); - AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( - clustername, clusterDirectory); - try { - checkForCredentials(getConfig(), instanceDefinition.getAppConf(), - clustername); - } catch (IOException e) { - sliderFileSystem.getFileSystem().delete(clusterDirectory, true); - throw e; - } - return startCluster(clustername, createArgs); - } - - @Override - public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs) - throws YarnException, IOException { - File template = upgradeArgs.template; - File resources = upgradeArgs.resources; - List<String> containers = upgradeArgs.containers; - List<String> components = upgradeArgs.components; - - // For upgrade spec, let's be little more strict with validation. If either - // --template or --resources is specified, then both needs to be specified. - // Otherwise the internal app config and resources states of the app will be - // unwantedly modified and the change will take effect to the running app - // immediately. - require(!(template != null && resources == null), - "Option %s must be specified with option %s", - Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE); - - require(!(resources != null && template == null), - "Option %s must be specified with option %s", - Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES); - - // For upgrade spec, both --template and --resources should be specified - // and neither of --containers or --components should be used - if (template != null && resources != null) { - require(CollectionUtils.isEmpty(containers), - "Option %s cannot be specified with %s or %s", - Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE, - Arguments.ARG_RESOURCES); - require(CollectionUtils.isEmpty(components), - "Option %s cannot be specified with %s or %s", - Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE, - Arguments.ARG_RESOURCES); - - // not an error to try to upgrade a stopped cluster, just return success - // code, appropriate log messages have already been dumped - if (!isAppInRunningState(clustername)) { - return EXIT_SUCCESS; - } - - // Now initiate the upgrade spec flow - buildInstanceDefinition(clustername, upgradeArgs, true, true, true); - SliderClusterOperations clusterOperations = createClusterOperations(clustername); - clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000); - return EXIT_SUCCESS; - } - - // Since neither --template or --resources were specified, it is upgrade - // containers flow. Here any one or both of --containers and --components - // can be specified. If a container is specified with --containers option - // and also belongs to a component type specified with --components, it will - // be upgraded only once. - return actionUpgradeContainers(clustername, upgradeArgs); - } - - private int actionUpgradeContainers(String clustername, - ActionUpgradeArgs upgradeArgs) throws YarnException, IOException { - verifyBindingsDefined(); - validateClusterName(clustername); - int waittime = upgradeArgs.getWaittime(); // ignored for now - String text = "Upgrade containers"; - log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername, - text, waittime); - - // not an error to try to upgrade a stopped cluster, just return success - // code, appropriate log messages have already been dumped - if (!isAppInRunningState(clustername)) { - return EXIT_SUCCESS; - } - - // Create sets of containers and components to get rid of duplicates and - // for quick lookup during checks below - Set<String> containers = new HashSet<>(); - if (upgradeArgs.containers != null) { - containers.addAll(new ArrayList<>(upgradeArgs.containers)); - } - Set<String> components = new HashSet<>(); - if (upgradeArgs.components != null) { - components.addAll(new ArrayList<>(upgradeArgs.components)); - } - - // check validity of component names and running containers here - List<ContainerInformation> liveContainers = getContainers(clustername); - Set<String> validContainers = new HashSet<>(); - Set<String> validComponents = new HashSet<>(); - for (ContainerInformation liveContainer : liveContainers) { - boolean allContainersAndComponentsAccountedFor = true; - if (CollectionUtils.isNotEmpty(containers)) { - if (containers.contains(liveContainer.containerId)) { - containers.remove(liveContainer.containerId); - validContainers.add(liveContainer.containerId); - } - allContainersAndComponentsAccountedFor = false; - } - if (CollectionUtils.isNotEmpty(components)) { - if (components.contains(liveContainer.component)) { - components.remove(liveContainer.component); - validComponents.add(liveContainer.component); - } - allContainersAndComponentsAccountedFor = false; - } - if (allContainersAndComponentsAccountedFor) { - break; - } - } - - // If any item remains in containers or components then they are invalid. - // Log warning for them and proceed. - if (CollectionUtils.isNotEmpty(containers)) { - log.warn("Invalid set of containers provided {}", containers); - } - if (CollectionUtils.isNotEmpty(components)) { - log.warn("Invalid set of components provided {}", components); - } - - // If not a single valid container or component is specified do not proceed - if (CollectionUtils.isEmpty(validContainers) - && CollectionUtils.isEmpty(validComponents)) { - log.error("Not a single valid container or component specified. Nothing to do."); - return EXIT_NOT_FOUND; - } - - SliderClusterProtocol appMaster = connect(findInstance(clustername)); - Messages.UpgradeContainersRequestProto r = - Messages.UpgradeContainersRequestProto - .newBuilder() - .setMessage(text) - .addAllContainer(validContainers) - .addAllComponent(validComponents) - .build(); - appMaster.upgradeContainers(r); - log.info("Cluster upgrade issued for -"); - if (CollectionUtils.isNotEmpty(validContainers)) { - log.info(" Containers (total {}): {}", validContainers.size(), - validContainers); - } - if (CollectionUtils.isNotEmpty(validComponents)) { - log.info(" Components (total {}): {}", validComponents.size(), - validComponents); - } - - return EXIT_SUCCESS; - } - - // returns true if and only if app is in RUNNING state - private boolean isAppInRunningState(String clustername) throws YarnException, - IOException { - // is this actually a known cluster? - sliderFileSystem.locateInstanceDefinition(clustername); - ApplicationReport app = findInstance(clustername); - if (app == null) { - // exit early - log.info("Cluster {} not running", clustername); - return false; - } - log.debug("App to upgrade was found: {}:\n{}", clustername, - new OnDemandReportStringifier(app)); - if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) { - log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.", - clustername, app.getYarnApplicationState(), ACTION_UPDATE); - return false; - } - - // IPC request to upgrade containers is possible if the app is running. - if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING - .ordinal()) { - log.info("Cluster {} is in a pre-running state {}. To upgrade it needs " - + "to be RUNNING.", clustername, app.getYarnApplicationState()); - return false; - } - - return true; - } - - protected static void checkForCredentials(Configuration conf, - ConfTree tree, String clusterName) throws IOException { - if (tree.credentials == null || tree.credentials.isEmpty()) { - log.info("No credentials requested"); - return; - } - - BufferedReader br = null; - try { - for (Entry<String, List<String>> cred : tree.credentials.entrySet()) { - String provider = cred.getKey() - .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName) - .replaceAll(Pattern.quote("${CLUSTER}"), clusterName); - List<String> aliases = cred.getValue(); - if (aliases == null || aliases.isEmpty()) { - continue; - } - Configuration c = new Configuration(conf); - c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); - CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0); - Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases()); - for (String alias : aliases) { - if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) { - log.info("Credentials for " + alias + " found in " + provider); - } else { - if (br == null) { - br = new BufferedReader(new InputStreamReader(System.in)); - } - char[] pass = readPassword(alias, br); - credentialProvider.createCredentialEntry(alias, pass); - credentialProvider.flush(); - Arrays.fill(pass, ' '); - } - } - } - } finally { - org.apache.hadoop.io.IOUtils.closeStream(br); - } - } - - private static char[] readOnePassword(String alias) throws IOException { - try(BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) { - return readPassword(alias, br); - } - } - - // using a normal reader instead of a secure one, - // because stdin is not hooked up to the command line - private static char[] readPassword(String alias, BufferedReader br) - throws IOException { - char[] cred = null; - - boolean noMatch; - do { - log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias)); - char[] newPassword1 = br.readLine().toCharArray(); - log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias)); - char[] newPassword2 = br.readLine().toCharArray(); - noMatch = !Arrays.equals(newPassword1, newPassword2); - if (noMatch) { - if (newPassword1 != null) Arrays.fill(newPassword1, ' '); - log.info(String.format("Passwords don't match. Try again.")); - } else { - cred = newPassword1; - } - if (newPassword2 != null) Arrays.fill(newPassword2, ' '); - } while (noMatch); - if (cred == null) - throw new IOException("Could not read credentials for " + alias + - " from stdin"); - return cred; - } - - @Override - public int actionBuild(String clustername, - AbstractClusterBuildingActionArgs buildInfo) throws - YarnException, - IOException { - - buildInstanceDefinition(clustername, buildInfo, false, false); - return EXIT_SUCCESS; - } - - @Override - public int actionKeytab(ActionKeytabArgs keytabInfo) - throws YarnException, IOException { - if (keytabInfo.install) { - return actionInstallKeytab(keytabInfo); - } else if (keytabInfo.delete) { - return actionDeleteKeytab(keytabInfo); - } else if (keytabInfo.list) { - return actionListKeytab(keytabInfo); - } else { - throw new BadCommandArgumentsException( - "Keytab option specified not found.\n" - + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); - } - } - - private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException { - String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY; - Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder); - RemoteIterator<LocatedFileStatus> files = - sliderFileSystem.getFileSystem().listFiles(keytabPath, true); - log.info("Keytabs:"); - while (files.hasNext()) { - log.info("\t" + files.next().getPath().toString()); - } - - return EXIT_SUCCESS; - } - - private int actionDeleteKeytab(ActionKeytabArgs keytabInfo) - throws BadCommandArgumentsException, IOException { - if (StringUtils.isEmpty(keytabInfo.folder)) { - throw new BadCommandArgumentsException( - "A valid destination keytab sub-folder name is required (e.g. 'security').\n" - + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); - } - - if (StringUtils.isEmpty(keytabInfo.keytab)) { - throw new BadCommandArgumentsException("A keytab name is required."); - } - - Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); - - Path fileInFs = new Path(pkgPath, keytabInfo.keytab ); - log.info("Deleting keytab {}", fileInFs); - FileSystem sfs = sliderFileSystem.getFileSystem(); - require(sfs.exists(fileInFs), "No keytab to delete found at %s", - fileInFs.toUri()); - sfs.delete(fileInFs, false); - - return EXIT_SUCCESS; - } - - private int actionInstallKeytab(ActionKeytabArgs keytabInfo) - throws BadCommandArgumentsException, IOException { - Path srcFile = null; - require(isSet(keytabInfo.folder), - "A valid destination keytab sub-folder name is required (e.g. 'security').\n" - + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); - - requireArgumentSet(Arguments.ARG_KEYTAB, keytabInfo.keytab); - File keytabFile = new File(keytabInfo.keytab); - require(keytabFile.isFile(), - "Unable to access supplied keytab file at %s", keytabFile.getAbsolutePath()); - srcFile = new Path(keytabFile.toURI()); - - Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); - FileSystem sfs = sliderFileSystem.getFileSystem(); - sfs.mkdirs(pkgPath); - sfs.setPermission(pkgPath, new FsPermission( - FsAction.ALL, FsAction.NONE, FsAction.NONE)); - - Path fileInFs = new Path(pkgPath, srcFile.getName()); - log.info("Installing keytab {} at {} and overwrite is {}.", - srcFile, fileInFs, keytabInfo.overwrite); - require(!(sfs.exists(fileInFs) && !keytabInfo.overwrite), - "Keytab exists at %s. Use --overwrite to overwrite.", fileInFs.toUri()); - - sfs.copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs); - sfs.setPermission(fileInFs, - new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)); - - return EXIT_SUCCESS; - } - - @Override - public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo) - throws YarnException, IOException { - log.warn("The 'install-keytab' option has been deprecated. Please use 'keytab --install'."); - return actionKeytab(new ActionKeytabArgs(installKeytabInfo)); - } - - @Override - public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws - YarnException, - IOException { - log.warn("The " + ACTION_INSTALL_PACKAGE - + " option has been deprecated. Please use '" - + ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'."); - if (StringUtils.isEmpty(installPkgInfo.name)) { - throw new BadCommandArgumentsException( - E_INVALID_APPLICATION_TYPE_NAME + "\n" - + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE)); - } - Path srcFile = extractPackagePath(installPkgInfo.packageURI); - - // Do not provide new options to install-package command as it is in - // deprecated mode. So version is kept null here. Use package --install. - Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name, - null); - FileSystem sfs = sliderFileSystem.getFileSystem(); - sfs.mkdirs(pkgPath); - - Path fileInFs = new Path(pkgPath, srcFile.getName()); - log.info("Installing package {} at {} and overwrite is {}.", - srcFile, fileInFs, installPkgInfo.replacePkg); - require(!(sfs.exists(fileInFs) && !installPkgInfo.replacePkg), - "Package exists at %s. : %s", fileInFs.toUri(), E_USE_REPLACEPKG_TO_OVERWRITE); - sfs.copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs); - return EXIT_SUCCESS; - } - - @Override - public int actionResource(ActionResourceArgs resourceInfo) - throws YarnException, IOException { - if (resourceInfo.help) { - actionHelp(ACTION_RESOURCE); - return EXIT_SUCCESS; - } else if (resourceInfo.install) { - return actionInstallResource(resourceInfo); - } else if (resourceInfo.delete) { - return actionDeleteResource(resourceInfo); - } else if (resourceInfo.list) { - return actionListResource(resourceInfo); - } else { - throw new BadCommandArgumentsException( - "Resource option specified not found.\n" - + CommonArgs.usage(serviceArgs, ACTION_RESOURCE)); - } - } - - private int actionListResource(ActionResourceArgs resourceInfo) throws IOException { - String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY; - Path path = sliderFileSystem.buildResourcePath(folder); - RemoteIterator<LocatedFileStatus> files = - sliderFileSystem.getFileSystem().listFiles(path, true); - log.info("Resources:"); - while (files.hasNext()) { - log.info("\t" + files.next().getPath().toString()); - } - - return EXIT_SUCCESS; - } - - private int actionDeleteResource(ActionResourceArgs resourceInfo) - throws BadCommandArgumentsException, IOException { - if (StringUtils.isEmpty(resourceInfo.resource)) { - throw new BadCommandArgumentsException("A file name is required."); - } - - Path fileInFs; - if (resourceInfo.folder == null) { - fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.resource); - } else { - fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.folder, - resourceInfo.resource); - } - - log.info("Deleting resource {}", fileInFs); - FileSystem sfs = sliderFileSystem.getFileSystem(); - require(sfs.exists(fileInFs), "No resource to delete found at %s", fileInFs.toUri()); - sfs.delete(fileInFs, true); - - return EXIT_SUCCESS; - } - - private int actionInstallResource(ActionResourceArgs resourceInfo) - throws BadCommandArgumentsException, IOException { - Path srcFile = null; - String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY; - - requireArgumentSet(Arguments.ARG_RESOURCE, resourceInfo.resource); - File file = new File(resourceInfo.resource); - require(file.isFile() || file.isDirectory(), - "Unable to access supplied file at %s", file.getAbsolutePath()); - - File[] files; - if (file.isDirectory()) { - files = file.listFiles(); - } else { - files = new File[] { file }; - } - - Path pkgPath = sliderFileSystem.buildResourcePath(folder); - FileSystem sfs = sliderFileSystem.getFileSystem(); - - if (!sfs.exists(pkgPath)) { - sfs.mkdirs(pkgPath); - sfs.setPermission(pkgPath, new FsPermission( - FsAction.ALL, FsAction.NONE, FsAction.NONE)); - } else { - require(sfs.isDirectory(pkgPath), "Specified folder %s exists and is " + - "not a directory", folder); - } - - for (File f : files) { - srcFile = new Path(f.toURI()); - - Path fileInFs = new Path(pkgPath, srcFile.getName()); - log.info("Installing file {} at {} and overwrite is {}.", - srcFile, fileInFs, resourceInfo.overwrite); - require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite), - "File exists at %s. Use --overwrite to overwrite.", fileInFs.toUri()); - - sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs); - sfs.setPermission(fileInFs, - new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)); - } - - return EXIT_SUCCESS; - } - - @Override - public int actionClient(ActionClientArgs clientInfo) throws - YarnException, - IOException { - if (clientInfo.install) { - return doClientInstall(clientInfo); - } else if (clientInfo.getCertStore) { - return doCertificateStoreRetrieval(clientInfo); - } else { - throw new BadCommandArgumentsException( - "Only install, keystore, and truststore commands are supported for the client.\n" - + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); - - } - } - - private int doCertificateStoreRetrieval(ActionClientArgs clientInfo) - throws YarnException, IOException { - if (clientInfo.keystore != null && clientInfo.truststore != null) { - throw new BadCommandArgumentsException( - "Only one of either keystore or truststore can be retrieved at one time. " - + "Retrieval of both should be done separately\n" - + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); - } - - requireArgumentSet(Arguments.ARG_NAME, clientInfo.name); - - File storeFile = null; - SecurityStore.StoreType type; - if (clientInfo.keystore != null) { - storeFile = clientInfo.keystore; - type = SecurityStore.StoreType.keystore; - } else { - storeFile = clientInfo.truststore; - type = SecurityStore.StoreType.truststore; - } - - require (!storeFile.exists(), - "File %s already exists. Please remove that file or select a different file name.", - storeFile.getAbsolutePath()); - String hostname = null; - if (type == SecurityStore.StoreType.keystore) { - hostname = clientInfo.hostname; - if (hostname == null) { - hostname = InetAddress.getLocalHost().getCanonicalHostName(); - log.info("No hostname specified via command line. Using {}", hostname); - } - } - - String password = clientInfo.password; - if (password == null) { - String provider = clientInfo.provider; - String alias = clientInfo.alias; - if (provider != null && alias != null) { - Configuration conf = new Configuration(getConfig()); - conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); - char[] chars = conf.getPassword(alias); - if (chars == null) { - CredentialProvider credentialProvider = - CredentialProviderFactory.getProviders(conf).get(0); - chars = readOnePassword(alias); - credentialProvider.createCredentialEntry(alias, chars); - credentialProvider.flush(); - } - password = String.valueOf(chars); - Arrays.fill(chars, ' '); - } else { - log.info("No password and no provider/alias pair were provided, " + - "prompting for password"); - // get a password - password = String.valueOf(readOnePassword(type.name())); - } - } - - byte[] keystore = createClusterOperations(clientInfo.name) - .getClientCertificateStore(hostname, "client", password, type.name()); - // persist to file - FileOutputStream storeFileOutputStream = null; - try { - storeFileOutputStream = new FileOutputStream(storeFile); - IOUtils.write(keystore, storeFileOutputStream); - } catch (Exception e) { - log.error("Unable to persist to file {}", storeFile); - throw e; - } finally { - if (storeFileOutputStream != null) { - storeFileOutputStream.close(); - } - } - - return EXIT_SUCCESS; - } - - private int doClientInstall(ActionClientArgs clientInfo) - throws IOException, SliderException { - - require(clientInfo.installLocation != null, - E_INVALID_INSTALL_LOCATION +"\n" - + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); - require(clientInfo.installLocation.exists(), - E_INSTALL_PATH_DOES_NOT_EXIST + ": " + clientInfo.installLocation.getAbsolutePath()); - - require(clientInfo.installLocation.isDirectory(), - E_INVALID_INSTALL_PATH + ": " + clientInfo.installLocation.getAbsolutePath()); - - File pkgFile; - File tmpDir = null; - - require(isSet(clientInfo.packageURI) || isSet(clientInfo.name), - E_INVALID_APPLICATION_PACKAGE_LOCATION); - if (isSet(clientInfo.packageURI)) { - pkgFile = new File(clientInfo.packageURI); - } else { - Path appDirPath = sliderFileSystem.buildAppDefDirPath(clientInfo.name); - Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG); - require(sliderFileSystem.isFile(appDefPath), - E_INVALID_APPLICATION_PACKAGE_LOCATION); - tmpDir = Files.createTempDir(); - pkgFile = new File(tmpDir, SliderKeys.DEFAULT_APP_PKG); - sliderFileSystem.copyHdfsFileToLocal(appDefPath, pkgFile); - } - require(pkgFile.isFile(), - E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + " at %s", pkgFile.getAbsolutePath()); - - JSONObject config = null; - if(clientInfo.clientConfig != null) { - try { - byte[] encoded = Files.toByteArray(clientInfo.clientConfig); - config = new JSONObject(new String(encoded, Charset.defaultCharset())); - } catch (JSONException jsonEx) { - log.error("Unable to read supplied configuration at {}: {}", - clientInfo.clientConfig, jsonEx); - log.debug("Unable to read supplied configuration at {}: {}", - clientInfo.clientConfig, jsonEx, jsonEx); - throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx); - } - } - - // Only INSTALL is supported - AbstractClientProvider - provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE); - provider.processClientOperation(sliderFileSystem, - getRegistryOperations(), - getConfig(), - "INSTALL", - clientInfo.installLocation, - pkgFile, - config, - clientInfo.name); - return EXIT_SUCCESS; - } - - - @Override - public int actionPackage(ActionPackageArgs actionPackageInfo) - throws YarnException, IOException { - initializeOutputStream(actionPackageInfo.out); - int exitCode = -1; - if (actionPackageInfo.help) { - exitCode = actionHelp(ACTION_PACKAGE); - } - if (actionPackageInfo.install) { - exitCode = actionPackageInstall(actionPackageInfo); - } - if (actionPackageInfo.delete) { - exitCode = actionPackageDelete(actionPackageInfo); - } - if (actionPackageInfo.list) { - exitCode = actionPackageList(); - } - if (actionPackageInfo.instances) { - exitCode = actionPackageInstances(); - } - finalizeOutputStream(actionPackageInfo.out); - if (exitCode != -1) { - return exitCode; - } - throw new BadCommandArgumentsException( - "Select valid package operation option"); - } - - private void initializeOutputStream(String outFile) - throws FileNotFoundException { - if (outFile != null) { - clientOutputStream = new PrintStream(new FileOutputStream(outFile)); - } else { - clientOutputStream = System.out; - } - } - - private void finalizeOutputStream(String outFile) { - if (outFile != null && clientOutputStream != null) { - clientOutputStream.flush(); - clientOutputStream.close(); - } - clientOutputStream = System.out; - } - - private int actionPackageInstances() throws YarnException, IOException { - Map<String, Path> persistentInstances = sliderFileSystem - .listPersistentInstances(); - if (persistentInstances.isEmpty()) { - log.info("No slider cluster specification available"); - return EXIT_SUCCESS; - } - String pkgPathValue = sliderFileSystem - .buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri() - .getPath(); - FileSystem fs = sliderFileSystem.getFileSystem(); - Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances - .entrySet().iterator(); - log.info("List of applications with its package name and path"); - println("%-25s %15s %30s %s", "Cluster Name", "Package Name", - "Package Version", "Application Location"); - while(instanceItr.hasNext()) { - Map.Entry<String, Path> entry = instanceItr.next(); - String clusterName = entry.getKey(); - Path clusterPath = entry.getValue(); - AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( - clusterName, clusterPath); - Path appDefPath = null; - try { - appDefPath = new Path( - getApplicationDefinitionPath(instanceDefinition - .getAppConfOperations())); - } catch (BadConfigException e) { - // Invalid cluster state, so move on to next. No need to log anything - // as this is just listing of instances. - continue; - } - if (!appDefPath.isUriPathAbsolute()) { - appDefPath = new Path(fs.getHomeDirectory(), appDefPath); - } - String appDefPathStr = appDefPath.toUri().toString(); - try { - if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) { - String packageName = appDefPath.getParent().getName(); - String packageVersion = StringUtils.EMPTY; - if (instanceDefinition.isVersioned()) { - packageVersion = packageName; - packageName = appDefPath.getParent().getParent().getName(); - } - println("%-25s %15s %30s %s", clusterName, packageName, - packageVersion, appDefPathStr); - } - } catch (IOException e) { - log.debug("{} application definition path {} is not found.", clusterName, appDefPathStr); - } - } - return EXIT_SUCCESS; - } - - private int actionPackageList() throws IOException { - Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY, - StringUtils.EMPTY); - log.info("Package install path : {}", pkgPath); - FileSystem sfs = sliderFileSystem.getFileSystem(); - if (!sfs.isDirectory(pkgPath)) { - log.info("No package(s) installed"); - return EXIT_SUCCESS; - } - FileStatus[] fileStatus = sfs.listStatus(pkgPath); - boolean hasPackage = false; - StringBuilder sb = new StringBuilder(); - sb.append("List of installed packages:\n"); - for (FileStatus fstat : fileStatus) { - if (fstat.isDirectory()) { - sb.append("\t").append(fstat.getPath().getName()); - sb.append("\n"); - hasPackage = true; - } - } - if (hasPackage) { - println(sb.toString()); - } else { - log.info("No package(s) installed"); - } - return EXIT_SUCCESS; - } - - private void createSummaryMetainfoFile(Path srcFile, Path destFile, - boolean overwrite) throws IOException { - FileSystem srcFs = srcFile.getFileSystem(getConfig()); - try (InputStream inputStreamJson = SliderUtils - .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.json"); - InputStream inputStreamXml = SliderUtils - .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.xml");) { - InputStream inputStream = null; - Path summaryFileInFs = null; - if (inputStreamJson != null) { - inputStream = inputStreamJson; - summaryFileInFs = new Path(destFile.getParent(), destFile.getName() - + ".metainfo.json"); - log.info("Found JSON metainfo file in package"); - } else if (inputStreamXml != null) { - inputStream = inputStreamXml; - summaryFileInFs = new Path(destFile.getParent(), destFile.getName() - + ".metainfo.xml"); - log.info("Found XML metainfo file in package"); - } - if (inputStream != null) { - try (FSDataOutputStream dataOutputStream = sliderFileSystem - .getFileSystem().create(summaryFileInFs, overwrite)) { - log.info("Creating summary metainfo file"); - IOUtils.copy(inputStream, dataOutputStream); - } - } - } - } - - private int actionPackageInstall(ActionPackageArgs actionPackageArgs) - throws YarnException, IOException { - requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name); - - Path srcFile = extractPackagePath(actionPackageArgs.packageURI); - - Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name, - actionPackageArgs.version); - FileSystem fs = sliderFileSystem.getFileSystem(); - if (!fs.exists(pkgPath)) { - fs.mkdirs(pkgPath); - } - - Path fileInFs = new Path(pkgPath, srcFile.getName()); - require(actionPackageArgs.replacePkg || !fs.exists(fileInFs), - E_PACKAGE_EXISTS +" at %s. Use --replacepkg to overwrite.", fileInFs.toUri()); - - log.info("Installing package {} to {} (overwrite set to {})", srcFile, - fileInFs, actionPackageArgs.replacePkg); - fs.copyFromLocalFile(false, actionPackageArgs.replacePkg, srcFile, fileInFs); - createSummaryMetainfoFile(srcFile, fileInFs, actionPackageArgs.replacePkg); - - String destPathWithHomeDir = Path - .getPathWithoutSchemeAndAuthority(fileInFs).toString(); - String destHomeDir = Path.getPathWithoutSchemeAndAuthority( - fs.getHomeDirectory()).toString(); - // a somewhat contrived approach to stripping out the home directory and any trailing - // separator; designed to work on windows and unix - String destPathWithoutHomeDir; - if (destPathWithHomeDir.startsWith(destHomeDir)) { - destPathWithoutHomeDir = destPathWithHomeDir.substring(destHomeDir.length()); - if (destPathWithoutHomeDir.startsWith("/") || destPathWithoutHomeDir.startsWith("\\")) { - destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1); - } - } else { - destPathWithoutHomeDir = destPathWithHomeDir; - } - log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}", - destPathWithoutHomeDir); - - return EXIT_SUCCESS; - } - - private Path extractPackagePath(String packageURI) - throws BadCommandArgumentsException { - require(isSet(packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION); - File pkgFile = new File(packageURI); - require(pkgFile.isFile(), - E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + ": " + pkgFile.getAbsolutePath()); - return new Path(pkgFile.toURI()); - } - - private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws - YarnException, IOException { - requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name); - - Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name, - actionPackageArgs.version); - FileSystem fs = sliderFileSystem.getFileSystem(); - require(fs.exists(pkgPath), E_PACKAGE_DOES_NOT_EXIST +": %s ", pkgPath.toUri()); - log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath); - - if(fs.delete(pkgPath, true)) { - log.info("Deleted package {} " + actionPackageArgs.name); - return EXIT_SUCCESS; - } else { - log.warn("Package deletion failed."); - return EXIT_NOT_FOUND; - } - } - - @Override - public int actionUpdate(String clustername, - AbstractClusterBuildingActionArgs buildInfo) throws - YarnException, IOException { - buildInstanceDefinition(clustername, buildInfo, true, true); - return EXIT_SUCCESS; - } - - /** - * Build up the AggregateConfiguration for an application instance then - * persists it - * @param clustername name of the cluster - * @param buildInfo the arguments needed to build the cluster - * @param overwrite true if existing cluster directory can be overwritten - * @param liveClusterAllowed true if live cluster can be modified - * @throws YarnException - * @throws IOException - */ - - public void buildInstanceDefinition(String clustername, - AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, - boolean liveClusterAllowed) throws YarnException, IOException { - buildInstanceDefinition(clustername, buildInfo, overwrite, - liveClusterAllowed, false); - } - - public void buildInstanceDefinition(String clustername, - AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, - boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException, - IOException { - // verify that a live cluster isn't there - validateClusterName(clustername); - verifyBindingsDefined(); - if (!liveClusterAllowed) { - verifyNoLiveClusters(clustername, "Create"); - } - - Configuration conf = getConfig(); - String registryQuorum = lookupZKQuorum(); - - Path appconfdir = buildInfo.getConfdir(); - // Provider - String providerName = buildInfo.getProvider(); - requireArgumentSet(Arguments.ARG_PROVIDER, providerName); - log.debug("Provider is {}", providerName); - SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf); - AbstractClientProvider provider = - createClientProvider(providerName); - InstanceBuilder builder = - new InstanceBuilder(sliderFileSystem, - getConfig(), - clustername); - - AggregateConf instanceDefinition = new AggregateConf(); - ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); - ConfTreeOperations resources = instanceDefinition.getResourceOperations(); - ConfTreeOperations internal = instanceDefinition.getInternalOperations(); - //initial definition is set by the providers - sliderAM.prepareInstanceConfiguration(instanceDefinition); - provider.prepareInstanceConfiguration(instanceDefinition); - - //load in any specified on the command line - if (buildInfo.resources != null) { - try { - resources.mergeFile(buildInfo.resources, - new ResourcesInputPropertiesValidator()); - - } catch (IOException e) { - throw new BadConfigException(e, - "incorrect argument to %s: \"%s\" : %s ", - Arguments.ARG_RESOURCES, - buildInfo.resources, - e.toString()); - } - } - if (buildInfo.template != null) { - try { - appConf.mergeFile(buildInfo.template, - new TemplateInputPropertiesValidator()); - } catch (IOException e) { - throw new BadConfigException(e, - "incorrect argument to %s: \"%s\" : %s ", - Arguments.ARG_TEMPLATE, - buildInfo.template, - e.toString()); - } - } - - if (isUpgradeFlow) { - ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo; - if (!upgradeInfo.force) { - validateClientAndClusterResource(clustername, resources); - } - } - - //get the command line options - ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree(); - ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree(); - - appConf.merge(cmdLineAppOptions); - - AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem); - appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf); - - // put the role counts into the resources file - Map<String, String> argsRoleMap = buildInfo.getComponentMap(); - for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) { - String count = roleEntry.getValue(); - String key = roleEntry.getKey(); - log.info("{} => {}", key, count); - resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count); - } - - //all CLI role options - Map<String, Map<String, String>> appOptionMap = - buildInfo.getCompOptionMap(); - appConf.mergeComponents(appOptionMap); - - //internal picks up core. values only - internal.propagateGlobalKeys(appConf, "slider."); - internal.propagateGlobalKeys(appConf, "internal."); - - //copy over role. and yarn. values ONLY to the resources - if (PROPAGATE_RESOURCE_OPTION) { - resources.propagateGlobalKeys(appConf, "component."); - resources.propagateGlobalKeys(appConf, "role."); - resources.propagateGlobalKeys(appConf, "yarn."); - resources.mergeComponentsPrefix(appOptionMap, "component.", true); - resources.mergeComponentsPrefix(appOptionMap, "yarn.", true); - resources.mergeComponentsPrefix(appOptionMap, "role.", true); - } - - // resource component args - appConf.merge(cmdLineResourceOptions); - resources.merge(cmdLineResourceOptions); - resources.mergeComponents(buildInfo.getResourceCompOptionMap()); - - builder.init(providerName, instanceDefinition); - builder.resolve(); - builder.propagateFilename(); - builder.propagatePrincipals(); - builder.setImageDetailsIfAvailable(buildInfo.getImage(), - buildInfo.getAppHomeDir()); - builder.setQueue(buildInfo.queue); - - String quorum = buildInfo.getZKhosts(); - if (isUnset(quorum)) { - quorum = registryQuorum; - } - if (isUnset(quorum)) { - throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM); - } - ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(), - getUsername(), - clustername, - registryQuorum, - quorum); - String zookeeperRoot = buildInfo.getAppZKPath(); - - if (isSet(zookeeperRoot)) { - zkPaths.setAppPath(zookeeperRoot); - } else { - String createDefaultZkNode = appConf.getGlobalOptions() - .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false"); - if (createDefaultZkNode.equals("true")) { - String defaultZKPath = createZookeeperNode(clustername, false); - log.debug("ZK node created for application instance: {}", defaultZKPath); - if (defaultZKPath != null) { - zkPaths.setAppPath(defaultZKPath); - } - } else { - // create AppPath if default is being used - String defaultZKPath = createZookeeperNode(clustername, true); - log.debug("ZK node assigned to application instance: {}", defaultZKPath); - zkPaths.setAppPath(defaultZKPath); - } - } - - builder.addZKBinding(zkPaths); - - //then propagate any package URI - if (buildInfo.packageURI != null) { - appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI); - } - - propagatePythonExecutable(conf, instanceDefinition); - - // make any substitutions needed at this stage - replaceTokens(appConf.getConfTree(), getUsername(), clustername); - - // TODO: Refactor the validation code and persistence code - try { - persistInstanceDefinition(overwrite, appconfdir, builder); - appDefinitionPersister.persistPackages(); - - } catch (LockAcquireFailedException e) { - log.warn("Failed to get a Lock on {} : {}", builder, e, e); - throw new BadClusterStateException("Failed to save " + clustername - + ": " + e); - } - - // providers to validate what there is - // TODO: Validation should be done before persistence - AggregateConf instanceDescription = builder.getInstanceDescription(); - validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem); - validateInstanceDefinition(provider, instanceDescription, sliderFileSystem); - } - - private void validateClientAndClusterResource(String clustername, - ConfTreeOperations clientResources) throws BadClusterStateException, - SliderException, IOException { - log.info("Validating upgrade resource definition with current cluster " - + "state (components and instance count)"); - Map<String, Integer> clientComponentInstances = new HashMap<>(); - for (String componentName : clientResources.getComponentNames()) { - if (!SliderKeys.COMPONENT_AM.equals(componentName)) { - clientComponentInstances.put(componentName, clientResources - .getComponentOptInt(componentName, - COMPONENT_INSTANCES, -1)); - } - } - - AggregateConf clusterConf = null; - try { - clusterConf = loadPersistedClusterDescription(clustername); - } catch (LockAcquireFailedException e) { - log.warn("Failed to get a Lock on cluster resource : {}", e, e); - throw new BadClusterStateException( - "Failed to load client resource definition " + clustername + ": " + e, e); - } - Map<String, Integer> clusterComponentInstances = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> component : clusterConf - .getResources().components.entrySet()) { - if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) { - clusterComponentInstances.put( - component.getKey(), - Integer.decode(component.getValue().get( - COMPONENT_INSTANCES))); - } - } - - // client and cluster should be an exact match - Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances - .entrySet().iterator(); - while (clientComponentInstanceIt.hasNext()) { - Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next(); - if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) { - // compare instance count now and remove from both maps if they match - if (clusterComponentInstances - .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry - .getValue().intValue()) { - clusterComponentInstances.remove(clientComponentInstanceEntry - .getKey()); - clientComponentInstanceIt.remove(); - } - } - } - - if (!clientComponentInstances.isEmpty() - || !clusterComponentInstances.isEmpty()) { - log.error("Mismatch found in upgrade resource definition and cluster " - + "resource state"); - if (!clientComponentInstances.isEmpty()) { - log.info("The upgrade resource definitions that do not match are:"); - for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances - .entrySet()) { - log.info(" Component Name: {}, Instance count: {}", - clientComponentInstanceEntry.getKey(), - clientComponentInstanceEntry.getValue()); - } - } - if (!clusterComponentInstances.isEmpty()) { - log.info("The cluster resources that do not match are:"); - for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances - .entrySet()) { - log.info(" Component Name: {}, Instance count: {}", - clusterComponentInstanceEntry.getKey(), - clusterComponentInstanceEntry.getValue()); - } - } - throw new BadConfigException("Resource definition provided for " - + "upgrade does not match with that of the currently running " - + "cluster.\nIf you are aware of what you are doing, rerun the " - + "command with " + Arguments.ARG_FORCE + " option."); - } - } - - protected void persistInstanceDefinition(boolean overwrite, - Path appconfdir, - InstanceBuilder builder) - throws IOException, SliderException, LockAcquireFailedException { - builder.persist(appconfdir, overwrite); - } - - @VisibleForTesting - public static void replaceTokens(ConfTree conf, - String userName, String clusterName) throws IOException { - Map<String,String> newglobal = new HashMap<>(); - for (Entry<String,String> entry : conf.global.entrySet()) { - newglobal.put(entry.getKey(), replaceTokens(entry.getValue(), - userName, clusterName)); - } - conf.global.putAll(newglobal); - - for (String component : conf.components.keySet()) { - Map<String,String> newComponent = new HashMap<>(); - for (Entry<String,String> entry : conf.components.get(component).entrySet()) { - newComponent.put(entry.getKey(), replaceTokens(entry.getValue(), - userName, clusterName)); - } - conf.components.get(component).putAll(newComponent); - } - - Map<String,List<String>> newcred = new HashMap<>(); - for (Entry<String,List<String>> entry : conf.credentials.entrySet()) { - List<String> resultList = new ArrayList<>(); - for (String v : entry.getValue()) { - resultList.add(replaceTokens(v, userName, clusterName)); - } - newcred.put(replaceTokens(entry.getKey(), userName, clusterName), - resultList); - } - conf.credentials.clear(); - conf.credentials.putAll(newcred); - } - - private static String replaceTokens(String s, String userName, - String clusterName) throws IOException { - return s.replaceAll(Pattern.quote("${USER}"), userName) - .replaceAll(Pattern.quote("${USER_NAME}"), userName); - } - - public FsPermission getClusterDirectoryPermissions(Configuration conf) { - String clusterDirPermsOct = - conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS); - return new FsPermission(clusterDirPermsOct); - } - - /** - * Verify that the Resource Manager is configured (on a non-HA cluster). - * with a useful error message - * @throws BadCommandArgumentsException the exception raised on an invalid config - */ - public void verifyBindingsDefined() throws BadCommandArgumentsException { - InetSocketAddress rmAddr = getRmAddress(getConfig()); - if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false) - && !isAddressDefined(rmAddr)) { - throw new BadCommandArgumentsException( - E_NO_RESOURCE_MANAGER - + " in the argument " - + Arguments.ARG_MANAGER - + " or the configuration property " - + YarnConfiguration.RM_ADDRESS - + " value :" + rmAddr); - } - } - - /** - * Load and start a cluster specification. - * This assumes that all validation of args and cluster state - * have already taken place - * - * @param clustername name of the cluster. - * @param launchArgs launch arguments - * @return the exit code - * @throws YarnException - * @throws IOException - */ - protected int startCluster(String clustername, - LaunchArgsAccessor launchArgs) throws - YarnException, - IOException { - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); - AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( - clustername, - clusterDirectory); - - LaunchedApplication launchedApplication = - launchApplication(clustername, clusterDirectory, instanceDefinition, - serviceArgs.isDebug()); - - if (launchArgs.getOutputFile() != null) { - // output file has been requested. Get the app report and serialize it - ApplicationReport report = - launchedApplication.getApplicationReport(); - SerializedApplicationReport sar = new SerializedApplicationReport(report); - sar.submitTime = System.currentTimeMillis(); - ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser(); - serDeser.save(sar, launchArgs.getOutputFile()); - } - int waittime = launchArgs.getWaittime(); - if (waittime > 0) { - return waitForAppRunning(launchedApplication, waittime, waittime); - } else { - // no waiting - return EXIT_SUCCESS; - } - } - - /** - * Load the instance definition. It is not resolved at this point - * @param name cluster name - * @param clusterDirectory cluster dir - * @return the loaded configuration - * @throws IOException - * @throws SliderException - * @throws UnknownApplicationInstanceException if the file is not found - */ - public AggregateConf loadInstanceDefinitionUnresolved(String name, - Path clusterDirectory) throws IOException, SliderException { - - try { - AggregateConf definition = - InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem, - clusterDirectory); - definition.setName(name); - return definition; - } catch (FileNotFoundException e) { - throw UnknownApplicationInstanceException.unknownInstance(name, e); - } - } - - /** - * Load the instance definition. - * @param name cluster name - * @param resolved flag to indicate the cluster should be resolved - * @return the loaded configuration - * @throws IOException IO problems - * @throws SliderException slider explicit issues - * @throws UnknownApplicationInstanceException if the file is not found - */ - public AggregateConf loadInstanceDefinition(String name, - boolean resolved) throws - IOException, - SliderException { - - Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); - AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( - name, - clusterDirectory); - if (resolved) { - instanceDefinition.resolve(); - } - return instanceDefinition; - - } - - protected AppMasterLauncher setupAppMasterLauncher(String clustername, - Path clusterDirectory, - AggregateConf instanceDefinition, - boolean debugAM) - throws YarnException, IOException{ - deployedClusterName = clustername; - validateClusterName(clustername); - verifyNoLiveClusters(clustername, "Launch"); - Configuration config = getConfig(); - lookupZKQuorum(); - boolean clusterSecure = isHadoopClusterSecure(config); - //create the Slider AM provider -this helps set up the AM - SliderAMClientProvider sliderAM = new SliderAMClientProvider(config); - - instanceDefinition.resolve(); - launchedInstanceDefinition = instanceDefinition; - - ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations(); - MapOperations internalOptions = internalOperations.getGlobalOptions(); - ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations(); - ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations(); - Path generatedConfDirPath = - createPathThatMustExist(internalOptions.getMandatoryOption( - INTERNAL_GENERATED_CONF_PATH)); - Path snapshotConfPath = - createPathThatMustExist(internalOptions.getMandatoryOption( - INTERNAL_SNAPSHOT_CONF_PATH)); - - - // cluster Provider - AbstractClientProvider provider = createClientProvider( - internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME)); - if (log.isDebugEnabled()) { - log.debug(instanceDefinition.toString()); - } - MapOperations sliderAMResourceComponent = - resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM); - MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions(); - - // add the tags if available - Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem, - getApplicationDefinitionPath(appOperations)); - - Credentials credentials = null; - if (clusterSecure) { - // pick up oozie credentials - credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(), - config); - if (credentials == null) { - // nothing from oozie, so build up directly - credentials = new Credentials( - UserGroupInformation.getCurrentUser().getCredentials()); - CredentialUtils.addRMRenewableFSDelegationTokens(config, - sliderFileSystem.getFileSystem(), - credentials); - CredentialUtils.addRMDelegationToken(yarnClient, credentials); - - } else { - log.info("Using externally supplied credentials to launch AM"); - } - } - - AppMasterLauncher amLauncher = new AppMasterLauncher(clustername, - SliderKeys.APP_TYPE, - config, - sliderFileSystem, - yarnClient, - clusterSecure, - sliderAMResourceComponent, - resourceGlobalOptions, - applicationTags, - credentials); - - ApplicationId appId = amLauncher.getApplicationId(); - // set the application name; - amLauncher.setKeepContainersOverRestarts(true); - - int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0); - amLauncher.setMaxAppAttempts(maxAppAttempts); - - sliderFileSystem.purgeAppInstanceTempFiles(clustername); - Path tempPath = sliderFileSystem.createAppInstanceTempPath( - clustername, - appId.toString() + "/am"); - String libdir = "lib"; - Path libPath = new Path(tempPath, libdir); - sliderFileSystem.getFileSystem().mkdirs(libPath); - log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath); - - // set local resources for the application master - // local files or archives as needed - // In this scenario, the jar file for the application master is part of the local resources - Map<String, LocalResource> localResources = amLauncher.getLocalResources(); - - // look for the configuration directory named on the command line - boolean hasServerLog4jProperties = false; - Path remoteConfPath = null; - String relativeConfDir = null; - String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR); - if (isUnset(confdirProp)) { - log.debug("No local configuration directory provided as system property"); - } else { - File confDir = new File(confdirProp); - if (!confDir.exists()) { - throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND, - confDir); - } - Path localConfDirPath = createLocalPath(confDir); - remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR); - log.debug("Slider configuration directory is {}; remote to be {}", - localConfDirPath, remoteConfPath); - copyDirectory(config, localConfDirPath, remoteConfPath, null); - - File log4jserver = - new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME); - hasServerLog4jProperties = log4jserver.isFile(); - } - // the assumption here is that minimr cluster => this is a test run - // and the classpath can look after itself - - boolean usingMiniMRCluster = getUsingMiniMRCluster(); - if (!usingMiniMRCluster) { - - log.debug("Destination is not a MiniYARNCluster -copying full classpath"); - - // insert conf dir first - if (remoteConfPath != null) { - relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR; - Map<String, LocalResource> submittedConfDir = - sliderFileSystem.submitDirectory(remoteConfPath, - relativeConfDir); - mergeMaps(localResources, submittedConfDir); - } - } - // build up the configuration - // IMPORTANT: it is only after this call that site configurations - // will be valid. - - propagatePrincipals(config, instanceDefinition); - // validate security data - -/* - // turned off until tested - SecurityConfiguration securityConfiguration = - new SecurityConfiguration(config, - instanceDefinition, clustername); - -*/ - Configuration clientConfExtras = new Configuration(false); - // then build up the generated path. - FsPermission clusterPerms = getClusterDirectoryPermissions(config); - copyDirectory(config, snapshotConfPath, generatedConfDirPath, - clusterPerms); - - - // standard AM resources - sliderAM.prepareAMAndConfigForLaunch(sliderFileSystem, - config, - amLauncher, - instanceDefinition, -
<TRUNCATED>