http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java new file mode 100644 index 0000000..b767059 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -0,0 +1,2450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.health.HealthCheckRegistry; +import com.codahale.metrics.jvm.GarbageCollectorMetricSet; +import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import com.codahale.metrics.jvm.ThreadStatesGaugeSet; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; +import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG; + +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.webapp.WebAppException; +import org.apache.hadoop.yarn.webapp.WebApps; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.InternalKeys; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.api.RoleKeys; +import org.apache.slider.api.StatusKeys; +import org.apache.slider.api.proto.SliderClusterAPI; +import org.apache.slider.client.SliderYarnClientImpl; +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.params.AbstractActionArgs; +import org.apache.slider.common.params.SliderAMArgs; +import org.apache.slider.common.params.SliderAMCreateAction; +import org.apache.slider.common.params.SliderActions; +import org.apache.slider.common.tools.ConfigHelper; +import org.apache.slider.common.tools.PortScanner; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.common.tools.SliderVersionInfo; +import org.apache.slider.core.build.InstanceIO; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTree; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.exceptions.SliderInternalStateException; +import org.apache.slider.core.exceptions.TriggerClusterTeardownException; +import org.apache.slider.core.launch.CredentialUtils; +import org.apache.slider.core.main.ExitCodeProvider; +import org.apache.slider.core.main.LauncherExitCodes; +import org.apache.slider.core.main.RunService; +import org.apache.slider.core.main.ServiceLauncher; +import org.apache.slider.core.registry.info.CustomRegistryConstants; +import org.apache.slider.providers.ProviderCompleted; +import org.apache.slider.providers.ProviderRole; +import org.apache.slider.providers.ProviderService; +import org.apache.slider.providers.SliderProviderFactory; +import org.apache.slider.providers.agent.AgentKeys; +import org.apache.slider.providers.agent.AgentProviderService; +import org.apache.slider.providers.slideram.SliderAMClientProvider; +import org.apache.slider.providers.slideram.SliderAMProviderService; +import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance; +import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests; +import org.apache.slider.server.appmaster.actions.RegisterComponentInstance; +import org.apache.slider.server.appmaster.actions.QueueExecutor; +import org.apache.slider.server.appmaster.actions.QueueService; +import org.apache.slider.server.appmaster.actions.ActionStopSlider; +import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers; +import org.apache.slider.server.appmaster.actions.AsyncAction; +import org.apache.slider.server.appmaster.actions.RenewingAction; +import org.apache.slider.server.appmaster.actions.ResetFailureWindow; +import org.apache.slider.server.appmaster.actions.ReviewAndFlexApplicationSize; +import org.apache.slider.server.appmaster.actions.UnregisterComponentInstance; +import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; +import org.apache.slider.server.appmaster.management.YarnServiceHealthCheck; +import org.apache.slider.server.appmaster.monkey.ChaosKillAM; +import org.apache.slider.server.appmaster.monkey.ChaosKillContainer; +import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService; +import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler; +import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler; +import org.apache.slider.server.appmaster.rpc.RpcBinder; +import org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider; +import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.appmaster.rpc.SliderIPCService; +import org.apache.slider.server.appmaster.security.SecurityConfiguration; +import org.apache.slider.server.appmaster.state.AppState; +import org.apache.slider.server.appmaster.state.AppStateBindingInfo; +import org.apache.slider.server.appmaster.state.ContainerAssignment; +import org.apache.slider.server.appmaster.state.ProviderAppState; +import org.apache.slider.server.appmaster.operations.RMOperationHandler; +import org.apache.slider.server.appmaster.state.RoleInstance; +import org.apache.slider.server.appmaster.web.AgentService; +import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer; +import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp; +import org.apache.slider.server.appmaster.web.SliderAMWebApp; +import org.apache.slider.server.appmaster.web.WebAppApi; +import org.apache.slider.server.appmaster.web.WebAppApiImpl; +import org.apache.slider.server.appmaster.web.rest.RestPaths; +import org.apache.slider.server.appmaster.web.rest.application.ApplicationResouceContentCacheFactory; +import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache; +import org.apache.slider.server.services.security.CertificateManager; +import org.apache.slider.server.services.utility.AbstractSliderLaunchedService; +import org.apache.slider.server.services.utility.WebAppService; +import org.apache.slider.server.services.workflow.ServiceThreadFactory; +import org.apache.slider.server.services.workflow.WorkflowExecutorService; +import org.apache.slider.server.services.workflow.WorkflowRpcService; +import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This is the AM, which directly implements the callbacks from the AM and NM + */ +public class SliderAppMaster extends AbstractSliderLaunchedService + implements AMRMClientAsync.CallbackHandler, + NMClientAsync.CallbackHandler, + RunService, + SliderExitCodes, + SliderKeys, + ServiceStateChangeListener, + RoleKeys, + ProviderCompleted, + AppMasterActionOperations { + + protected static final Logger log = + LoggerFactory.getLogger(SliderAppMaster.class); + + /** + * log for YARN events + */ + protected static final Logger LOG_YARN = log; + + public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster"; + public static final String SERVICE_CLASSNAME = + "org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT; + + public static final int HEARTBEAT_INTERVAL = 1000; + public static final int NUM_RPC_HANDLERS = 5; + + /** + * Metrics and monitoring services. + * Deployed in {@link #serviceInit(Configuration)} + */ + private final MetricsAndMonitoring metricsAndMonitoring = new MetricsAndMonitoring(); + + /** + * metrics registry + */ + public MetricRegistry metrics; + + /** Error string on chaos monkey launch failure action: {@value} */ + public static final String E_TRIGGERED_LAUNCH_FAILURE = + "Chaos monkey triggered launch failure"; + + /** YARN RPC to communicate with the Resource Manager or Node Manager */ + private YarnRPC yarnRPC; + + /** Handle to communicate with the Resource Manager*/ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private AMRMClientAsync asyncRMClient; + + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private RMOperationHandler rmOperationHandler; + + private RMOperationHandler providerRMOperationHandler; + + /** Handle to communicate with the Node Manager*/ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + public NMClientAsync nmClientAsync; + + /** + * Credentials for propagating down to launched containers + */ + private Credentials containerCredentials; + + /** + * Slider IPC: Real service handler + */ + private SliderIPCService sliderIPCService; + /** + * Slider IPC: binding + */ + private WorkflowRpcService rpcService; + + /** + * Secret manager + */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private ClientToAMTokenSecretManager secretManager; + + /** Hostname of the container*/ + private String appMasterHostname = ""; + /* Port on which the app master listens for status updates from clients*/ + private int appMasterRpcPort = 0; + /** Tracking url to which app master publishes info for clients to monitor*/ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private String appMasterTrackingUrl = ""; + + /** Proxied app master URL (as retrieved from AM report at launch time) */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private String appMasterProxiedUrl = ""; + + /** Application Attempt Id ( combination of attemptId and fail count )*/ + private ApplicationAttemptId appAttemptID; + + /** + * App ACLs + */ + protected Map<ApplicationAccessType, String> applicationACLs; + + /** + * Ongoing state of the cluster: containers, nodes they + * live on, etc. + */ + private final AppState appState = + new AppState(new ProtobufClusterServices(), metricsAndMonitoring); + + /** + * App state for external objects. This is almost entirely + * a read-only view of the application state. To change the state, + * Providers (or anything else) are expected to queue async changes. + */ + private final ProviderAppState stateForProviders = + new ProviderAppState("undefined", appState); + + /** + * model the state using locks and conditions + */ + private final ReentrantLock AMExecutionStateLock = new ReentrantLock(); + private final Condition isAMCompleted = AMExecutionStateLock.newCondition(); + + /** + * Flag set if the AM is to be shutdown + */ + private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false); + + /** + * Flag set during the init process + */ + private final AtomicBoolean initCompleted = new AtomicBoolean(false); + + /** + * Flag to set if the process exit code was set before shutdown started + */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private boolean spawnedProcessExitedBeforeShutdownTriggered; + + + /** Arguments passed in : raw*/ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private SliderAMArgs serviceArgs; + + /** + * ID of the AM container + */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private ContainerId appMasterContainerID; + + /** + * Monkey Service -may be null + */ + private ChaosMonkeyService monkey; + + /** + * ProviderService of this cluster + */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private ProviderService providerService; + + /** + * The YARN registry service + */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private RegistryOperations registryOperations; + + /** + * The stop request received...the exit details are extracted + * from this + */ + private volatile ActionStopSlider stopAction; + + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private RoleLaunchService launchService; + + //username -null if it is not known/not to be set + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private String hadoop_user_name; + private String service_user_name; + + private SliderAMWebApp webApp; + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private InetSocketAddress rpcServiceAddress; + private SliderAMProviderService sliderAMProvider; + private CertificateManager certificateManager; + + /** + * Executor. + * Assigned in {@link #serviceInit(Configuration)} + */ + private WorkflowExecutorService<ExecutorService> executorService; + + /** + * Action queues. Created at instance creation, but + * added as a child and inited in {@link #serviceInit(Configuration)} + */ + private final QueueService actionQueues = new QueueService(); + private String agentOpsUrl; + private String agentStatusUrl; + private YarnRegistryViewForProviders yarnRegistryOperations; + //private FsDelegationTokenManager fsDelegationTokenManager; + private RegisterApplicationMasterResponse amRegistrationData; + private PortScanner portScanner; + private SecurityConfiguration securityConfiguration; + + /** + * Is security enabled? + * Set early on in the {@link #createAndRunCluster(String)} operation. + */ + private boolean securityEnabled; + private ContentCache contentCache; + + /** + * resource limits + */ + private Resource maximumResourceCapability; + + /** + * Service Constructor + */ + public SliderAppMaster() { + super(SERVICE_CLASSNAME_SHORT); + new HdfsConfiguration(); + new YarnConfiguration(); + } + +/* =================================================================== */ +/* service lifecycle methods */ +/* =================================================================== */ + + @Override //AbstractService + public synchronized void serviceInit(Configuration conf) throws Exception { + // slider client if found + + Configuration customConf = SliderUtils.loadSliderClientXML(); + // Load in the server configuration - if it is actually on the Classpath + URL serverXmlUrl = ConfigHelper.getResourceUrl(SLIDER_SERVER_XML); + if (serverXmlUrl != null) { + log.info("Loading {} at {}", SLIDER_SERVER_XML, serverXmlUrl); + Configuration serverConf = ConfigHelper.loadFromResource(SLIDER_SERVER_XML); + ConfigHelper.mergeConfigurations(customConf, serverConf, + SLIDER_SERVER_XML, true); + } + serviceArgs.applyDefinitions(customConf); + serviceArgs.applyFileSystemBinding(customConf); + // conf now contains all customizations + + AbstractActionArgs action = serviceArgs.getCoreAction(); + SliderAMCreateAction createAction = (SliderAMCreateAction) action; + + // sort out the location of the AM + String rmAddress = createAction.getRmAddress(); + if (rmAddress != null) { + log.debug("Setting RM address from the command line: {}", rmAddress); + SliderUtils.setRmSchedulerAddress(customConf, rmAddress); + } + + log.info("AM configuration:\n{}", + ConfigHelper.dumpConfigToString(customConf)); + for (Map.Entry<String, String> envs : System.getenv().entrySet()) { + log.info("System env {}={}", envs.getKey(), envs.getValue()); + } + + ConfigHelper.mergeConfigurations(conf, customConf, SLIDER_CLIENT_XML, true); + //init security with our conf + if (SliderUtils.isHadoopClusterSecure(conf)) { + log.info("Secure mode with kerberos realm {}", + SliderUtils.getKerberosRealm()); + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + log.debug("Authenticating as {}", ugi); + SliderUtils.verifyPrincipalSet(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); + } else { + log.info("Cluster is insecure"); + } + log.info("Login user is {}", UserGroupInformation.getLoginUser()); + + //look at settings of Hadoop Auth, to pick up a problem seen once + checkAndWarnForAuthTokenProblems(); + + // validate server env + boolean dependencyChecks = + !conf.getBoolean(KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED, + false); + SliderUtils.validateSliderServerEnvironment(log, dependencyChecks); + + // create and register monitoring services + addService(metricsAndMonitoring); + metrics = metricsAndMonitoring.getMetrics(); +/* TODO: turn these one once the metrics testing is more under control + metrics.registerAll(new ThreadStatesGaugeSet()); + metrics.registerAll(new MemoryUsageGaugeSet()); + metrics.registerAll(new GarbageCollectorMetricSet()); + +*/ + contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders); + + executorService = new WorkflowExecutorService<>("AmExecutor", + Executors.newFixedThreadPool(2, + new ServiceThreadFactory("AmExecutor", true))); + addService(executorService); + + addService(actionQueues); + + //init all child services + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + HealthCheckRegistry health = metricsAndMonitoring.getHealth(); + health.register("AM Health", new YarnServiceHealthCheck(this)); + } + + /** + * Start the queue processing + */ + private void startQueueProcessing() { + log.info("Queue Processing started"); + executorService.execute(actionQueues); + executorService.execute(new QueueExecutor(this, actionQueues)); + } + +/* =================================================================== */ +/* RunService methods called from ServiceLauncher */ +/* =================================================================== */ + + /** + * pick up the args from the service launcher + * @param config configuration + * @param args argument list + */ + @Override // RunService + public Configuration bindArgs(Configuration config, String... args) throws Exception { + // let the superclass process it + Configuration superConf = super.bindArgs(config, args); + // add the slider XML config + ConfigHelper.injectSliderXMLResource(); + + //yarn-ify + YarnConfiguration yarnConfiguration = new YarnConfiguration( + superConf); + serviceArgs = new SliderAMArgs(args); + serviceArgs.parse(); + + return SliderUtils.patchConfiguration(yarnConfiguration); + } + + + /** + * this is called by service launcher; when it returns the application finishes + * @return the exit code to return by the app + * @throws Throwable + */ + @Override + public int runService() throws Throwable { + SliderVersionInfo.loadAndPrintVersionInfo(log); + + //dump the system properties if in debug mode + if (log.isDebugEnabled()) { + log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties())); + } + + //choose the action + String action = serviceArgs.getAction(); + List<String> actionArgs = serviceArgs.getActionArgs(); + int exitCode; + switch (action) { + case SliderActions.ACTION_HELP: + log.info("{}: {}", getName(), serviceArgs.usage()); + exitCode = SliderExitCodes.EXIT_USAGE; + break; + case SliderActions.ACTION_CREATE: + exitCode = createAndRunCluster(actionArgs.get(0)); + break; + default: + throw new SliderException("Unimplemented: " + action); + } + log.info("Exiting AM; final exit code = {}", exitCode); + return exitCode; + } + + /** + * Initialize a newly created service then add it. + * Because the service is not started, this MUST be done before + * the AM itself starts, or it is explicitly added after + * @param service the service to init + */ + public Service initAndAddService(Service service) { + service.init(getConfig()); + addService(service); + return service; + } + + /* =================================================================== */ + + /** + * Create and run the cluster. + * @param clustername cluster name + * @return exit code + * @throws Throwable on a failure + */ + private int createAndRunCluster(String clustername) throws Throwable { + + //load the cluster description from the cd argument + String sliderClusterDir = serviceArgs.getSliderClusterURI(); + URI sliderClusterURI = new URI(sliderClusterDir); + Path clusterDirPath = new Path(sliderClusterURI); + log.info("Application defined at {}", sliderClusterURI); + SliderFileSystem fs = getClusterFS(); + + // build up information about the running application -this + // will be passed down to the cluster status + MapOperations appInformation = new MapOperations(); + + AggregateConf instanceDefinition = + InstanceIO.loadInstanceDefinitionUnresolved(fs, clusterDirPath); + instanceDefinition.setName(clustername); + + log.info("Deploying cluster {}:", instanceDefinition); + + // and resolve it + AggregateConf resolvedInstance = new AggregateConf( instanceDefinition); + resolvedInstance.resolve(); + + stateForProviders.setApplicationName(clustername); + + Configuration serviceConf = getConfig(); + + // extend AM configuration with component resource + MapOperations amConfiguration = resolvedInstance + .getAppConfOperations().getComponent(COMPONENT_AM); + // and patch configuration with prefix + if (amConfiguration != null) { + Map<String, String> sliderAppConfKeys = amConfiguration.prefixedWith("slider."); + for (Map.Entry<String, String> entry : sliderAppConfKeys.entrySet()) { + String k = entry.getKey(); + String v = entry.getValue(); + boolean exists = serviceConf.get(k) != null; + log.info("{} {} to {}", (exists ? "Overwriting" : "Setting"), k, v); + serviceConf.set(k, v); + } + } + + securityConfiguration = new SecurityConfiguration(serviceConf, resolvedInstance, clustername); + // obtain security state + securityEnabled = securityConfiguration.isSecurityEnabled(); + // set the global security flag for the instance definition + instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled); + + // triggers resolution and snapshotting for agent + appState.setInitialInstanceDefinition(instanceDefinition); + + File confDir = getLocalConfDir(); + if (!confDir.exists() || !confDir.isDirectory()) { + log.info("Conf dir {} does not exist.", confDir); + File parentFile = confDir.getParentFile(); + log.info("Parent dir {}:\n{}", parentFile, SliderUtils.listDir(parentFile)); + } + + //get our provider + MapOperations globalInternalOptions = getGlobalInternalOptions(); + String providerType = globalInternalOptions.getMandatoryOption( + InternalKeys.INTERNAL_PROVIDER_NAME); + log.info("Cluster provider type is {}", providerType); + SliderProviderFactory factory = + SliderProviderFactory.createSliderProviderFactory(providerType); + providerService = factory.createServerProvider(); + // init the provider BUT DO NOT START IT YET + initAndAddService(providerService); + providerRMOperationHandler = new ProviderNotifyingOperationHandler(providerService); + + // create a slider AM provider + sliderAMProvider = new SliderAMProviderService(); + initAndAddService(sliderAMProvider); + + InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf); + log.info("RM is at {}", rmSchedulerAddress); + yarnRPC = YarnRPC.create(serviceConf); + + // set up the YARN client. This may require patching in the RM client-API address if it + // is (somehow) unset server-side. String clientRMaddr = serviceConf.get(YarnConfiguration.RM_ADDRESS); + InetSocketAddress clientRpcAddress = SliderUtils.getRmAddress(serviceConf); + if (!SliderUtils.isAddressDefined(clientRpcAddress)) { + // client addr is being unset. We can lift it from the other RM APIs + log.warn("Yarn RM address was unbound; attempting to fix up"); + serviceConf.set(YarnConfiguration.RM_ADDRESS, + String.format("%s:%d", rmSchedulerAddress.getHostString(), clientRpcAddress.getPort() )); + } + + /* + * Extract the container ID. This is then + * turned into an (incomplete) container + */ + appMasterContainerID = ConverterUtils.toContainerId( + SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name())); + appAttemptID = appMasterContainerID.getApplicationAttemptId(); + + ApplicationId appid = appAttemptID.getApplicationId(); + log.info("AM for ID {}", appid.getId()); + + appInformation.put(StatusKeys.INFO_AM_CONTAINER_ID, appMasterContainerID.toString()); + appInformation.put(StatusKeys.INFO_AM_APP_ID, appid.toString()); + appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString()); + + Map<String, String> envVars; + List<Container> liveContainers; + + /* + * It is critical this section is synchronized, to stop async AM events + * arriving while registering a restarting AM. + */ + synchronized (appState) { + int heartbeatInterval = HEARTBEAT_INTERVAL; + + // add the RM client -this brings the callbacks in + asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval, this); + addService(asyncRMClient); + //now bring it up + deployChildService(asyncRMClient); + + + // nmclient relays callbacks back to this class + nmClientAsync = new NMClientAsyncImpl("nmclient", this); + deployChildService(nmClientAsync); + + // set up secret manager + secretManager = new ClientToAMTokenSecretManager(appAttemptID, null); + + if (securityEnabled) { + // fix up the ACLs if they are not set + String acls = serviceConf.get(KEY_PROTOCOL_ACL); + if (acls == null) { + getConfig().set(KEY_PROTOCOL_ACL, "*"); + } + } + + certificateManager = new CertificateManager(); + + //bring up the Slider RPC service + buildPortScanner(instanceDefinition); + startSliderRPCServer(instanceDefinition); + + rpcServiceAddress = rpcService.getConnectAddress(); + appMasterHostname = rpcServiceAddress.getAddress().getCanonicalHostName(); + appMasterRpcPort = rpcServiceAddress.getPort(); + appMasterTrackingUrl = null; + log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort); + appInformation.put(StatusKeys.INFO_AM_HOSTNAME, appMasterHostname); + appInformation.set(StatusKeys.INFO_AM_RPC_PORT, appMasterRpcPort); + + log.info("Starting Yarn registry"); + registryOperations = startRegistryOperationsService(); + log.info(registryOperations.toString()); + + //build the role map + List<ProviderRole> providerRoles = new ArrayList<>(providerService.getRoles()); + providerRoles.addAll(SliderAMClientProvider.ROLES); + + // Start up the WebApp and track the URL for it + MapOperations component = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM); + certificateManager.initialize(component, appMasterHostname, + appMasterContainerID.toString(), + clustername); + certificateManager.setPassphrase(instanceDefinition.getPassphrase()); + + if (component.getOptionBool( + AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) { + uploadServerCertForLocalization(clustername, fs); + } + + // Web service endpoints: initialize + WebAppApiImpl webAppApi = + new WebAppApiImpl( + stateForProviders, + providerService, + certificateManager, + registryOperations, + metricsAndMonitoring, + actionQueues, + this, + contentCache); + initAMFilterOptions(serviceConf); + + // start the agent web app + startAgentWebApp(appInformation, serviceConf, webAppApi); + int webAppPort = deployWebApplication(webAppApi); + + String scheme = WebAppUtils.HTTP_PREFIX; + appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort; + + appInformation.put(StatusKeys.INFO_AM_WEB_URL, appMasterTrackingUrl + "/"); + appInformation.set(StatusKeys.INFO_AM_WEB_PORT, webAppPort); + + // ***************************************************** + // Register self with ResourceManager + // This will start heartbeating to the RM + // address = SliderUtils.getRmSchedulerAddress(asyncRMClient.getConfig()); + // ***************************************************** + log.info("Connecting to RM at {}; AM tracking URL={}", + appMasterRpcPort, appMasterTrackingUrl); + amRegistrationData = asyncRMClient.registerApplicationMaster(appMasterHostname, + appMasterRpcPort, + appMasterTrackingUrl); + maximumResourceCapability = amRegistrationData.getMaximumResourceCapability(); + + int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + // validate scheduler vcores allocation setting + int minCores = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + int maxMemory = maximumResourceCapability.getMemory(); + int maxCores = maximumResourceCapability.getVirtualCores(); + appState.setContainerLimits(minMemory,maxMemory, minCores, maxCores ); + + // build the handler for RM request/release operations; this uses + // the max value as part of its lookup + rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability); + + // set the RM-defined maximum cluster values + appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(maxCores)); + appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(maxMemory)); + + processAMCredentials(securityConfiguration); + + if (securityEnabled) { + secretManager.setMasterKey( + amRegistrationData.getClientToAMTokenMasterKey().array()); + applicationACLs = amRegistrationData.getApplicationACLs(); + + //tell the server what the ACLs are + rpcService.getServer().refreshServiceAcl(serviceConf, + new SliderAMPolicyProvider()); + if (securityConfiguration.isKeytabProvided()) { + // perform keytab based login to establish kerberos authenticated + // principal. Can do so now since AM registration with RM above required + // tokens associated to principal + String principal = securityConfiguration.getPrincipal(); + File localKeytabFile = securityConfiguration.getKeytabFile(instanceDefinition); + // Now log in... + login(principal, localKeytabFile); + // obtain new FS reference that should be kerberos based and different + // than the previously cached reference + fs = new SliderFileSystem(serviceConf); + } + } + + // YARN client. + // Important: this is only valid at startup, and must be executed within + // the right UGI context. Use with care. + SliderYarnClientImpl yarnClient = null; + List<NodeReport> nodeReports; + try { + yarnClient = new SliderYarnClientImpl(); + yarnClient.init(getConfig()); + yarnClient.start(); + nodeReports = getNodeReports(yarnClient); + log.info("Yarn node report count: {}", nodeReports.size()); + // look up the application itself -this is needed to get the proxied + // URL of the AM, for registering endpoints. + // this call must be made after the AM has registered itself, obviously + ApplicationAttemptReport report = getApplicationAttemptReport(yarnClient); + appMasterProxiedUrl = report.getTrackingUrl(); + if (SliderUtils.isUnset(appMasterProxiedUrl)) { + log.warn("Proxied URL is not set in application report"); + appMasterProxiedUrl = appMasterTrackingUrl; + } + } finally { + // at this point yarnClient is no longer needed. + // stop it immediately + ServiceOperations.stop(yarnClient); + yarnClient = null; + } + + // extract container list + + liveContainers = amRegistrationData.getContainersFromPreviousAttempts(); + + //now validate the installation + Configuration providerConf = + providerService.loadProviderConfigurationInformation(confDir); + + providerService.initializeApplicationConfiguration(instanceDefinition, fs); + + providerService.validateApplicationConfiguration(instanceDefinition, + confDir, + securityEnabled); + + //determine the location for the role history data + Path historyDir = new Path(clusterDirPath, HISTORY_DIR_NAME); + + //build the instance + AppStateBindingInfo binding = new AppStateBindingInfo(); + binding.instanceDefinition = instanceDefinition; + binding.serviceConfig = serviceConf; + binding.publishedProviderConf = providerConf; + binding.roles = providerRoles; + binding.fs = fs.getFileSystem(); + binding.historyPath = historyDir; + binding.liveContainers = liveContainers; + binding.applicationInfo = appInformation; + binding.releaseSelector = providerService.createContainerReleaseSelector(); + binding.nodeReports = nodeReports; + appState.buildInstance(binding); + + providerService.rebuildContainerDetails(liveContainers, + instanceDefinition.getName(), appState.getRolePriorityMap()); + + // add the AM to the list of nodes in the cluster + + appState.buildAppMasterNode(appMasterContainerID, + appMasterHostname, + webAppPort, + appMasterHostname + ":" + webAppPort); + + // build up environment variables that the AM wants set in every container + // irrespective of provider and role. + envVars = new HashMap<>(); + if (hadoop_user_name != null) { + envVars.put(HADOOP_USER_NAME, hadoop_user_name); + } + String debug_kerberos = System.getenv(HADOOP_JAAS_DEBUG); + if (debug_kerberos != null) { + envVars.put(HADOOP_JAAS_DEBUG, debug_kerberos); + } + } + String rolesTmpSubdir = appMasterContainerID.toString() + "/roles"; + + String amTmpDir = globalInternalOptions.getMandatoryOption(InternalKeys.INTERNAL_AM_TMP_DIR); + + Path tmpDirPath = new Path(amTmpDir); + Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir); + fs.getFileSystem().mkdirs(launcherTmpDirPath); + + //launcher service + launchService = new RoleLaunchService(actionQueues, + providerService, + fs, + new Path(getGeneratedConfDir()), + envVars, + launcherTmpDirPath); + + deployChildService(launchService); + + appState.noteAMLaunched(); + + + //Give the provider access to the state, and AM + providerService.bind(stateForProviders, actionQueues, liveContainers); + sliderAMProvider.bind(stateForProviders, actionQueues, liveContainers); + + // chaos monkey + maybeStartMonkey(); + + // setup token renewal and expiry handling for long lived apps +// if (!securityConfiguration.isKeytabProvided() && +// SliderUtils.isHadoopClusterSecure(getConfig())) { +// fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues); +// fsDelegationTokenManager.acquireDelegationToken(getConfig()); +// } + + // if not a secure cluster, extract the username -it will be + // propagated to workers + if (!UserGroupInformation.isSecurityEnabled()) { + hadoop_user_name = System.getenv(HADOOP_USER_NAME); + log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name); + } + service_user_name = RegistryUtils.currentUser(); + log.info("Registry service username ={}", service_user_name); + + + // declare the cluster initialized + log.info("Application Master Initialization Completed"); + initCompleted.set(true); + + scheduleFailureWindowResets(instanceDefinition.getResources()); + scheduleEscalation(instanceDefinition.getInternal()); + + try { + // schedule YARN Registry registration + queue(new ActionRegisterServiceInstance(clustername, appid)); + + // log the YARN and web UIs + log.info("RM Webapp address {}", + serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS)); + log.info("Slider webapp address {} proxied at {}", + appMasterTrackingUrl, appMasterProxiedUrl); + + // Start the Slider AM provider + sliderAMProvider.start(); + + // launch the real provider; this is expected to trigger a callback that + // starts the node review process + launchProviderService(instanceDefinition, confDir); + + // start handling any scheduled events + + startQueueProcessing(); + + //now block waiting to be told to exit the process + waitForAMCompletionSignal(); + } catch(Exception e) { + log.error("Exception : {}", e, e); + // call the AM stop command as if it had been queued (but without + // going via the queue, which may not have started + onAMStop(new ActionStopSlider(e)); + } + //shutdown time + return finish(); + } + + /** + * Get the YARN application Attempt report as the logged in user + * @param yarnClient client to the RM + * @return the application report + * @throws YarnException + * @throws IOException + * @throws InterruptedException + */ + private ApplicationAttemptReport getApplicationAttemptReport( + final SliderYarnClientImpl yarnClient) + throws YarnException, IOException, InterruptedException { + Preconditions.checkNotNull(yarnClient, "Null Yarn client"); + ApplicationAttemptReport report; + if (securityEnabled) { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + report = ugi.doAs(new PrivilegedExceptionAction<ApplicationAttemptReport>() { + @Override + public ApplicationAttemptReport run() throws Exception { + return yarnClient.getApplicationAttemptReport(appAttemptID); + } + }); + } else { + report = yarnClient.getApplicationAttemptReport(appAttemptID); + } + return report; + } + + /** + * List the node reports: uses {@link SliderYarnClientImpl} as the login user + * @param yarnClient client to the RM + * @return the node reports + * @throws IOException + * @throws YarnException + * @throws InterruptedException + */ + private List<NodeReport> getNodeReports(final SliderYarnClientImpl yarnClient) + throws IOException, YarnException, InterruptedException { + Preconditions.checkNotNull(yarnClient, "Null Yarn client"); + List<NodeReport> nodeReports; + if (securityEnabled) { + nodeReports = UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction<List<NodeReport>>() { + @Override + public List<NodeReport> run() throws Exception { + return yarnClient.getNodeReports(NodeState.RUNNING); + } + }); + } else { + nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + } + log.info("Yarn node report count: {}", nodeReports.size()); + return nodeReports; + } + + /** + * Deploy the web application. + * <p> + * Creates and starts the web application, and adds a + * <code>WebAppService</code> service under the AM, to ensure + * a managed web application shutdown. + * @param webAppApi web app API instance + * @return port the web application is deployed on + * @throws IOException general problems starting the webapp (network, etc) + * @throws WebAppException other issues + */ + private int deployWebApplication(WebAppApiImpl webAppApi) + throws IOException, SliderException { + + try { + webApp = new SliderAMWebApp(webAppApi); + HttpConfig.Policy policy = HttpConfig.Policy.HTTP_ONLY; + int port = getPortToRequest(); + log.info("Launching web application at port {} with policy {}", port, policy); + + WebApps.$for(SliderAMWebApp.BASE_PATH, + WebAppApi.class, + webAppApi, + RestPaths.WS_CONTEXT) + .withHttpPolicy(getConfig(), policy) + .at("0.0.0.0", port, true) + .inDevMode() + .start(webApp); + + WebAppService<SliderAMWebApp> webAppService = + new WebAppService<>("slider", webApp); + + deployChildService(webAppService); + return webApp.port(); + } catch (WebAppException e) { + if (e.getCause() instanceof IOException) { + throw (IOException)e.getCause(); + } else { + throw e; + } + } + } + + /** + * Process the initial user to obtain the set of user + * supplied credentials (tokens were passed in by client). + * Removes the AM/RM token. + * If a keytab has been provided, also strip the HDFS delegation token. + * @param securityConfig slider security config + * @throws IOException + */ + private void processAMCredentials(SecurityConfiguration securityConfig) + throws IOException { + + List<Text> filteredTokens = new ArrayList<>(3); + filteredTokens.add(AMRMTokenIdentifier.KIND_NAME); + filteredTokens.add(TimelineDelegationTokenIdentifier.KIND_NAME); + + boolean keytabProvided = securityConfig.isKeytabProvided(); + log.info("Slider AM Security Mode: {}", keytabProvided ? "KEYTAB" : "TOKEN"); + if (keytabProvided) { + filteredTokens.add(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + } + containerCredentials = CredentialUtils.filterTokens( + UserGroupInformation.getCurrentUser().getCredentials(), + filteredTokens); + log.info(CredentialUtils.dumpTokens(containerCredentials, "\n")); + } + + /** + * Build up the port scanner. This may include setting a port range. + */ + private void buildPortScanner(AggregateConf instanceDefinition) + throws BadConfigException { + portScanner = new PortScanner(); + String portRange = instanceDefinition. + getAppConfOperations().getGlobalOptions(). + getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0"); + if (!"0".equals(portRange)) { + portScanner.setPortRange(portRange); + } + } + + /** + * Locate a port to request for a service such as RPC or web/REST. + * This uses port range definitions in the <code>instanceDefinition</code> + * to fix the port range âif one is set. + * <p> + * The port returned is available at the time of the request; there are + * no guarantees as to how long that situation will last. + * @return the port to request. + * @throws SliderException + */ + private int getPortToRequest() throws SliderException, IOException { + return portScanner.getAvailablePort(); + } + + private void uploadServerCertForLocalization(String clustername, + SliderFileSystem fs) + throws IOException { + Path certsDir = fs.buildClusterSecurityDirPath(clustername); + if (!fs.getFileSystem().exists(certsDir)) { + fs.getFileSystem().mkdirs(certsDir, + new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); + } + Path destPath = new Path(certsDir, SliderKeys.CRT_FILE_NAME); + if (!fs.getFileSystem().exists(destPath)) { + fs.getFileSystem().copyFromLocalFile( + new Path(CertificateManager.getServerCertficateFilePath().getAbsolutePath()), + destPath); + log.info("Uploaded server cert to localization path {}", destPath); + } + + fs.getFileSystem().setPermission(destPath, + new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE)); + } + + protected void login(String principal, File localKeytabFile) + throws IOException, SliderException { + log.info("Logging in as {} with keytab {}", principal, localKeytabFile); + UserGroupInformation.loginUserFromKeytab(principal, + localKeytabFile.getAbsolutePath()); + validateLoginUser(UserGroupInformation.getLoginUser()); + } + + /** + * Ensure that the user is generated from a keytab and has no HDFS delegation + * tokens. + * + * @param user user to validate + * @throws SliderException + */ + protected void validateLoginUser(UserGroupInformation user) + throws SliderException { + if (!user.isFromKeytab()) { + log.error("User is not holding on a keytab in a secure deployment:" + + " slider will fail as tokens expire"); + } + Credentials credentials = user.getCredentials(); + Iterator<Token<? extends TokenIdentifier>> iter = + credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token<? extends TokenIdentifier> token = iter.next(); + log.info("Token {}", token.getKind()); + if (token.getKind().equals( + DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) { + log.info("HDFS delegation token {}. Removing...", token); + iter.remove(); + } + } + } + + /** + * Set up and start the agent web application + * @param appInformation application information + * @param serviceConf service configuration + * @param webAppApi web app API instance to bind to + * @throws IOException + */ + private void startAgentWebApp(MapOperations appInformation, + Configuration serviceConf, WebAppApiImpl webAppApi) throws IOException, SliderException { + URL[] urls = ((URLClassLoader) AgentWebApp.class.getClassLoader() ).getURLs(); + StringBuilder sb = new StringBuilder("AM classpath:"); + for (URL url : urls) { + sb.append("\n").append(url.toString()); + } + LOG_YARN.debug(sb.append("\n").toString()); + initAMFilterOptions(serviceConf); + + + // Start up the agent web app and track the URL for it + MapOperations appMasterConfig = getInstanceDefinition() + .getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM); + AgentWebApp agentWebApp = AgentWebApp.$for(AgentWebApp.BASE_PATH, + webAppApi, + RestPaths.AGENT_WS_CONTEXT) + .withComponentConfig(appMasterConfig) + .withPort(getPortToRequest()) + .withSecuredPort(getPortToRequest()) + .start(); + agentOpsUrl = + "https://" + appMasterHostname + ":" + agentWebApp.getSecuredPort(); + agentStatusUrl = + "https://" + appMasterHostname + ":" + agentWebApp.getPort(); + AgentService agentService = + new AgentService("slider-agent", agentWebApp); + + agentService.init(serviceConf); + agentService.start(); + addService(agentService); + + appInformation.put(StatusKeys.INFO_AM_AGENT_OPS_URL, agentOpsUrl + "/"); + appInformation.put(StatusKeys.INFO_AM_AGENT_STATUS_URL, agentStatusUrl + "/"); + appInformation.set(StatusKeys.INFO_AM_AGENT_STATUS_PORT, + agentWebApp.getPort()); + appInformation.set(StatusKeys.INFO_AM_AGENT_OPS_PORT, + agentWebApp.getSecuredPort()); + } + + /** + * Set up the AM filter + * @param serviceConf configuration to patch + */ + private void initAMFilterOptions(Configuration serviceConf) { + // IP filtering + String amFilterName = AM_FILTER_NAME; + + // This is here until YARN supports proxy & redirect operations + // on verbs other than GET, and is only supported for testing + if (X_DEV_INSECURE_REQUIRED && serviceConf.getBoolean(X_DEV_INSECURE_WS, + X_DEV_INSECURE_DEFAULT)) { + log.warn("Insecure filter enabled: REST operations are unauthenticated"); + amFilterName = InsecureAmFilterInitializer.NAME; + } + + serviceConf.set(HADOOP_HTTP_FILTER_INITIALIZERS, amFilterName); + } + + /** + * This registers the service instance and its external values + * @param instanceName name of this instance + * @param appId application ID + * @throws IOException + */ + public void registerServiceInstance(String instanceName, + ApplicationId appId) throws IOException { + + + // the registry is running, so register services + URL amWebURI = new URL(appMasterProxiedUrl); + URL agentOpsURI = new URL(agentOpsUrl); + URL agentStatusURI = new URL(agentStatusUrl); + + //Give the provider restricted access to the state, registry + setupInitialRegistryPaths(); + yarnRegistryOperations = new YarnRegistryViewForProviders( + registryOperations, + service_user_name, + SliderKeys.APP_TYPE, + instanceName, + appAttemptID); + providerService.bindToYarnRegistry(yarnRegistryOperations); + sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations); + + // Yarn registry + ServiceRecord serviceRecord = new ServiceRecord(); + serviceRecord.set(YarnRegistryAttributes.YARN_ID, appId.toString()); + serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE, + PersistencePolicies.APPLICATION); + serviceRecord.description = "Slider Application Master"; + + serviceRecord.addExternalEndpoint( + RegistryTypeUtils.ipcEndpoint( + CustomRegistryConstants.AM_IPC_PROTOCOL, + rpcServiceAddress)); + + // internal services + sliderAMProvider.applyInitialRegistryDefinitions(amWebURI, + agentOpsURI, + agentStatusURI, + serviceRecord); + + // provider service dynamic definitions. + providerService.applyInitialRegistryDefinitions(amWebURI, + agentOpsURI, + agentStatusURI, + serviceRecord); + + // set any provided attributes + setProvidedServiceRecordAttributes( + getInstanceDefinition().getAppConfOperations().getComponent( + SliderKeys.COMPONENT_AM), serviceRecord); + + // register the service's entry + log.info("Service Record \n{}", serviceRecord); + yarnRegistryOperations.registerSelf(serviceRecord, true); + log.info("Registered service under {}; absolute path {}", + yarnRegistryOperations.getSelfRegistrationPath(), + yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); + + boolean isFirstAttempt = 1 == appAttemptID.getAttemptId(); + // delete the children in case there are any and this is an AM startup. + // just to make sure everything underneath is purged + if (isFirstAttempt) { + yarnRegistryOperations.deleteChildren( + yarnRegistryOperations.getSelfRegistrationPath(), + true); + } + } + + /** + * TODO: purge this once RM is doing the work + * @throws IOException + */ + protected void setupInitialRegistryPaths() throws IOException { + if (registryOperations instanceof RMRegistryOperationsService) { + RMRegistryOperationsService rmRegOperations = + (RMRegistryOperationsService) registryOperations; + rmRegOperations.initUserRegistryAsync(service_user_name); + } + } + + /** + * Handler for {@link RegisterComponentInstance action} + * Register/re-register an ephemeral container that is already in the app state + * @param id the component + * @param description component description + * @param type component type + * @return true if the component is registered + */ + public boolean registerComponent(ContainerId id, String description, + String type) throws IOException { + RoleInstance instance = appState.getOwnedContainer(id); + if (instance == null) { + return false; + } + // this is where component registrations go + log.info("Registering component {}", id); + String cid = RegistryPathUtils.encodeYarnID(id.toString()); + ServiceRecord container = new ServiceRecord(); + container.set(YarnRegistryAttributes.YARN_ID, cid); + container.description = description; + container.set(YarnRegistryAttributes.YARN_PERSISTENCE, + PersistencePolicies.CONTAINER); + MapOperations compOps = getInstanceDefinition().getAppConfOperations(). + getComponent(type); + setProvidedServiceRecordAttributes(compOps, container); + try { + yarnRegistryOperations.putComponent(cid, container); + } catch (IOException e) { + log.warn("Failed to register container {}/{}: {}", + id, description, e, e); + return false; + } + return true; + } + + protected void setProvidedServiceRecordAttributes(MapOperations ops, + ServiceRecord record) { + String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX; + for (Map.Entry<String, String> entry : ops.entrySet()) { + if (entry.getKey().startsWith( + prefix)) { + String key = entry.getKey().substring( + prefix.length() + 1); + record.set(key, entry.getValue().trim()); + } + } + } + + /** + * Handler for {@link UnregisterComponentInstance} + * + * unregister a component. At the time this message is received, + * the component may not have been registered + * @param id the component + */ + public void unregisterComponent(ContainerId id) { + log.info("Unregistering component {}", id); + if (yarnRegistryOperations == null) { + log.warn("Processing unregister component event before initialization " + + "completed; init flag ={}", initCompleted); + return; + } + String cid = RegistryPathUtils.encodeYarnID(id.toString()); + try { + yarnRegistryOperations.deleteComponent(cid); + } catch (IOException e) { + log.warn("Failed to delete container {} : {}", id, e, e); + } + } + + /** + * looks for a specific case where a token file is provided as an environment + * variable, yet the file is not there. + * + * This surfaced (once) in HBase, where its HDFS library was looking for this, + * and somehow the token was missing. This is a check in the AM so that + * if the problem re-occurs, the AM can fail with a more meaningful message. + * + */ + private void checkAndWarnForAuthTokenProblems() { + String fileLocation = + System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + if (fileLocation != null) { + File tokenFile = new File(fileLocation); + if (!tokenFile.exists()) { + log.warn("Token file {} specified in {} not found", tokenFile, + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + } + } + } + + /** + * Build the configuration directory passed in or of the target FS + * @return the file + */ + public File getLocalConfDir() { + File confdir = + new File(SliderKeys.PROPAGATED_CONF_DIR_NAME).getAbsoluteFile(); + return confdir; + } + + /** + * Get the path to the DFS configuration that is defined in the cluster specification + * @return the generated configuration dir + */ + public String getGeneratedConfDir() { + return getGlobalInternalOptions().get( + InternalKeys.INTERNAL_GENERATED_CONF_PATH); + } + + /** + * Get the global internal options for the AM + * @return a map to access the internals + */ + public MapOperations getGlobalInternalOptions() { + return getInstanceDefinition() + .getInternalOperations(). + getGlobalOptions(); + } + + /** + * Get the filesystem of this cluster + * @return the FS of the config + */ + public SliderFileSystem getClusterFS() throws IOException { + return new SliderFileSystem(getConfig()); + } + + /** + * Get the AM log + * @return the log of the AM + */ + public static Logger getLog() { + return log; + } + + /** + * Get the application state + * @return the application state + */ + public AppState getAppState() { + return appState; + } + + /** + * Block until it is signalled that the AM is done + */ + private void waitForAMCompletionSignal() { + AMExecutionStateLock.lock(); + try { + if (!amCompletionFlag.get()) { + log.debug("blocking until signalled to terminate"); + isAMCompleted.awaitUninterruptibly(); + } + } finally { + AMExecutionStateLock.unlock(); + } + } + + /** + * Signal that the AM is complete .. queues it in a separate thread + * + * @param stopActionRequest request containing shutdown details + */ + public synchronized void signalAMComplete(ActionStopSlider stopActionRequest) { + // this is a queued action: schedule it through the queues + schedule(stopActionRequest); + } + + /** + * Signal that the AM is complete + * + * @param stopActionRequest request containing shutdown details + */ + public synchronized void onAMStop(ActionStopSlider stopActionRequest) { + + AMExecutionStateLock.lock(); + try { + if (amCompletionFlag.compareAndSet(false, true)) { + // first stop request received + this.stopAction = stopActionRequest; + isAMCompleted.signal(); + } + } finally { + AMExecutionStateLock.unlock(); + } + } + + + /** + * trigger the YARN cluster termination process + * @return the exit code + * @throws Exception if the stop action contained an Exception which implements + * ExitCodeProvider + */ + private synchronized int finish() throws Exception { + Preconditions.checkNotNull(stopAction, "null stop action"); + FinalApplicationStatus appStatus; + log.info("Triggering shutdown of the AM: {}", stopAction); + + String appMessage = stopAction.getMessage(); + //stop the daemon & grab its exit code + int exitCode = stopAction.getExitCode(); + Exception exception = stopAction.getEx(); + + appStatus = stopAction.getFinalApplicationStatus(); + if (!spawnedProcessExitedBeforeShutdownTriggered) { + //stopped the forked process but don't worry about its exit code + int forkedExitCode = stopForkedProcess(); + log.debug("Stopped forked process: exit code={}", forkedExitCode); + } + + // make sure the AM is actually registered. If not, there's no point + // trying to unregister it + if (amRegistrationData == null) { + log.info("Application attempt not yet registered; skipping unregistration"); + if (exception != null) { + throw exception; + } + return exitCode; + } + + //stop any launches in progress + launchService.stop(); + + //now release all containers + releaseAllContainers(); + + // When the application completes, it should send a finish application + // signal to the RM + log.info("Application completed. Signalling finish to RM"); + + try { + log.info("Unregistering AM status={} message={}", appStatus, appMessage); + asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null); + } catch (InvalidApplicationMasterRequestException e) { + log.info("Application not found in YARN application list;" + + " it may have been terminated/YARN shutdown in progress: {}", e, e); + } catch (YarnException | IOException e) { + log.info("Failed to unregister application: " + e, e); + } + if (exception != null) { + throw exception; + } + return exitCode; + } + + /** + * Get diagnostics info about containers + */ + private String getContainerDiagnosticInfo() { + + return appState.getContainerDiagnosticInfo(); + } + + public Object getProxy(Class protocol, InetSocketAddress addr) { + return yarnRPC.getProxy(protocol, addr, getConfig()); + } + + /** + * Start the slider RPC server + */ + private void startSliderRPCServer(AggregateConf instanceDefinition) + throws IOException, SliderException { + verifyIPCAccess(); + + sliderIPCService = new SliderIPCService( + this, + certificateManager, + stateForProviders, + actionQueues, + metricsAndMonitoring, + contentCache); + + deployChildService(sliderIPCService); + SliderClusterProtocolPBImpl protobufRelay = + new SliderClusterProtocolPBImpl(sliderIPCService); + BlockingService blockingService = SliderClusterAPI.SliderClusterProtocolPB + .newReflectiveBlockingService( + protobufRelay); + + int port = getPortToRequest(); + InetSocketAddress rpcAddress = new InetSocketAddress("0.0.0.0", port); + rpcService = + new WorkflowRpcService("SliderRPC", + RpcBinder.createProtobufServer(rpcAddress, getConfig(), + secretManager, + NUM_RPC_HANDLERS, + blockingService, + null)); + deployChildService(rpcService); + } + + /** + * verify that if the cluster is authed, the ACLs are set. + * @throws BadConfigException if Authorization is set without any ACL + */ + private void verifyIPCAccess() throws BadConfigException { + boolean authorization = getConfig().getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + false); + String acls = getConfig().get(KEY_PROTOCOL_ACL); + if (authorization && SliderUtils.isUnset(acls)) { + throw new BadConfigException("Application has IPC authorization enabled in " + + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION + + " but no ACLs in " + KEY_PROTOCOL_ACL); + } + } + + +/* =================================================================== */ +/* AMRMClientAsync callbacks */ +/* =================================================================== */ + + /** + * Callback event when a container is allocated. + * + * The app state is updated with the allocation, and builds up a list + * of assignments and RM operations. The assignments are + * handed off into the pool of service launchers to asynchronously schedule + * container launch operations. + * + * The operations are run in sequence; they are expected to be 0 or more + * release operations (to handle over-allocations) + * + * @param allocatedContainers list of containers that are now ready to be + * given work. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + @Override //AMRMClientAsync + public void onContainersAllocated(List<Container> allocatedContainers) { + LOG_YARN.info("onContainersAllocated({})", allocatedContainers.size()); + List<ContainerAssignment> assignments = new ArrayList<>(); + List<AbstractRMOperation> operations = new ArrayList<>(); + + //app state makes all the decisions + appState.onContainersAllocated(allocatedContainers, assignments, operations); + + //for each assignment: instantiate that role + for (ContainerAssignment assignment : assignments) { + try { + launchService.launchRole(assignment, getInstanceDefinition(), + buildContainerCredentials()); + } catch (IOException e) { + // Can be caused by failure to renew credentials with the remote + // service. If so, don't launch the application. Container is retained, + // though YARN will take it away after a timeout. + log.error("Failed to build credentials to launch container: {}", e, e); + + } + } + + //for all the operations, exec them + execute(operations); + log.info("Diagnostics: {}", getContainerDiagnosticInfo()); + } + + @Override //AMRMClientAsync + public synchronized void onContainersCompleted(List<ContainerStatus> completedContainers) { + LOG_YARN.info("onContainersCompleted([{}]", completedContainers.size()); + for (ContainerStatus status : completedContainers) { + ContainerId containerId = status.getContainerId(); + LOG_YARN.info("Container Completion for" + + " containerID={}," + + " state={}," + + " exitStatus={}," + + " diagnostics={}", + containerId, status.getState(), + status.getExitStatus(), + status.getDiagnostics()); + + // non complete containers should not be here + assert (status.getState() == ContainerState.COMPLETE); + AppState.NodeCompletionResult result = appState.onCompletedNode(status); + if (result.containerFailed) { + RoleInstance ri = result.roleInstance; + log.error("Role instance {} failed ", ri); + } + + // known nodes trigger notifications + if(!result.unknownNode) { + getProviderService().notifyContainerCompleted(containerId); + queue(new UnregisterComponentInstance(containerId, 0, + TimeUnit.MILLISECONDS)); + } + } + + reviewRequestAndReleaseNodes("onContainersCompleted"); + } + + /** + * Signal that containers are being upgraded. Containers specified with + * --containers option and all containers of all roles specified with + * --components option are merged and upgraded. + * + * @param upgradeContainersRequest + * request containing upgrade details + */ + public synchronized void onUpgradeContainers( + ActionUpgradeContainers upgradeContainersRequest) throws IOException, + SliderException { + LOG_YARN.info("onUpgradeContainers({})", + upgradeContainersRequest.getMessage()); + Set<String> containers = upgradeContainersRequest.getContainers() == null ? new HashSet<String>() + : upgradeContainersRequest.getContainers(); + LOG_YARN.info(" Container list provided (total {}) : {}", + containers.size(), containers); + Set<String> components = upgradeContainersRequest.getComponents() == null ? new HashSet<String>() + : upgradeContainersRequest.getComponents(); + LOG_YARN.info(" Component list provided (total {}) : {}", + components.size(), components); + // If components are specified as well, then grab all the containers of + // each of the components (roles) + if (CollectionUtils.isNotEmpty(components)) { + Map<ContainerId, RoleInstance> liveContainers = appState.getLiveContainers(); + if (CollectionUtils.isNotEmpty(liveContainers.keySet())) { + Map<String, Set<String>> roleContainerMap = prepareRoleContainerMap(liveContainers); + for (String component : components) { + Set<String> roleContainers = roleContainerMap.get(component); + if (roleContainers != null) { + containers.addAll(roleContainers); + } + } + } + } + LOG_YARN.info("Final list of containers to be upgraded (total {}) : {}", + containers.size(), containers); + if (providerService instanceof AgentProviderService) { + AgentProviderService agentProviderService = (AgentProviderService) providerService; + agentProviderService.setInUpgradeMode(true); + agentProviderService.addUpgradeContainers(containers); + } + } + + // create a reverse map of roles -> set of all live containers + private Map<String, Set<String>> prepareRoleContainerMap( + Map<ContainerId, RoleInstance> liveContainers) { + // liveContainers is ensured to be not empty + Map<String, Set<String>> roleContainerMap = new HashMap<>(); + for (Map.Entry<ContainerId, RoleInstance> liveContainer : liveContainers + .entrySet()) { + RoleInstance role = liveContainer.getValue(); + if (roleContainerMap.containsKey(role.role)) { + roleContainerMap.get(role.role).add(liveContainer.getKey().toString()); + } else { + Set<String> containers = new HashSet<String>(); + containers.add(liveContainer.getKey().toString()); + roleContainerMap.put(role.role, containers); + } + } + return roleContainerMap; + } + + /** + * Implementation of cluster flexing. + * It should be the only way that anything -even the AM itself on startup- + * asks for nodes. + * @param resources the resource tree + * @throws SliderException slider problems, including invalid configs + * @throws IOException IO problems + */ + public void flexCluster(ConfTree resources) + throws IOException, SliderException { + + AggregateConf newConf = + new AggregateConf(appState.getInstanceDefinitionSnapshot()); + newConf.setResources(resources); + // verify the new definition is valid + sliderAMProvider.validateInstanceDefinition(newConf); + providerService.validateInstanceDefinition(newConf); + + appState.updateResourceDefinitions(resources); + + // reset the scheduled windows...the values + // may have changed + appState.resetFailureCounts(); + + // ask for more containers if needed + reviewRequestAndReleaseNodes("flexCluster"); + } + + /** + * Schedule the failure window + * @param resources the resource tree + * @throws BadConfigException if the window is out of range + */ + private void scheduleFailureWindowResets(ConfTree resources) throws + BadConfigException { + ResetFailureWindow reset = new ResetFailureWindow(); + ConfTreeOperations ops = new ConfTreeOperations(resources); + MapOperations globals = ops.getGlobalOptions(); + long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, 0); + if (seconds > 0) { + log.info( + "Scheduling the failure window reset interval to every {} seconds", + seconds); + RenewingAction<ResetFailureWindow> renew = new RenewingAction<>( + reset, seconds, seconds, TimeUnit.SECONDS, 0); + actionQueues.renewing("failures", renew); + } else { + log.info("Failure window reset interval is not set"); + } + } + + /** + * Schedule the escalation action + * @param internal + * @throws BadConfigException + */ + private void scheduleEscalation(ConfTree internal) throws BadConfigException { + EscalateOutstandingRequests escalate = new EscalateOutstandingRequests(); + ConfTreeOperations ops = new ConfTreeOperations(internal); + int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL, + InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL); + RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>( + escalate, seconds, seconds, TimeUnit.SECONDS, 0); + actionQueues.renewing("escalation", renew); + } + + /** + * Look at where the current node state is -and whether it should be changed + * @param reason reason for operation + */ + private synchronized void reviewRequestAndReleaseNodes(String reason) { + log.debug("reviewRequestAndReleaseNodes({})", reason); + queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS)); + } + + /** + * Handle the event requesting a review ... look at the queue and decide + * whether to act or not + * @param action action triggering the event. It may be put + * back into the queue + * @throws SliderInternalStateException + */ + public void handleReviewAndFlexApplicationSize(ReviewAndFlexApplicationSize action) + throws SliderInternalStateException { + + if ( actionQueues.hasQueuedActionWithAttribute( + AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) { + // this operation isn't needed at all -existing duplicate or shutdown due + return; + } + // if there is an action which changes cluster size, wait + if (actionQueues.hasQueuedActionWithAttribute( + AsyncAction.ATTR_CHANGES_APP_SIZE)) { + // place the action at the back of the queue + actionQueues.put(action); + } + + executeNodeReview(action.name); + } + + /** + * Look at where the current node state is -and whether it should be changed + */ + public synchronized void executeNodeReview(String reason) + throws SliderInternalStateException { + + log.debug("in executeNodeReview({})", reason); + if (amCompletionFlag.get()) { + log.info("Ignoring node review operation: shutdown in progress"); + } + try { + List<AbstractRMOperation> allOperations = appState.reviewRequestAndReleaseNodes(); + // tell the provider + providerRMOperationHandler.execute(allOperations); + //now apply the operations + execute(allOperations); + } catch (TriggerClusterTeardownException e) { + //App state has decided that it is time to exit + log.error("Cluster teardown triggered {}", e, e); + queue(new ActionStopSlider(e)); + } + } + + /** + * Escalate operation as triggered by external timer. + * <p> + * Get the list of new operations off the AM, then executest them. + */ + public void escalateOutstandingRequests() { + List<AbstractRMOperation> operations = appState.escalateOutstandingRequests(); + providerRMOperationHandler.execute(operations); + execute(operations); + } + + + /** + * Shutdown operation: release all containers + */ + private void releaseAllContainers() { + if (providerService instanceof AgentProviderService) { + log.info("Setting stopInitiated flag to true"); + AgentProviderService agentProviderService = (AgentProviderService) providerService; + agentProviderService.setAppStopInitiated(true); + } + // Add the sleep here (before releasing containers) so that applications get + // time to perform graceful shutdown + try { + long timeout = getContainerReleaseTimeout(); + if (timeout > 0) { + Thread.sleep(timeout); + } + } catch (InterruptedException e) { + log.info("Sleep for container release interrupted"); + } finally { + List<AbstractRMOperation> operations = appState.releaseAllContainers(); + providerRMOperationHandler.execute(operations); + // now apply the operations + execute(operations); + } + } + + private long getContainerReleaseTimeout() { + // Get container release timeout in millis or 0 if the property is not set. + // If non-zero then add the agent heartbeat delay time, since it can take up + // to that much time for agents to receive the stop command. + int timeout = getInstanceDefinition().getAppConfOperations() + .getGlobalOptions() + .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0); + if (timeout > 0) { + timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC; + } + // convert to millis + long timeoutInMillis = timeout * 1000l; + log.info("Container release timeout in millis = {}", timeoutInMillis); + return timeoutInMillis; + } + + /** + * RM wants to shut down the AM + */ + @Override //AMRMClientAsync + public void onShutdownRequest() { + LOG_YARN.info("Shutdown Request received"); + signalAMComplete(new ActionStopSlider("stop", + EXIT_SUCCESS, + FinalApplicationStatus.SUCCEEDED, + "Shutdown requested from RM")); + } + + /** + * Monitored nodes have been changed + * @param updatedNodes list of updated nodes + */ + @Override //AMRMClientAsync + public void onNodesUpdated(List<NodeReport> updatedNodes) { + LOG_YARN.info("onNodesUpdated({})", updatedNodes.size()); + log.info("Updated nodes {}", updatedNodes); + // Check if any nodes are lost or revived and update state accordingly + + AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes); + if (!outcome.operations.isEmpty()) { + execute(outcome.operations); + } + // trigger a review if the cluster changed + if (outcome.clusterChanged) { + reviewRequestAndReleaseNodes("nodes updated"); + } + } + + /** + * heartbeat operation; return the ratio of requested + * to actual + * @return progress + */ + @Override //AMRMClientAsync + public float getProgress() { + return appState.getApplicationProgressPercentage(); + } + + @Override //AMRMClientAsync + public void onError(Throwable e) { + //callback says it's time to finish + LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e); + signalAMComplete(new ActionStopSlider("stop", + EXIT_EXCEPTION_THROWN, + FinalApplicationStatus.FAILED, + "AMRMClientAsync.onError() received " + e)); + } + +/* =================================================================== */ +/* RMOperationHandlerActions */ +/* =================================================================== */ + + + @Override + public void execute(List<AbstractRMOperation> operations) { + rmOperationHandler.execute(operations); + } + + @Override + public void releaseAssignedContainer(ContainerId containerId) { + rmOperationHandler.releaseAssignedContainer(containerId); + } + + @Override + public void addContainerRequest(AMRMClient.ContainerRequest req) { + rmOperationHandler.addContainerRequest(req); + } + + @Override + public int cancelContainerRequests(Priority priority1, + Priority priority2, + int count) { + return rmOperationHandler.cancelContainerRequests(priority1, priority2, count); + } + + @Override + public void cancelSingleRequest(AMRMClient.ContainerRequest request) { + rmOperationHandler.cancelSingleRequest(request); + } + +/* =================================================================== */ +/* END */ +/* =================================================================== */ + + /** + * Launch the provider service + * + * @param instanceDefinition definition of the service + * @param confDir directory of config data + * @throws IOException + * @throws SliderException + */ + protected synchronized void launchProviderService(AggregateConf instanceDefinition, + File confDir) + throws IOException, SliderException { + Map<String, String> env = new HashMap<>(); + boolean execStarted = providerService.exec(instanceDefinition, confDir, env, + this); + if (execStarted) { + providerService.registerServiceListener(this); + providerService.start(); + } else { + // didn't start, so don't register + providerService.start(); + // and send the started event ourselves + eventCallbackEvent(null); + } + } + + /* =================================================================== */ + /* EventCallback from the child or ourselves directly */ + /* =================================================================== */ + + @Override // ProviderCompleted + public void eventCallbackEvent(Object parameter) { + // signalled that the child process is up. + appState.noteAMLive(); + // now ask for the cluster nodes + try { + flexCluster(getInstanceDefinition().getResources()); + } catch (Exception e) { + // cluster flex failure: log + log.error("Failed to flex cluster nodes: {}", e, e); + // then what? exit + queue(new ActionStopSlider(e)); + } + } + + /** + * report container loss. If this isn't already known about, react + * + * @param containerId id of the container which has failed + * @throws SliderException + */ + public synchronized void providerLostContainer( + ContainerId containerId) + throws SliderException { + log.info("containerLostContactWithProvider: container {} lost", + containerId); + RoleInstance activeContainer = appState.getOwnedContainer(containerId); + if (activeContainer != null) { + execute(appState.releaseContainer(containerId)); + // ask for more containers if needed + log.info("Container released; triggering review"); + reviewRequestAndReleaseNodes("Loss of container"); + } else { + log.info("Container not in active set - ignoring"); + } + } + + /* =================================================================== */ + /* ServiceStateChangeListener */ + /* =================================================================== */ + + /** + * Received on listening service termination. + * @param service the service that has changed. + */ + @Override //ServiceStateChangeListener + public void stateChanged(Service service) { + if (service == providerService && service.isInState(STATE.STOPPED)) { + //its the current master process in play + int exitCode = providerService.getExitCode(); + int mappedProcessExitCode = exitCode; + + boolean shouldTriggerFailure = !amCompletionFlag.get() + && (mappedProcessExitCode != 0); + + if (shouldTriggerFailure) { + String reason = + "Spawned process failed with raw " + exitCode + " mapped to " + + mappedProcessExitCode; + ActionStopSlider stop = new ActionStopSlider("stop", + mappedProcessExitCode, + FinalApplicationStatus.FAILED, + reason); + //this wasn't expected: the process finished early + spawnedProcessExitedBeforeShutdownTriggered = true; + log.info( + "Process has exited with exit code {} mapped to {} -triggering termination", + exitCode, + mappedProcessExitCode); + + //tell the AM the cluster is complete + signalAMComplete(stop); + } else { + //we don't care + log.info( + "Process has exited with exit code {} mapped to {} -ignoring", + exitCode, + mappedProcessExitCode); + } + } else { + super.stateChanged(service); + } + } + + /** + * stop forked process if it the running process var is not null + * @return the process exit code + */ + protected synchronized Integer stopForkedProcess() { + providerService.stop(); + return providerService.getExitCode(); + } + + /** + * Async start container request + * @param container container + * @param ctx context + * @param instance node details + */ + public void startContainer(Container container, + ContainerLaunchContext ctx, + RoleInstance instance) throws IOException { + appState.containerStartSubmitted(container, instance); + + nmClientAsync.startContainerAsync(container, ctx); + } + + /** + * Build the credentials needed for containers. This will include + * getting new delegation tokens for HDFS if the AM is running + * with a keytab. + * @return a buffer of credentials + * @throws IOException + */ + + private Credentials buildContainerCredentials() throws IOException { + Credentials credentials = new Credentials(containerCredentials); + if (securityConfiguration.isKeytabProvided()) { + CredentialUtils.addSelfRenewableFSDelegationTokens( + getClusterFS().getFileSystem(), + credentials); + } + return credentials; + } + + @Override // NMClientAsync.CallbackHandler + public void onContainerStopped(ContainerId containerId) { + // do nothing but log: container events from the AM + // are the source of container halt details to react to + log.info("onContainerStopped {} ", containerId); + } + + @Override // NMClientAsync.CallbackHandler + public void onContainerStarted(ContainerId containerId, + Map<String, ByteBuffer> allServiceResponse) { + LOG_YARN.info("Started Container {} ", containerId); + RoleInstance cinfo = appState.onNodeManagerContainerStarted(containerId); + if (cinfo != null) { + LOG_YARN.info("Deployed instance of role {} onto {}", + cinfo.role, containerId); + //trigger an async container status + nmClientAsync.getContainerStatusAsync(containerId, + cinfo.container.getNodeId()); + // push out a registration + queue(new RegisterComponentInstance(containerId, cinfo.role, cinfo.group, + 0, TimeUnit.MILLISECONDS)); + + } else { + //this is a hypothetical path not seen. We react by warning + log.error("Noti
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org