YARN-7530. Refactored YARN service API project location. Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a23ff8d8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a23ff8d8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a23ff8d8 Branch: refs/heads/HDDS-48 Commit: a23ff8d88001ad8e4ac4c36fc1f7691d193dc1d0 Parents: 89f5911 Author: Eric Yang <ey...@apache.org> Authored: Fri May 18 17:29:10 2018 -0400 Committer: Eric Yang <ey...@apache.org> Committed: Fri May 18 17:29:10 2018 -0400 ---------------------------------------------------------------------- .../resources/assemblies/hadoop-yarn-dist.xml | 2 +- .../dev-support/findbugs-exclude.xml | 20 - .../hadoop-yarn-services-api/pom.xml | 144 ---- .../yarn/service/client/ApiServiceClient.java | 598 -------------- .../client/SystemServiceManagerImpl.java | 391 --------- .../yarn/service/client/package-info.java | 28 - .../hadoop/yarn/service/webapp/ApiServer.java | 818 ------------------- .../yarn/service/webapp/ApiServerWebApp.java | 161 ---- .../yarn/service/webapp/package-info.java | 28 - .../definition/YARN-Services-Examples.md | 444 ---------- ...RN-Simplified-V1-API-Layer-For-Services.yaml | 594 -------------- .../src/main/resources/log4j-server.properties | 76 -- .../src/main/resources/webapps/api-server/app | 16 - .../src/main/webapp/WEB-INF/web.xml | 36 - .../hadoop/yarn/service/ServiceClientTest.java | 210 ----- .../hadoop/yarn/service/TestApiServer.java | 623 -------------- .../service/client/TestApiServiceClient.java | 314 ------- .../client/TestSystemServiceManagerImpl.java | 182 ----- .../src/test/resources/example-app.json | 16 - .../src/test/resources/log4j.properties | 19 - .../resources/system-services/bad/bad.yarnfile | 16 - .../sync/user1/example-app1.yarnfile | 16 - .../sync/user1/example-app2.yarnfile | 16 - .../sync/user1/example-app3.json | 16 - .../sync/user2/example-app1.yarnfile | 16 - .../sync/user2/example-app2.yarnfile | 16 - .../dev-support/findbugs-exclude.xml | 20 + .../hadoop-yarn-services-api/pom.xml | 144 ++++ .../yarn/service/client/ApiServiceClient.java | 598 ++++++++++++++ .../client/SystemServiceManagerImpl.java | 391 +++++++++ .../yarn/service/client/package-info.java | 28 + .../hadoop/yarn/service/webapp/ApiServer.java | 818 +++++++++++++++++++ .../yarn/service/webapp/ApiServerWebApp.java | 161 ++++ .../yarn/service/webapp/package-info.java | 28 + .../definition/YARN-Services-Examples.md | 444 ++++++++++ ...RN-Simplified-V1-API-Layer-For-Services.yaml | 594 ++++++++++++++ .../src/main/resources/log4j-server.properties | 76 ++ .../src/main/resources/webapps/api-server/app | 16 + .../src/main/webapp/WEB-INF/web.xml | 36 + .../hadoop/yarn/service/ServiceClientTest.java | 210 +++++ .../hadoop/yarn/service/TestApiServer.java | 623 ++++++++++++++ .../service/client/TestApiServiceClient.java | 314 +++++++ .../client/TestSystemServiceManagerImpl.java | 182 +++++ .../src/test/resources/example-app.json | 16 + .../src/test/resources/log4j.properties | 19 + .../resources/system-services/bad/bad.yarnfile | 16 + .../sync/user1/example-app1.yarnfile | 16 + .../sync/user1/example-app2.yarnfile | 16 + .../sync/user1/example-app3.json | 16 + .../sync/user2/example-app1.yarnfile | 16 + .../sync/user2/example-app2.yarnfile | 16 + .../hadoop-yarn-services/pom.xml | 1 + .../hadoop-yarn-applications/pom.xml | 1 - 53 files changed, 4816 insertions(+), 4816 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml ---------------------------------------------------------------------- diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index 382c967..a2ea08c 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -105,7 +105,7 @@ </includes> </fileSet> <fileSet> - <directory>hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/target</directory> + <directory>hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/target</directory> <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory> <includes> <include>*-sources.jar</include> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml deleted file mode 100644 index b89146a..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml +++ /dev/null @@ -1,20 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<FindBugsFilter> - -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml deleted file mode 100644 index 354c9b5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml +++ /dev/null @@ -1,144 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-applications</artifactId> - <version>3.2.0-SNAPSHOT</version> - </parent> - <artifactId>hadoop-yarn-services-api</artifactId> - <name>Apache Hadoop YARN Services API</name> - <packaging>jar</packaging> - <description>Hadoop YARN REST APIs for services</description> - - <build> - - <!-- resources are filtered for dynamic updates. This gets build info in--> - <resources> - <resource> - <directory>src/main/resources</directory> - <filtering>true</filtering> - </resource> - <resource> - <directory>src/main/scripts/</directory> - <filtering>true</filtering> - </resource> - </resources> - - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <!-- The configuration of the plugin --> - <configuration> - <!-- Configuration of the archiver --> - <archive> - <manifestEntries> - <mode>development</mode> - <url>${project.url}</url> - </manifestEntries> - <!-- Manifest specific configuration --> - <manifest> - </manifest> - </archive> - </configuration> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <configuration> - <excludes> - <exclude>**/*.json</exclude> - <exclude>**/*.yarnfile</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> - - <reporting> - </reporting> - - <dependencies> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-services-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-webapp</artifactId> - </dependency> - <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - </dependency> - <dependency> - <groupId>javax.ws.rs</groupId> - <artifactId>jsr311-api</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - - <!-- ======================================================== --> - <!-- Test dependencies --> - <!-- ======================================================== --> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java deleted file mode 100644 index a8e2f51..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ /dev/null @@ -1,598 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.service.client; - -import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; - -import java.io.File; -import java.io.IOException; -import java.text.MessageFormat; -import java.util.List; -import java.util.Map; - -import javax.ws.rs.core.MediaType; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.client.api.AppAdminClient; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.service.api.records.Component; -import org.apache.hadoop.yarn.service.api.records.ComponentState; -import org.apache.hadoop.yarn.service.api.records.Container; -import org.apache.hadoop.yarn.service.api.records.ContainerState; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.api.records.ServiceState; -import org.apache.hadoop.yarn.service.api.records.ServiceStatus; -import org.apache.hadoop.yarn.service.conf.RestApiConstants; -import org.apache.hadoop.yarn.service.utils.JsonSerDeser; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; -import org.apache.hadoop.yarn.util.RMHAUtils; -import org.codehaus.jackson.map.PropertyNamingStrategy; -import org.eclipse.jetty.util.UrlEncoded; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.WebResource.Builder; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; - -import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*; - -/** - * The rest API client for users to manage services on YARN. - */ -public class ApiServiceClient extends AppAdminClient { - private static final Logger LOG = - LoggerFactory.getLogger(ApiServiceClient.class); - protected YarnClient yarnClient; - - @Override protected void serviceInit(Configuration configuration) - throws Exception { - yarnClient = YarnClient.createYarnClient(); - addService(yarnClient); - super.serviceInit(configuration); - } - - /** - * Calculate Resource Manager address base on working REST API. - */ - private String getRMWebAddress() { - Configuration conf = getConfig(); - String scheme = "http://"; - String path = "/app/v1/services/version"; - String rmAddress = conf - .get("yarn.resourcemanager.webapp.address"); - if (YarnConfiguration.useHttps(conf)) { - scheme = "https://"; - rmAddress = conf - .get("yarn.resourcemanager.webapp.https.address"); - } - boolean useKerberos = UserGroupInformation.isSecurityEnabled(); - List<String> rmServers = RMHAUtils - .getRMHAWebappAddresses(new YarnConfiguration(conf)); - for (String host : rmServers) { - try { - Client client = Client.create(); - StringBuilder sb = new StringBuilder(); - sb.append(scheme); - sb.append(host); - sb.append(path); - if (!useKerberos) { - try { - String username = UserGroupInformation.getCurrentUser().getShortUserName(); - sb.append("?user.name="); - sb.append(username); - } catch (IOException e) { - LOG.debug("Fail to resolve username: {}", e); - } - } - WebResource webResource = client - .resource(sb.toString()); - if (useKerberos) { - AuthenticatedURL.Token token = new AuthenticatedURL.Token(); - webResource.header("WWW-Authenticate", token); - } - ClientResponse test = webResource.get(ClientResponse.class); - if (test.getStatus() == 200) { - rmAddress = host; - break; - } - } catch (Exception e) { - LOG.debug("Fail to connect to: "+host, e); - } - } - return scheme+rmAddress; - } - - /** - * Compute active resource manager API service location. - * - * @param appName - YARN service name - * @return URI to API Service - * @throws IOException - */ - private String getServicePath(String appName) throws IOException { - String url = getRMWebAddress(); - StringBuilder api = new StringBuilder(); - api.append(url); - api.append("/app/v1/services"); - if (appName != null) { - api.append("/"); - api.append(appName); - } - Configuration conf = getConfig(); - if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase("simple")) { - api.append("?user.name=" + UrlEncoded - .encodeString(System.getProperty("user.name"))); - } - return api.toString(); - } - - private String getInstancesPath(String appName) throws IOException { - Preconditions.checkNotNull(appName); - String url = getRMWebAddress(); - StringBuilder api = new StringBuilder(); - api.append(url); - api.append("/app/v1/services/").append(appName).append("/") - .append(RestApiConstants.COMP_INSTANCES); - Configuration conf = getConfig(); - if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( - "simple")) { - api.append("?user.name=" + UrlEncoded - .encodeString(System.getProperty("user.name"))); - } - return api.toString(); - } - - private String getComponentsPath(String appName) throws IOException { - Preconditions.checkNotNull(appName); - String url = getRMWebAddress(); - StringBuilder api = new StringBuilder(); - api.append(url); - api.append("/app/v1/services/").append(appName).append("/") - .append(RestApiConstants.COMPONENTS); - Configuration conf = getConfig(); - if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( - "simple")) { - api.append("?user.name=" + UrlEncoded - .encodeString(System.getProperty("user.name"))); - } - return api.toString(); - } - - private Builder getApiClient() throws IOException { - return getApiClient(getServicePath(null)); - } - - /** - * Setup API service web request. - * - * @param requestPath - * @return - * @throws IOException - */ - private Builder getApiClient(String requestPath) - throws IOException { - Client client = Client.create(getClientConfig()); - Configuration conf = getConfig(); - client.setChunkedEncodingSize(null); - Builder builder = client - .resource(requestPath).type(MediaType.APPLICATION_JSON); - if (conf.get("hadoop.http.authentication.type").equals("kerberos")) { - AuthenticatedURL.Token token = new AuthenticatedURL.Token(); - builder.header("WWW-Authenticate", token); - } - return builder - .accept("application/json;charset=utf-8"); - } - - private ClientConfig getClientConfig() { - ClientConfig config = new DefaultClientConfig(); - config.getProperties().put( - ClientConfig.PROPERTY_CHUNKED_ENCODING_SIZE, 0); - config.getProperties().put( - ClientConfig.PROPERTY_BUFFER_RESPONSE_ENTITY_ON_EXCEPTION, true); - return config; - } - - private int processResponse(ClientResponse response) { - response.bufferEntity(); - String output; - if (response.getStatus() == 401) { - LOG.error("Authentication required"); - return EXIT_EXCEPTION_THROWN; - } - if (response.getStatus() == 503) { - LOG.error("YARN Service is unavailable or disabled."); - return EXIT_EXCEPTION_THROWN; - } - try { - ServiceStatus ss = response.getEntity(ServiceStatus.class); - output = ss.getDiagnostics(); - } catch (Throwable t) { - output = response.getEntity(String.class); - } - if (output==null) { - output = response.getEntity(String.class); - } - if (response.getStatus() <= 299) { - LOG.info(output); - return EXIT_SUCCESS; - } else { - LOG.error(output); - return EXIT_EXCEPTION_THROWN; - } - } - - /** - * Utility method to load Service json from disk or from - * YARN examples. - * - * @param fileName - path to yarnfile - * @param serviceName - YARN Service Name - * @param lifetime - application lifetime - * @param queue - Queue to submit application - * @return - * @throws IOException - * @throws YarnException - */ - public Service loadAppJsonFromLocalFS(String fileName, String serviceName, - Long lifetime, String queue) throws IOException, YarnException { - File file = new File(fileName); - if (!file.exists() && fileName.equals(file.getName())) { - String examplesDirStr = System.getenv("YARN_SERVICE_EXAMPLES_DIR"); - String[] examplesDirs; - if (examplesDirStr == null) { - String yarnHome = System - .getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key()); - examplesDirs = new String[]{ - yarnHome + "/share/hadoop/yarn/yarn-service-examples", - yarnHome + "/yarn-service-examples" - }; - } else { - examplesDirs = StringUtils.split(examplesDirStr, ":"); - } - for (String dir : examplesDirs) { - file = new File(MessageFormat.format("{0}/{1}/{2}.json", - dir, fileName, fileName)); - if (file.exists()) { - break; - } - // Then look for secondary location. - file = new File(MessageFormat.format("{0}/{1}.json", - dir, fileName)); - if (file.exists()) { - break; - } - } - } - if (!file.exists()) { - throw new YarnException("File or example could not be found: " + - fileName); - } - Path filePath = new Path(file.getAbsolutePath()); - LOG.info("Loading service definition from local FS: " + filePath); - Service service = jsonSerDeser - .load(FileSystem.getLocal(getConfig()), filePath); - if (!StringUtils.isEmpty(serviceName)) { - service.setName(serviceName); - } - if (lifetime != null && lifetime > 0) { - service.setLifetime(lifetime); - } - if (!StringUtils.isEmpty(queue)) { - service.setQueue(queue); - } - return service; - } - - /** - * Launch YARN service application. - * - * @param fileName - path to yarnfile - * @param appName - YARN Service Name - * @param lifetime - application lifetime - * @param queue - Queue to submit application - */ - @Override - public int actionLaunch(String fileName, String appName, Long lifetime, - String queue) throws IOException, YarnException { - int result = EXIT_SUCCESS; - try { - Service service = - loadAppJsonFromLocalFS(fileName, appName, lifetime, queue); - String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient() - .post(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Fail to launch application: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - /** - * Stop YARN service application. - * - * @param appName - YARN Service Name - */ - @Override - public int actionStop(String appName) throws IOException, YarnException { - int result = EXIT_SUCCESS; - try { - Service service = new Service(); - service.setName(appName); - service.setState(ServiceState.STOPPED); - String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(getServicePath(appName)) - .put(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Fail to stop application: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - /** - * Start YARN service application. - * - * @param appName - YARN Service Name - */ - @Override - public int actionStart(String appName) throws IOException, YarnException { - int result = EXIT_SUCCESS; - try { - Service service = new Service(); - service.setName(appName); - service.setState(ServiceState.STARTED); - String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(getServicePath(appName)) - .put(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Fail to start application: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - /** - * Save Service configuration. - * - * @param fileName - path to Yarnfile - * @param appName - YARN Service Name - * @param lifetime - container life time - * @param queue - Queue to submit the application - */ - @Override - public int actionSave(String fileName, String appName, Long lifetime, - String queue) throws IOException, YarnException { - int result = EXIT_SUCCESS; - try { - Service service = - loadAppJsonFromLocalFS(fileName, appName, lifetime, queue); - service.setState(ServiceState.STOPPED); - String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient() - .post(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Fail to save application: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - /** - * Decommission a YARN service. - * - * @param appName - YARN Service Name - */ - @Override - public int actionDestroy(String appName) throws IOException, YarnException { - int result = EXIT_SUCCESS; - try { - ClientResponse response = getApiClient(getServicePath(appName)) - .delete(ClientResponse.class); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Fail to destroy application: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - /** - * Change number of containers associated with a service. - * - * @param appName - YARN Service Name - * @param componentCounts - list of components and desired container count - */ - @Override - public int actionFlex(String appName, Map<String, String> componentCounts) - throws IOException, YarnException { - int result = EXIT_SUCCESS; - try { - Service service = new Service(); - service.setName(appName); - service.setState(ServiceState.FLEX); - for (Map.Entry<String, String> entry : componentCounts.entrySet()) { - Component component = new Component(); - component.setName(entry.getKey()); - Long numberOfContainers = Long.parseLong(entry.getValue()); - component.setNumberOfContainers(numberOfContainers); - service.addComponent(component); - } - String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(getServicePath(appName)) - .put(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Fail to flex application: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - @Override - public int enableFastLaunch(String destinationFolder) throws IOException, YarnException { - ServiceClient sc = new ServiceClient(); - sc.init(getConfig()); - sc.start(); - int result = sc.enableFastLaunch(destinationFolder); - sc.close(); - return result; - } - - /** - * Retrieve Service Status through REST API. - * - * @param appIdOrName - YARN application ID or application name - * @return Status output - */ - @Override - public String getStatusString(String appIdOrName) throws IOException, - YarnException { - String output = ""; - String appName; - try { - ApplicationId appId = ApplicationId.fromString(appIdOrName); - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - appName = appReport.getName(); - } catch (IllegalArgumentException e) { - // not app Id format, may be app name - appName = appIdOrName; - ServiceApiUtil.validateNameFormat(appName, getConfig()); - } - try { - ClientResponse response = getApiClient(getServicePath(appName)) - .get(ClientResponse.class); - if (response.getStatus() == 404) { - StringBuilder sb = new StringBuilder(); - sb.append(" Service "); - sb.append(appName); - sb.append(" not found"); - return sb.toString(); - } - if (response.getStatus() != 200) { - StringBuilder sb = new StringBuilder(); - sb.append(appName); - sb.append(" Failed : HTTP error code : "); - sb.append(response.getStatus()); - return sb.toString(); - } - output = response.getEntity(String.class); - } catch (Exception e) { - LOG.error("Fail to check application status: ", e); - } - return output; - } - - @Override - public int initiateUpgrade(String appName, - String fileName, boolean autoFinalize) throws IOException, YarnException { - int result; - try { - Service service = - loadAppJsonFromLocalFS(fileName, appName, null, null); - if (autoFinalize) { - service.setState(ServiceState.UPGRADING_AUTO_FINALIZE); - } else { - service.setState(ServiceState.UPGRADING); - } - String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(getServicePath(appName)) - .put(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Failed to upgrade application: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - @Override - public int actionUpgradeInstances(String appName, List<String> compInstances) - throws IOException, YarnException { - int result; - Container[] toUpgrade = new Container[compInstances.size()]; - try { - int idx = 0; - for (String instanceName : compInstances) { - Container container = new Container(); - container.setComponentInstanceName(instanceName); - container.setState(ContainerState.UPGRADING); - toUpgrade[idx++] = container; - } - String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade); - ClientResponse response = getApiClient(getInstancesPath(appName)) - .put(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Failed to upgrade component instance: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - @Override - public int actionUpgradeComponents(String appName, List<String> components) - throws IOException, YarnException { - int result; - Component[] toUpgrade = new Component[components.size()]; - try { - int idx = 0; - for (String compName : components) { - Component component = new Component(); - component.setName(compName); - component.setState(ComponentState.UPGRADING); - toUpgrade[idx++] = component; - } - String buffer = COMP_JSON_SERDE.toJson(toUpgrade); - ClientResponse response = getApiClient(getComponentsPath(appName)) - .put(ClientResponse.class, buffer); - result = processResponse(response); - } catch (Exception e) { - LOG.error("Failed to upgrade components: ", e); - result = EXIT_EXCEPTION_THROWN; - } - return result; - } - - private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE = - new JsonSerDeser<>(Container[].class, - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); - - private static final JsonSerDeser<Component[]> COMP_JSON_SERDE = - new JsonSerDeser<>(Component[].class, - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java deleted file mode 100644 index f9cfa92..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.service.client; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.service.SystemServiceManager; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.api.records.ServiceState; -import org.apache.hadoop.yarn.service.conf.YarnServiceConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; - -/** - * SystemServiceManager implementation. - * Scan for configure system service path. - * - * The service path structure is as follows: - * SYSTEM_SERVICE_DIR_PATH - * |---- sync - * | |--- user1 - * | | |---- service1.yarnfile - * | | |---- service2.yarnfile - * | |--- user2 - * | | |---- service1.yarnfile - * | | .... - * | | - * |---- async - * | |--- user3 - * | | |---- service1.yarnfile - * | | |---- service2.yarnfile - * | |--- user4 - * | | |---- service1.yarnfile - * | | .... - * | | - * - * sync: These services are launched at the time of service start synchronously. - * It is a blocking service start. - * async: These services are launched in separate thread without any delay after - * service start. Non-blocking service start. - */ -public class SystemServiceManagerImpl extends AbstractService - implements SystemServiceManager { - - private static final Logger LOG = - LoggerFactory.getLogger(SystemServiceManagerImpl.class); - - private static final String YARN_FILE_SUFFIX = ".yarnfile"; - private static final String SYNC = "sync"; - private static final String ASYNC = "async"; - - private FileSystem fs; - private Path systemServiceDir; - private AtomicBoolean stopExecutors = new AtomicBoolean(false); - private Map<String, Set<Service>> syncUserServices = new HashMap<>(); - private Map<String, Set<Service>> asyncUserServices = new HashMap<>(); - private UserGroupInformation loginUGI; - private Thread serviceLaucher; - - @VisibleForTesting - private int badFileNameExtensionSkipCounter; - @VisibleForTesting - private Map<String, Integer> ignoredUserServices = - new HashMap<>(); - @VisibleForTesting - private int badDirSkipCounter; - - public SystemServiceManagerImpl() { - super(SystemServiceManagerImpl.class.getName()); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - String dirPath = - conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY); - if (dirPath != null) { - systemServiceDir = new Path(dirPath); - LOG.info("System Service Directory is configured to {}", - systemServiceDir); - fs = systemServiceDir.getFileSystem(conf); - this.loginUGI = UserGroupInformation.isSecurityEnabled() ? - UserGroupInformation.getLoginUser() : - UserGroupInformation.getCurrentUser(); - LOG.info("UserGroupInformation initialized to {}", loginUGI); - } - } - - @Override - protected void serviceStart() throws Exception { - scanForUserServices(); - launchUserService(syncUserServices); - // Create a thread and submit services in background otherwise it - // block RM switch time. - serviceLaucher = new Thread(createRunnable()); - serviceLaucher.setName("System service launcher"); - serviceLaucher.start(); - } - - @Override - protected void serviceStop() throws Exception { - LOG.info("Stopping {}", getName()); - stopExecutors.set(true); - - if (serviceLaucher != null) { - serviceLaucher.interrupt(); - try { - serviceLaucher.join(); - } catch (InterruptedException ie) { - LOG.warn("Interrupted Exception while stopping", ie); - } - } - } - - private Runnable createRunnable() { - return new Runnable() { - @Override - public void run() { - launchUserService(asyncUserServices); - } - }; - } - - void launchUserService(Map<String, Set<Service>> userServices) { - for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) { - String user = entry.getKey(); - Set<Service> services = entry.getValue(); - if (services.isEmpty()) { - continue; - } - ServiceClient serviceClient = null; - try { - UserGroupInformation userUgi = getProxyUser(user); - serviceClient = createServiceClient(userUgi); - for (Service service : services) { - LOG.info("POST: createService = {} user = {}", service, userUgi); - try { - launchServices(userUgi, serviceClient, service); - } catch (IOException | UndeclaredThrowableException e) { - if (e.getCause() != null) { - LOG.warn(e.getCause().getMessage()); - } else { - String message = - "Failed to create service " + service.getName() + " : "; - LOG.error(message, e); - } - } - } - } catch (InterruptedException e) { - LOG.warn("System service launcher thread interrupted", e); - break; - } catch (Exception e) { - LOG.error("Error while submitting services for user " + user, e); - } finally { - if (serviceClient != null) { - try { - serviceClient.close(); - } catch (IOException e) { - LOG.warn("Error while closing serviceClient for user {}", user); - } - } - } - } - } - - private ServiceClient createServiceClient(UserGroupInformation userUgi) - throws IOException, InterruptedException { - ServiceClient serviceClient = - userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() { - @Override public ServiceClient run() - throws IOException, YarnException { - ServiceClient sc = getServiceClient(); - sc.init(getConfig()); - sc.start(); - return sc; - } - }); - return serviceClient; - } - - private void launchServices(UserGroupInformation userUgi, - ServiceClient serviceClient, Service service) - throws IOException, InterruptedException { - if (service.getState() == ServiceState.STOPPED) { - userUgi.doAs(new PrivilegedExceptionAction<Void>() { - @Override public Void run() throws IOException, YarnException { - serviceClient.actionBuild(service); - return null; - } - }); - LOG.info("Service {} version {} saved.", service.getName(), - service.getVersion()); - } else { - ApplicationId applicationId = - userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() { - @Override public ApplicationId run() - throws IOException, YarnException { - ApplicationId applicationId = serviceClient.actionCreate(service); - return applicationId; - } - }); - LOG.info("Service {} submitted with Application ID: {}", - service.getName(), applicationId); - } - } - - ServiceClient getServiceClient() { - return new ServiceClient(); - } - - private UserGroupInformation getProxyUser(String user) { - UserGroupInformation ugi; - if (UserGroupInformation.isSecurityEnabled()) { - ugi = UserGroupInformation.createProxyUser(user, loginUGI); - } else { - ugi = UserGroupInformation.createRemoteUser(user); - } - return ugi; - } - - // scan for both launch service types i.e sync and async - void scanForUserServices() throws IOException { - if (systemServiceDir == null) { - return; - } - try { - LOG.info("Scan for launch type on {}", systemServiceDir); - RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir); - while (iterLaunchType.hasNext()) { - FileStatus launchType = iterLaunchType.next(); - if (!launchType.isDirectory()) { - LOG.debug("Scanner skips for unknown file {}", launchType.getPath()); - continue; - } - if (launchType.getPath().getName().equals(SYNC)) { - scanForUserServiceDefinition(launchType.getPath(), syncUserServices); - } else if (launchType.getPath().getName().equals(ASYNC)) { - scanForUserServiceDefinition(launchType.getPath(), asyncUserServices); - } else { - badDirSkipCounter++; - LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath()); - } - } - } catch (FileNotFoundException e) { - LOG.warn("System service directory {} doesn't not exist.", - systemServiceDir); - } - } - - // Files are under systemServiceDir/<users>. Scan for 2 levels - // 1st level for users - // 2nd level for service definitions under user - private void scanForUserServiceDefinition(Path userDirPath, - Map<String, Set<Service>> userServices) throws IOException { - LOG.info("Scan for users on {}", userDirPath); - RemoteIterator<FileStatus> iterUsers = list(userDirPath); - while (iterUsers.hasNext()) { - FileStatus userDir = iterUsers.next(); - // if 1st level is not user directory then skip it. - if (!userDir.isDirectory()) { - LOG.info( - "Service definition {} doesn't belong to any user. Ignoring.. ", - userDir.getPath().getName()); - continue; - } - String userName = userDir.getPath().getName(); - LOG.info("Scanning service definitions for user {}.", userName); - - //2nd level scan - RemoteIterator<FileStatus> iterServices = list(userDir.getPath()); - while (iterServices.hasNext()) { - FileStatus serviceCache = iterServices.next(); - String filename = serviceCache.getPath().getName(); - if (!serviceCache.isFile()) { - LOG.info("Scanner skips for unknown dir {}", filename); - continue; - } - if (!filename.endsWith(YARN_FILE_SUFFIX)) { - LOG.info("Scanner skips for unknown file extension, filename = {}", - filename); - badFileNameExtensionSkipCounter++; - continue; - } - Service service = getServiceDefinition(serviceCache.getPath()); - if (service != null) { - Set<Service> services = userServices.get(userName); - if (services == null) { - services = new HashSet<>(); - userServices.put(userName, services); - } - if (!services.add(service)) { - int count = ignoredUserServices.containsKey(userName) ? - ignoredUserServices.get(userName) : 0; - ignoredUserServices.put(userName, count + 1); - LOG.warn( - "Ignoring service {} for the user {} as it is already present," - + " filename = {}", service.getName(), userName, filename); - } else { - LOG.info("Added service {} for the user {}, filename = {}", - service.getName(), userName, filename); - } - } - } - } - } - - private Service getServiceDefinition(Path filePath) { - Service service = null; - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Loading service definition from FS: " + filePath); - } - service = jsonSerDeser.load(fs, filePath); - } catch (IOException e) { - LOG.info("Error while loading service definition from FS: {}", e); - } - return service; - } - - private RemoteIterator<FileStatus> list(Path path) throws IOException { - return new StoppableRemoteIterator(fs.listStatusIterator(path)); - } - - @VisibleForTesting Map<String, Integer> getIgnoredUserServices() { - return ignoredUserServices; - } - - private class StoppableRemoteIterator implements RemoteIterator<FileStatus> { - private final RemoteIterator<FileStatus> remote; - - StoppableRemoteIterator(RemoteIterator<FileStatus> remote) { - this.remote = remote; - } - - @Override public boolean hasNext() throws IOException { - return !stopExecutors.get() && remote.hasNext(); - } - - @Override public FileStatus next() throws IOException { - return remote.next(); - } - } - - @VisibleForTesting - Map<String, Set<Service>> getSyncUserServices() { - return syncUserServices; - } - - @VisibleForTesting - int getBadFileNameExtensionSkipCounter() { - return badFileNameExtensionSkipCounter; - } - - @VisibleForTesting - int getBadDirSkipCounter() { - return badDirSkipCounter; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java deleted file mode 100644 index cf5ce11..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.service.client contains classes - * for YARN Services Client API. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.service.client; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java deleted file mode 100644 index 46c9abe..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ /dev/null @@ -1,818 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.service.webapp; - -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.VersionInfo; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.service.api.records.Component; -import org.apache.hadoop.yarn.service.api.records.ComponentState; -import org.apache.hadoop.yarn.service.api.records.Container; -import org.apache.hadoop.yarn.service.api.records.ContainerState; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.api.records.ServiceState; -import org.apache.hadoop.yarn.service.api.records.ServiceStatus; -import org.apache.hadoop.yarn.service.client.ServiceClient; -import org.apache.hadoop.yarn.service.conf.RestApiConstants; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; -import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*; -import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*; - -/** - * The rest API endpoints for users to manage services on YARN. - */ -@Singleton -@Path(CONTEXT_ROOT) -public class ApiServer { - - public ApiServer() { - super(); - } - - @Inject - public ApiServer(Configuration conf) { - super(); - } - - private static final Logger LOG = - LoggerFactory.getLogger(ApiServer.class); - private static Configuration YARN_CONFIG = new YarnConfiguration(); - private ServiceClient serviceClientUnitTest; - private boolean unitTest = false; - - static { - init(); - } - - // initialize all the common resources - order is important - private static void init() { - } - - @GET - @Path(VERSION) - @Consumes({ MediaType.APPLICATION_JSON }) - @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" }) - public Response getVersion() { - String version = VersionInfo.getBuildVersion(); - LOG.info(version); - return Response.ok("{ \"hadoop_version\": \"" + version + "\"}").build(); - } - - @POST - @Path(SERVICE_ROOT_PATH) - @Consumes({ MediaType.APPLICATION_JSON }) - @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" }) - public Response createService(@Context HttpServletRequest request, - Service service) { - ServiceStatus serviceStatus = new ServiceStatus(); - try { - UserGroupInformation ugi = getProxyUser(request); - LOG.info("POST: createService = {} user = {}", service, ugi); - if(service.getState()==ServiceState.STOPPED) { - ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws YarnException, IOException { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - sc.actionBuild(service); - sc.close(); - return null; - } - }); - serviceStatus.setDiagnostics("Service " + service.getName() + - " version " + service.getVersion() + " saved."); - } else { - ApplicationId applicationId = ugi - .doAs(new PrivilegedExceptionAction<ApplicationId>() { - @Override - public ApplicationId run() throws IOException, YarnException { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - ApplicationId applicationId = sc.actionCreate(service); - sc.close(); - return applicationId; - } - }); - serviceStatus.setDiagnostics("Application ID: " + applicationId); - } - serviceStatus.setState(ACCEPTED); - serviceStatus.setUri( - CONTEXT_ROOT + SERVICE_ROOT_PATH + "/" + service - .getName()); - return formatResponse(Status.ACCEPTED, serviceStatus); - } catch (AccessControlException e) { - serviceStatus.setDiagnostics(e.getMessage()); - return formatResponse(Status.FORBIDDEN, e.getCause().getMessage()); - } catch (IllegalArgumentException e) { - return formatResponse(Status.BAD_REQUEST, e.getMessage()); - } catch (IOException | InterruptedException e) { - String message = "Failed to create service " + service.getName() - + ": {}"; - LOG.error(message, e); - return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); - } catch (UndeclaredThrowableException e) { - String message = "Failed to create service " + service.getName() - + ": {}"; - LOG.error(message, e); - if (e.getCause().getMessage().contains("already exists")) { - message = "Service name " + service.getName() + " is already taken."; - } else { - message = e.getCause().getMessage(); - } - return formatResponse(Status.INTERNAL_SERVER_ERROR, - message); - } - } - - @GET - @Path(SERVICE_PATH) - @Consumes({ MediaType.APPLICATION_JSON }) - @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" }) - public Response getService(@Context HttpServletRequest request, - @PathParam(SERVICE_NAME) String appName) { - ServiceStatus serviceStatus = new ServiceStatus(); - try { - if (appName == null) { - throw new IllegalArgumentException("Service name cannot be null."); - } - UserGroupInformation ugi = getProxyUser(request); - LOG.info("GET: getService for appName = {} user = {}", appName, ugi); - Service app = getServiceFromClient(ugi, appName); - return Response.ok(app).build(); - } catch (AccessControlException e) { - return formatResponse(Status.FORBIDDEN, e.getMessage()); - } catch (IllegalArgumentException e) { - serviceStatus.setDiagnostics(e.getMessage()); - serviceStatus.setCode(ERROR_CODE_APP_NAME_INVALID); - return Response.status(Status.NOT_FOUND).entity(serviceStatus) - .build(); - } catch (FileNotFoundException e) { - serviceStatus.setDiagnostics("Service " + appName + " not found"); - serviceStatus.setCode(ERROR_CODE_APP_NAME_INVALID); - return Response.status(Status.NOT_FOUND).entity(serviceStatus) - .build(); - } catch (IOException | InterruptedException e) { - LOG.error("Get service failed: {}", e); - return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); - } catch (UndeclaredThrowableException e) { - LOG.error("Get service failed: {}", e); - return formatResponse(Status.INTERNAL_SERVER_ERROR, - e.getCause().getMessage()); - } - } - - @DELETE - @Path(SERVICE_PATH) - @Consumes({ MediaType.APPLICATION_JSON }) - @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" }) - public Response deleteService(@Context HttpServletRequest request, - @PathParam(SERVICE_NAME) String appName) { - try { - if (appName == null) { - throw new IllegalArgumentException("Service name can not be null."); - } - UserGroupInformation ugi = getProxyUser(request); - LOG.info("DELETE: deleteService for appName = {} user = {}", - appName, ugi); - return stopService(appName, true, ugi); - } catch (AccessControlException e) { - return formatResponse(Status.FORBIDDEN, e.getMessage()); - } catch (IllegalArgumentException e) { - return formatResponse(Status.BAD_REQUEST, e.getMessage()); - } catch (UndeclaredThrowableException e) { - LOG.error("Fail to stop service: {}", e); - return formatResponse(Status.BAD_REQUEST, - e.getCause().getMessage()); - } catch (YarnException | FileNotFoundException e) { - return formatResponse(Status.NOT_FOUND, e.getMessage()); - } catch (Exception e) { - LOG.error("Fail to stop service: {}", e); - return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); - } - } - - private Response stopService(String appName, boolean destroy, - final UserGroupInformation ugi) throws Exception { - int result = ugi.doAs(new PrivilegedExceptionAction<Integer>() { - @Override - public Integer run() throws Exception { - int result = 0; - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Exception stopException = null; - try { - result = sc.actionStop(appName, destroy); - if (result == EXIT_SUCCESS) { - LOG.info("Successfully stopped service {}", appName); - } - } catch (Exception e) { - LOG.info("Got exception stopping service", e); - stopException = e; - } - if (destroy) { - result = sc.actionDestroy(appName); - if (result == EXIT_SUCCESS) { - LOG.info("Successfully deleted service {}", appName); - } - } else { - if (stopException != null) { - throw stopException; - } - } - sc.close(); - return result; - } - }); - ServiceStatus serviceStatus = new ServiceStatus(); - if (destroy) { - if (result == EXIT_SUCCESS) { - serviceStatus.setDiagnostics("Successfully destroyed service " + - appName); - } else { - if (result == EXIT_NOT_FOUND) { - serviceStatus - .setDiagnostics("Service " + appName + " doesn't exist"); - return formatResponse(Status.BAD_REQUEST, serviceStatus); - } else { - serviceStatus - .setDiagnostics("Service " + appName + " error cleaning up " + - "registry"); - return formatResponse(Status.INTERNAL_SERVER_ERROR, serviceStatus); - } - } - } else { - if (result == EXIT_COMMAND_ARGUMENT_ERROR) { - serviceStatus - .setDiagnostics("Service " + appName + " is already stopped"); - return formatResponse(Status.BAD_REQUEST, serviceStatus); - } else { - serviceStatus.setDiagnostics("Successfully stopped service " + appName); - } - } - return formatResponse(Status.OK, serviceStatus); - } - - @PUT - @Path(COMPONENTS_PATH) - @Consumes({MediaType.APPLICATION_JSON}) - @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) - public Response updateComponents(@Context HttpServletRequest request, - @PathParam(SERVICE_NAME) String serviceName, - List<Component> requestComponents) { - - try { - if (requestComponents == null || requestComponents.isEmpty()) { - throw new YarnException("No components provided."); - } - UserGroupInformation ugi = getProxyUser(request); - Set<String> compNamesToUpgrade = new HashSet<>(); - requestComponents.forEach(reqComp -> { - if (reqComp.getState() != null && - reqComp.getState().equals(ComponentState.UPGRADING)) { - compNamesToUpgrade.add(reqComp.getName()); - } - }); - LOG.info("PUT: upgrade components {} for service {} " + - "user = {}", compNamesToUpgrade, serviceName, ugi); - return processComponentsUpgrade(ugi, serviceName, compNamesToUpgrade); - } catch (AccessControlException e) { - return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); - } catch (YarnException e) { - return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); - } catch (IOException | InterruptedException e) { - return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, - e.getMessage()); - } catch (UndeclaredThrowableException e) { - return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, - e.getCause().getMessage()); - } - } - - @PUT - @Path(COMPONENT_PATH) - @Consumes({ MediaType.APPLICATION_JSON }) - @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8", - MediaType.TEXT_PLAIN }) - public Response updateComponent(@Context HttpServletRequest request, - @PathParam(SERVICE_NAME) String appName, - @PathParam(COMPONENT_NAME) String componentName, Component component) { - - try { - if (component == null) { - throw new YarnException("No component data provided"); - } - if (component.getName() != null - && !component.getName().equals(componentName)) { - String msg = "Component name in the request object (" - + component.getName() + ") does not match that in the URI path (" - + componentName + ")"; - throw new YarnException(msg); - } - UserGroupInformation ugi = getProxyUser(request); - if (component.getState() != null && - component.getState().equals(ComponentState.UPGRADING)) { - LOG.info("PUT: upgrade component {} for service {} " + - "user = {}", component.getName(), appName, ugi); - return processComponentsUpgrade(ugi, appName, - Sets.newHashSet(componentName)); - } - - if (component.getNumberOfContainers() == null) { - throw new YarnException("No container count provided"); - } - if (component.getNumberOfContainers() < 0) { - String message = "Invalid number of containers specified " - + component.getNumberOfContainers(); - throw new YarnException(message); - } - Map<String, Long> original = ugi - .doAs(new PrivilegedExceptionAction<Map<String, Long>>() { - @Override - public Map<String, Long> run() throws YarnException, IOException { - ServiceClient sc = new ServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Map<String, Long> original = sc.flexByRestService(appName, - Collections.singletonMap(componentName, - component.getNumberOfContainers())); - sc.close(); - return original; - } - }); - ServiceStatus status = new ServiceStatus(); - status.setDiagnostics( - "Updating component (" + componentName + ") size from " + original - .get(componentName) + " to " + component.getNumberOfContainers()); - return formatResponse(Status.OK, status); - } catch (AccessControlException e) { - return formatResponse(Status.FORBIDDEN, e.getMessage()); - } catch (YarnException e) { - return formatResponse(Status.BAD_REQUEST, e.getMessage()); - } catch (IOException | InterruptedException e) { - return formatResponse(Status.INTERNAL_SERVER_ERROR, - e.getMessage()); - } catch (UndeclaredThrowableException e) { - return formatResponse(Status.INTERNAL_SERVER_ERROR, - e.getCause().getMessage()); - } - } - - @PUT - @Path(SERVICE_PATH) - @Consumes({ MediaType.APPLICATION_JSON }) - @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" }) - public Response updateService(@Context HttpServletRequest request, - @PathParam(SERVICE_NAME) String appName, - Service updateServiceData) { - try { - UserGroupInformation ugi = getProxyUser(request); - LOG.info("PUT: updateService for app = {} with data = {} user = {}", - appName, updateServiceData, ugi); - // Ignore the app name provided in updateServiceData and always use - // appName path param - updateServiceData.setName(appName); - - if (updateServiceData.getState() != null - && updateServiceData.getState() == ServiceState.FLEX) { - return flexService(updateServiceData, ugi); - } - // For STOP the app should be running. If already stopped then this - // operation will be a no-op. For START it should be in stopped state. - // If already running then this operation will be a no-op. - if (updateServiceData.getState() != null - && updateServiceData.getState() == ServiceState.STOPPED) { - return stopService(appName, false, ugi); - } - - // If a START is requested - if (updateServiceData.getState() != null - && updateServiceData.getState() == ServiceState.STARTED) { - return startService(appName, ugi); - } - - // If an UPGRADE is requested - if (updateServiceData.getState() != null && ( - updateServiceData.getState() == ServiceState.UPGRADING || - updateServiceData.getState() == - ServiceState.UPGRADING_AUTO_FINALIZE)) { - return upgradeService(updateServiceData, ugi); - } - - // If new lifetime value specified then update it - if (updateServiceData.getLifetime() != null - && updateServiceData.getLifetime() > 0) { - return updateLifetime(appName, updateServiceData, ugi); - } - } catch (UndeclaredThrowableException e) { - return formatResponse(Status.BAD_REQUEST, - e.getCause().getMessage()); - } catch (AccessControlException e) { - return formatResponse(Status.FORBIDDEN, e.getMessage()); - } catch (FileNotFoundException e) { - String message = "Application is not found app: " + appName; - LOG.error(message, e); - return formatResponse(Status.NOT_FOUND, e.getMessage()); - } catch (YarnException e) { - String message = "Service is not found in hdfs: " + appName; - LOG.error(message, e); - return formatResponse(Status.NOT_FOUND, e.getMessage()); - } catch (Exception e) { - String message = "Error while performing operation for app: " + appName; - LOG.error(message, e); - return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); - } - - // If nothing happens consider it a no-op - return Response.status(Status.NO_CONTENT).build(); - } - - @PUT - @Path(COMP_INSTANCE_LONG_PATH) - @Consumes({MediaType.APPLICATION_JSON}) - @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) - public Response updateComponentInstance(@Context HttpServletRequest request, - @PathParam(SERVICE_NAME) String serviceName, - @PathParam(COMPONENT_NAME) String componentName, - @PathParam(COMP_INSTANCE_NAME) String compInstanceName, - Container reqContainer) { - - try { - UserGroupInformation ugi = getProxyUser(request); - LOG.info("PUT: update component instance {} for component = {}" + - " service = {} user = {}", compInstanceName, componentName, - serviceName, ugi); - if (reqContainer == null) { - throw new YarnException("No container data provided."); - } - Service service = getServiceFromClient(ugi, serviceName); - Component component = service.getComponent(componentName); - if (component == null) { - throw new YarnException(String.format( - "The component name in the URI path (%s) is invalid.", - componentName)); - } - - Container liveContainer = component.getComponentInstance( - compInstanceName); - if (liveContainer == null) { - throw new YarnException(String.format( - "The component (%s) does not have a component instance (%s).", - componentName, compInstanceName)); - } - - if (reqContainer.getState() != null - && reqContainer.getState().equals(ContainerState.UPGRADING)) { - return processContainersUpgrade(ugi, service, - Lists.newArrayList(liveContainer)); - } - } catch (AccessControlException e) { - return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); - } catch (YarnException e) { - return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); - } catch (IOException | InterruptedException e) { - return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, - e.getMessage()); - } catch (UndeclaredThrowableException e) { - return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, - e.getCause().getMessage()); - } - return Response.status(Status.NO_CONTENT).build(); - } - - @PUT - @Path(COMP_INSTANCES_PATH) - @Consumes({MediaType.APPLICATION_JSON}) - @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) - public Response updateComponentInstances(@Context HttpServletRequest request, - @PathParam(SERVICE_NAME) String serviceName, - List<Container> requestContainers) { - - try { - if (requestContainers == null || requestContainers.isEmpty()) { - throw new YarnException("No containers provided."); - } - UserGroupInformation ugi = getProxyUser(request); - List<String> toUpgrade = new ArrayList<>(); - for (Container reqContainer : requestContainers) { - if (reqContainer.getState() != null && - reqContainer.getState().equals(ContainerState.UPGRADING)) { - toUpgrade.add(reqContainer.getComponentInstanceName()); - } - } - - if (!toUpgrade.isEmpty()) { - Service service = getServiceFromClient(ugi, serviceName); - LOG.info("PUT: upgrade component instances {} for service = {} " + - "user = {}", toUpgrade, serviceName, ugi); - List<Container> liveContainers = ServiceApiUtil - .getLiveContainers(service, toUpgrade); - - return processContainersUpgrade(ugi, service, liveContainers); - } - } catch (AccessControlException e) { - return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); - } catch (YarnException e) { - return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); - } catch (IOException | InterruptedException e) { - return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, - e.getMessage()); - } catch (UndeclaredThrowableException e) { - return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, - e.getCause().getMessage()); - } - return Response.status(Status.NO_CONTENT).build(); - } - - private Response flexService(Service service, UserGroupInformation ugi) - throws IOException, InterruptedException { - String appName = service.getName(); - Response response = Response.status(Status.BAD_REQUEST).build(); - Map<String, String> componentCountStrings = new HashMap<String, String>(); - for (Component c : service.getComponents()) { - componentCountStrings.put(c.getName(), - c.getNumberOfContainers().toString()); - } - Integer result = ugi.doAs(new PrivilegedExceptionAction<Integer>() { - - @Override - public Integer run() throws YarnException, IOException { - int result = 0; - ServiceClient sc = new ServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - result = sc - .actionFlex(appName, componentCountStrings); - sc.close(); - return Integer.valueOf(result); - } - }); - if (result == EXIT_SUCCESS) { - String message = "Service " + appName + " is successfully flexed."; - LOG.info(message); - ServiceStatus status = new ServiceStatus(); - status.setDiagnostics(message); - status.setState(ServiceState.ACCEPTED); - response = formatResponse(Status.ACCEPTED, status); - } - return response; - } - - private Response updateLifetime(String appName, Service updateAppData, - final UserGroupInformation ugi) throws IOException, - InterruptedException { - String newLifeTime = ugi.doAs(new PrivilegedExceptionAction<String>() { - @Override - public String run() throws YarnException, IOException { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - String newLifeTime = sc.updateLifetime(appName, - updateAppData.getLifetime()); - sc.close(); - return newLifeTime; - } - }); - ServiceStatus status = new ServiceStatus(); - status.setDiagnostics( - "Service (" + appName + ")'s lifeTime is updated to " + newLifeTime - + ", " + updateAppData.getLifetime() + " seconds remaining"); - return formatResponse(Status.OK, status); - } - - private Response startService(String appName, - final UserGroupInformation ugi) throws IOException, - InterruptedException { - ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws YarnException, IOException { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - sc.actionStart(appName); - sc.close(); - return null; - } - }); - LOG.info("Successfully started service " + appName); - ServiceStatus status = new ServiceStatus(); - status.setDiagnostics("Service " + appName + " is successfully started."); - status.setState(ServiceState.ACCEPTED); - return formatResponse(Status.OK, status); - } - - private Response upgradeService(Service service, - final UserGroupInformation ugi) throws IOException, InterruptedException { - ServiceStatus status = new ServiceStatus(); - ugi.doAs((PrivilegedExceptionAction<Void>) () -> { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - sc.initiateUpgrade(service); - sc.close(); - return null; - }); - LOG.info("Service {} version {} upgrade initialized", service.getName(), - service.getVersion()); - status.setDiagnostics("Service " + service.getName() + - " version " + service.getVersion() + " saved."); - status.setState(ServiceState.ACCEPTED); - return formatResponse(Status.ACCEPTED, status); - } - - private Response processComponentsUpgrade(UserGroupInformation ugi, - String serviceName, Set<String> compNames) throws YarnException, - IOException, InterruptedException { - Service service = getServiceFromClient(ugi, serviceName); - if (service.getState() != ServiceState.UPGRADING) { - throw new YarnException( - String.format("The upgrade of service %s has not been initiated.", - service.getName())); - } - List<Container> containersToUpgrade = ServiceApiUtil - .validateAndResolveCompsUpgrade(service, compNames); - Integer result = invokeContainersUpgrade(ugi, service, containersToUpgrade); - if (result == EXIT_SUCCESS) { - ServiceStatus status = new ServiceStatus(); - status.setDiagnostics( - "Upgrading components " + Joiner.on(',').join(compNames) + "."); - return formatResponse(Response.Status.ACCEPTED, status); - } - // If result is not a success, consider it a no-op - return Response.status(Response.Status.NO_CONTENT).build(); - } - - private Response processContainersUpgrade(UserGroupInformation ugi, - Service service, List<Container> containers) throws YarnException, - IOException, InterruptedException { - - if (service.getState() != ServiceState.UPGRADING) { - throw new YarnException( - String.format("The upgrade of service %s has not been initiated.", - service.getName())); - } - ServiceApiUtil.validateInstancesUpgrade(containers); - Integer result = invokeContainersUpgrade(ugi, service, containers); - if (result == EXIT_SUCCESS) { - ServiceStatus status = new ServiceStatus(); - status.setDiagnostics( - "Upgrading component instances " + containers.stream() - .map(Container::getId).collect(Collectors.joining(",")) + "."); - return formatResponse(Response.Status.ACCEPTED, status); - } - // If result is not a success, consider it a no-op - return Response.status(Response.Status.NO_CONTENT).build(); - } - - private int invokeContainersUpgrade(UserGroupInformation ugi, - Service service, List<Container> containers) throws IOException, - InterruptedException { - return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> { - int result1; - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - result1 = sc.actionUpgrade(service, containers); - sc.close(); - return result1; - }); - } - - private Service getServiceFromClient(UserGroupInformation ugi, - String serviceName) throws IOException, InterruptedException { - - return ugi.doAs((PrivilegedExceptionAction<Service>) () -> { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Service app1 = sc.getStatus(serviceName); - sc.close(); - return app1; - }); - } - - /** - * Used by negative test case. - * - * @param mockServerClient - A mocked version of ServiceClient - */ - public void setServiceClient(ServiceClient mockServerClient) { - serviceClientUnitTest = mockServerClient; - unitTest = true; - } - - private ServiceClient getServiceClient() { - if (unitTest) { - return serviceClientUnitTest; - } else { - return new ServiceClient(); - } - } - - /** - * Configure impersonation callback. - * - * @param request - web request - * @return - configured UGI class for proxy callback - * @throws IOException - if user is not login. - */ - private UserGroupInformation getProxyUser(HttpServletRequest request) - throws AccessControlException { - UserGroupInformation proxyUser; - UserGroupInformation ugi; - String remoteUser = request.getRemoteUser(); - try { - if (UserGroupInformation.isSecurityEnabled()) { - proxyUser = UserGroupInformation.getLoginUser(); - ugi = UserGroupInformation.createProxyUser(remoteUser, proxyUser); - } else { - ugi = UserGroupInformation.createRemoteUser(remoteUser); - } - return ugi; - } catch (IOException e) { - throw new AccessControlException(e.getCause()); - } - } - - /** - * Format HTTP response. - * - * @param status - HTTP Code - * @param message - Diagnostic message - * @return - HTTP response - */ - private Response formatResponse(Status status, String message) { - ServiceStatus entity = new ServiceStatus(); - entity.setDiagnostics(message); - return formatResponse(status, entity); - } - - /** - * Format HTTP response. - * - * @param status - HTTP Code - * @param entity - ServiceStatus object - * @return - HTTP response - */ - private Response formatResponse(Status status, ServiceStatus entity) { - return Response.status(status).entity(entity).build(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org