http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java new file mode 100644 index 0000000..8210f4d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -0,0 +1,4569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.slider.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Files; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.KerberosDiags; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.SliderApplicationApi; +import org.apache.slider.api.SliderClusterProtocol; +import org.apache.slider.api.StateValues; +import org.apache.slider.api.proto.Messages; +import org.apache.slider.api.types.ContainerInformation; +import org.apache.slider.api.types.NodeInformationList; +import org.apache.slider.api.types.SliderInstanceDescription; +import org.apache.slider.client.ipc.SliderApplicationIpcClient; +import org.apache.slider.client.ipc.SliderClusterOperations; +import org.apache.slider.common.Constants; +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.params.AbstractActionArgs; +import org.apache.slider.common.params.AbstractClusterBuildingActionArgs; +import org.apache.slider.common.params.ActionAMSuicideArgs; +import org.apache.slider.common.params.ActionClientArgs; +import org.apache.slider.common.params.ActionCreateArgs; +import org.apache.slider.common.params.ActionDependencyArgs; +import org.apache.slider.common.params.ActionDestroyArgs; +import org.apache.slider.common.params.ActionDiagnosticArgs; +import org.apache.slider.common.params.ActionEchoArgs; +import org.apache.slider.common.params.ActionExistsArgs; +import org.apache.slider.common.params.ActionFlexArgs; +import org.apache.slider.common.params.ActionFreezeArgs; +import org.apache.slider.common.params.ActionInstallKeytabArgs; +import org.apache.slider.common.params.ActionInstallPackageArgs; +import org.apache.slider.common.params.ActionKDiagArgs; +import org.apache.slider.common.params.ActionKeytabArgs; +import org.apache.slider.common.params.ActionKillContainerArgs; +import org.apache.slider.common.params.ActionListArgs; +import org.apache.slider.common.params.ActionLookupArgs; +import org.apache.slider.common.params.ActionNodesArgs; +import org.apache.slider.common.params.ActionPackageArgs; +import org.apache.slider.common.params.ActionRegistryArgs; +import org.apache.slider.common.params.ActionResolveArgs; +import org.apache.slider.common.params.ActionResourceArgs; +import org.apache.slider.common.params.ActionStatusArgs; +import org.apache.slider.common.params.ActionThawArgs; +import org.apache.slider.common.params.ActionTokensArgs; +import org.apache.slider.common.params.ActionUpgradeArgs; +import org.apache.slider.common.params.Arguments; +import org.apache.slider.common.params.ClientArgs; +import org.apache.slider.common.params.CommonArgs; +import org.apache.slider.common.params.LaunchArgsAccessor; +import org.apache.slider.common.tools.ConfigHelper; +import org.apache.slider.common.tools.Duration; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.common.tools.SliderVersionInfo; +import org.apache.slider.core.build.InstanceBuilder; +import org.apache.slider.core.build.InstanceIO; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTree; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.conf.ResourcesInputPropertiesValidator; +import org.apache.slider.core.conf.TemplateInputPropertiesValidator; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.ErrorStrings; +import org.apache.slider.core.exceptions.NoSuchNodeException; +import org.apache.slider.core.exceptions.NotFoundException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; +import org.apache.slider.core.exceptions.UsageException; +import org.apache.slider.core.exceptions.WaitTimeoutException; +import org.apache.slider.core.launch.AppMasterLauncher; +import org.apache.slider.core.launch.ClasspathConstructor; +import org.apache.slider.core.launch.CredentialUtils; +import org.apache.slider.core.launch.JavaCommandLineBuilder; +import org.apache.slider.core.launch.LaunchedApplication; +import org.apache.slider.core.launch.RunningApplication; +import org.apache.slider.core.launch.SerializedApplicationReport; +import org.apache.slider.core.main.RunService; +import org.apache.slider.core.persist.AppDefinitionPersister; +import org.apache.slider.core.persist.ApplicationReportSerDeser; +import org.apache.slider.core.persist.ConfPersister; +import org.apache.slider.core.persist.JsonSerDeser; +import org.apache.slider.core.persist.LockAcquireFailedException; +import org.apache.slider.core.registry.SliderRegistryUtils; +import org.apache.slider.core.registry.YarnAppListClient; +import org.apache.slider.core.registry.docstore.ConfigFormat; +import org.apache.slider.core.registry.docstore.PublishedConfigSet; +import org.apache.slider.core.registry.docstore.PublishedConfiguration; +import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; +import org.apache.slider.core.registry.docstore.PublishedExports; +import org.apache.slider.core.registry.docstore.PublishedExportsOutputter; +import org.apache.slider.core.registry.docstore.PublishedExportsSet; +import org.apache.slider.core.registry.retrieve.RegistryRetriever; +import org.apache.slider.core.zk.BlockingZKWatcher; +import org.apache.slider.core.zk.ZKIntegration; +import org.apache.slider.core.zk.ZKPathBuilder; +import org.apache.slider.providers.AbstractClientProvider; +import org.apache.slider.providers.SliderProviderFactory; +import org.apache.slider.providers.agent.AgentKeys; +import org.apache.slider.providers.slideram.SliderAMClientProvider; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.rpc.RpcBinder; +import org.apache.slider.server.services.security.SecurityStore; +import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.Writer; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.hadoop.registry.client.binding.RegistryUtils.*; +import static org.apache.slider.api.InternalKeys.*; +import static org.apache.slider.api.OptionKeys.*; +import static org.apache.slider.api.ResourceKeys.*; +import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG; +import static org.apache.slider.common.params.SliderActions.*; +import static org.apache.slider.common.tools.SliderUtils.*; + + +/** + * Client service for Slider + */ + +public class SliderClient extends AbstractSliderLaunchedService implements RunService, + SliderExitCodes, SliderKeys, ErrorStrings, SliderClientAPI { + private static final Logger log = LoggerFactory.getLogger(SliderClient.class); + public static final String E_MUST_BE_A_VALID_JSON_FILE + = "Invalid configuration. Must be a valid json file."; + public static final String E_INVALID_INSTALL_LOCATION + = "A valid install location must be provided for the client."; + public static final String E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + = "Unable to read supplied package file"; + public static final String E_INVALID_APPLICATION_PACKAGE_LOCATION + = "A valid application package location required."; + public static final String E_INVALID_INSTALL_PATH = "Install path is not a valid directory"; + public static final String E_INSTALL_PATH_DOES_NOT_EXIST = "Install path does not exist"; + public static final String E_INVALID_APPLICATION_TYPE_NAME + = "A valid application type name is required (e.g. HBASE)."; + public static final String E_USE_REPLACEPKG_TO_OVERWRITE = "Use --replacepkg to overwrite."; + public static final String E_PACKAGE_DOES_NOT_EXIST = "Package does not exist"; + public static final String E_NO_ZOOKEEPER_QUORUM = "No Zookeeper quorum defined"; + public static final String E_NO_RESOURCE_MANAGER = "No valid Resource Manager address provided"; + public static final String E_PACKAGE_EXISTS = "Package exists"; + private static PrintStream clientOutputStream = System.out; + + // value should not be changed without updating string find in slider.py + private static final String PASSWORD_PROMPT = "Enter password for"; + + private ClientArgs serviceArgs; + public ApplicationId applicationId; + + private String deployedClusterName; + /** + * Cluster operations against the deployed cluster -will be null + * if no bonding has yet taken place + */ + private SliderClusterOperations sliderClusterOperations; + + protected SliderFileSystem sliderFileSystem; + + /** + * Yarn client service + */ + private SliderYarnClientImpl yarnClient; + private YarnAppListClient yarnAppListClient; + private AggregateConf launchedInstanceDefinition; + + /** + * The YARN registry service + */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private RegistryOperations registryOperations; + + /** + * Constructor + */ + public SliderClient() { + super("Slider Client"); + new HdfsConfiguration(); + new YarnConfiguration(); + } + + /** + * This is called <i>Before serviceInit is called</i> + * @param config the initial configuration build up by the + * service launcher. + * @param args argument list list of arguments passed to the command line + * after any launcher-specific commands have been stripped. + * @return the post-binding configuration to pass to the <code>init()</code> + * operation. + * @throws Exception + */ + @Override + public Configuration bindArgs(Configuration config, String... args) throws Exception { + config = super.bindArgs(config, args); + serviceArgs = new ClientArgs(args); + serviceArgs.parse(); + // add the slider XML config + ConfigHelper.injectSliderXMLResource(); + // yarn-ify + YarnConfiguration yarnConfiguration = new YarnConfiguration(config); + return patchConfiguration(yarnConfiguration); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + Configuration clientConf = loadSliderClientXML(); + ConfigHelper.mergeConfigurations(conf, clientConf, SLIDER_CLIENT_XML, true); + serviceArgs.applyDefinitions(conf); + serviceArgs.applyFileSystemBinding(conf); + AbstractActionArgs coreAction = serviceArgs.getCoreAction(); + // init security with our conf + if (!coreAction.disableSecureLogin() && isHadoopClusterSecure(conf)) { + forceLogin(); + initProcessSecurity(conf); + } + if (coreAction.getHadoopServicesRequired()) { + initHadoopBinding(); + } + super.serviceInit(conf); + } + + /** + * Launched service execution. This runs {@link #exec()} + * then catches some exceptions and converts them to exit codes + * @return an exit code + * @throws Throwable + */ + @Override + public int runService() throws Throwable { + try { + return exec(); + } catch (FileNotFoundException | PathNotFoundException nfe) { + throw new NotFoundException(nfe, nfe.toString()); + } + } + + /** + * Execute the command line + * @return an exit code + * @throws Throwable on a failure + */ + public int exec() throws Throwable { + + // choose the action + String action = serviceArgs.getAction(); + if (isUnset(action)) { + throw new SliderException(EXIT_USAGE, serviceArgs.usage()); + } + + int exitCode = EXIT_SUCCESS; + String clusterName = serviceArgs.getClusterName(); + // actions + + switch (action) { + case ACTION_AM_SUICIDE: + exitCode = actionAmSuicide(clusterName, + serviceArgs.getActionAMSuicideArgs()); + break; + + case ACTION_BUILD: + exitCode = actionBuild(clusterName, serviceArgs.getActionBuildArgs()); + break; + + case ACTION_CLIENT: + exitCode = actionClient(serviceArgs.getActionClientArgs()); + break; + + case ACTION_CREATE: + exitCode = actionCreate(clusterName, serviceArgs.getActionCreateArgs()); + break; + + case ACTION_DEPENDENCY: + exitCode = actionDependency(serviceArgs.getActionDependencyArgs()); + break; + + case ACTION_DESTROY: + exitCode = actionDestroy(clusterName, serviceArgs.getActionDestroyArgs()); + break; + + case ACTION_DIAGNOSTICS: + exitCode = actionDiagnostic(serviceArgs.getActionDiagnosticArgs()); + break; + + case ACTION_EXISTS: + exitCode = actionExists(clusterName, + serviceArgs.getActionExistsArgs()); + break; + + case ACTION_FLEX: + exitCode = actionFlex(clusterName, serviceArgs.getActionFlexArgs()); + break; + + case ACTION_FREEZE: + exitCode = actionFreeze(clusterName, serviceArgs.getActionFreezeArgs()); + break; + + case ACTION_HELP: + log.info(serviceArgs.usage()); + break; + + case ACTION_KDIAG: + exitCode = actionKDiag(serviceArgs.getActionKDiagArgs()); + break; + + case ACTION_KILL_CONTAINER: + exitCode = actionKillContainer(clusterName, + serviceArgs.getActionKillContainerArgs()); + break; + + case ACTION_INSTALL_KEYTAB: + exitCode = actionInstallKeytab(serviceArgs.getActionInstallKeytabArgs()); + break; + + case ACTION_INSTALL_PACKAGE: + exitCode = actionInstallPkg(serviceArgs.getActionInstallPackageArgs()); + break; + + case ACTION_KEYTAB: + exitCode = actionKeytab(serviceArgs.getActionKeytabArgs()); + break; + + case ACTION_LIST: + exitCode = actionList(clusterName, serviceArgs.getActionListArgs()); + break; + + case ACTION_LOOKUP: + exitCode = actionLookup(serviceArgs.getActionLookupArgs()); + break; + + case ACTION_NODES: + exitCode = actionNodes("", serviceArgs.getActionNodesArgs()); + break; + + case ACTION_PACKAGE: + exitCode = actionPackage(serviceArgs.getActionPackageArgs()); + break; + + case ACTION_REGISTRY: + exitCode = actionRegistry(serviceArgs.getActionRegistryArgs()); + break; + + case ACTION_RESOLVE: + exitCode = actionResolve(serviceArgs.getActionResolveArgs()); + break; + + case ACTION_RESOURCE: + exitCode = actionResource(serviceArgs.getActionResourceArgs()); + break; + + case ACTION_STATUS: + exitCode = actionStatus(clusterName, serviceArgs.getActionStatusArgs()); + break; + + case ACTION_THAW: + exitCode = actionThaw(clusterName, serviceArgs.getActionThawArgs()); + break; + + case ACTION_TOKENS: + exitCode = actionTokens(serviceArgs.getActionTokenArgs()); + break; + + case ACTION_UPDATE: + exitCode = actionUpdate(clusterName, serviceArgs.getActionUpdateArgs()); + break; + + case ACTION_UPGRADE: + exitCode = actionUpgrade(clusterName, serviceArgs.getActionUpgradeArgs()); + break; + + case ACTION_VERSION: + exitCode = actionVersion(); + break; + + default: + throw new SliderException(EXIT_UNIMPLEMENTED, + "Unimplemented: " + action); + } + + return exitCode; + } + + /** + * Perform everything needed to init the hadoop binding. + * This assumes that the service is already in inited or started state + * @throws IOException + * @throws SliderException + */ + protected void initHadoopBinding() throws IOException, SliderException { + // validate the client + validateSliderClientEnvironment(null); + //create the YARN client + yarnClient = new SliderYarnClientImpl(); + yarnClient.init(getConfig()); + if (getServiceState() == STATE.STARTED) { + yarnClient.start(); + } + addService(yarnClient); + yarnAppListClient = + new YarnAppListClient(yarnClient, getUsername(), getConfig()); + // create the filesystem + sliderFileSystem = new SliderFileSystem(getConfig()); + } + + /** + * Delete the zookeeper node associated with the calling user and the cluster + * TODO: YARN registry operations + **/ + @VisibleForTesting + public boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException { + String user = getUsername(); + String zkPath = ZKIntegration.mkClusterPath(user, clusterName); + Exception e = null; + try { + Configuration config = getConfig(); + ZKIntegration client = getZkClient(clusterName, user); + if (client != null) { + if (client.exists(zkPath)) { + log.info("Deleting zookeeper path {}", zkPath); + } + client.deleteRecursive(zkPath); + return true; + } + } catch (InterruptedException | BadConfigException | KeeperException ex) { + e = ex; + } + if (e != null) { + log.warn("Unable to recursively delete zk node {}", zkPath, e); + } + + return false; + } + + /** + * Create the zookeeper node associated with the calling user and the cluster + * + * @param clusterName slider application name + * @param nameOnly should the name only be created (i.e. don't create ZK node) + * @return the path, using the policy implemented in + * {@link ZKIntegration#mkClusterPath(String, String)} + * @throws YarnException + * @throws IOException + */ + @VisibleForTesting + public String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException { + try { + return createZookeeperNodeInner(clusterName, nameOnly); + } catch (KeeperException.NodeExistsException e) { + return null; + } catch (KeeperException e) { + return null; + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + } + + /** + * Create the zookeeper node associated with the calling user and the cluster + * -throwing exceptions on any failure + * @param clusterName cluster name + * @param nameOnly create the path, not the node + * @return the path, using the policy implemented in + * {@link ZKIntegration#mkClusterPath(String, String)} + * @throws YarnException + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @VisibleForTesting + public String createZookeeperNodeInner(String clusterName, Boolean nameOnly) + throws YarnException, IOException, KeeperException, InterruptedException { + String user = getUsername(); + String zkPath = ZKIntegration.mkClusterPath(user, clusterName); + if (nameOnly) { + return zkPath; + } + ZKIntegration client = getZkClient(clusterName, user); + if (client != null) { + // set up the permissions. This must be done differently on a secure cluster from an insecure + // one + List<ACL> zkperms = new ArrayList<>(); + if (UserGroupInformation.isSecurityEnabled()) { + zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS)); + zkperms.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)); + } else { + zkperms.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE)); + } + client.createPath(zkPath, "", + zkperms, + CreateMode.PERSISTENT); + return zkPath; + } else { + return null; + } + } + + /** + * Gets a zookeeper client, returns null if it cannot connect to zookeeper + **/ + protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException { + String registryQuorum = lookupZKQuorum(); + ZKIntegration client = null; + try { + BlockingZKWatcher watcher = new BlockingZKWatcher(); + client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher, + ZKIntegration.SESSION_TIMEOUT); + client.init(); + watcher.waitForZKConnection(2 * 1000); + } catch (InterruptedException e) { + client = null; + log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); + } catch (IOException e) { + log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); + } + return client; + } + + /** + * Keep this signature for backward compatibility with + * force=true by default. + */ + @Override + public int actionDestroy(String clustername) throws YarnException, + IOException { + ActionDestroyArgs destroyArgs = new ActionDestroyArgs(); + destroyArgs.force = true; + return actionDestroy(clustername, destroyArgs); + } + + @Override + public int actionDestroy(String clustername, + ActionDestroyArgs destroyArgs) throws YarnException, IOException { + // verify that a live cluster isn't there + validateClusterName(clustername); + //no=op, it is now mandatory. + verifyBindingsDefined(); + verifyNoLiveClusters(clustername, "Destroy"); + boolean forceDestroy = destroyArgs.force; + log.debug("actionDestroy({}, force={})", clustername, forceDestroy); + + // create the directory path + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + // delete the directory; + FileSystem fs = sliderFileSystem.getFileSystem(); + boolean exists = fs.exists(clusterDirectory); + if (exists) { + log.debug("Application Instance {} found at {}: destroying", clustername, clusterDirectory); + if (!forceDestroy) { + // fail the command if --force is not explicitly specified + throw new UsageException("Destroy will permanently delete directories and registries. " + + "Reissue this command with the --force option if you want to proceed."); + } + if (!fs.delete(clusterDirectory, true)) { + log.warn("Filesystem returned false from delete() operation"); + } + + if(!deleteZookeeperNode(clustername)) { + log.warn("Unable to perform node cleanup in Zookeeper."); + } + + if (fs.exists(clusterDirectory)) { + log.warn("Failed to delete {}", clusterDirectory); + } + + } else { + log.debug("Application Instance {} already destroyed", clustername); + } + + // rm the registry entry âdo not let this block the destroy operations + String registryPath = SliderRegistryUtils.registryPathForInstance( + clustername); + try { + getRegistryOperations().delete(registryPath, true); + } catch (IOException e) { + log.warn("Error deleting registry entry {}: {} ", registryPath, e, e); + } catch (SliderException e) { + log.warn("Error binding to registry {} ", e, e); + } + + List<ApplicationReport> instances = findAllLiveInstances(clustername); + // detect any race leading to cluster creation during the check/destroy process + // and report a problem. + if (!instances.isEmpty()) { + throw new SliderException(EXIT_APPLICATION_IN_USE, + clustername + ": " + + E_DESTROY_CREATE_RACE_CONDITION + + " :" + + instances.get(0)); + } + log.info("Destroyed cluster {}", clustername); + return EXIT_SUCCESS; + } + + @Override + public int actionAmSuicide(String clustername, + ActionAMSuicideArgs args) throws YarnException, IOException { + SliderClusterOperations clusterOperations = + createClusterOperations(clustername); + clusterOperations.amSuicide(args.message, args.exitcode, args.waittime); + return EXIT_SUCCESS; + } + + @Override + public AbstractClientProvider createClientProvider(String provider) + throws SliderException { + SliderProviderFactory factory = + SliderProviderFactory.createSliderProviderFactory(provider); + return factory.createClientProvider(); + } + + /** + * Create the cluster -saving the arguments to a specification file first + * @param clustername cluster name + * @return the status code + * @throws YarnException Yarn problems + * @throws IOException other problems + * @throws BadCommandArgumentsException bad arguments. + */ + public int actionCreate(String clustername, ActionCreateArgs createArgs) throws + YarnException, + IOException { + + actionBuild(clustername, createArgs); + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + clustername, clusterDirectory); + try { + checkForCredentials(getConfig(), instanceDefinition.getAppConf()); + } catch (IOException e) { + sliderFileSystem.getFileSystem().delete(clusterDirectory, true); + throw e; + } + return startCluster(clustername, createArgs); + } + + @Override + public int actionUpgrade(String clustername, ActionUpgradeArgs upgradeArgs) + throws YarnException, IOException { + File template = upgradeArgs.template; + File resources = upgradeArgs.resources; + List<String> containers = upgradeArgs.containers; + List<String> components = upgradeArgs.components; + + // For upgrade spec, let's be little more strict with validation. If either + // --template or --resources is specified, then both needs to be specified. + // Otherwise the internal app config and resources states of the app will be + // unwantedly modified and the change will take effect to the running app + // immediately. + require(!(template != null && resources == null), + "Option %s must be specified with option %s", + Arguments.ARG_RESOURCES, Arguments.ARG_TEMPLATE); + + require(!(resources != null && template == null), + "Option %s must be specified with option %s", + Arguments.ARG_TEMPLATE, Arguments.ARG_RESOURCES); + + // For upgrade spec, both --template and --resources should be specified + // and neither of --containers or --components should be used + if (template != null && resources != null) { + require(CollectionUtils.isEmpty(containers), + "Option %s cannot be specified with %s or %s", + Arguments.ARG_CONTAINERS, Arguments.ARG_TEMPLATE, + Arguments.ARG_RESOURCES); + require(CollectionUtils.isEmpty(components), + "Option %s cannot be specified with %s or %s", + Arguments.ARG_COMPONENTS, Arguments.ARG_TEMPLATE, + Arguments.ARG_RESOURCES); + + // not an error to try to upgrade a stopped cluster, just return success + // code, appropriate log messages have already been dumped + if (!isAppInRunningState(clustername)) { + return EXIT_SUCCESS; + } + + // Now initiate the upgrade spec flow + buildInstanceDefinition(clustername, upgradeArgs, true, true, true); + SliderClusterOperations clusterOperations = createClusterOperations(clustername); + clusterOperations.amSuicide("AM restarted for application upgrade", 1, 1000); + return EXIT_SUCCESS; + } + + // Since neither --template or --resources were specified, it is upgrade + // containers flow. Here any one or both of --containers and --components + // can be specified. If a container is specified with --containers option + // and also belongs to a component type specified with --components, it will + // be upgraded only once. + return actionUpgradeContainers(clustername, upgradeArgs); + } + + private int actionUpgradeContainers(String clustername, + ActionUpgradeArgs upgradeArgs) throws YarnException, IOException { + verifyBindingsDefined(); + validateClusterName(clustername); + int waittime = upgradeArgs.getWaittime(); // ignored for now + String text = "Upgrade containers"; + log.debug("actionUpgradeContainers({}, reason={}, wait={})", clustername, + text, waittime); + + // not an error to try to upgrade a stopped cluster, just return success + // code, appropriate log messages have already been dumped + if (!isAppInRunningState(clustername)) { + return EXIT_SUCCESS; + } + + // Create sets of containers and components to get rid of duplicates and + // for quick lookup during checks below + Set<String> containers = new HashSet<>(); + if (upgradeArgs.containers != null) { + containers.addAll(new ArrayList<>(upgradeArgs.containers)); + } + Set<String> components = new HashSet<>(); + if (upgradeArgs.components != null) { + components.addAll(new ArrayList<>(upgradeArgs.components)); + } + + // check validity of component names and running containers here + List<ContainerInformation> liveContainers = getContainers(clustername); + Set<String> validContainers = new HashSet<>(); + Set<String> validComponents = new HashSet<>(); + for (ContainerInformation liveContainer : liveContainers) { + boolean allContainersAndComponentsAccountedFor = true; + if (CollectionUtils.isNotEmpty(containers)) { + if (containers.contains(liveContainer.containerId)) { + containers.remove(liveContainer.containerId); + validContainers.add(liveContainer.containerId); + } + allContainersAndComponentsAccountedFor = false; + } + if (CollectionUtils.isNotEmpty(components)) { + if (components.contains(liveContainer.component)) { + components.remove(liveContainer.component); + validComponents.add(liveContainer.component); + } + allContainersAndComponentsAccountedFor = false; + } + if (allContainersAndComponentsAccountedFor) { + break; + } + } + + // If any item remains in containers or components then they are invalid. + // Log warning for them and proceed. + if (CollectionUtils.isNotEmpty(containers)) { + log.warn("Invalid set of containers provided {}", containers); + } + if (CollectionUtils.isNotEmpty(components)) { + log.warn("Invalid set of components provided {}", components); + } + + // If not a single valid container or component is specified do not proceed + if (CollectionUtils.isEmpty(validContainers) + && CollectionUtils.isEmpty(validComponents)) { + log.error("Not a single valid container or component specified. Nothing to do."); + return EXIT_NOT_FOUND; + } + + SliderClusterProtocol appMaster = connect(findInstance(clustername)); + Messages.UpgradeContainersRequestProto r = + Messages.UpgradeContainersRequestProto + .newBuilder() + .setMessage(text) + .addAllContainer(validContainers) + .addAllComponent(validComponents) + .build(); + appMaster.upgradeContainers(r); + log.info("Cluster upgrade issued for -"); + if (CollectionUtils.isNotEmpty(validContainers)) { + log.info(" Containers (total {}): {}", validContainers.size(), + validContainers); + } + if (CollectionUtils.isNotEmpty(validComponents)) { + log.info(" Components (total {}): {}", validComponents.size(), + validComponents); + } + + return EXIT_SUCCESS; + } + + // returns true if and only if app is in RUNNING state + private boolean isAppInRunningState(String clustername) throws YarnException, + IOException { + // is this actually a known cluster? + sliderFileSystem.locateInstanceDefinition(clustername); + ApplicationReport app = findInstance(clustername); + if (app == null) { + // exit early + log.info("Cluster {} not running", clustername); + return false; + } + log.debug("App to upgrade was found: {}:\n{}", clustername, + new OnDemandReportStringifier(app)); + if (app.getYarnApplicationState().ordinal() >= YarnApplicationState.FINISHED.ordinal()) { + log.info("Cluster {} is in a terminated state {}. Use command '{}' instead.", + clustername, app.getYarnApplicationState(), ACTION_UPDATE); + return false; + } + + // IPC request to upgrade containers is possible if the app is running. + if (app.getYarnApplicationState().ordinal() < YarnApplicationState.RUNNING + .ordinal()) { + log.info("Cluster {} is in a pre-running state {}. To upgrade it needs " + + "to be RUNNING.", clustername, app.getYarnApplicationState()); + return false; + } + + return true; + } + + protected static void checkForCredentials(Configuration conf, + ConfTree tree) throws IOException { + if (tree.credentials == null || tree.credentials.isEmpty()) { + log.info("No credentials requested"); + return; + } + + BufferedReader br = null; + try { + for (Entry<String, List<String>> cred : tree.credentials.entrySet()) { + String provider = cred.getKey(); + List<String> aliases = cred.getValue(); + if (aliases == null || aliases.isEmpty()) { + continue; + } + Configuration c = new Configuration(conf); + c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); + CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0); + Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases()); + for (String alias : aliases) { + if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) { + log.info("Credentials for " + alias + " found in " + provider); + } else { + if (br == null) { + br = new BufferedReader(new InputStreamReader(System.in)); + } + char[] pass = readPassword(alias, br); + credentialProvider.createCredentialEntry(alias, pass); + credentialProvider.flush(); + Arrays.fill(pass, ' '); + } + } + } + } finally { + org.apache.hadoop.io.IOUtils.closeStream(br); + } + } + + private static char[] readOnePassword(String alias) throws IOException { + try(BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) { + return readPassword(alias, br); + } + } + + // using a normal reader instead of a secure one, + // because stdin is not hooked up to the command line + private static char[] readPassword(String alias, BufferedReader br) + throws IOException { + char[] cred = null; + + boolean noMatch; + do { + log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias)); + char[] newPassword1 = br.readLine().toCharArray(); + log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias)); + char[] newPassword2 = br.readLine().toCharArray(); + noMatch = !Arrays.equals(newPassword1, newPassword2); + if (noMatch) { + if (newPassword1 != null) Arrays.fill(newPassword1, ' '); + log.info(String.format("Passwords don't match. Try again.")); + } else { + cred = newPassword1; + } + if (newPassword2 != null) Arrays.fill(newPassword2, ' '); + } while (noMatch); + if (cred == null) + throw new IOException("Could not read credentials for " + alias + + " from stdin"); + return cred; + } + + @Override + public int actionBuild(String clustername, + AbstractClusterBuildingActionArgs buildInfo) throws + YarnException, + IOException { + + buildInstanceDefinition(clustername, buildInfo, false, false); + return EXIT_SUCCESS; + } + + @Override + public int actionKeytab(ActionKeytabArgs keytabInfo) + throws YarnException, IOException { + if (keytabInfo.install) { + return actionInstallKeytab(keytabInfo); + } else if (keytabInfo.delete) { + return actionDeleteKeytab(keytabInfo); + } else if (keytabInfo.list) { + return actionListKeytab(keytabInfo); + } else { + throw new BadCommandArgumentsException( + "Keytab option specified not found.\n" + + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); + } + } + + private int actionListKeytab(ActionKeytabArgs keytabInfo) throws IOException { + String folder = keytabInfo.folder != null ? keytabInfo.folder : StringUtils.EMPTY; + Path keytabPath = sliderFileSystem.buildKeytabInstallationDirPath(folder); + RemoteIterator<LocatedFileStatus> files = + sliderFileSystem.getFileSystem().listFiles(keytabPath, true); + log.info("Keytabs:"); + while (files.hasNext()) { + log.info("\t" + files.next().getPath().toString()); + } + + return EXIT_SUCCESS; + } + + private int actionDeleteKeytab(ActionKeytabArgs keytabInfo) + throws BadCommandArgumentsException, IOException { + if (StringUtils.isEmpty(keytabInfo.folder)) { + throw new BadCommandArgumentsException( + "A valid destination keytab sub-folder name is required (e.g. 'security').\n" + + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); + } + + if (StringUtils.isEmpty(keytabInfo.keytab)) { + throw new BadCommandArgumentsException("A keytab name is required."); + } + + Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); + + Path fileInFs = new Path(pkgPath, keytabInfo.keytab ); + log.info("Deleting keytab {}", fileInFs); + FileSystem sfs = sliderFileSystem.getFileSystem(); + require(sfs.exists(fileInFs), "No keytab to delete found at %s", + fileInFs.toUri()); + sfs.delete(fileInFs, false); + + return EXIT_SUCCESS; + } + + private int actionInstallKeytab(ActionKeytabArgs keytabInfo) + throws BadCommandArgumentsException, IOException { + Path srcFile = null; + require(isSet(keytabInfo.folder), + "A valid destination keytab sub-folder name is required (e.g. 'security').\n" + + CommonArgs.usage(serviceArgs, ACTION_KEYTAB)); + + requireArgumentSet(Arguments.ARG_KEYTAB, keytabInfo.keytab); + File keytabFile = new File(keytabInfo.keytab); + require(keytabFile.isFile(), + "Unable to access supplied keytab file at %s", keytabFile.getAbsolutePath()); + srcFile = new Path(keytabFile.toURI()); + + Path pkgPath = sliderFileSystem.buildKeytabInstallationDirPath(keytabInfo.folder); + FileSystem sfs = sliderFileSystem.getFileSystem(); + sfs.mkdirs(pkgPath); + sfs.setPermission(pkgPath, new FsPermission( + FsAction.ALL, FsAction.NONE, FsAction.NONE)); + + Path fileInFs = new Path(pkgPath, srcFile.getName()); + log.info("Installing keytab {} at {} and overwrite is {}.", + srcFile, fileInFs, keytabInfo.overwrite); + require(!(sfs.exists(fileInFs) && !keytabInfo.overwrite), + "Keytab exists at %s. Use --overwrite to overwrite.", fileInFs.toUri()); + + sfs.copyFromLocalFile(false, keytabInfo.overwrite, srcFile, fileInFs); + sfs.setPermission(fileInFs, + new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)); + + return EXIT_SUCCESS; + } + + @Override + public int actionInstallKeytab(ActionInstallKeytabArgs installKeytabInfo) + throws YarnException, IOException { + log.warn("The 'install-keytab' option has been deprecated. Please use 'keytab --install'."); + return actionKeytab(new ActionKeytabArgs(installKeytabInfo)); + } + + @Override + public int actionInstallPkg(ActionInstallPackageArgs installPkgInfo) throws + YarnException, + IOException { + log.warn("The " + ACTION_INSTALL_PACKAGE + + " option has been deprecated. Please use '" + + ACTION_PACKAGE + " " + ClientArgs.ARG_INSTALL + "'."); + if (StringUtils.isEmpty(installPkgInfo.name)) { + throw new BadCommandArgumentsException( + E_INVALID_APPLICATION_TYPE_NAME + "\n" + + CommonArgs.usage(serviceArgs, ACTION_INSTALL_PACKAGE)); + } + Path srcFile = extractPackagePath(installPkgInfo.packageURI); + + // Do not provide new options to install-package command as it is in + // deprecated mode. So version is kept null here. Use package --install. + Path pkgPath = sliderFileSystem.buildPackageDirPath(installPkgInfo.name, + null); + FileSystem sfs = sliderFileSystem.getFileSystem(); + sfs.mkdirs(pkgPath); + + Path fileInFs = new Path(pkgPath, srcFile.getName()); + log.info("Installing package {} at {} and overwrite is {}.", + srcFile, fileInFs, installPkgInfo.replacePkg); + require(!(sfs.exists(fileInFs) && !installPkgInfo.replacePkg), + "Package exists at %s. : %s", fileInFs.toUri(), E_USE_REPLACEPKG_TO_OVERWRITE); + sfs.copyFromLocalFile(false, installPkgInfo.replacePkg, srcFile, fileInFs); + return EXIT_SUCCESS; + } + + @Override + public int actionResource(ActionResourceArgs resourceInfo) + throws YarnException, IOException { + if (resourceInfo.help) { + actionHelp(ACTION_RESOURCE); + return EXIT_SUCCESS; + } else if (resourceInfo.install) { + return actionInstallResource(resourceInfo); + } else if (resourceInfo.delete) { + return actionDeleteResource(resourceInfo); + } else if (resourceInfo.list) { + return actionListResource(resourceInfo); + } else { + throw new BadCommandArgumentsException( + "Resource option specified not found.\n" + + CommonArgs.usage(serviceArgs, ACTION_RESOURCE)); + } + } + + private int actionListResource(ActionResourceArgs resourceInfo) throws IOException { + String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY; + Path path = sliderFileSystem.buildResourcePath(folder); + RemoteIterator<LocatedFileStatus> files = + sliderFileSystem.getFileSystem().listFiles(path, true); + log.info("Resources:"); + while (files.hasNext()) { + log.info("\t" + files.next().getPath().toString()); + } + + return EXIT_SUCCESS; + } + + private int actionDeleteResource(ActionResourceArgs resourceInfo) + throws BadCommandArgumentsException, IOException { + if (StringUtils.isEmpty(resourceInfo.resource)) { + throw new BadCommandArgumentsException("A file name is required."); + } + + Path fileInFs; + if (resourceInfo.folder == null) { + fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.resource); + } else { + fileInFs = sliderFileSystem.buildResourcePath(resourceInfo.folder, + resourceInfo.resource); + } + + log.info("Deleting resource {}", fileInFs); + FileSystem sfs = sliderFileSystem.getFileSystem(); + require(sfs.exists(fileInFs), "No resource to delete found at %s", fileInFs.toUri()); + sfs.delete(fileInFs, true); + + return EXIT_SUCCESS; + } + + private int actionInstallResource(ActionResourceArgs resourceInfo) + throws BadCommandArgumentsException, IOException { + Path srcFile = null; + String folder = resourceInfo.folder != null ? resourceInfo.folder : StringUtils.EMPTY; + + requireArgumentSet(Arguments.ARG_RESOURCE, resourceInfo.resource); + File file = new File(resourceInfo.resource); + require(file.isFile() || file.isDirectory(), + "Unable to access supplied file at %s", file.getAbsolutePath()); + + File[] files; + if (file.isDirectory()) { + files = file.listFiles(); + } else { + files = new File[] { file }; + } + + Path pkgPath = sliderFileSystem.buildResourcePath(folder); + FileSystem sfs = sliderFileSystem.getFileSystem(); + + if (!sfs.exists(pkgPath)) { + sfs.mkdirs(pkgPath); + sfs.setPermission(pkgPath, new FsPermission( + FsAction.ALL, FsAction.NONE, FsAction.NONE)); + } else { + require(sfs.isDirectory(pkgPath), "Specified folder %s exists and is " + + "not a directory", folder); + } + + for (File f : files) { + srcFile = new Path(f.toURI()); + + Path fileInFs = new Path(pkgPath, srcFile.getName()); + log.info("Installing file {} at {} and overwrite is {}.", + srcFile, fileInFs, resourceInfo.overwrite); + require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite), + "File exists at %s. Use --overwrite to overwrite.", fileInFs.toUri()); + + sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs); + sfs.setPermission(fileInFs, + new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)); + } + + return EXIT_SUCCESS; + } + + @Override + public int actionClient(ActionClientArgs clientInfo) throws + YarnException, + IOException { + if (clientInfo.install) { + return doClientInstall(clientInfo); + } else if (clientInfo.getCertStore) { + return doCertificateStoreRetrieval(clientInfo); + } else { + throw new BadCommandArgumentsException( + "Only install, keystore, and truststore commands are supported for the client.\n" + + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); + + } + } + + private int doCertificateStoreRetrieval(ActionClientArgs clientInfo) + throws YarnException, IOException { + if (clientInfo.keystore != null && clientInfo.truststore != null) { + throw new BadCommandArgumentsException( + "Only one of either keystore or truststore can be retrieved at one time. " + + "Retrieval of both should be done separately\n" + + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); + } + + requireArgumentSet(Arguments.ARG_NAME, clientInfo.name); + + File storeFile = null; + SecurityStore.StoreType type; + if (clientInfo.keystore != null) { + storeFile = clientInfo.keystore; + type = SecurityStore.StoreType.keystore; + } else { + storeFile = clientInfo.truststore; + type = SecurityStore.StoreType.truststore; + } + + require (!storeFile.exists(), + "File %s already exists. Please remove that file or select a different file name.", + storeFile.getAbsolutePath()); + String hostname = null; + if (type == SecurityStore.StoreType.keystore) { + hostname = clientInfo.hostname; + if (hostname == null) { + hostname = InetAddress.getLocalHost().getCanonicalHostName(); + log.info("No hostname specified via command line. Using {}", hostname); + } + } + + String password = clientInfo.password; + if (password == null) { + String provider = clientInfo.provider; + String alias = clientInfo.alias; + if (provider != null && alias != null) { + Configuration conf = new Configuration(getConfig()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); + char[] chars = conf.getPassword(alias); + if (chars == null) { + CredentialProvider credentialProvider = + CredentialProviderFactory.getProviders(conf).get(0); + chars = readOnePassword(alias); + credentialProvider.createCredentialEntry(alias, chars); + credentialProvider.flush(); + } + password = String.valueOf(chars); + Arrays.fill(chars, ' '); + } else { + log.info("No password and no provider/alias pair were provided, " + + "prompting for password"); + // get a password + password = String.valueOf(readOnePassword(type.name())); + } + } + + byte[] keystore = createClusterOperations(clientInfo.name) + .getClientCertificateStore(hostname, "client", password, type.name()); + // persist to file + FileOutputStream storeFileOutputStream = null; + try { + storeFileOutputStream = new FileOutputStream(storeFile); + IOUtils.write(keystore, storeFileOutputStream); + } catch (Exception e) { + log.error("Unable to persist to file {}", storeFile); + throw e; + } finally { + if (storeFileOutputStream != null) { + storeFileOutputStream.close(); + } + } + + return EXIT_SUCCESS; + } + + private int doClientInstall(ActionClientArgs clientInfo) + throws IOException, SliderException { + + require(clientInfo.installLocation != null, + E_INVALID_INSTALL_LOCATION +"\n" + + CommonArgs.usage(serviceArgs, ACTION_CLIENT)); + require(clientInfo.installLocation.exists(), + E_INSTALL_PATH_DOES_NOT_EXIST + ": " + clientInfo.installLocation.getAbsolutePath()); + + require(clientInfo.installLocation.isDirectory(), + E_INVALID_INSTALL_PATH + ": " + clientInfo.installLocation.getAbsolutePath()); + + File pkgFile; + File tmpDir = null; + + require(isSet(clientInfo.packageURI) || isSet(clientInfo.name), + E_INVALID_APPLICATION_PACKAGE_LOCATION); + if (isSet(clientInfo.packageURI)) { + pkgFile = new File(clientInfo.packageURI); + } else { + Path appDirPath = sliderFileSystem.buildAppDefDirPath(clientInfo.name); + Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG); + require(sliderFileSystem.isFile(appDefPath), + E_INVALID_APPLICATION_PACKAGE_LOCATION); + tmpDir = Files.createTempDir(); + pkgFile = new File(tmpDir, SliderKeys.DEFAULT_APP_PKG); + sliderFileSystem.copyHdfsFileToLocal(appDefPath, pkgFile); + } + require(pkgFile.isFile(), + E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + " at %s", pkgFile.getAbsolutePath()); + + JSONObject config = null; + if(clientInfo.clientConfig != null) { + try { + byte[] encoded = Files.toByteArray(clientInfo.clientConfig); + config = new JSONObject(new String(encoded, Charset.defaultCharset())); + } catch (JSONException jsonEx) { + log.error("Unable to read supplied configuration at {}: {}", + clientInfo.clientConfig, jsonEx); + log.debug("Unable to read supplied configuration at {}: {}", + clientInfo.clientConfig, jsonEx, jsonEx); + throw new BadConfigException(E_MUST_BE_A_VALID_JSON_FILE, jsonEx); + } + } + + // Only INSTALL is supported + AbstractClientProvider + provider = createClientProvider(SliderProviderFactory.DEFAULT_CLUSTER_TYPE); + provider.processClientOperation(sliderFileSystem, + getRegistryOperations(), + getConfig(), + "INSTALL", + clientInfo.installLocation, + pkgFile, + config, + clientInfo.name); + return EXIT_SUCCESS; + } + + + @Override + public int actionPackage(ActionPackageArgs actionPackageInfo) + throws YarnException, IOException { + initializeOutputStream(actionPackageInfo.out); + int exitCode = -1; + if (actionPackageInfo.help) { + exitCode = actionHelp(ACTION_PACKAGE); + } + if (actionPackageInfo.install) { + exitCode = actionPackageInstall(actionPackageInfo); + } + if (actionPackageInfo.delete) { + exitCode = actionPackageDelete(actionPackageInfo); + } + if (actionPackageInfo.list) { + exitCode = actionPackageList(); + } + if (actionPackageInfo.instances) { + exitCode = actionPackageInstances(); + } + finalizeOutputStream(actionPackageInfo.out); + if (exitCode != -1) { + return exitCode; + } + throw new BadCommandArgumentsException( + "Select valid package operation option"); + } + + private void initializeOutputStream(String outFile) + throws FileNotFoundException { + if (outFile != null) { + clientOutputStream = new PrintStream(new FileOutputStream(outFile)); + } else { + clientOutputStream = System.out; + } + } + + private void finalizeOutputStream(String outFile) { + if (outFile != null && clientOutputStream != null) { + clientOutputStream.flush(); + clientOutputStream.close(); + } + clientOutputStream = System.out; + } + + private int actionPackageInstances() throws YarnException, IOException { + Map<String, Path> persistentInstances = sliderFileSystem + .listPersistentInstances(); + if (persistentInstances.isEmpty()) { + log.info("No slider cluster specification available"); + return EXIT_SUCCESS; + } + String pkgPathValue = sliderFileSystem + .buildPackageDirPath(StringUtils.EMPTY, StringUtils.EMPTY).toUri() + .getPath(); + FileSystem fs = sliderFileSystem.getFileSystem(); + Iterator<Map.Entry<String, Path>> instanceItr = persistentInstances + .entrySet().iterator(); + log.info("List of applications with its package name and path"); + println("%-25s %15s %30s %s", "Cluster Name", "Package Name", + "Package Version", "Application Location"); + while(instanceItr.hasNext()) { + Map.Entry<String, Path> entry = instanceItr.next(); + String clusterName = entry.getKey(); + Path clusterPath = entry.getValue(); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + clusterName, clusterPath); + Path appDefPath = null; + try { + appDefPath = new Path( + getApplicationDefinitionPath(instanceDefinition + .getAppConfOperations())); + } catch (BadConfigException e) { + // Invalid cluster state, so move on to next. No need to log anything + // as this is just listing of instances. + continue; + } + if (!appDefPath.isUriPathAbsolute()) { + appDefPath = new Path(fs.getHomeDirectory(), appDefPath); + } + String appDefPathStr = appDefPath.toUri().toString(); + try { + if (appDefPathStr.contains(pkgPathValue) && fs.isFile(appDefPath)) { + String packageName = appDefPath.getParent().getName(); + String packageVersion = StringUtils.EMPTY; + if (instanceDefinition.isVersioned()) { + packageVersion = packageName; + packageName = appDefPath.getParent().getParent().getName(); + } + println("%-25s %15s %30s %s", clusterName, packageName, + packageVersion, appDefPathStr); + } + } catch (IOException e) { + log.debug("{} application definition path {} is not found.", clusterName, appDefPathStr); + } + } + return EXIT_SUCCESS; + } + + private int actionPackageList() throws IOException { + Path pkgPath = sliderFileSystem.buildPackageDirPath(StringUtils.EMPTY, + StringUtils.EMPTY); + log.info("Package install path : {}", pkgPath); + FileSystem sfs = sliderFileSystem.getFileSystem(); + if (!sfs.isDirectory(pkgPath)) { + log.info("No package(s) installed"); + return EXIT_SUCCESS; + } + FileStatus[] fileStatus = sfs.listStatus(pkgPath); + boolean hasPackage = false; + StringBuilder sb = new StringBuilder(); + sb.append("List of installed packages:\n"); + for (FileStatus fstat : fileStatus) { + if (fstat.isDirectory()) { + sb.append("\t").append(fstat.getPath().getName()); + sb.append("\n"); + hasPackage = true; + } + } + if (hasPackage) { + println(sb.toString()); + } else { + log.info("No package(s) installed"); + } + return EXIT_SUCCESS; + } + + private void createSummaryMetainfoFile(Path srcFile, Path destFile, + boolean overwrite) throws IOException { + FileSystem srcFs = srcFile.getFileSystem(getConfig()); + try (InputStream inputStreamJson = SliderUtils + .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.json"); + InputStream inputStreamXml = SliderUtils + .getApplicationResourceInputStream(srcFs, srcFile, "metainfo.xml");) { + InputStream inputStream = null; + Path summaryFileInFs = null; + if (inputStreamJson != null) { + inputStream = inputStreamJson; + summaryFileInFs = new Path(destFile.getParent(), destFile.getName() + + ".metainfo.json"); + log.info("Found JSON metainfo file in package"); + } else if (inputStreamXml != null) { + inputStream = inputStreamXml; + summaryFileInFs = new Path(destFile.getParent(), destFile.getName() + + ".metainfo.xml"); + log.info("Found XML metainfo file in package"); + } + if (inputStream != null) { + try (FSDataOutputStream dataOutputStream = sliderFileSystem + .getFileSystem().create(summaryFileInFs, overwrite)) { + log.info("Creating summary metainfo file"); + IOUtils.copy(inputStream, dataOutputStream); + } + } + } + } + + private int actionPackageInstall(ActionPackageArgs actionPackageArgs) + throws YarnException, IOException { + requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name); + + Path srcFile = extractPackagePath(actionPackageArgs.packageURI); + + Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name, + actionPackageArgs.version); + FileSystem fs = sliderFileSystem.getFileSystem(); + if (!fs.exists(pkgPath)) { + fs.mkdirs(pkgPath); + } + + Path fileInFs = new Path(pkgPath, srcFile.getName()); + require(actionPackageArgs.replacePkg || !fs.exists(fileInFs), + E_PACKAGE_EXISTS +" at %s. Use --replacepkg to overwrite.", fileInFs.toUri()); + + log.info("Installing package {} to {} (overwrite set to {})", srcFile, + fileInFs, actionPackageArgs.replacePkg); + fs.copyFromLocalFile(false, actionPackageArgs.replacePkg, srcFile, fileInFs); + createSummaryMetainfoFile(srcFile, fileInFs, actionPackageArgs.replacePkg); + + String destPathWithHomeDir = Path + .getPathWithoutSchemeAndAuthority(fileInFs).toString(); + String destHomeDir = Path.getPathWithoutSchemeAndAuthority( + fs.getHomeDirectory()).toString(); + // a somewhat contrived approach to stripping out the home directory and any trailing + // separator; designed to work on windows and unix + String destPathWithoutHomeDir; + if (destPathWithHomeDir.startsWith(destHomeDir)) { + destPathWithoutHomeDir = destPathWithHomeDir.substring(destHomeDir.length()); + if (destPathWithoutHomeDir.startsWith("/") || destPathWithoutHomeDir.startsWith("\\")) { + destPathWithoutHomeDir = destPathWithoutHomeDir.substring(1); + } + } else { + destPathWithoutHomeDir = destPathWithHomeDir; + } + log.info("Set " + AgentKeys.APP_DEF + " in your app config JSON to {}", + destPathWithoutHomeDir); + + return EXIT_SUCCESS; + } + + private Path extractPackagePath(String packageURI) + throws BadCommandArgumentsException { + require(isSet(packageURI), E_INVALID_APPLICATION_PACKAGE_LOCATION); + File pkgFile = new File(packageURI); + require(pkgFile.isFile(), + E_UNABLE_TO_READ_SUPPLIED_PACKAGE_FILE + ": " + pkgFile.getAbsolutePath()); + return new Path(pkgFile.toURI()); + } + + private int actionPackageDelete(ActionPackageArgs actionPackageArgs) throws + YarnException, IOException { + requireArgumentSet(Arguments.ARG_NAME, actionPackageArgs.name); + + Path pkgPath = sliderFileSystem.buildPackageDirPath(actionPackageArgs.name, + actionPackageArgs.version); + FileSystem fs = sliderFileSystem.getFileSystem(); + require(fs.exists(pkgPath), E_PACKAGE_DOES_NOT_EXIST +": %s ", pkgPath.toUri()); + log.info("Deleting package {} at {}.", actionPackageArgs.name, pkgPath); + + if(fs.delete(pkgPath, true)) { + log.info("Deleted package {} " + actionPackageArgs.name); + return EXIT_SUCCESS; + } else { + log.warn("Package deletion failed."); + return EXIT_NOT_FOUND; + } + } + + @Override + public int actionUpdate(String clustername, + AbstractClusterBuildingActionArgs buildInfo) throws + YarnException, IOException { + buildInstanceDefinition(clustername, buildInfo, true, true); + return EXIT_SUCCESS; + } + + /** + * Build up the AggregateConfiguration for an application instance then + * persists it + * @param clustername name of the cluster + * @param buildInfo the arguments needed to build the cluster + * @param overwrite true if existing cluster directory can be overwritten + * @param liveClusterAllowed true if live cluster can be modified + * @throws YarnException + * @throws IOException + */ + + public void buildInstanceDefinition(String clustername, + AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, + boolean liveClusterAllowed) throws YarnException, IOException { + buildInstanceDefinition(clustername, buildInfo, overwrite, + liveClusterAllowed, false); + } + + public void buildInstanceDefinition(String clustername, + AbstractClusterBuildingActionArgs buildInfo, boolean overwrite, + boolean liveClusterAllowed, boolean isUpgradeFlow) throws YarnException, + IOException { + // verify that a live cluster isn't there + validateClusterName(clustername); + verifyBindingsDefined(); + if (!liveClusterAllowed) { + verifyNoLiveClusters(clustername, "Create"); + } + + Configuration conf = getConfig(); + String registryQuorum = lookupZKQuorum(); + + Path appconfdir = buildInfo.getConfdir(); + // Provider + String providerName = buildInfo.getProvider(); + requireArgumentSet(Arguments.ARG_PROVIDER, providerName); + log.debug("Provider is {}", providerName); + SliderAMClientProvider sliderAM = new SliderAMClientProvider(conf); + AbstractClientProvider provider = + createClientProvider(providerName); + InstanceBuilder builder = + new InstanceBuilder(sliderFileSystem, + getConfig(), + clustername); + + AggregateConf instanceDefinition = new AggregateConf(); + ConfTreeOperations appConf = instanceDefinition.getAppConfOperations(); + ConfTreeOperations resources = instanceDefinition.getResourceOperations(); + ConfTreeOperations internal = instanceDefinition.getInternalOperations(); + //initial definition is set by the providers + sliderAM.prepareInstanceConfiguration(instanceDefinition); + provider.prepareInstanceConfiguration(instanceDefinition); + + //load in any specified on the command line + if (buildInfo.resources != null) { + try { + resources.mergeFile(buildInfo.resources, + new ResourcesInputPropertiesValidator()); + + } catch (IOException e) { + throw new BadConfigException(e, + "incorrect argument to %s: \"%s\" : %s ", + Arguments.ARG_RESOURCES, + buildInfo.resources, + e.toString()); + } + } + if (buildInfo.template != null) { + try { + appConf.mergeFile(buildInfo.template, + new TemplateInputPropertiesValidator()); + } catch (IOException e) { + throw new BadConfigException(e, + "incorrect argument to %s: \"%s\" : %s ", + Arguments.ARG_TEMPLATE, + buildInfo.template, + e.toString()); + } + } + + if (isUpgradeFlow) { + ActionUpgradeArgs upgradeInfo = (ActionUpgradeArgs) buildInfo; + if (!upgradeInfo.force) { + validateClientAndClusterResource(clustername, resources); + } + } + + //get the command line options + ConfTree cmdLineAppOptions = buildInfo.buildAppOptionsConfTree(); + ConfTree cmdLineResourceOptions = buildInfo.buildResourceOptionsConfTree(); + + appConf.merge(cmdLineAppOptions); + + AppDefinitionPersister appDefinitionPersister = new AppDefinitionPersister(sliderFileSystem); + appDefinitionPersister.processSuppliedDefinitions(clustername, buildInfo, appConf); + + // put the role counts into the resources file + Map<String, String> argsRoleMap = buildInfo.getComponentMap(); + for (Map.Entry<String, String> roleEntry : argsRoleMap.entrySet()) { + String count = roleEntry.getValue(); + String key = roleEntry.getKey(); + log.info("{} => {}", key, count); + resources.getOrAddComponent(key).put(COMPONENT_INSTANCES, count); + } + + //all CLI role options + Map<String, Map<String, String>> appOptionMap = + buildInfo.getCompOptionMap(); + appConf.mergeComponents(appOptionMap); + + //internal picks up core. values only + internal.propagateGlobalKeys(appConf, "slider."); + internal.propagateGlobalKeys(appConf, "internal."); + + //copy over role. and yarn. values ONLY to the resources + if (PROPAGATE_RESOURCE_OPTION) { + resources.propagateGlobalKeys(appConf, "component."); + resources.propagateGlobalKeys(appConf, "role."); + resources.propagateGlobalKeys(appConf, "yarn."); + resources.mergeComponentsPrefix(appOptionMap, "component.", true); + resources.mergeComponentsPrefix(appOptionMap, "yarn.", true); + resources.mergeComponentsPrefix(appOptionMap, "role.", true); + } + + // resource component args + appConf.merge(cmdLineResourceOptions); + resources.merge(cmdLineResourceOptions); + resources.mergeComponents(buildInfo.getResourceCompOptionMap()); + + builder.init(providerName, instanceDefinition); + builder.propagateFilename(); + builder.propagatePrincipals(); + builder.setImageDetailsIfAvailable(buildInfo.getImage(), + buildInfo.getAppHomeDir()); + builder.setQueue(buildInfo.queue); + + String quorum = buildInfo.getZKhosts(); + if (isUnset(quorum)) { + quorum = registryQuorum; + } + if (isUnset(quorum)) { + throw new BadConfigException(E_NO_ZOOKEEPER_QUORUM); + } + ZKPathBuilder zkPaths = new ZKPathBuilder(getAppName(), + getUsername(), + clustername, + registryQuorum, + quorum); + String zookeeperRoot = buildInfo.getAppZKPath(); + + if (isSet(zookeeperRoot)) { + zkPaths.setAppPath(zookeeperRoot); + } else { + String createDefaultZkNode = appConf.getGlobalOptions() + .getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false"); + if (createDefaultZkNode.equals("true")) { + String defaultZKPath = createZookeeperNode(clustername, false); + log.debug("ZK node created for application instance: {}", defaultZKPath); + if (defaultZKPath != null) { + zkPaths.setAppPath(defaultZKPath); + } + } else { + // create AppPath if default is being used + String defaultZKPath = createZookeeperNode(clustername, true); + log.debug("ZK node assigned to application instance: {}", defaultZKPath); + zkPaths.setAppPath(defaultZKPath); + } + } + + builder.addZKBinding(zkPaths); + + //then propagate any package URI + if (buildInfo.packageURI != null) { + appConf.set(AgentKeys.PACKAGE_PATH, buildInfo.packageURI); + } + + propagatePythonExecutable(conf, instanceDefinition); + + // make any substitutions needed at this stage + replaceTokens(appConf.getConfTree(), getUsername(), clustername); + + // TODO: Refactor the validation code and persistence code + try { + persistInstanceDefinition(overwrite, appconfdir, builder); + appDefinitionPersister.persistPackages(); + + } catch (LockAcquireFailedException e) { + log.warn("Failed to get a Lock on {} : {}", builder, e, e); + throw new BadClusterStateException("Failed to save " + clustername + + ": " + e); + } + + // providers to validate what there is + // TODO: Validation should be done before persistence + AggregateConf instanceDescription = builder.getInstanceDescription(); + validateInstanceDefinition(sliderAM, instanceDescription, sliderFileSystem); + validateInstanceDefinition(provider, instanceDescription, sliderFileSystem); + } + + private void validateClientAndClusterResource(String clustername, + ConfTreeOperations clientResources) throws BadClusterStateException, + SliderException, IOException { + log.info("Validating upgrade resource definition with current cluster " + + "state (components and instance count)"); + Map<String, Integer> clientComponentInstances = new HashMap<>(); + for (String componentName : clientResources.getComponentNames()) { + if (!SliderKeys.COMPONENT_AM.equals(componentName)) { + clientComponentInstances.put(componentName, clientResources + .getComponentOptInt(componentName, + COMPONENT_INSTANCES, -1)); + } + } + + AggregateConf clusterConf = null; + try { + clusterConf = loadPersistedClusterDescription(clustername); + } catch (LockAcquireFailedException e) { + log.warn("Failed to get a Lock on cluster resource : {}", e, e); + throw new BadClusterStateException( + "Failed to load client resource definition " + clustername + ": " + e, e); + } + Map<String, Integer> clusterComponentInstances = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> component : clusterConf + .getResources().components.entrySet()) { + if (!SliderKeys.COMPONENT_AM.equals(component.getKey())) { + clusterComponentInstances.put( + component.getKey(), + Integer.decode(component.getValue().get( + COMPONENT_INSTANCES))); + } + } + + // client and cluster should be an exact match + Iterator<Map.Entry<String, Integer>> clientComponentInstanceIt = clientComponentInstances + .entrySet().iterator(); + while (clientComponentInstanceIt.hasNext()) { + Map.Entry<String, Integer> clientComponentInstanceEntry = clientComponentInstanceIt.next(); + if (clusterComponentInstances.containsKey(clientComponentInstanceEntry.getKey())) { + // compare instance count now and remove from both maps if they match + if (clusterComponentInstances + .get(clientComponentInstanceEntry.getKey()).intValue() == clientComponentInstanceEntry + .getValue().intValue()) { + clusterComponentInstances.remove(clientComponentInstanceEntry + .getKey()); + clientComponentInstanceIt.remove(); + } + } + } + + if (!clientComponentInstances.isEmpty() + || !clusterComponentInstances.isEmpty()) { + log.error("Mismatch found in upgrade resource definition and cluster " + + "resource state"); + if (!clientComponentInstances.isEmpty()) { + log.info("The upgrade resource definitions that do not match are:"); + for (Map.Entry<String, Integer> clientComponentInstanceEntry : clientComponentInstances + .entrySet()) { + log.info(" Component Name: {}, Instance count: {}", + clientComponentInstanceEntry.getKey(), + clientComponentInstanceEntry.getValue()); + } + } + if (!clusterComponentInstances.isEmpty()) { + log.info("The cluster resources that do not match are:"); + for (Map.Entry<String, Integer> clusterComponentInstanceEntry : clusterComponentInstances + .entrySet()) { + log.info(" Component Name: {}, Instance count: {}", + clusterComponentInstanceEntry.getKey(), + clusterComponentInstanceEntry.getValue()); + } + } + throw new BadConfigException("Resource definition provided for " + + "upgrade does not match with that of the currently running " + + "cluster.\nIf you are aware of what you are doing, rerun the " + + "command with " + Arguments.ARG_FORCE + " option."); + } + } + + protected void persistInstanceDefinition(boolean overwrite, + Path appconfdir, + InstanceBuilder builder) + throws IOException, SliderException, LockAcquireFailedException { + builder.persist(appconfdir, overwrite); + } + + @VisibleForTesting + public static void replaceTokens(ConfTree conf, + String userName, String clusterName) throws IOException { + Map<String,String> newglobal = new HashMap<>(); + for (Entry<String,String> entry : conf.global.entrySet()) { + newglobal.put(entry.getKey(), replaceTokens(entry.getValue(), + userName, clusterName)); + } + conf.global.putAll(newglobal); + + for (String component : conf.components.keySet()) { + Map<String,String> newComponent = new HashMap<>(); + for (Entry<String,String> entry : conf.components.get(component).entrySet()) { + newComponent.put(entry.getKey(), replaceTokens(entry.getValue(), + userName, clusterName)); + } + conf.components.get(component).putAll(newComponent); + } + + Map<String,List<String>> newcred = new HashMap<>(); + for (Entry<String,List<String>> entry : conf.credentials.entrySet()) { + List<String> resultList = new ArrayList<>(); + for (String v : entry.getValue()) { + resultList.add(replaceTokens(v, userName, clusterName)); + } + newcred.put(replaceTokens(entry.getKey(), userName, clusterName), + resultList); + } + conf.credentials.clear(); + conf.credentials.putAll(newcred); + } + + private static String replaceTokens(String s, String userName, + String clusterName) throws IOException { + return s.replaceAll(Pattern.quote("${USER}"), userName) + .replaceAll(Pattern.quote("${USER_NAME}"), userName) + .replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName); + } + + public FsPermission getClusterDirectoryPermissions(Configuration conf) { + String clusterDirPermsOct = + conf.get(CLUSTER_DIRECTORY_PERMISSIONS, DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS); + return new FsPermission(clusterDirPermsOct); + } + + /** + * Verify that the Resource Manager is configured (on a non-HA cluster). + * with a useful error message + * @throws BadCommandArgumentsException the exception raised on an invalid config + */ + public void verifyBindingsDefined() throws BadCommandArgumentsException { + InetSocketAddress rmAddr = getRmAddress(getConfig()); + if (!getConfig().getBoolean(YarnConfiguration.RM_HA_ENABLED, false) + && !isAddressDefined(rmAddr)) { + throw new BadCommandArgumentsException( + E_NO_RESOURCE_MANAGER + + " in the argument " + + Arguments.ARG_MANAGER + + " or the configuration property " + + YarnConfiguration.RM_ADDRESS + + " value :" + rmAddr); + } + } + + /** + * Load and start a cluster specification. + * This assumes that all validation of args and cluster state + * have already taken place + * + * @param clustername name of the cluster. + * @param launchArgs launch arguments + * @return the exit code + * @throws YarnException + * @throws IOException + */ + protected int startCluster(String clustername, + LaunchArgsAccessor launchArgs) throws + YarnException, + IOException { + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + clustername, + clusterDirectory); + + LaunchedApplication launchedApplication = + launchApplication(clustername, clusterDirectory, instanceDefinition, + serviceArgs.isDebug()); + + if (launchArgs.getOutputFile() != null) { + // output file has been requested. Get the app report and serialize it + ApplicationReport report = + launchedApplication.getApplicationReport(); + SerializedApplicationReport sar = new SerializedApplicationReport(report); + sar.submitTime = System.currentTimeMillis(); + ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser(); + serDeser.save(sar, launchArgs.getOutputFile()); + } + int waittime = launchArgs.getWaittime(); + if (waittime > 0) { + return waitForAppRunning(launchedApplication, waittime, waittime); + } else { + // no waiting + return EXIT_SUCCESS; + } + } + + /** + * Load the instance definition. It is not resolved at this point + * @param name cluster name + * @param clusterDirectory cluster dir + * @return the loaded configuration + * @throws IOException + * @throws SliderException + * @throws UnknownApplicationInstanceException if the file is not found + */ + public AggregateConf loadInstanceDefinitionUnresolved(String name, + Path clusterDirectory) throws IOException, SliderException { + + try { + AggregateConf definition = + InstanceIO.loadInstanceDefinitionUnresolved(sliderFileSystem, + clusterDirectory); + definition.setName(name); + return definition; + } catch (FileNotFoundException e) { + throw UnknownApplicationInstanceException.unknownInstance(name, e); + } + } + + /** + * Load the instance definition. + * @param name cluster name + * @param resolved flag to indicate the cluster should be resolved + * @return the loaded configuration + * @throws IOException IO problems + * @throws SliderException slider explicit issues + * @throws UnknownApplicationInstanceException if the file is not found + */ + public AggregateConf loadInstanceDefinition(String name, + boolean resolved) throws + IOException, + SliderException { + + Path clusterDirectory = sliderFileSystem.buildClusterDirPath(name); + AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved( + name, + clusterDirectory); + if (resolved) { + instanceDefinition.resolve(); + } + return instanceDefinition; + + } + + protected AppMasterLauncher setupAppMasterLauncher(String clustername, + Path clusterDirectory, + AggregateConf instanceDefinition, + boolean debugAM) + throws YarnException, IOException{ + deployedClusterName = clustername; + validateClusterName(clustername); + verifyNoLiveClusters(clustername, "Launch"); + Configuration config = getConfig(); + lookupZKQuorum(); + boolean clusterSecure = isHadoopClusterSecure(config); + //create the Slider AM provider -this helps set up the AM + SliderAMClientProvider sliderAM = new SliderAMClientProvider(config); + + instanceDefinition.resolve(); + launchedInstanceDefinition = instanceDefinition; + + ConfTreeOperations internalOperations = instanceDefinition.getInternalOperations(); + MapOperations internalOptions = internalOperations.getGlobalOptions(); + ConfTreeOperations resourceOperations = instanceDefinition.getResourceOperations(); + ConfTreeOperations appOperations = instanceDefinition.getAppConfOperations(); + Path generatedConfDirPath = + createPathThatMustExist(internalOptions.getMandatoryOption( + INTERNAL_GENERATED_CONF_PATH)); + Path snapshotConfPath = + createPathThatMustExist(internalOptions.getMandatoryOption( + INTERNAL_SNAPSHOT_CONF_PATH)); + + + // cluster Provider + AbstractClientProvider provider = createClientProvider( + internalOptions.getMandatoryOption(INTERNAL_PROVIDER_NAME)); + if (log.isDebugEnabled()) { + log.debug(instanceDefinition.toString()); + } + MapOperations sliderAMResourceComponent = + resourceOperations.getOrAddComponent(SliderKeys.COMPONENT_AM); + MapOperations resourceGlobalOptions = resourceOperations.getGlobalOptions(); + + // add the tags if available + Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem, + getApplicationDefinitionPath(appOperations)); + + Credentials credentials = null; + if (clusterSecure) { + // pick up oozie credentials + credentials = CredentialUtils.loadTokensFromEnvironment(System.getenv(), + config); + if (credentials == null) { + // nothing from oozie, so build up directly + credentials = new Credentials( + UserGroupInformation.getCurrentUser().getCredentials()); + CredentialUtils.addRMRenewableFSDelegationTokens(config, + sliderFileSystem.getFileSystem(), + credentials); + CredentialUtils.addRMDelegationToken(yarnClient, credentials); + + } else { + log.info("Using externally supplied credentials to launch AM"); + } + } + + AppMasterLauncher amLauncher = new AppMasterLauncher(clustername, + SliderKeys.APP_TYPE, + config, + sliderFileSystem, + yarnClient, + clusterSecure, + sliderAMResourceComponent, + resourceGlobalOptions, + applicationTags, + credentials); + + ApplicationId appId = amLauncher.getApplicationId(); + // set the application name; + amLauncher.setKeepContainersOverRestarts(true); + + int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0); + amLauncher.setMaxAppAttempts(maxAppAttempts); + + sliderFileSystem.purgeAppInstanceTempFiles(clustername); + Path tempPath = sliderFileSystem.createAppInstanceTempPath( + clustername, + appId.toString() + "/am"); + String libdir = "lib"; + Path libPath = new Path(tempPath, libdir); + sliderFileSystem.getFileSystem().mkdirs(libPath); + log.debug("FS={}, tempPath={}, libdir={}", sliderFileSystem, tempPath, libPath); + + // set local resources for the application master + // local files or archives as needed + // In this scenario, the jar file for the application master is part of the local resources + Map<String, LocalResource> localResources = amLauncher.getLocalResources(); + + // look for the configuration directory named on the command line + boolean hasServerLog4jProperties = false; + Path remoteConfPath = null; + String relativeConfDir = null; + String confdirProp = System.getProperty(SliderKeys.PROPERTY_CONF_DIR); + if (isUnset(confdirProp)) { + log.debug("No local configuration directory provided as system property"); + } else { + File confDir = new File(confdirProp); + if (!confDir.exists()) { + throw new BadConfigException(E_CONFIGURATION_DIRECTORY_NOT_FOUND, + confDir); + } + Path localConfDirPath = createLocalPath(confDir); + remoteConfPath = new Path(clusterDirectory, SliderKeys.SUBMITTED_CONF_DIR); + log.debug("Slider configuration directory is {}; remote to be {}", + localConfDirPath, remoteConfPath); + copyDirectory(config, localConfDirPath, remoteConfPath, null); + + File log4jserver = + new File(confDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME); + hasServerLog4jProperties = log4jserver.isFile(); + } + // the assumption here is that minimr cluster => this is a test run + // and the classpath can look after itself + + boolean usingMiniMRCluster = getUsingMiniMRCluster(); + if (!usingMiniMRCluster) { + + log.debug("Destination is not a MiniYARNCluster -copying full classpath"); + + // insert conf dir first + if (remoteConfPath != null) { + relativeConfDir = SliderKeys.SUBMITTED_CONF_DIR; + Map<String, LocalResource> submittedConfDir = + sliderFileSystem.submitDirectory(remoteConfPath, + relativeConfDir); + mergeMaps(localResources, submittedConfDir); + } + } + // build up the configuration + // IMPORTANT: it is only after this call that site configurations + // will be valid. + + propagatePrincipals(config, instanceDefinition); + // validate security data + +/* + // turned off until tested + SecurityConfiguration securityConfiguration = + new SecurityConfiguration(config, + instanceDefinition, clustername); + +*/ + Configuration clientConfExtras = new Configuration(false); + // then build up the generated path. + FsPermission clusterPerms = getClusterDirectoryPermissions(config); + copyDirectory(config, snapshotConfPath, generatedConfDirPath, + clusterPerms); + + + // standard AM resources + sliderAM.prepareAMAnd
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org