YARN-5610. Initial code for native services REST API. Contributed by Gour Saha
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/689cbc40 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/689cbc40 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/689cbc40 Branch: refs/heads/yarn-native-services Commit: 689cbc40c11cb032f8470c148507538a8d709881 Parents: b627324 Author: Jian He <jia...@apache.org> Authored: Tue Oct 11 11:36:57 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Tue Dec 13 14:46:44 2016 -0800 ---------------------------------------------------------------------- hadoop-project/pom.xml | 20 + .../dev-support/findbugs-exclude.xml | 20 + .../hadoop-yarn-services-api/pom.xml | 225 +++ .../yarn/services/api/ApplicationApi.java | 38 + .../api/impl/ApplicationApiService.java | 1527 ++++++++++++++++++ .../yarn/services/resource/Application.java | 452 ++++++ .../services/resource/ApplicationState.java | 25 + .../services/resource/ApplicationStatus.java | 147 ++ .../hadoop/yarn/services/resource/Artifact.java | 155 ++ .../yarn/services/resource/BaseResource.java | 48 + .../yarn/services/resource/Component.java | 377 +++++ .../yarn/services/resource/ConfigFile.java | 190 +++ .../yarn/services/resource/Configuration.java | 147 ++ .../yarn/services/resource/Container.java | 256 +++ .../yarn/services/resource/ContainerState.java | 25 + .../hadoop/yarn/services/resource/Error.java | 125 ++ .../yarn/services/resource/PlacementPolicy.java | 97 ++ .../yarn/services/resource/ReadinessCheck.java | 161 ++ .../hadoop/yarn/services/resource/Resource.java | 149 ++ .../yarn/services/utils/RestApiConstants.java | 66 + .../services/utils/RestApiErrorMessages.java | 79 + .../services/webapp/ApplicationApiWebApp.java | 127 ++ .../src/main/resources/log4j-server.properties | 76 + .../resources/webapps/services-rest-api/app | 16 + .../src/main/scripts/run_rest_service.sh | 28 + .../src/main/webapp/WEB-INF/web.xml | 36 + .../api/impl/TestApplicationApiService.java | 232 +++ .../hadoop-yarn-applications/pom.xml | 2 +- 28 files changed, 4845 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/689cbc40/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 9ec24ea..8e8db00 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -129,6 +129,9 @@ <!-- the version of Hadoop declared in the version resources; can be overridden so that Hadoop 3.x can declare itself a 2.x artifact. --> <declared.hadoop.version>${pom.version}</declared.hadoop.version> + + <swagger-annotations-version>1.5.4</swagger-annotations-version> + <maven-doxia-module-markdown.version>1.4</maven-doxia-module-markdown.version> </properties> <dependencyManagement> @@ -1238,6 +1241,23 @@ <artifactId>kerb-simplekdc</artifactId> <version>1.0.0-RC2</version> </dependency> + + <dependency> + <groupId>io.swagger</groupId> + <artifactId>swagger-annotations</artifactId> + <version>${swagger-annotations-version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-json-provider</artifactId> + <version>${jackson2.version}</version> + </dependency> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-markdown</artifactId> + <version>${maven-doxia-module-markdown.version}</version> + </dependency> + </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/hadoop/blob/689cbc40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000..b89146a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<FindBugsFilter> + +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/689cbc40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml new file mode 100644 index 0000000..78b7855 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml @@ -0,0 +1,225 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-applications</artifactId> + <version>3.0.0-alpha2-SNAPSHOT</version> + </parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-services-api</artifactId> + <name>Apache Hadoop YARN Services API</name> + <version>3.0.0-alpha2-SNAPSHOT</version> + <packaging>jar</packaging> + <description>Hadoop YARN REST APIs for services</description> + + <properties> + <test.failIfNoTests>false</test.failIfNoTests> + <powermock.version>1.6.5</powermock.version> + </properties> + + <build> + + <!-- resources are filtered for dynamic updates. This gets build info in--> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + <resource> + <directory>src/main/scripts/</directory> + <filtering>true</filtering> + </resource> + </resources> + + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + <!-- The configuration of the plugin --> + <configuration> + <!-- Configuration of the archiver --> + <archive> + <manifestEntries> + <mode>development</mode> + <url>${project.url}</url> + </manifestEntries> + <!-- Manifest specific configuration --> + <manifest> + </manifest> + </archive> + <excludes> + <exclude>**/run_rest_service.sh</exclude> + </excludes> + </configuration> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <reuseForks>${test.reuseForks}</reuseForks> + <forkMode>${test.forkMode}</forkMode> + <forkCount>1</forkCount> + <forkedProcessTimeoutInSeconds>${test.forkedProcessTimeoutInSeconds} + </forkedProcessTimeoutInSeconds> + <threadCount>1</threadCount> + <argLine>${test.argLine}</argLine> + <failIfNoTests>${test.failIfNoTests}</failIfNoTests> + <redirectTestOutputToFile>${build.redirect.test.output.to.file}</redirectTestOutputToFile> + <environmentVariables> + <PATH>${test.env.path}</PATH> + </environmentVariables> + <systemPropertyVariables> + <java.net.preferIPv4Stack>true</java.net.preferIPv4Stack> + <java.awt.headless>true</java.awt.headless> + </systemPropertyVariables> + <includes> + <include>**/Test*.java</include> + </includes> + <excludes> + <exclude>**/Test*$*.java</exclude> + </excludes> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <version>${maven-site-plugin.version}</version> + + <dependencies> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-markdown</artifactId> + <version>${maven-doxia-module-markdown.version}</version> + </dependency> + </dependencies> + </plugin> + </plugins> + </build> + + <reporting> + </reporting> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-slider-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>io.swagger</groupId> + <artifactId>swagger-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-json-provider</artifactId> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-base</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-easymock</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <profiles> + + <profile> + <id>rat</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>${apache-rat-plugin.version}</version> + <executions> + <execution> + <id>check-licenses</id> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <excludes> + <exclude>**/*.json</exclude> + <exclude>**/THIRD-PARTY.properties</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/689cbc40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/ApplicationApi.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/ApplicationApi.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/ApplicationApi.java new file mode 100644 index 0000000..654413c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/ApplicationApi.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.api; + +import javax.ws.rs.core.Response; + +import org.apache.hadoop.yarn.services.resource.Application; + +/** + * Apache Hadoop YARN Services REST API interface. + * + */ +public interface ApplicationApi { + Response createApplication(Application application); + + Response getApplications(String state); + + Response getApplication(String appName); + + Response deleteApplication(String appName); + + Response updateApplication(String appName, Application updateAppData); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/689cbc40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/impl/ApplicationApiService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/impl/ApplicationApiService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/impl/ApplicationApiService.java new file mode 100644 index 0000000..9645696 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/api/impl/ApplicationApiService.java @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.api.impl; + +import static org.apache.hadoop.yarn.services.utils.RestApiConstants.*; +import static org.apache.hadoop.yarn.services.utils.RestApiErrorMessages.*; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.regex.Pattern; + +import javax.inject.Singleton; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.services.api.ApplicationApi; +import org.apache.hadoop.yarn.services.resource.Application; +import org.apache.hadoop.yarn.services.resource.ApplicationState; +import org.apache.hadoop.yarn.services.resource.ApplicationStatus; +import org.apache.hadoop.yarn.services.resource.Artifact; +import org.apache.hadoop.yarn.services.resource.Component; +import org.apache.hadoop.yarn.services.resource.ConfigFile; +import org.apache.hadoop.yarn.services.resource.Configuration; +import org.apache.hadoop.yarn.services.resource.Container; +import org.apache.hadoop.yarn.services.resource.ContainerState; +import org.apache.hadoop.yarn.services.resource.Resource; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.api.StateValues; +import org.apache.slider.client.SliderClient; +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.common.params.ActionCreateArgs; +import org.apache.slider.common.params.ActionFlexArgs; +import org.apache.slider.common.params.ActionFreezeArgs; +import org.apache.slider.common.params.ActionListArgs; +import org.apache.slider.common.params.ActionRegistryArgs; +import org.apache.slider.common.params.ActionStatusArgs; +import org.apache.slider.common.params.ActionThawArgs; +import org.apache.slider.common.params.ComponentArgsDelegate; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.common.tools.SliderVersionInfo; +import org.apache.slider.core.buildutils.BuildHelper; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.core.exceptions.NotFoundException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; +import org.apache.slider.core.registry.docstore.ConfigFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +@Singleton +@Path(APPLICATIONS_API_RESOURCE_PATH) +@Consumes({ MediaType.APPLICATION_JSON }) +@Produces({ MediaType.APPLICATION_JSON }) +public class ApplicationApiService implements ApplicationApi { + private static final Logger logger = LoggerFactory + .getLogger(ApplicationApiService.class); + private static org.apache.hadoop.conf.Configuration SLIDER_CONFIG; + private static UserGroupInformation SLIDER_USER; + private static SliderClient SLIDER_CLIENT; + + static { + init(); + } + + // initialize all the common resources - order is important + protected static void init() { + SLIDER_CONFIG = getSliderClientConfiguration(); + SLIDER_USER = getSliderUser(); + SLIDER_CLIENT = createSliderClient(); + } + + @GET + @Path("/slider-version") + @Consumes({ MediaType.APPLICATION_JSON }) + @Produces({ MediaType.APPLICATION_JSON }) + public Response getSliderVersion() { + logger.info("GET: getSliderVersion"); + + Map<String, Object> metadata = new HashMap<>(); + BuildHelper.addBuildMetadata(metadata, "org.apache.hadoop.yarn.services"); + String sliderVersion = metadata.toString(); + logger.info("Slider version = {}", sliderVersion); + String hadoopVersion = SliderVersionInfo.getHadoopVersionString(); + logger.info("Hadoop version = {}", hadoopVersion); + return Response.ok( + "{ \"slider_version\": \"" + sliderVersion + + "\", \"hadoop_version\": \"" + hadoopVersion + "\"}").build(); + } + + @POST + @Consumes({ MediaType.APPLICATION_JSON }) + @Produces({ MediaType.APPLICATION_JSON }) + public Response createApplication(Application application) { + logger.info("POST: createApplication for app = {}", application); + ApplicationStatus applicationStatus = new ApplicationStatus(); + + Map<String, String> compNameArtifactIdMap = new HashMap<>(); + // post payload validation + try { + validateApplicationPostPayload(application, compNameArtifactIdMap); + } catch (IllegalArgumentException e) { + applicationStatus.setDiagnostics(e.getMessage()); + return Response.status(Status.BAD_REQUEST).entity(applicationStatus) + .build(); + } + String applicationId = null; + try { + applicationId = createSliderApp(application, compNameArtifactIdMap); + applicationStatus.setState(ApplicationState.ACCEPTED); + } catch (SliderException se) { + logger.error("Create application failed", se); + if (se.getExitCode() == SliderExitCodes.EXIT_APPLICATION_IN_USE) { + applicationStatus.setDiagnostics(ERROR_APPLICATION_IN_USE); + return Response.status(Status.BAD_REQUEST).entity(applicationStatus) + .build(); + } else { + applicationStatus.setDiagnostics(se.getMessage()); + } + } catch (Exception e) { + logger.error("Create application failed", e); + applicationStatus.setDiagnostics(e.getMessage()); + } + + if (StringUtils.isNotEmpty(applicationId)) { + applicationStatus.setUri(CONTEXT_ROOT + APPLICATIONS_API_RESOURCE_PATH + + "/" + application.getName()); + // 202 = ACCEPTED + return Response.status(HTTP_STATUS_CODE_ACCEPTED) + .entity(applicationStatus).build(); + } else { + return Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(applicationStatus).build(); + } + } + + @VisibleForTesting + protected void validateApplicationPostPayload(Application application, + Map<String, String> compNameArtifactIdMap) { + if (StringUtils.isEmpty(application.getName())) { + throw new IllegalArgumentException(ERROR_APPLICATION_NAME_INVALID); + } + if (!SliderUtils.isClusternameValid(application.getName())) { + throw new IllegalArgumentException(ERROR_APPLICATION_NAME_INVALID_FORMAT); + } + + // If the application has no components do top-level checks + if (application.getComponents() == null) { + // artifact + if (application.getArtifact() == null) { + throw new IllegalArgumentException(ERROR_ARTIFACT_INVALID); + } + if (StringUtils.isEmpty(application.getArtifact().getId())) { + throw new IllegalArgumentException(ERROR_ARTIFACT_ID_INVALID); + } + + // If artifact is of type APPLICATION, add a slider specific property + if (application.getArtifact().getType() == Artifact.TypeEnum.APPLICATION) { + if (application.getConfiguration() == null) { + application.setConfiguration(new Configuration()); + } + addPropertyToConfiguration(application.getConfiguration(), + PROPERTY_COMPONENT_TYPE, COMPONENT_TYPE_EXTERNAL); + } + // resource + validateApplicationResource(application.getResource(), null, application + .getArtifact().getType()); + + // container size + if (application.getNumberOfContainers() == null) { + throw new IllegalArgumentException(ERROR_CONTAINERS_COUNT_INVALID); + } + } else { + // If the application has components, then run checks for each component. + // Let global values take effect if component level values are not + // provided. + Artifact globalArtifact = application.getArtifact(); + Resource globalResource = application.getResource(); + Long globalNumberOfContainers = application.getNumberOfContainers(); + for (Component comp : application.getComponents()) { + // artifact + if (comp.getArtifact() == null) { + comp.setArtifact(globalArtifact); + } + // If still null raise validation exception + if (comp.getArtifact() == null) { + throw new IllegalArgumentException(String.format( + ERROR_ARTIFACT_FOR_COMP_INVALID, comp.getName())); + } + if (StringUtils.isEmpty(comp.getArtifact().getId())) { + throw new IllegalArgumentException(String.format( + ERROR_ARTIFACT_ID_FOR_COMP_INVALID, comp.getName())); + } + + // If artifact is of type APPLICATION, add a slider specific property + if (comp.getArtifact().getType() == Artifact.TypeEnum.APPLICATION) { + if (comp.getConfiguration() == null) { + comp.setConfiguration(new Configuration()); + } + addPropertyToConfiguration(comp.getConfiguration(), + PROPERTY_COMPONENT_TYPE, COMPONENT_TYPE_EXTERNAL); + compNameArtifactIdMap.put(comp.getName(), comp.getArtifact().getId()); + comp.setName(comp.getArtifact().getId()); + } + + // resource + if (comp.getResource() == null) { + comp.setResource(globalResource); + } + validateApplicationResource(comp.getResource(), comp, comp + .getArtifact().getType()); + + // container count + if (comp.getNumberOfContainers() == null) { + comp.setNumberOfContainers(globalNumberOfContainers); + } + if (comp.getNumberOfContainers() == null) { + throw new IllegalArgumentException(String.format( + ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID, comp.getName())); + } + } + } + + // If it is a simple app with no components, then create a default component + if (application.getComponents() == null) { + application.setComponents(getDefaultComponentAsList()); + } + + // Application lifetime if not specified, is set to unlimited lifetime + if (application.getLifetime() == null) { + application.setLifetime(DEFAULT_UNLIMITED_LIFETIME); + } + } + + private void validateApplicationResource(Resource resource, Component comp, + Artifact.TypeEnum artifactType) { + // Only apps/components of type APPLICATION can skip resource requirement + if (resource == null && artifactType == Artifact.TypeEnum.APPLICATION) { + return; + } + if (resource == null) { + throw new IllegalArgumentException(comp == null ? ERROR_RESOURCE_INVALID + : String.format(ERROR_RESOURCE_FOR_COMP_INVALID, comp.getName())); + } + // One and only one of profile OR cpus & memory can be specified. Specifying + // both raises validation error. + if (StringUtils.isNotEmpty(resource.getProfile()) + && (resource.getCpus() != null + || StringUtils.isNotEmpty(resource.getMemory()))) { + throw new IllegalArgumentException( + comp == null ? ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_NOT_SUPPORTED + : String.format( + ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED, + comp.getName())); + } + // Currently resource profile is not supported yet, so we will raise + // validation error if only resource profile is specified + if (StringUtils.isNotEmpty(resource.getProfile())) { + throw new IllegalArgumentException( + ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET); + } + + String memory = resource.getMemory(); + Integer cpus = resource.getCpus(); + if (StringUtils.isEmpty(memory)) { + throw new IllegalArgumentException( + comp == null ? ERROR_RESOURCE_MEMORY_INVALID : String.format( + ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID, comp.getName())); + } + if (cpus == null) { + throw new IllegalArgumentException( + comp == null ? ERROR_RESOURCE_CPUS_INVALID : String.format( + ERROR_RESOURCE_CPUS_FOR_COMP_INVALID, comp.getName())); + } + if (cpus <= 0) { + throw new IllegalArgumentException( + comp == null ? ERROR_RESOURCE_CPUS_INVALID_RANGE : String.format( + ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE, comp.getName())); + } + } + + private String createSliderApp(Application application, + Map<String, String> compNameArtifactIdMap) throws IOException, + YarnException, InterruptedException { + final String appName = application.getName(); + final String queueName = application.getQueue(); + + final ActionCreateArgs createArgs = new ActionCreateArgs(); + addAppConfOptions(createArgs, application, compNameArtifactIdMap); + addResourceOptions(createArgs, application); + String metainfoJson = getMetainfoJson(application, compNameArtifactIdMap); + + createArgs.appMetaInfoJson = metainfoJson; + if (queueName != null && queueName.trim().length() > 0) { + createArgs.queue = queueName.trim(); + } + + return invokeSliderClientRunnable(new SliderClientContextRunnable<String>() { + @Override + public String run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + sliderClient.actionCreate(appName, createArgs); + ApplicationId applicationId = sliderClient.applicationId; + if (applicationId != null) { + return applicationId.toString(); + // return getApplicationIdString(applicationId); + } + return null; + } + }); + } + + private void addAppConfOptions(ActionCreateArgs createArgs, + Application application, Map<String, String> compNameArtifactIdMap) throws IOException { + List<String> appCompOptionTriples = createArgs.optionsDelegate.compOptTriples; // TODO: optionTuples instead of compOptTriples + logger.info("Initial appCompOptionTriples = {}", + Arrays.toString(appCompOptionTriples.toArray())); + List<String> appOptions = createArgs.optionsDelegate.optionTuples; + logger.info("Initial appOptions = {}", + Arrays.toString(appOptions.toArray())); + // TODO: Set Slider-AM memory and vcores here + // appCompOptionTriples.addAll(Arrays.asList(SLIDER_APPMASTER_COMPONENT_NAME, + // "", "")); + + // Global configuration - for override purpose + // TODO: add it to yaml + Configuration globalConfig = null; + // Configuration globalConfig = (Configuration) SerializationUtils + // .clone(application.getConfiguration()); + + // TODO: Add the below into globalConfig + // if (application.getConfigurations() != null) { + // for (Entry<String, String> entry : application.getConfigurations() + // .entrySet()) { + // globalConf.addProperty(entry.getKey(), entry.getValue()); + // } + // } + + Set<String> uniqueGlobalPropertyCache = new HashSet<>(); + if (application.getConfiguration() != null + && application.getConfiguration().getProperties() != null) { + for (Map.Entry<String, String> propEntry : application.getConfiguration() + .getProperties().entrySet()) { + addOptionsIfNotPresent(appOptions, uniqueGlobalPropertyCache, + propEntry.getKey(), propEntry.getValue()); + } + } + if (application.getComponents() != null) { + Map<String, String> placeholders = new HashMap<>(); + for (Component comp : application.getComponents()) { + if (Boolean.TRUE.equals(comp.getUniqueComponentSupport())) { + for (int i = 1; i <= comp.getNumberOfContainers(); i++) { + placeholders.put(PLACEHOLDER_COMPONENT_ID, Integer.toString(i)); + appCompOptionTriples.addAll(createAppConfigComponent( + comp.getName() + i, comp, comp.getName() + i, globalConfig, + placeholders, compNameArtifactIdMap)); + } + } else { + appCompOptionTriples.addAll(createAppConfigComponent(comp.getName(), + comp, comp.getName(), globalConfig, null, compNameArtifactIdMap)); + } + } + } + + logger.info("Updated appCompOptionTriples = {}", + Arrays.toString(appCompOptionTriples.toArray())); + logger.info("Updated appOptions = {}", + Arrays.toString(appOptions.toArray())); + } + + private void addOptionsIfNotPresent(List<String> options, + Set<String> uniqueGlobalPropertyCache, String key, String value) { + if (uniqueGlobalPropertyCache == null) { + options.addAll(Arrays.asList(key, value)); + } else if (!uniqueGlobalPropertyCache.contains(key)) { + options.addAll(Arrays.asList(key, value)); + uniqueGlobalPropertyCache.add(key); + } + } + + private void addPropertyToConfiguration(Configuration conf, String key, + String value) { + if (conf == null) { + return; + } + if (conf.getProperties() == null) { + conf.setProperties(new HashMap<String, String>()); + } + conf.getProperties().put(key, value); + } + + private List<String> createAppConfigComponent(String compName, + Component component, String configPrefix, Configuration globalConf, + Map<String, String> placeholders, + Map<String, String> compNameArtifactIdMap) { + List<String> appConfOptTriples = new ArrayList<>(); + + if (component.getConfiguration() != null + && component.getConfiguration().getProperties() != null) { + for (Map.Entry<String, String> propEntry : component.getConfiguration() + .getProperties().entrySet()) { + appConfOptTriples.addAll(Arrays.asList(compName, propEntry.getKey(), + replacePlaceholders(propEntry.getValue(), placeholders))); + } + } + + // If artifact is of type APPLICATION, then in the POST JSON there will + // be no component definition for that artifact. Hence it's corresponding id + // field is added. Every external APPLICATION has a unique id field. + List<String> convertedDeps = new ArrayList<>(); + for (String dep : component.getDependencies()) { + if (compNameArtifactIdMap.containsKey(dep)) { + convertedDeps.add(compNameArtifactIdMap.get(dep)); + } else { + convertedDeps.add(dep); + } + } + // If the DNS dependency property is set to true for a component, it means + // that it is ensured that DNS entry has been added for all the containers + // of this component, before moving on to the next component in the DAG. + if (hasPropertyWithValue(component, PROPERTY_DNS_DEPENDENCY, "true")) { + if (component.getArtifact().getType() == Artifact.TypeEnum.APPLICATION) { + convertedDeps.add(component.getArtifact().getId()); + } else { + convertedDeps.add(compName); + } + } + if (convertedDeps.size() > 0) { + appConfOptTriples.addAll(Arrays.asList(compName, "requires", + StringUtils.join(convertedDeps, ","))); + } + return appConfOptTriples; + } + + private String replacePlaceholders(String value, + Map<String, String> placeholders) { + if (StringUtils.isEmpty(value) || placeholders == null) { + return value; + } + for (Map.Entry<String, String> placeholder : placeholders.entrySet()) { + value = value.replaceAll(Pattern.quote(placeholder.getKey()), + placeholder.getValue()); + } + return value; + } + + private List<String> createAppConfigGlobal(Component component, + Configuration globalConf, Set<String> uniqueGlobalPropertyCache) { + List<String> appOptions = new ArrayList<>(); + if (component.getConfiguration() != null + && component.getConfiguration().getProperties() != null) { + for (Map.Entry<String, String> propEntry : component.getConfiguration() + .getProperties().entrySet()) { + addOptionsIfNotPresent(appOptions, uniqueGlobalPropertyCache, + propEntry.getKey(), propEntry.getValue()); + } + } + return appOptions; + } + + private void addResourceOptions(ActionCreateArgs createArgs, + Application application) throws IOException { + List<String> resCompOptionTriples = createArgs.optionsDelegate.resCompOptTriples; + logger.info("Initial resCompOptTriples = {}", + Arrays.toString(resCompOptionTriples.toArray())); + // TODO: Add any Slider AM resource specific props here like jvm.heapsize + // resCompOptionTriples.addAll(Arrays.asList(SLIDER_APPMASTER_COMPONENT_NAME, + // "", "")); + + // Global resource - for override purpose + Resource globalResource = (Resource) SerializationUtils.clone(application + .getResource()); + // Priority seeded with 1, expecting every new component will increase it by + // 1 making it ready for the next component to use. + if (application.getComponents() != null) { + int priority = 1; + for (Component comp : application.getComponents()) { + if (hasPropertyWithValue(comp, PROPERTY_COMPONENT_TYPE, + COMPONENT_TYPE_EXTERNAL)) { + continue; + } + if (Boolean.TRUE.equals(comp.getUniqueComponentSupport())) { + for (int i = 1; i <= comp.getNumberOfContainers(); i++) { + resCompOptionTriples.addAll(createResourcesComponent(comp.getName() + + i, comp, priority, 1, globalResource)); + priority++; + } + } else { + resCompOptionTriples.addAll(createResourcesComponent(comp.getName(), + comp, priority, comp.getNumberOfContainers(), globalResource)); + priority++; + } + } + } + + logger.info("Updated resCompOptTriples = {}", + Arrays.toString(resCompOptionTriples.toArray())); + } + + private boolean hasPropertyWithValue(Component comp, String key, String value) { + if (comp == null || key == null) { + return false; + } + if (comp.getConfiguration() == null + || comp.getConfiguration().getProperties() == null) { + return false; + } + Map<String, String> props = comp.getConfiguration().getProperties(); + if (props.containsKey(key)) { + if (value == null) { + return props.get(key) == null; + } else { + if (value.equals(props.get(key))) { + return true; + } + } + } + return false; + } + + private List<String> createResourcesComponent(String compName, + Component component, int priority, long numInstances, + Resource globalResource) { + String memory = component.getResource() == null ? globalResource + .getMemory() : component.getResource().getMemory(); + Integer cpus = component.getResource() == null ? globalResource.getCpus() + : component.getResource().getCpus(); + + List<String> resCompOptTriples = new ArrayList<String>(); + resCompOptTriples.addAll(Arrays.asList(compName, + ResourceKeys.COMPONENT_PRIORITY, Integer.toString(priority))); + resCompOptTriples.addAll(Arrays.asList(compName, + ResourceKeys.COMPONENT_INSTANCES, Long.toString(numInstances))); + resCompOptTriples.addAll(Arrays.asList(compName, ResourceKeys.YARN_MEMORY, + memory)); + resCompOptTriples.addAll(Arrays.asList(compName, ResourceKeys.YARN_CORES, + cpus.toString())); + if (component.getPlacementPolicy() != null) { + resCompOptTriples.addAll(Arrays.asList(compName, + ResourceKeys.COMPONENT_PLACEMENT_POLICY, + component.getPlacementPolicy().getLabel())); + } + + return resCompOptTriples; + } + + private String getMetainfoJson(Application application, + Map<String, String> compNameArtifactIdMap) throws SliderException, + IOException { + JsonObject rootObj = new JsonObject(); + rootObj.addProperty("schemaVersion", METAINFO_SCHEMA_VERSION); + JsonObject applicationObj = new JsonObject(); + rootObj.add("application", applicationObj); + applicationObj.addProperty("name", application.getName().toUpperCase()); + JsonArray componentsArray = new JsonArray(); + applicationObj.add("components", componentsArray); + JsonArray commandOrdersArray = new JsonArray(); + applicationObj.add("commandOrders", commandOrdersArray); + + JsonArray exportGroupsArray = new JsonArray(); + applicationObj.add("exportGroups", exportGroupsArray); + // Use only one export group + JsonObject exportGroup = new JsonObject(); + exportGroup.addProperty("name", EXPORT_GROUP_NAME); + exportGroupsArray.add(exportGroup); + JsonArray exportsArray = new JsonArray(); + exportGroup.add("exports", exportsArray); + + if (application.getComponents() != null) { + + // Set exports at application level + Map<String, String> appQuicklinks = application.getQuicklinks(); + Map<String, String> placeholders = new HashMap<>(); + placeholders.put(PLACEHOLDER_APP_NAME, application.getName()); + if (appQuicklinks != null) { + for (Map.Entry<String, String> quicklink : appQuicklinks.entrySet()) { + JsonObject export = new JsonObject(); + export.addProperty("name", quicklink.getKey()); + export.addProperty("value", + replacePlaceholders(quicklink.getValue(), placeholders)); + exportsArray.add(export); + } + } + + for (Component comp : application.getComponents()) { + JsonObject compObject = null; + if (!hasPropertyWithValue(comp, PROPERTY_COMPONENT_TYPE, + COMPONENT_TYPE_EXTERNAL)) { + if (Boolean.TRUE.equals(comp.getUniqueComponentSupport())) { + for (int i = 1; i <= comp.getNumberOfContainers(); i++) { + // we also need the capability to specify ports and mount points + // sometime + compObject = createMetainfoComponent(comp, application, + comp.getName() + i); + componentsArray.add(compObject); + } + } else { + compObject = createMetainfoComponent(comp, application, + comp.getName()); + componentsArray.add(compObject); + } + } + + // Translate dependencies into command orders + List<String> dependencies = comp.getDependencies(); + if (dependencies != null && !dependencies.isEmpty()) { + JsonObject commandOrder = new JsonObject(); + commandOrder.addProperty("command", comp.getName() + + COMMAND_ORDER_SUFFIX_START); + for (String dependency : dependencies) { + // If APPLICATION type artifact then map component name dependencies + // to artifact id + if (comp.getArtifact().getType() == Artifact.TypeEnum.APPLICATION) { + dependency = compNameArtifactIdMap.get(dependency); + } + commandOrder.addProperty("requires", dependency + + COMMAND_ORDER_SUFFIX_STARTED); + } + commandOrdersArray.add(commandOrder); + } + + // Quicklinks need to be added as appExports and componentExports at the + // component level + List<String> compQuicklinks = comp.getQuicklinks(); + if (compQuicklinks != null && !compQuicklinks.isEmpty()) { + if (MapUtils.isEmpty(appQuicklinks)) { + throw new SliderException(ERROR_QUICKLINKS_FOR_COMP_INVALID); + } + List<String> appExports = new ArrayList<>(); + JsonArray compExportsArray = new JsonArray(); + compObject.add("componentExports", compExportsArray); + + for (String quicklink : compQuicklinks) { + appExports.add(EXPORT_GROUP_NAME + "-" + quicklink); + + JsonObject compExport = new JsonObject(); + compExport.addProperty("name", quicklink); + compExport.addProperty("value", appQuicklinks.get(quicklink)); + compExportsArray.add(compExport); + } + compObject.addProperty("appExports", + StringUtils.join(appExports, ",")); + // specify that there are published configs for this component + compObject.addProperty("publishConfig", "true"); + } + } + } + + String jsonString = new GsonBuilder().setPrettyPrinting().create() + .toJson(rootObj); + logger.info("Metainfo = \n{}", jsonString); + return jsonString; + } + + private JsonObject createMetainfoComponent(Component comp, + Application application, String compName) { + JsonObject compObj = new JsonObject(); + compObj.addProperty("name", compName); + // below is diff for each type + if (comp.getArtifact() != null && comp.getArtifact().getType() != null + && comp.getArtifact().getType() == Artifact.TypeEnum.DOCKER) { + compObj.addProperty("type", COMPONENT_TYPE_YARN_DOCKER); + JsonArray dockerContainerArray = new JsonArray(); + compObj.add("dockerContainers", dockerContainerArray); + JsonObject dockerContainerObj = new JsonObject(); + dockerContainerArray.add(dockerContainerObj); + dockerContainerObj.addProperty("name", compName.toLowerCase()); + // if image not specified, then use global value + dockerContainerObj.addProperty("image", + comp.getArtifact().getId() == null ? application.getArtifact() + .getId() : comp.getArtifact().getId()); + // If launch command not specified, then use global value. Resolve all + // placeholders. + Map<String, String> placeholders = new HashMap<>(); + placeholders.put(PLACEHOLDER_APP_NAME, application.getName()); + placeholders.put(PLACEHOLDER_APP_COMPONENT_NAME, compName); + dockerContainerObj.addProperty( + "startCommand", + comp.getLaunchCommand() == null ? replacePlaceholders( + application.getLaunchCommand(), placeholders) + : replacePlaceholders(comp.getLaunchCommand(), placeholders)); + dockerContainerObj.addProperty("network", DEFAULT_NETWORK); + dockerContainerObj.addProperty("commandPath", DEFAULT_COMMAND_PATH); + // TODO: What to do with privContainer ? + dockerContainerObj.addProperty("runPrivilegedContainer", + comp.getRunPrivilegedContainer()); + if (comp.getConfiguration() != null) { + List<ConfigFile> configFiles = comp.getConfiguration().getFiles(); + if (configFiles != null && !configFiles.isEmpty()) { + JsonArray configFileArray = new JsonArray(); + for (ConfigFile configFile : configFiles) { + JsonObject configFileObj = new JsonObject(); + configFileObj.addProperty("type", configFile.getType().toString()); + configFileObj.addProperty("fileName", configFile.getDestFile()); + // TODO: add all properties which should include dictionaryName + configFileObj.addProperty("dictionaryName", + configFile.getSrcFile()); + configFileArray.add(configFileObj); + } + dockerContainerObj.add("configFiles", configFileArray); + } + } + // we also need to specify artifact_management_service sometime + } + // we also need the capability to specify ports and mount points sometime + return compObj; + } + + private static UserGroupInformation getSliderUser() { + if (SLIDER_USER != null) { + return SLIDER_USER; + } + UserGroupInformation sliderUser = null; + UserGroupInformation.setConfiguration(SLIDER_CONFIG); + String loggedInUser = getUserToRunAs(); + try { + sliderUser = UserGroupInformation.getBestUGI(null, loggedInUser); + // TODO: Once plugged into RM process we should remove the previous call + // and replace it with getCurrentUser as commented below. + // sliderUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException("Unable to create UGI (slider user)", e); + } + return sliderUser; + } + + private <T> T invokeSliderClientRunnable( + final SliderClientContextRunnable<T> runnable) + throws IOException, InterruptedException, YarnException { + try { + T value = SLIDER_USER.doAs(new PrivilegedExceptionAction<T>() { + @Override + public T run() throws Exception { + return runnable.run(SLIDER_CLIENT); + } + }); + return value; + } catch (UndeclaredThrowableException e) { + Throwable cause = e.getCause(); + if (cause instanceof YarnException) { + YarnException ye = (YarnException) cause; + throw ye; + } + throw e; + } + } + + protected static SliderClient createSliderClient() { + if (SLIDER_CLIENT != null) { + return SLIDER_CLIENT; + } + org.apache.hadoop.conf.Configuration sliderClientConfiguration = SLIDER_CONFIG; + SliderClient client = new SliderClient() { + @Override + public void init(org.apache.hadoop.conf.Configuration conf) { + super.init(conf); + try { + initHadoopBinding(); + } catch (SliderException e) { + throw new RuntimeException( + "Unable to automatically init Hadoop binding", e); + } catch (IOException e) { + throw new RuntimeException( + "Unable to automatically init Hadoop binding", e); + } + } + }; + try { + logger + .debug("Slider Client configuration: {}", sliderClientConfiguration); + sliderClientConfiguration = client.bindArgs(sliderClientConfiguration, + new String[] { "help" }); + client.init(sliderClientConfiguration); + client.start(); + } catch (Exception e) { + logger.error("Unable to create SliderClient", e); + throw new RuntimeException(e.getMessage(), e); + } + return client; + } + + private static String getUserToRunAs() { + String user = System.getenv(PROPERTY_APP_RUNAS_USER); + if (StringUtils.isEmpty(user)) { + user = "root"; + } + return user; + } + + private static org.apache.hadoop.conf.Configuration getSliderClientConfiguration() { + if (SLIDER_CONFIG != null) { + return SLIDER_CONFIG; + } + YarnConfiguration yarnConfig = new YarnConfiguration(); + logger.info("prop yarn.resourcemanager.address = {}", + yarnConfig.get("yarn.resourcemanager.address")); + + return yarnConfig; + } + + private interface SliderClientContextRunnable<T> { + T run(SliderClient sliderClient) + throws YarnException, IOException, InterruptedException; + } + + @GET + @Consumes({ MediaType.APPLICATION_JSON }) + @Produces({ MediaType.APPLICATION_JSON }) + public Response getApplications(@QueryParam("state") String state) { + logger.info("GET: getApplications with param state = {}", state); + + // Get all applications in a specific state - lighter projection. For full + // detail, call getApplication on a specific app. + Set<String> applications; + try { + if (StringUtils.isNotEmpty(state)) { + ApplicationStatus appStatus = new ApplicationStatus(); + try { + ApplicationState.valueOf(state); + } catch (IllegalArgumentException e) { + appStatus.setDiagnostics("Invalid value for param state - " + state); + return Response.status(Status.BAD_REQUEST).entity(appStatus).build(); + } + applications = getSliderApplications(state); + } else { + applications = getSliderApplications(true); + } + } catch (Exception e) { + logger.error("Get applications failed", e); + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + + Set<Application> apps = new HashSet<Application>(); + if (applications.size() > 0) { + try { + for (String app : applications) { + Application application = new Application(); + // TODO: Need to get lifetime, launch-time and privileged container + // status from YARN + application.setLifetime(null); + application.setLaunchTime(new Date()); + application.setName(app); + // Containers not required, setting to null to avoid empty list + application.setContainers(null); + apps.add(application); + } + } catch (Exception e) { + logger.error("Get applications failed", e); + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + } + + return Response.ok().entity(apps).build(); + } + + @GET + @Path("/{app_name}") + @Consumes({ MediaType.APPLICATION_JSON }) + @Produces({ MediaType.APPLICATION_JSON }) + public Response getApplication(@PathParam("app_name") String appName) { + logger.info("GET: getApplication for appName = {}", appName); + + // app name validation + if (!SliderUtils.isClusternameValid(appName)) { + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics("Invalid application name"); + applicationStatus.setCode(ERROR_CODE_APP_NAME_INVALID); + return Response.status(Status.NOT_FOUND).entity(applicationStatus) + .build(); + } + + // Check if app exists + try { + int livenessCheck = getSliderList(appName); + if (livenessCheck < 0) { + logger.info("Application not running"); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics(ERROR_APPLICATION_NOT_RUNNING); + applicationStatus.setCode(ERROR_CODE_APP_IS_NOT_RUNNING); + return Response.status(Status.NOT_FOUND).entity(applicationStatus) + .build(); + } + } catch (UnknownApplicationInstanceException e) { + logger.error("Get application failed, application not found", e); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics(ERROR_APPLICATION_DOES_NOT_EXIST); + applicationStatus.setCode(ERROR_CODE_APP_DOES_NOT_EXIST); + return Response.status(Status.NOT_FOUND).entity(applicationStatus) + .build(); + } catch (Exception e) { + logger.error("Get application failed, application not running", e); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics(ERROR_APPLICATION_NOT_RUNNING); + applicationStatus.setCode(ERROR_CODE_APP_IS_NOT_RUNNING); + return Response.status(Status.NOT_FOUND).entity(applicationStatus) + .build(); + } + + Application app = new Application(); + app.setName(appName); + app.setUri(CONTEXT_ROOT + APPLICATIONS_API_RESOURCE_PATH + "/" + + appName); + // TODO: add status + app.setState(ApplicationState.ACCEPTED); + JsonObject appStatus = null; + JsonObject appRegistryDocker = null; + JsonObject appRegistryQuicklinks = null; + try { + appStatus = getSliderApplicationStatus(appName); + appRegistryDocker = getSliderApplicationRegistry(appName, "docker"); + appRegistryQuicklinks = getSliderApplicationRegistry(appName, + "quicklinks"); + return populateAppData(app, appStatus, appRegistryDocker, + appRegistryQuicklinks); + } catch (BadClusterStateException | NotFoundException e) { + logger.error( + "Get application failed, application not in running state yet", e); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics("Application not running yet"); + applicationStatus.setCode(ERROR_CODE_APP_SUBMITTED_BUT_NOT_RUNNING_YET); + return Response.status(Status.NOT_FOUND).entity(applicationStatus) + .build(); + } catch (Exception e) { + logger.error("Get application failed", e); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics("Failed to retrieve application: " + + e.getMessage()); + return Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(applicationStatus).build(); + } + } + + private Response populateAppData(Application app, JsonObject appStatus, + JsonObject appRegistryDocker, JsonObject appRegistryQuicklinks) { + String appName = jsonGetAsString(appStatus, "name"); + Long totalNumberOfRunningContainers = 0L; + Long totalExpectedNumberOfRunningContainers = 0L; + Long totalNumberOfIpAssignedContainers = 0L; + + // info + JsonObject applicationInfo = jsonGetAsObject(appStatus, "info"); + if (applicationInfo != null) { + String applicationId = jsonGetAsString(applicationInfo, "info.am.app.id"); + if (applicationId != null) { + app.setId(applicationId); + } + } + + // state + String appState = jsonGetAsString(appStatus, "state"); + switch (Integer.parseInt(appState)) { + case StateValues.STATE_LIVE: + app.setState(ApplicationState.STARTED); + break; + case StateValues.STATE_CREATED: + case StateValues.STATE_INCOMPLETE: + case StateValues.STATE_SUBMITTED: + app.setState(ApplicationState.ACCEPTED); + return Response.ok(app).build(); + case StateValues.STATE_DESTROYED: + case StateValues.STATE_STOPPED: + app.setState(ApplicationState.STOPPED); + return Response.ok(app).build(); + default: + break; + } + + // start time + app.setLaunchTime(appStatus.get("createTime") == null ? null + : new Date(appStatus.get("createTime").getAsLong())); + + // lifetime - set it to unlimited for now + // TODO: Once YARN-3813 and YARN-4205 are available - get it from YARN + app.setLifetime(DEFAULT_UNLIMITED_LIFETIME); + + // Quicklinks + Map<String, String> appQuicklinks = new HashMap<>(); + for (Map.Entry<String, JsonElement> quicklink : appRegistryQuicklinks + .entrySet()) { + appQuicklinks.put(quicklink.getKey(), quicklink.getValue() == null ? null + : quicklink.getValue().getAsString()); + } + if (!appQuicklinks.isEmpty()) { + app.setQuicklinks(appQuicklinks); + } + + ArrayList<String> componentNames = new ArrayList<>(); + + // status.live + JsonObject applicationStatus = jsonGetAsObject(appStatus, "status"); + // roles + JsonObject applicationRoles = jsonGetAsObject(appStatus, "roles"); + // statistics + JsonObject applicationStatistics = jsonGetAsObject(appStatus, "statistics"); + if (applicationRoles == null) { + // initialize to empty object to avoid too many null checks + applicationRoles = new JsonObject(); + } + if (applicationStatus != null) { + JsonObject applicationLive = jsonGetAsObject(applicationStatus, "live"); + if (applicationLive != null) { + for (Entry<String, JsonElement> entry : applicationLive.entrySet()) { + if (entry.getKey().equals(SLIDER_APPMASTER_COMPONENT_NAME)) { + continue; + } + componentNames.add(entry.getKey()); + JsonObject componentRole = applicationRoles.get(entry.getKey()) == null ? new JsonObject() + : applicationRoles.get(entry.getKey()).getAsJsonObject(); + JsonObject liveContainers = entry.getValue().getAsJsonObject(); + if (liveContainers != null) { + for (Map.Entry<String, JsonElement> liveContainerEntry : liveContainers + .entrySet()) { + String containerId = liveContainerEntry.getKey(); + Container container = new Container(); + container.setId(containerId); + JsonObject liveContainer = (JsonObject) liveContainerEntry + .getValue(); + container + .setLaunchTime(liveContainer.get("startTime") == null ? null + : new Date(liveContainer.get("startTime").getAsLong())); + container + .setComponentName(jsonGetAsString(liveContainer, "role")); + container.setIp(jsonGetAsString(liveContainer, "ip")); + // If ip is non-null increment count + if (container.getIp() != null) { + totalNumberOfIpAssignedContainers++; + } + container.setHostname(jsonGetAsString(liveContainer, "hostname")); + container.setState(ContainerState.INIT); + if (StringUtils.isNotEmpty(container.getIp()) + && StringUtils.isNotEmpty(container.getHostname())) { + container.setState(ContainerState.READY); + } + container.setBareHost(jsonGetAsString(liveContainer, "host")); + container.setUri(CONTEXT_ROOT + APPLICATIONS_API_RESOURCE_PATH + + "/" + appName + CONTAINERS_API_RESOURCE_PATH + "/" + + containerId); + Resource resource = new Resource(); + resource.setCpus(jsonGetAsInt(componentRole, "yarn.vcores")); + resource.setMemory(jsonGetAsString(componentRole, "yarn.memory")); + container.setResource(resource); + // TODO: add container property - for response only? + app.addContainer(container); + } + } + } + } + } + + // application info + if (applicationRoles != null && !componentNames.isEmpty()) { + JsonObject applicationRole = jsonGetAsObject(applicationRoles, + componentNames.get(0)); + if (applicationRole != null) { + Artifact artifact = new Artifact(); + // how to get artifact id - docker image name?? + artifact.setId(null); + } + } + + // actual and expected number of containers + if (applicationStatistics != null) { + for (Entry<String, JsonElement> entry : applicationStatistics.entrySet()) { + if (entry.getKey().equals(SLIDER_APPMASTER_COMPONENT_NAME)) { + continue; + } + JsonObject containerStats = (JsonObject) entry.getValue(); + totalNumberOfRunningContainers += jsonGetAsInt(containerStats, + "containers.live"); + totalExpectedNumberOfRunningContainers += jsonGetAsInt(containerStats, + "containers.desired"); + } + app.setNumberOfContainers(totalExpectedNumberOfRunningContainers); + app.setNumberOfRunningContainers(totalNumberOfRunningContainers); + } + + // If all containers of the app has IP assigned, then according to the REST + // API it is considered to be READY. Note, application readiness from + // end-users point of view, is out of scope of the REST API. Also, this + // readiness has nothing to do with readiness-check defined at the component + // level (which is used for dependency resolution of component DAG). + if (totalNumberOfIpAssignedContainers == totalExpectedNumberOfRunningContainers) { + app.setState(ApplicationState.READY); + } + logger.info("Application = {}", app); + return Response.ok(app).build(); + } + + private String jsonGetAsString(JsonObject object, String key) { + return object.get(key) == null ? null : object.get(key).getAsString(); + } + + private Integer jsonGetAsInt(JsonObject object, String key) { + return object.get(key) == null ? null + : object.get(key).isJsonNull() ? null : object.get(key).getAsInt(); + } + + private JsonObject jsonGetAsObject(JsonObject object, String key) { + return object.get(key) == null ? null : object.get(key).getAsJsonObject(); + } + + private JsonObject getSliderApplicationStatus(final String appName) + throws IOException, YarnException, InterruptedException { + final File appStatusOutputFile = File.createTempFile("status_", ".json"); + final ActionStatusArgs statusArgs = new ActionStatusArgs(); + statusArgs.output = appStatusOutputFile.getAbsolutePath(); + + return invokeSliderClientRunnable(new SliderClientContextRunnable<JsonObject>() { + @Override + public JsonObject run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + sliderClient.actionStatus(appName, statusArgs); + JsonParser parser = new JsonParser(); + FileReader reader = null; + JsonElement statusElement = null; + try { + reader = new FileReader(appStatusOutputFile); + statusElement = parser.parse(reader); + } finally { + if (reader != null) { + reader.close(); + } + appStatusOutputFile.delete(); + } + return (statusElement == null || statusElement instanceof JsonNull) ? + new JsonObject() : (JsonObject) statusElement; + } + }); + } + + private JsonObject getSliderApplicationRegistry(final String appName, + final String registryName) throws IOException, YarnException, + InterruptedException { + final File appRegistryOutputFile = File + .createTempFile("registry_", ".json"); + final ActionRegistryArgs registryArgs = new ActionRegistryArgs(); + registryArgs.out = appRegistryOutputFile; + registryArgs.name = appName; + registryArgs.getConf = registryName; + registryArgs.format = ConfigFormat.JSON.toString(); + + return invokeSliderClientRunnable(new SliderClientContextRunnable<JsonObject>() { + @Override + public JsonObject run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + sliderClient.actionRegistry(registryArgs); + JsonParser parser = new JsonParser(); + FileReader reader = null; + JsonElement registryElement = null; + try { + reader = new FileReader(appRegistryOutputFile); + registryElement = parser.parse(reader); + } catch (Throwable t) { + logger.error("Error reading file {}", appRegistryOutputFile); + } finally { + if (reader != null) { + reader.close(); + } + appRegistryOutputFile.delete(); + } + return (registryElement == null || registryElement instanceof JsonNull) ? + new JsonObject() : (JsonObject) registryElement; + } + }); + } + + private Integer getSliderList(final String appName) + throws IOException, YarnException, InterruptedException { + return getSliderList(appName, true); + } + + private Integer getSliderList(final String appName, final boolean liveOnly) + throws IOException, YarnException, InterruptedException { + return invokeSliderClientRunnable(new SliderClientContextRunnable<Integer>() { + @Override + public Integer run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + int status = 0; + if (liveOnly) { + status = sliderClient.actionList(appName); + } else { + ActionListArgs listArgs = new ActionListArgs(); + status = sliderClient.actionList(appName, listArgs); + } + return status; + } + }); + } + + private Set<String> getSliderApplications(final String state) + throws IOException, YarnException, InterruptedException { + return getSliderApplications(false, state); + } + + private Set<String> getSliderApplications(final boolean liveOnly) + throws IOException, YarnException, InterruptedException { + return getSliderApplications(liveOnly, null); + } + + private Set<String> getSliderApplications(final boolean liveOnly, + final String state) throws IOException, YarnException, + InterruptedException { + return invokeSliderClientRunnable(new SliderClientContextRunnable<Set<String>>() { + @Override + public Set<String> run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + Set<String> apps; + ActionListArgs listArgs = new ActionListArgs(); + if (liveOnly) { + apps = sliderClient.getApplicationList(null); + } else if (StringUtils.isNotEmpty(state)) { + listArgs.state = state; + apps = sliderClient.getApplicationList(null, listArgs); + } else { + apps = sliderClient.getApplicationList(null, listArgs); + } + return apps; + } + }); + } + + @DELETE + @Path("/{app_name}") + @Consumes({ MediaType.APPLICATION_JSON }) + @Produces({ MediaType.APPLICATION_JSON }) + public Response deleteApplication(@PathParam("app_name") String appName) { + logger.info("DELETE: deleteApplication for appName = {}", appName); + + try { + Response stopResponse = stopSliderApplication(appName); + if (stopResponse.getStatus() == Status.INTERNAL_SERVER_ERROR + .getStatusCode()) { + return Response.status(Status.NOT_FOUND).build(); + } + } catch (UnknownApplicationInstanceException e) { + logger.error("Application does not exist", e); + return Response.status(Status.NOT_FOUND).build(); + } catch (Exception e) { + logger.error("Delete application failed", e); + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + + // Although slider client stop returns immediately, it usually takes a + // little longer for it to stop from YARN point of view. Slider destroy + // fails if the application is not completely stopped. Hence the need to + // call destroy in a controlled loop few times (only if exit code is + // EXIT_APPLICATION_IN_USE), before giving up. + boolean keepTrying = true; + int maxDeleteAttempt = 5; + int deleteAttempt = 0; + while (keepTrying && deleteAttempt < maxDeleteAttempt) { + try { + destroySliderApplication(appName); + keepTrying = false; + } catch (SliderException e) { + logger.error("Delete application threw exception", e); + if (e.getExitCode() == SliderExitCodes.EXIT_APPLICATION_IN_USE) { + deleteAttempt++; + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + } + } else { + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + } catch (Exception e) { + logger.error("Delete application failed", e); + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + } + return Response.status(Status.NO_CONTENT).build(); + } + + private Response stopSliderApplication(final String appName) + throws IOException, YarnException, InterruptedException { + return invokeSliderClientRunnable(new SliderClientContextRunnable<Response>() { + @Override + public Response run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + ActionFreezeArgs freezeArgs = new ActionFreezeArgs(); + int returnCode = sliderClient.actionFreeze(appName, freezeArgs); + if (returnCode == 0) { + logger.info("Successfully stopped application {}", appName); + return Response.status(Status.NO_CONTENT).build(); + } else { + logger.error("Stop of application {} failed with return code ", + appName, returnCode); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics("Stop of application " + appName + + " failed"); + return Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(applicationStatus).build(); + } + } + }); + } + + private Response startSliderApplication(final String appName) + throws IOException, YarnException, InterruptedException { + return invokeSliderClientRunnable(new SliderClientContextRunnable<Response>() { + @Override + public Response run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + ActionThawArgs thawArgs = new ActionThawArgs(); + int returnCode = sliderClient.actionThaw(appName, thawArgs); + if (returnCode == 0) { + logger.info("Successfully started application {}", appName); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setState(ApplicationState.ACCEPTED); + applicationStatus.setUri(CONTEXT_ROOT + + APPLICATIONS_API_RESOURCE_PATH + "/" + appName); + // 202 = ACCEPTED + return Response.status(HTTP_STATUS_CODE_ACCEPTED) + .entity(applicationStatus).build(); + } else { + logger.error("Start of application {} failed with returnCode ", + appName, returnCode); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics("Start of application " + appName + + " failed"); + return Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(applicationStatus).build(); + } + } + }); + } + + private Void destroySliderApplication(final String appName) + throws IOException, YarnException, InterruptedException { + return invokeSliderClientRunnable(new SliderClientContextRunnable<Void>() { + @Override + public Void run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + sliderClient.actionDestroy(appName); + return null; + } + }); + } + + @PUT + @Path("/{app_name}") + @Consumes({ MediaType.APPLICATION_JSON }) + @Produces({ MediaType.APPLICATION_JSON }) + public Response updateApplication(@PathParam("app_name") String appName, + Application updateAppData) { + logger.info("PUT: updateApplication for app = {} with data = {}", appName, + updateAppData); + + // Ignore the app name provided in updateAppData and always use appName + // path param + updateAppData.setName(appName); + + // Adding support for stop and start + // For STOP the app should be running. If already stopped then this + // operation will be a no-op. For START it should be in stopped state. + // If already running then this operation will be a no-op. + + // Check if app exists in any state + try { + int appsFound = getSliderList(appName, false); + if (appsFound < 0) { + return Response.status(Status.NOT_FOUND).build(); + } + } catch (Exception e) { + logger.error("Update application failed", e); + return Response.status(Status.NOT_FOUND).build(); + } + + // If a STOP is requested + if (updateAppData.getState() != null + && updateAppData.getState() == ApplicationState.STOPPED) { + try { + int livenessCheck = getSliderList(appName); + if (livenessCheck == 0) { + return stopSliderApplication(appName); + } else { + logger.info("Application {} is already stopped", appName); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics("Application " + appName + + " is already stopped"); + return Response.status(Status.BAD_REQUEST).entity(applicationStatus) + .build(); + } + } catch (Exception e) { + logger.error("Stop application failed", e); + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + } + + // If a START is requested + if (updateAppData.getState() != null + && updateAppData.getState() == ApplicationState.STARTED) { + try { + int livenessCheck = getSliderList(appName); + if (livenessCheck != 0) { + return startSliderApplication(appName); + } else { + logger.info("Application {} is already running", appName); + ApplicationStatus applicationStatus = new ApplicationStatus(); + applicationStatus.setDiagnostics("Application " + appName + + " is already running"); + applicationStatus.setUri(CONTEXT_ROOT + + APPLICATIONS_API_RESOURCE_PATH + "/" + appName); + return Response.status(Status.BAD_REQUEST).entity(applicationStatus) + .build(); + } + } catch (Exception e) { + logger.error("Start application failed", e); + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + } + + // If no of instances specified then treat it as a flex + if (updateAppData.getNumberOfContainers() != null + && updateAppData.getComponents() == null) { + updateAppData.setComponents(getDefaultComponentAsList()); + } + + // At this point if there are components then it is a flex + if (updateAppData.getComponents() != null) { + try { + int livenessCheck = getSliderList(appName); + if (livenessCheck == 0) { + flexSliderApplication(appName, updateAppData); + } + return Response.status(Status.NO_CONTENT).build(); + } catch (Exception e) { + logger.error("Update application failed", e); + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + } + + // If new lifetime value specified then update it + if (updateAppData.getLifetime() != null) { + // TODO: Once YARN-3813 and YARN-4205 are available + } + + // If nothing happens consider it a no-op + return Response.status(Status.NO_CONTENT).build(); + } + + private List<Component> getDefaultComponentAsList() { + Component comp = new Component(); + comp.setName(DEFAULT_COMPONENT_NAME); + List<Component> comps = new ArrayList<>(); + comps.add(comp); + return comps; + } + + private Void flexSliderApplication(final String appName, + final Application updateAppData) throws IOException, YarnException, + InterruptedException { + return invokeSliderClientRunnable(new SliderClientContextRunnable<Void>() { + @Override + public Void run(SliderClient sliderClient) throws YarnException, + IOException, InterruptedException { + ActionFlexArgs flexArgs = new ActionFlexArgs(); + ComponentArgsDelegate compDelegate = new ComponentArgsDelegate(); + Long globalNumberOfContainers = updateAppData.getNumberOfContainers(); + for (Component comp : updateAppData.getComponents()) { + Long noOfContainers = comp.getNumberOfContainers() == null + ? globalNumberOfContainers : comp.getNumberOfContainers(); + if (noOfContainers != null) { + compDelegate.componentTuples.addAll( + Arrays.asList(comp.getName(), String.valueOf(noOfContainers))); + } + } + if (!compDelegate.componentTuples.isEmpty()) { + flexArgs.componentDelegate = compDelegate; + sliderClient.actionFlex(appName, flexArgs); + } + return null; + } + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org