http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index f67ea58..4922c2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.io.Text; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -77,13 +80,12 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.WebAppException; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.InternalKeys; import org.apache.slider.api.ResourceKeys; import org.apache.slider.api.RoleKeys; -import org.apache.slider.api.StatusKeys; +import org.apache.slider.api.proto.Messages; import org.apache.slider.api.proto.SliderClusterAPI; -import org.apache.slider.client.SliderYarnClientImpl; +import org.apache.slider.api.resource.Application; import org.apache.slider.common.SliderExitCodes; import org.apache.slider.common.SliderKeys; import org.apache.slider.common.params.AbstractActionArgs; @@ -95,10 +97,7 @@ import org.apache.slider.common.tools.PortScanner; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.common.tools.SliderVersionInfo; -import org.apache.slider.core.buildutils.InstanceIO; import org.apache.slider.core.conf.AggregateConf; -import org.apache.slider.core.conf.ConfTree; -import org.apache.slider.core.conf.ConfTreeOperations; import org.apache.slider.core.conf.MapOperations; import org.apache.slider.core.exceptions.BadConfigException; import org.apache.slider.core.exceptions.SliderException; @@ -109,13 +108,12 @@ import org.apache.slider.core.main.ExitCodeProvider; import org.apache.slider.core.main.LauncherExitCodes; import org.apache.slider.core.main.RunService; import org.apache.slider.core.main.ServiceLauncher; +import org.apache.slider.core.persist.JsonSerDeser; import org.apache.slider.core.registry.info.CustomRegistryConstants; import org.apache.slider.providers.ProviderCompleted; import org.apache.slider.providers.ProviderRole; import org.apache.slider.providers.ProviderService; import org.apache.slider.providers.SliderProviderFactory; -import org.apache.slider.providers.slideram.SliderAMClientProvider; -import org.apache.slider.providers.slideram.SliderAMProviderService; import org.apache.slider.server.appmaster.actions.ActionHalt; import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance; import org.apache.slider.server.appmaster.actions.ActionStopSlider; @@ -136,7 +134,6 @@ import org.apache.slider.server.appmaster.monkey.ChaosKillContainer; import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService; import org.apache.slider.server.appmaster.operations.AbstractRMOperation; import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler; -import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler; import org.apache.slider.server.appmaster.operations.RMOperationHandler; import org.apache.slider.server.appmaster.rpc.RpcBinder; import org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider; @@ -146,6 +143,7 @@ import org.apache.slider.server.appmaster.security.SecurityConfiguration; import org.apache.slider.server.appmaster.state.AppState; import org.apache.slider.server.appmaster.state.AppStateBindingInfo; import org.apache.slider.server.appmaster.state.ContainerAssignment; +import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector; import org.apache.slider.server.appmaster.state.ProviderAppState; import org.apache.slider.server.appmaster.state.RoleInstance; import org.apache.slider.server.appmaster.web.SliderAMWebApp; @@ -161,18 +159,20 @@ import org.apache.slider.server.services.workflow.ServiceThreadFactory; import org.apache.slider.server.services.workflow.WorkflowExecutorService; import org.apache.slider.server.services.workflow.WorkflowRpcService; import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; +import org.codehaus.jackson.map.PropertyNamingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.URI; import java.net.URL; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -242,8 +242,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private RMOperationHandler rmOperationHandler; - - private RMOperationHandler providerRMOperationHandler; /** Handle to communicate with the Node Manager*/ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") @@ -252,7 +250,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Credentials for propagating down to launched containers */ - private Credentials containerCredentials; + private Credentials containerCredentials = new Credentials(); /** * Slider IPC: Real service handler @@ -320,13 +318,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService */ private final AtomicBoolean initCompleted = new AtomicBoolean(false); - /** - * Flag to set if the process exit code was set before shutdown started - */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private boolean spawnedProcessExitedBeforeShutdownTriggered; - - /** Arguments passed in : raw*/ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private SliderAMArgs serviceArgs; @@ -371,7 +362,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private SliderAMWebApp webApp; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private InetSocketAddress rpcServiceAddress; - private SliderAMProviderService sliderAMProvider; /** * Executor. @@ -398,12 +388,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService */ private boolean securityEnabled; private ContentCache contentCache; + private static final JsonSerDeser<Application> jsonSerDeser = + new JsonSerDeser<Application>(Application.class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); /** * resource limits */ private Resource maximumResourceCapability; - + private Application application; /** * Service Constructor */ @@ -586,84 +579,31 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Create and run the cluster. - * @param clustername cluster name + * @param appName cluster name * @return exit code * @throws Throwable on a failure */ - private int createAndRunCluster(String clustername) throws Throwable { - - //load the cluster description from the cd argument - String sliderClusterDir = serviceArgs.getSliderClusterURI(); - URI sliderClusterURI = new URI(sliderClusterDir); - Path clusterDirPath = new Path(sliderClusterURI); - log.info("Application defined at {}", sliderClusterURI); + private int createAndRunCluster(String appName) throws Throwable { + Path appDir = new Path((serviceArgs.getAppDefDir())); SliderFileSystem fs = getClusterFS(); - - // build up information about the running application -this - // will be passed down to the cluster status - MapOperations appInformation = new MapOperations(); - - AggregateConf instanceDefinition = - InstanceIO.loadInstanceDefinitionUnresolved(fs, clusterDirPath); - instanceDefinition.setName(clustername); - - log.info("Deploying cluster {}:", instanceDefinition); - - // and resolve it - AggregateConf resolvedInstance = new AggregateConf( instanceDefinition); - resolvedInstance.resolve(); - - stateForProviders.setApplicationName(clustername); - + fs.setAppDir(appDir); + Path appJson = new Path(appDir, appName + ".json"); + log.info("Loading application definition from " + appJson); + application = jsonSerDeser.load(fs.getFileSystem(), appJson); + log.info("Application Json: " + application); + stateForProviders.setApplicationName(appName); Configuration serviceConf = getConfig(); - // extend AM configuration with component resource - MapOperations amConfiguration = resolvedInstance - .getAppConfOperations().getComponent(COMPONENT_AM); - // and patch configuration with prefix - if (amConfiguration != null) { - Map<String, String> sliderAppConfKeys = amConfiguration.prefixedWith("slider."); - for (Map.Entry<String, String> entry : sliderAppConfKeys.entrySet()) { - String k = entry.getKey(); - String v = entry.getValue(); - boolean exists = serviceConf.get(k) != null; - log.info("{} {} to {}", (exists ? "Overwriting" : "Setting"), k, v); - serviceConf.set(k, v); - } - } - - securityConfiguration = new SecurityConfiguration(serviceConf, resolvedInstance, clustername); // obtain security state - securityEnabled = securityConfiguration.isSecurityEnabled(); // set the global security flag for the instance definition - instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled); - - // triggers resolution and snapshotting for agent - appState.setInitialInstanceDefinition(instanceDefinition); - File confDir = getLocalConfDir(); - if (!confDir.exists() || !confDir.isDirectory()) { - log.info("Conf dir {} does not exist.", confDir); - File parentFile = confDir.getParentFile(); - log.info("Parent dir {}:\n{}", parentFile, SliderUtils.listDir(parentFile)); - } - //get our provider - MapOperations globalInternalOptions = getGlobalInternalOptions(); - String providerType = globalInternalOptions.getMandatoryOption( - InternalKeys.INTERNAL_PROVIDER_NAME); - log.info("Cluster provider type is {}", providerType); SliderProviderFactory factory = - SliderProviderFactory.createSliderProviderFactory(providerType); + SliderProviderFactory.createSliderProviderFactory("docker"); providerService = factory.createServerProvider(); // init the provider BUT DO NOT START IT YET initAndAddService(providerService); - providerRMOperationHandler = new ProviderNotifyingOperationHandler(providerService); - - // create a slider AM provider - sliderAMProvider = new SliderAMProviderService(); - initAndAddService(sliderAMProvider); - + InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf); log.info("RM is at {}", rmSchedulerAddress); yarnRPC = YarnRPC.create(serviceConf); @@ -689,10 +629,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService ApplicationId appid = appAttemptID.getApplicationId(); log.info("AM for ID {}", appid.getId()); - appInformation.put(StatusKeys.INFO_AM_CONTAINER_ID, appMasterContainerID.toString()); - appInformation.put(StatusKeys.INFO_AM_APP_ID, appid.toString()); - appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString()); - Map<String, String> envVars; List<Container> liveContainers; @@ -731,28 +667,22 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } //bring up the Slider RPC service - buildPortScanner(instanceDefinition); - startSliderRPCServer(instanceDefinition); + buildPortScanner(); + startSliderRPCServer(); rpcServiceAddress = rpcService.getConnectAddress(); appMasterHostname = rpcServiceAddress.getAddress().getCanonicalHostName(); appMasterRpcPort = rpcServiceAddress.getPort(); appMasterTrackingUrl = null; log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort); - appInformation.put(StatusKeys.INFO_AM_HOSTNAME, appMasterHostname); - appInformation.set(StatusKeys.INFO_AM_RPC_PORT, appMasterRpcPort); log.info("Starting Yarn registry"); registryOperations = startRegistryOperationsService(); log.info(registryOperations.toString()); //build the role map - List<ProviderRole> providerRoles = new ArrayList<>(providerService.getRoles()); - providerRoles.addAll(SliderAMClientProvider.ROLES); - + List<ProviderRole> providerRoles = Collections.EMPTY_LIST; // Start up the WebApp and track the URL for it - MapOperations component = instanceDefinition.getAppConfOperations() - .getComponent(SliderKeys.COMPONENT_AM); // Web service endpoints: initialize WebAppApiImpl webAppApi = @@ -760,9 +690,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService stateForProviders, providerService, registryOperations, metricsAndMonitoring, - actionQueues, - this, - contentCache); + actionQueues); initAMFilterOptions(serviceConf); int webAppPort = deployWebApplication(webAppApi); @@ -770,9 +698,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService String scheme = WebAppUtils.HTTP_PREFIX; appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort; - appInformation.put(StatusKeys.INFO_AM_WEB_URL, appMasterTrackingUrl + "/"); - appInformation.set(StatusKeys.INFO_AM_WEB_PORT, webAppPort); - // ***************************************************** // Register self with ResourceManager // This will start heartbeating to the RM @@ -785,6 +710,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService appMasterTrackingUrl); maximumResourceCapability = amRegistrationData.getMaximumResourceCapability(); + //TODO should not read local configs !!! int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); // validate scheduler vcores allocation setting @@ -798,11 +724,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // the max value as part of its lookup rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability); - // set the RM-defined maximum cluster values - appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(maxCores)); - appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(maxMemory)); - - processAMCredentials(securityConfiguration); +// processAMCredentials(securityConfiguration); if (securityEnabled) { secretManager.setMasterKey( @@ -817,7 +739,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // principal. Can do so now since AM registration with RM above required // tokens associated to principal String principal = securityConfiguration.getPrincipal(); - File localKeytabFile = securityConfiguration.getKeytabFile(instanceDefinition); + //TODO read key tab file from slider-am.xml + File localKeytabFile = + securityConfiguration.getKeytabFile(new AggregateConf()); // Now log in... login(principal, localKeytabFile); // obtain new FS reference that should be kerberos based and different @@ -829,10 +753,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // YARN client. // Important: this is only valid at startup, and must be executed within // the right UGI context. Use with care. - SliderYarnClientImpl yarnClient = null; + YarnClient yarnClient = null; List<NodeReport> nodeReports; try { - yarnClient = new SliderYarnClientImpl(); + yarnClient = YarnClient.createYarnClient(); yarnClient.init(getConfig()); yarnClient.start(); nodeReports = getNodeReports(yarnClient); @@ -856,45 +780,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // extract container list liveContainers = amRegistrationData.getContainersFromPreviousAttempts(); - - //now validate the installation - Configuration providerConf = - providerService.loadProviderConfigurationInformation(confDir); - - providerService.initializeApplicationConfiguration(instanceDefinition, - fs, null); - - providerService.validateApplicationConfiguration(instanceDefinition, - confDir, - securityEnabled); + DefaultMetricsSystem.initialize("SliderAppMaster"); //determine the location for the role history data - Path historyDir = new Path(clusterDirPath, HISTORY_DIR_NAME); + Path historyDir = new Path(appDir, HISTORY_DIR_NAME); //build the instance AppStateBindingInfo binding = new AppStateBindingInfo(); - binding.instanceDefinition = instanceDefinition; binding.serviceConfig = serviceConf; - binding.publishedProviderConf = providerConf; binding.roles = providerRoles; binding.fs = fs.getFileSystem(); binding.historyPath = historyDir; binding.liveContainers = liveContainers; - binding.applicationInfo = appInformation; - binding.releaseSelector = providerService.createContainerReleaseSelector(); + binding.releaseSelector = new MostRecentContainerReleaseSelector(); binding.nodeReports = nodeReports; + binding.application = application; appState.buildInstance(binding); - providerService.rebuildContainerDetails(liveContainers, - instanceDefinition.getName(), appState.getRolePriorityMap()); - - // add the AM to the list of nodes in the cluster - - appState.buildAppMasterNode(appMasterContainerID, - appMasterHostname, - webAppPort, - appMasterHostname + ":" + webAppPort); - // build up environment variables that the AM wants set in every container // irrespective of provider and role. envVars = new HashMap<>(); @@ -908,8 +810,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } String rolesTmpSubdir = appMasterContainerID.toString() + "/roles"; - String amTmpDir = globalInternalOptions.getMandatoryOption(InternalKeys.INTERNAL_AM_TMP_DIR); - + String amTmpDir = "/tmp"; + //TODO read tmpDir from slider-am.xml Path tmpDirPath = new Path(amTmpDir); Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir); fs.getFileSystem().mkdirs(launcherTmpDirPath); @@ -917,29 +819,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //launcher service launchService = new RoleLaunchService(actionQueues, providerService, - fs, - new Path(getGeneratedConfDir()), - envVars, - launcherTmpDirPath); + fs, envVars); deployChildService(launchService); - appState.noteAMLaunched(); - - //Give the provider access to the state, and AM - providerService.bind(stateForProviders, actionQueues, liveContainers); - sliderAMProvider.bind(stateForProviders, actionQueues, liveContainers); + providerService.setAMState(stateForProviders); // chaos monkey - maybeStartMonkey(); - - // setup token renewal and expiry handling for long lived apps -// if (!securityConfiguration.isKeytabProvided() && -// SliderUtils.isHadoopClusterSecure(getConfig())) { -// fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues); -// fsDelegationTokenManager.acquireDelegationToken(getConfig()); -// } +// maybeStartMonkey(); // if not a secure cluster, extract the username -it will be // propagated to workers @@ -955,25 +843,21 @@ public class SliderAppMaster extends AbstractSliderLaunchedService log.info("Application Master Initialization Completed"); initCompleted.set(true); - scheduleFailureWindowResets(instanceDefinition.getResources()); - scheduleEscalation(instanceDefinition.getInternal()); + scheduleFailureWindowResets(application.getConfiguration()); + scheduleEscalation(application.getConfiguration()); try { // schedule YARN Registry registration - queue(new ActionRegisterServiceInstance(clustername, appid)); + queue(new ActionRegisterServiceInstance(appName, appid, application)); // log the YARN and web UIs log.info("RM Webapp address {}", serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS)); log.info("Slider webapp address {} proxied at {}", appMasterTrackingUrl, appMasterProxiedUrl); - - // Start the Slider AM provider - sliderAMProvider.start(); - // launch the real provider; this is expected to trigger a callback that // starts the node review process - launchProviderService(instanceDefinition, confDir); + launchProviderService(); // start handling any scheduled events @@ -1000,7 +884,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * @throws InterruptedException */ private ApplicationAttemptReport getApplicationAttemptReport( - final SliderYarnClientImpl yarnClient) + final YarnClient yarnClient) throws YarnException, IOException, InterruptedException { Preconditions.checkNotNull(yarnClient, "Null Yarn client"); ApplicationAttemptReport report; @@ -1019,14 +903,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } /** - * List the node reports: uses {@link SliderYarnClientImpl} as the login user + * List the node reports: uses {@link YarnClient} as the login user * @param yarnClient client to the RM * @return the node reports * @throws IOException * @throws YarnException * @throws InterruptedException */ - private List<NodeReport> getNodeReports(final SliderYarnClientImpl yarnClient) + private List<NodeReport> getNodeReports(final YarnClient yarnClient) throws IOException, YarnException, InterruptedException { Preconditions.checkNotNull(yarnClient, "Null Yarn client"); List<NodeReport> nodeReports; @@ -1051,7 +935,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * Creates and starts the web application, and adds a * <code>WebAppService</code> service under the AM, to ensure * a managed web application shutdown. - * @param webAppApi web app API instance + * @param webAppApi web application API instance * @return port the web application is deployed on * @throws IOException general problems starting the webapp (network, etc) * @throws WebAppException other issues @@ -1117,12 +1001,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Build up the port scanner. This may include setting a port range. */ - private void buildPortScanner(AggregateConf instanceDefinition) + private void buildPortScanner() throws BadConfigException { portScanner = new PortScanner(); - String portRange = instanceDefinition. - getAppConfOperations().getGlobalOptions(). - getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0"); + String portRange = "0"; + //TODO read from slider-am.xml +// String portRange = instanceDefinition. +// getAppConfOperations().getGlobalOptions(). +// getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0"); if (!"0".equals(portRange)) { portScanner.setPortRange(portRange); } @@ -1203,11 +1089,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * @throws IOException */ public void registerServiceInstance(String instanceName, - ApplicationId appId) throws IOException { - - - // the registry is running, so register services - URL amWebURI = new URL(appMasterProxiedUrl); + ApplicationId appId, Application application) throws IOException { //Give the provider restricted access to the state, registry setupInitialRegistryPaths(); @@ -1218,7 +1100,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService instanceName, appAttemptID); providerService.bindToYarnRegistry(yarnRegistryOperations); - sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations); // Yarn registry ServiceRecord serviceRecord = new ServiceRecord(); @@ -1231,19 +1112,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService RegistryTypeUtils.ipcEndpoint( CustomRegistryConstants.AM_IPC_PROTOCOL, rpcServiceAddress)); - - // internal services - sliderAMProvider.applyInitialRegistryDefinitions(amWebURI, - serviceRecord); - - // provider service dynamic definitions. - providerService.applyInitialRegistryDefinitions(amWebURI, serviceRecord); - // set any provided attributes - setProvidedServiceRecordAttributes( - getInstanceDefinition().getAppConfOperations().getComponent( - SliderKeys.COMPONENT_AM), serviceRecord); + setUserProvidedServiceRecordAttributes(application.getConfiguration(), + serviceRecord); // register the service's entry log.info("Service Record \n{}", serviceRecord); @@ -1276,7 +1148,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Handler for {@link RegisterComponentInstance action} - * Register/re-register an ephemeral container that is already in the app state + * Register/re-register an ephemeral container that is already in the application state * @param id the component * @param description component description * @param type component type @@ -1291,32 +1163,36 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // this is where component registrations go log.info("Registering component {}", id); String cid = RegistryPathUtils.encodeYarnID(id.toString()); - ServiceRecord container = new ServiceRecord(); - container.set(YarnRegistryAttributes.YARN_ID, cid); - container.description = description; - container.set(YarnRegistryAttributes.YARN_PERSISTENCE, + ServiceRecord record = new ServiceRecord(); + record.set(YarnRegistryAttributes.YARN_ID, cid); + record.description = description; + record.set(YarnRegistryAttributes.YARN_PERSISTENCE, PersistencePolicies.CONTAINER); - MapOperations compOps = getInstanceDefinition().getAppConfOperations(). - getComponent(type); - setProvidedServiceRecordAttributes(compOps, container); + setUserProvidedServiceRecordAttributes( + instance.providerRole.component.getConfiguration(), record); try { - yarnRegistryOperations.putComponent(cid, container); + yarnRegistryOperations.putComponent(cid, record); } catch (IOException e) { log.warn("Failed to register container {}/{}: {}", id, description, e, e); return false; } + org.apache.slider.api.resource.Container container = + new org.apache.slider.api.resource.Container(); + container.setId(id.toString()); + container.setLaunchTime(new Date()); + container.setState(org.apache.slider.api.resource.ContainerState.INIT); + container.setBareHost(instance.host); + instance.providerRole.component.addContainer(container); return true; } - protected void setProvidedServiceRecordAttributes(MapOperations ops, - ServiceRecord record) { + protected void setUserProvidedServiceRecordAttributes( + org.apache.slider.api.resource.Configuration conf, ServiceRecord record) { String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX; - for (Map.Entry<String, String> entry : ops.entrySet()) { - if (entry.getKey().startsWith( - prefix)) { - String key = entry.getKey().substring( - prefix.length() + 1); + for (Map.Entry<String, String> entry : conf.getProperties().entrySet()) { + if (entry.getKey().startsWith(prefix)) { + String key = entry.getKey().substring(prefix.length() + 1); record.set(key, entry.getValue().trim()); } } @@ -1366,35 +1242,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } /** - * Build the configuration directory passed in or of the target FS - * @return the file - */ - public File getLocalConfDir() { - File confdir = - new File(SliderKeys.PROPAGATED_CONF_DIR_NAME).getAbsoluteFile(); - return confdir; - } - - /** - * Get the path to the DFS configuration that is defined in the cluster specification - * @return the generated configuration dir - */ - public String getGeneratedConfDir() { - return getGlobalInternalOptions().get( - InternalKeys.INTERNAL_GENERATED_CONF_PATH); - } - - /** - * Get the global internal options for the AM - * @return a map to access the internals - */ - public MapOperations getGlobalInternalOptions() { - return getInstanceDefinition() - .getInternalOperations(). - getGlobalOptions(); - } - - /** * Get the filesystem of this cluster * @return the FS of the config */ @@ -1480,11 +1327,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService Exception exception = stopAction.getEx(); appStatus = stopAction.getFinalApplicationStatus(); - if (!spawnedProcessExitedBeforeShutdownTriggered) { - //stopped the forked process but don't worry about its exit code - int forkedExitCode = stopForkedProcess(); - log.debug("Stopped forked process: exit code={}", forkedExitCode); - } // make sure the AM is actually registered. If not, there's no point // trying to unregister it @@ -1500,7 +1342,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService launchService.stop(); //now release all containers - releaseAllContainers(); + releaseAllContainers(application); + DefaultMetricsSystem.shutdown(); // When the application completes, it should send a finish application // signal to the RM @@ -1536,7 +1379,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Start the slider RPC server */ - private void startSliderRPCServer(AggregateConf instanceDefinition) + private void startSliderRPCServer() throws IOException, SliderException { verifyIPCAccess(); @@ -1612,16 +1455,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //for each assignment: instantiate that role for (ContainerAssignment assignment : assignments) { - try { - launchService.launchRole(assignment, getInstanceDefinition(), - buildContainerCredentials()); - } catch (IOException e) { - // Can be caused by failure to renew credentials with the remote - // service. If so, don't launch the application. Container is retained, - // though YARN will take it away after a timeout. - log.error("Failed to build credentials to launch container: {}", e, e); - - } + //TODO Do we need to pass credentials to containers? + launchService.launchRole(assignment, application, null); } //for all the operations, exec them @@ -1645,7 +1480,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // non complete containers should not be here assert (status.getState() == ContainerState.COMPLETE); - AppState.NodeCompletionResult result = appState.onCompletedNode(status); + AppState.NodeCompletionResult result = appState.onCompletedContainer(status); if (result.containerFailed) { RoleInstance ri = result.roleInstance; log.error("Role instance {} failed ", ri); @@ -1653,7 +1488,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // known nodes trigger notifications if(!result.unknownNode) { - getProviderService().notifyContainerCompleted(containerId); queue(new UnregisterComponentInstance(containerId, 0, TimeUnit.MILLISECONDS)); } @@ -1724,22 +1558,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * Implementation of cluster flexing. * It should be the only way that anything -even the AM itself on startup- * asks for nodes. - * @param resources the resource tree * @throws SliderException slider problems, including invalid configs * @throws IOException IO problems */ - public void flexCluster(ConfTree resources) + public void flexCluster(Messages.FlexComponentRequestProto request) throws IOException, SliderException { - - AggregateConf newConf = - new AggregateConf(appState.getInstanceDefinitionSnapshot()); - newConf.setResources(resources); - // verify the new definition is valid - sliderAMProvider.validateInstanceDefinition(newConf); - providerService.validateInstanceDefinition(newConf); - - appState.updateResourceDefinitions(resources); - + if (request != null) { + appState.updateComponents(request); + } // reset the scheduled windows...the values // may have changed appState.resetFailureCounts(); @@ -1750,24 +1576,37 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Schedule the failure window - * @param resources the resource tree * @throws BadConfigException if the window is out of range */ - private void scheduleFailureWindowResets(ConfTree resources) throws - BadConfigException { + private void scheduleFailureWindowResets( + org.apache.slider.api.resource.Configuration conf) { + ResetFailureWindow reset = new ResetFailureWindow(rmOperationHandler); - ConfTreeOperations ops = new ConfTreeOperations(resources); - MapOperations globals = ops.getGlobalOptions(); - long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW, - ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS, - ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS, - ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, 0); - if (seconds > 0) { - log.info( - "Scheduling the failure window reset interval to every {} seconds", - seconds); - RenewingAction<ResetFailureWindow> renew = new RenewingAction<>( - reset, seconds, seconds, TimeUnit.SECONDS, 0); + + long days = + conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".days", + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS); + long hours = + conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".hours", + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS); + long minutes = + conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".minutes", + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES); + long seconds = + conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".seconds", + 0); + Preconditions + .checkState(days >= 0 && hours >= 0 && minutes >= 0 && seconds >= 0, + "Time range for has negative time component %s:%s:%s:%s", days, + hours, minutes, seconds); + long totalMinutes = days * 24 * 60 + hours * 24 + minutes; + long totalSeconds = totalMinutes * 60 + seconds; + if (totalSeconds > 0) { + log.info("Scheduling the failure window reset interval to every {}" + + " seconds", totalSeconds); + RenewingAction<ResetFailureWindow> renew = + new RenewingAction<>(reset, totalSeconds, totalSeconds, + TimeUnit.SECONDS, 0); actionQueues.renewing("failures", renew); } else { log.info("Failure window reset interval is not set"); @@ -1776,16 +1615,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Schedule the escalation action - * @param internal * @throws BadConfigException */ - private void scheduleEscalation(ConfTree internal) throws BadConfigException { + private void scheduleEscalation( + org.apache.slider.api.resource.Configuration conf) { EscalateOutstandingRequests escalate = new EscalateOutstandingRequests(); - ConfTreeOperations ops = new ConfTreeOperations(internal); - int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL, + long seconds = conf.getPropertyLong(InternalKeys.ESCALATION_CHECK_INTERVAL, InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL); - RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>( - escalate, seconds, seconds, TimeUnit.SECONDS, 0); + RenewingAction<EscalateOutstandingRequests> renew = + new RenewingAction<>(escalate, seconds, seconds, TimeUnit.SECONDS, 0); actionQueues.renewing("escalation", renew); } @@ -1794,7 +1632,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * @param reason reason for operation */ private synchronized void reviewRequestAndReleaseNodes(String reason) { - log.debug("reviewRequestAndReleaseNodes({})", reason); + log.info("reviewRequestAndReleaseNodes({})", reason); queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS)); } @@ -1810,6 +1648,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService if ( actionQueues.hasQueuedActionWithAttribute( AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) { + //TODO Loop all actions to check duplicate ï¼ï¼ // this operation isn't needed at all -existing duplicate or shutdown due return; } @@ -1829,14 +1668,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public synchronized void executeNodeReview(String reason) throws SliderInternalStateException { - log.debug("in executeNodeReview({})", reason); + log.info("in executeNodeReview({})", reason); if (amCompletionFlag.get()) { log.info("Ignoring node review operation: shutdown in progress"); } try { List<AbstractRMOperation> allOperations = appState.reviewRequestAndReleaseNodes(); - // tell the provider - providerRMOperationHandler.execute(allOperations); //now apply the operations execute(allOperations); } catch (TriggerClusterTeardownException e) { @@ -1853,7 +1690,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService */ public void escalateOutstandingRequests() { List<AbstractRMOperation> operations = appState.escalateOutstandingRequests(); - providerRMOperationHandler.execute(operations); execute(operations); } @@ -1861,11 +1697,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Shutdown operation: release all containers */ - private void releaseAllContainers() { + private void releaseAllContainers(Application application) { // Add the sleep here (before releasing containers) so that applications get // time to perform graceful shutdown try { - long timeout = getContainerReleaseTimeout(); + long timeout = getContainerReleaseTimeout(application); if (timeout > 0) { Thread.sleep(timeout); } @@ -1873,22 +1709,16 @@ public class SliderAppMaster extends AbstractSliderLaunchedService log.info("Sleep for container release interrupted"); } finally { List<AbstractRMOperation> operations = appState.releaseAllContainers(); - providerRMOperationHandler.execute(operations); // now apply the operations execute(operations); } } - private long getContainerReleaseTimeout() { + private long getContainerReleaseTimeout(Application application) { // Get container release timeout in millis or 0 if the property is not set. - // If non-zero then add the agent heartbeat delay time, since it can take up - // to that much time for agents to receive the stop command. - int timeout = getInstanceDefinition().getAppConfOperations() - .getGlobalOptions() - .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0); - if (timeout > 0) { - timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC; - } + long timeout = application.getConfiguration() + .getPropertyLong(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0); + // convert to millis long timeoutInMillis = timeout * 1000l; log.info("Container release timeout in millis = {}", timeoutInMillis); @@ -2000,27 +1830,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Launch the provider service - * - * @param instanceDefinition definition of the service - * @param confDir directory of config data * @throws IOException * @throws SliderException */ - protected synchronized void launchProviderService(AggregateConf instanceDefinition, - File confDir) - throws IOException, SliderException { - Map<String, String> env = new HashMap<>(); - boolean execStarted = providerService.exec(instanceDefinition, confDir, env, - this); - if (execStarted) { - providerService.registerServiceListener(this); - providerService.start(); - } else { - // didn't start, so don't register - providerService.start(); - // and send the started event ourselves - eventCallbackEvent(null); - } + protected synchronized void launchProviderService() + throws IOException, SliderException { + // didn't start, so don't register + providerService.start(); + // and send the started event ourselves + eventCallbackEvent(null); } /* =================================================================== */ @@ -2029,11 +1847,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService @Override // ProviderCompleted public void eventCallbackEvent(Object parameter) { - // signalled that the child process is up. - appState.noteAMLive(); // now ask for the cluster nodes try { - flexCluster(getInstanceDefinition().getResources()); + flexCluster(null); } catch (Exception e) { // cluster flex failure: log log.error("Failed to flex cluster nodes: {}", e, e); @@ -2064,62 +1880,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } } - /* =================================================================== */ - /* ServiceStateChangeListener */ - /* =================================================================== */ - - /** - * Received on listening service termination. - * @param service the service that has changed. - */ - @Override //ServiceStateChangeListener - public void stateChanged(Service service) { - if (service == providerService && service.isInState(STATE.STOPPED)) { - //its the current master process in play - int exitCode = providerService.getExitCode(); - int mappedProcessExitCode = exitCode; - - boolean shouldTriggerFailure = !amCompletionFlag.get() - && (mappedProcessExitCode != 0); - - if (shouldTriggerFailure) { - String reason = - "Spawned process failed with raw " + exitCode + " mapped to " + - mappedProcessExitCode; - ActionStopSlider stop = new ActionStopSlider("stop", - mappedProcessExitCode, - FinalApplicationStatus.FAILED, - reason); - //this wasn't expected: the process finished early - spawnedProcessExitedBeforeShutdownTriggered = true; - log.info( - "Process has exited with exit code {} mapped to {} -triggering termination", - exitCode, - mappedProcessExitCode); - - //tell the AM the cluster is complete - signalAMComplete(stop); - } else { - //we don't care - log.info( - "Process has exited with exit code {} mapped to {} -ignoring", - exitCode, - mappedProcessExitCode); - } - } else { - super.stateChanged(service); - } - } - - /** - * stop forked process if it the running process var is not null - * @return the process exit code - */ - protected synchronized Integer stopForkedProcess() { - providerService.stop(); - return providerService.getExitCode(); - } - /** * Async start container request * @param container container @@ -2221,16 +1981,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService LOG_YARN.warn("Failed to stop Container {}", containerId); } - public AggregateConf getInstanceDefinition() { - return appState.getInstanceDefinition(); - } - - /** - * This is the status, the live model - */ - public ClusterDescription getClusterDescription() { - return appState.getClusterStatus(); - } public ProviderService getProviderService() { return providerService; @@ -2278,12 +2028,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } /** - * Start the chaos monkey + * TODO Start the chaos monkey * @return true if it started */ private boolean maybeStartMonkey() { - MapOperations internals = getGlobalInternalOptions(); - +// MapOperations internals = getGlobalInternalOptions(); + MapOperations internals = new MapOperations(); Boolean enabled = internals.getOptionBool(InternalKeys.CHAOS_MONKEY_ENABLED, InternalKeys.DEFAULT_CHAOS_MONKEY_ENABLED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java index 6b61681..a660958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java @@ -18,6 +18,7 @@ package org.apache.slider.server.appmaster.actions; +import org.apache.slider.api.proto.Messages; import org.apache.slider.core.conf.ConfTree; import org.apache.slider.server.appmaster.SliderAppMaster; import org.apache.slider.server.appmaster.state.AppState; @@ -26,19 +27,16 @@ import java.util.concurrent.TimeUnit; public class ActionFlexCluster extends AsyncAction { - public final ConfTree resources; - - public ActionFlexCluster(String name, - long delay, - TimeUnit timeUnit, ConfTree resources) { + final Messages.FlexComponentRequestProto requestProto; + public ActionFlexCluster(String name, long delay, TimeUnit timeUnit, + Messages.FlexComponentRequestProto requestProto) { super(name, delay, timeUnit, ATTR_CHANGES_APP_SIZE); - this.resources = resources; + this.requestProto = requestProto; } - @Override public void execute(SliderAppMaster appMaster, QueueAccess queueService, AppState appState) throws Exception { - appMaster.flexCluster(resources); + appMaster.flexCluster(requestProto); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java index ca330af..0d7f7d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java @@ -19,6 +19,7 @@ package org.apache.slider.server.appmaster.actions; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.slider.api.resource.Application; import org.apache.slider.server.appmaster.SliderAppMaster; import org.apache.slider.server.appmaster.state.AppState; @@ -31,21 +32,13 @@ public class ActionRegisterServiceInstance extends AsyncAction { private final String instanceName; private final ApplicationId appId; - + private final Application application; public ActionRegisterServiceInstance(String instanceName, - ApplicationId appId) { + ApplicationId appId, Application application) { super("ActionRegisterServiceInstance"); this.instanceName = instanceName; this.appId = appId; - } - - public ActionRegisterServiceInstance(String instanceName, - ApplicationId appId, - long delay, - TimeUnit timeUnit) { - super("ActionRegisterServiceInstance", delay, timeUnit); - this.instanceName = instanceName; - this.appId = appId; + this.application = application; } @Override @@ -54,6 +47,6 @@ public class ActionRegisterServiceInstance extends AsyncAction { AppState appState) throws Exception { // YARN Registry do the registration - appMaster.registerServiceInstance(instanceName, appId); + appMaster.registerServiceInstance(instanceName, appId, application); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java new file mode 100644 index 0000000..510ff73 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.metrics; + +import com.codahale.metrics.Counter; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +@Metrics(context = "yarn-native-service") +public class SliderMetrics implements MetricsSource { + + @Metric("containers pending") + public MutableGaugeInt containersPending; + @Metric("anti-affinity containers pending") + public MutableGaugeInt pendingAAContainers; + @Metric("containers pending") + public MutableGaugeInt containersRunning; + @Metric("containers requested") + public MutableGaugeInt containersDesired; + @Metric("containers completed") + public MutableGaugeInt containersCompleted; + @Metric("containers failed") + public MutableGaugeInt containersFailed; + @Metric("containers failed since last threshold") + public MutableGaugeInt failedSinceLastThreshold; + @Metric("containers preempted") + public MutableGaugeInt containersPreempted; + @Metric("containers surplus") + public MutableGaugeInt surplusContainers; + + protected final MetricsRegistry registry; + + public SliderMetrics(MetricsInfo metricsInfo) { + registry = new MetricsRegistry(metricsInfo); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public static SliderMetrics register(String name, String description) { + SliderMetrics metrics = new SliderMetrics(info(name, description)); + DefaultMetricsSystem.instance().register(name, description, metrics); + return metrics; + } + + public void tag(String name, String description, String value) { + registry.tag(name, description, value); + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java deleted file mode 100644 index 972cc30..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.appmaster.operations; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.slider.providers.ProviderService; - -import java.util.List; - -public class ProviderNotifyingOperationHandler extends RMOperationHandler { - - private final ProviderService providerService; - - public ProviderNotifyingOperationHandler(ProviderService providerService) { - this.providerService = providerService; - } - - @Override - public void releaseAssignedContainer(ContainerId containerId) { - providerService.releaseAssignedContainer(containerId); - } - - @Override - public void addContainerRequest(AMRMClient.ContainerRequest req) { - providerService.addContainerRequest(req); - } - - @Override - public int cancelContainerRequests(Priority priority1, - Priority priority2, - int count) { - return providerService.cancelContainerRequests(priority1, priority2, count); - } - - @Override - public void cancelSingleRequest(AMRMClient.ContainerRequest request) { - providerService.cancelSingleRequest(request); - } - - @Override - public void updateBlacklist(List<String> blacklistAdditions, - List<String> blacklistRemovals) { - providerService.updateBlacklist(blacklistAdditions, blacklistRemovals); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java index fbd408e..4d483c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java @@ -70,11 +70,12 @@ public class SliderClusterProtocolPBImpl implements SliderClusterProtocolPB { } @Override - public Messages.FlexClusterResponseProto flexCluster(RpcController controller, - Messages.FlexClusterRequestProto request) throws ServiceException { + public Messages.FlexComponentResponseProto flexComponent( + RpcController controller, Messages.FlexComponentRequestProto request) + throws ServiceException { try { - return real.flexCluster(request); - } catch (Exception e) { + return real.flexComponent(request); + } catch (IOException e) { throw wrap(e); } } @@ -90,19 +91,6 @@ public class SliderClusterProtocolPBImpl implements SliderClusterProtocolPB { } } - - @Override - public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition( - RpcController controller, - Messages.GetInstanceDefinitionRequestProto request) - throws ServiceException { - try { - return real.getInstanceDefinition(request); - } catch (Exception e) { - throw wrap(e); - } - } - @Override public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole( RpcController controller, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java index 448c6f3..c60d609 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java @@ -110,10 +110,10 @@ public class SliderClusterProtocolProxy implements SliderClusterProtocol { } @Override - public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) - throws IOException { + public Messages.FlexComponentResponseProto flexComponent( + Messages.FlexComponentRequestProto request) throws IOException { try { - return endpoint.flexCluster(NULL_CONTROLLER, request); + return endpoint.flexComponent(NULL_CONTROLLER, request); } catch (ServiceException e) { throw convert(e); } @@ -131,19 +131,6 @@ public class SliderClusterProtocolProxy implements SliderClusterProtocol { } } - - @Override - public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition( - Messages.GetInstanceDefinitionRequestProto request) throws - IOException, - YarnException { - try { - return endpoint.getInstanceDefinition(NULL_CONTROLLER, request); - } catch (ServiceException e) { - throw convert(e); - } - } - @Override public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) throws IOException, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java index 70c2f05..344495b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java @@ -24,9 +24,9 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.SliderClusterProtocol; import org.apache.slider.api.proto.Messages; +import org.apache.slider.api.resource.Application; import org.apache.slider.api.types.ApplicationLivenessInformation; import org.apache.slider.api.types.ComponentInformation; import org.apache.slider.api.types.ContainerInformation; @@ -38,6 +38,7 @@ import org.apache.slider.core.exceptions.ServiceNotReadyException; import org.apache.slider.core.main.LauncherExitCodes; import org.apache.slider.core.persist.AggregateConfSerDeser; import org.apache.slider.core.persist.ConfTreeSerDeser; +import org.apache.slider.core.persist.JsonSerDeser; import org.apache.slider.server.appmaster.AppMasterActionOperations; import org.apache.slider.server.appmaster.actions.ActionFlexCluster; import org.apache.slider.server.appmaster.actions.ActionHalt; @@ -78,6 +79,9 @@ public class SliderIPCService extends AbstractService private final MetricsAndMonitoring metricsAndMonitoring; private final AppMasterActionOperations amOperations; private final ContentCache cache; + private static final JsonSerDeser<Application> jsonSerDeser = + new JsonSerDeser<Application>(Application.class); + /** * This is the prefix used for metrics @@ -195,17 +199,12 @@ public class SliderIPCService extends AbstractService return Messages.UpgradeContainersResponseProto.getDefaultInstance(); } - @Override //SliderClusterProtocol - public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) - throws IOException { + @Override + public Messages.FlexComponentResponseProto flexComponent( + Messages.FlexComponentRequestProto request) throws IOException { onRpcCall("flex"); - String payload = request.getClusterSpec(); - ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser(); - ConfTree updatedResources = confTreeSerDeser.fromJson(payload); - schedule(new ActionFlexCluster("flex", 1, TimeUnit.MILLISECONDS, - updatedResources)); - return Messages.FlexClusterResponseProto.newBuilder().setResponse( - true).build(); + schedule(new ActionFlexCluster("flex", 1, TimeUnit.MILLISECONDS, request)); + return Messages.FlexComponentResponseProto.newBuilder().build(); } @Override //SliderClusterProtocol @@ -216,38 +215,10 @@ public class SliderIPCService extends AbstractService String result; //quick update //query and json-ify - ClusterDescription cd = state.refreshClusterStatus(); - result = cd.toJsonString(); - String stat = result; + Application application = state.refreshClusterStatus(); + String stat = jsonSerDeser.toJson(application); return Messages.GetJSONClusterStatusResponseProto.newBuilder() - .setClusterSpec(stat) - .build(); - } - - @Override - public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition( - Messages.GetInstanceDefinitionRequestProto request) - throws IOException, YarnException { - - onRpcCall("getinstancedefinition"); - String internal; - String resources; - String app; - AggregateConf instanceDefinition = - state.getInstanceDefinitionSnapshot(); - internal = instanceDefinition.getInternal().toJson(); - resources = instanceDefinition.getResources().toJson(); - app = instanceDefinition.getAppConf().toJson(); - assert internal != null; - assert resources != null; - assert app != null; - log.debug("Generating getInstanceDefinition Response"); - Messages.GetInstanceDefinitionResponseProto.Builder builder = - Messages.GetInstanceDefinitionResponseProto.newBuilder(); - builder.setInternal(internal); - builder.setResources(resources); - builder.setApplication(app); - return builder.build(); + .setClusterSpec(stat).build(); } @Override //SliderClusterProtocol http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java index 9a89c39..b31babc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java @@ -138,6 +138,7 @@ public class SecurityConfiguration { public File getKeytabFile(AggregateConf instanceDefinition) throws SliderException, IOException { + //TODO implement this for dash semantic String keytabFullPath = instanceDefinition.getAppConfOperations() .getComponent(SliderKeys.COMPONENT_AM) .get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
