This is an automated email from the ASF dual-hosted git repository.
chtyim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/twill.git
The following commit(s) were added to refs/heads/master by this push:
new 77b993c (TWILL-270) Remove support for old Hadoop-2.0 alpha
77b993c is described below
commit 77b993c050883324f9e3c62fbe6ea131a86d1e04
Author: Terence Yim <[email protected]>
AuthorDate: Mon Jan 6 13:57:10 2020 -0800
(TWILL-270) Remove support for old Hadoop-2.0 alpha
This closes #82 from Github.
Signed-off-by: Terence Yim <[email protected]>
---
.travis.yml | 2 -
pom.xml | 164 +-------
twill-yarn/pom.xml | 84 -----
.../twill/internal/yarn/Hadoop20YarnAMClient.java | 183 ---------
.../twill/internal/yarn/Hadoop20YarnAppClient.java | 238 ------------
.../yarn/Hadoop20YarnApplicationReport.java | 107 ------
.../internal/yarn/Hadoop20YarnContainerInfo.java | 70 ----
.../internal/yarn/Hadoop20YarnContainerStatus.java | 53 ---
.../internal/yarn/Hadoop20YarnLaunchContext.java | 99 -----
.../internal/yarn/Hadoop20YarnLocalResource.java | 101 -----
.../twill/internal/yarn/Hadoop20YarnNMClient.java | 125 -------
.../twill/internal/yarn/ports/AMRMClient.java | 149 --------
.../twill/internal/yarn/ports/AMRMClientImpl.java | 412 ---------------------
.../internal/yarn/ports/AllocationResponse.java | 38 --
.../internal/yarn/ports/AllocationResponses.java | 111 ------
.../yarn/VersionDetectYarnAMClientFactory.java | 5 -
.../yarn/VersionDetectYarnAppClientFactory.java | 3 -
.../org/apache/twill/internal/yarn/YarnUtils.java | 79 +---
.../org/apache/twill/yarn/YarnTwillController.java | 11 +-
.../twill/filesystem/FileContextLocationTest.java | 11 -
.../apache/twill/filesystem/LocationTestBase.java | 10 +-
.../java/org/apache/twill/yarn/TwillTester.java | 20 +-
22 files changed, 30 insertions(+), 2045 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 2c0770a..139fff7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,13 +32,11 @@ script: mvn --batch-mode test -P $PROFILE
-Dsurefire.redirectTestOutputToFile=fa
install: mvn --batch-mode install -P $PROFILE -DskipTests=true
env:
- - PROFILE='hadoop-2.0'
- PROFILE='hadoop-2.1'
- PROFILE='hadoop-2.2'
- PROFILE='hadoop-2.4'
- PROFILE='hadoop-2.5'
- PROFILE='hadoop-2.6'
- - PROFILE='cdh-4.4.0'
- PROFILE='mapr-hadoop-2.4'
- PROFILE='hadoop-2.6,java8-test'
diff --git a/pom.xml b/pom.xml
index 8c29192..5c8ad78 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,7 +179,6 @@
<junit.version>4.11</junit.version>
<jopt-simple.version>3.2</jopt-simple.version>
<commons-compress.version>1.5</commons-compress.version>
- <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
<force.mac.tests>false</force.mac.tests>
</properties>
@@ -225,11 +224,13 @@
<configuration>
<excludePackageNames>*.internal.*:echo:*.example.*</excludePackageNames>
<links>
-
<link>http://docs.oracle.com/javase/7/docs/api/</link>
+
<link>http://docs.oracle.com/javase/8/docs/api/</link>
</links>
<bottom>
- <![CDATA[Copyright © 2013-2016 <a
href="http://www.apache.org">The Apache Software Foundation</a>. All rights
reserved.]]>
+ <![CDATA[Copyright © 2013-2020 <a
href="http://www.apache.org">The Apache Software Foundation</a>. All rights
reserved.]]>
</bottom>
+ <doclint>none</doclint>
+ <failOnError>false</failOnError>
</configuration>
<executions>
<execution>
@@ -375,12 +376,13 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
+ <version>3.2.0</version>
<configuration>
<finalName>apache-twill-${project.version}</finalName>
<formats>
<format>tar.gz</format>
</formats>
+ <tarLongFileMode>posix</tarLongFileMode>
</configuration>
</plugin>
<plugin>
@@ -395,76 +397,6 @@
</build>
</profile>
<profile>
- <id>hadoop-2.0</id>
- <properties>
- <hadoop.version>2.0.2-alpha</hadoop.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>cdh-4.4.0</id>
- <properties>
- <hadoop.version>2.0.0-cdh4.4.0</hadoop.version>
- </properties>
- <repositories>
- <repository>
- <id>cloudera-releases</id>
-
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
<id>hadoop-2.1</id>
<properties>
<hadoop.version>2.1.0-beta</hadoop.version>
@@ -488,18 +420,6 @@
</sources>
</configuration>
</execution>
- <execution>
- <id>add-source-2.0</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
@@ -530,18 +450,6 @@
</sources>
</configuration>
</execution>
- <execution>
- <id>add-source-2.0</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
@@ -573,18 +481,6 @@
</sources>
</configuration>
</execution>
- <execution>
- <id>add-source-2.0</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
@@ -616,18 +512,6 @@
</sources>
</configuration>
</execution>
- <execution>
- <id>add-source-2.0</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
@@ -673,18 +557,6 @@
</sources>
</configuration>
</execution>
- <execution>
- <id>add-source-2.0</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
@@ -716,18 +588,6 @@
</sources>
</configuration>
</execution>
- <execution>
- <id>add-source-2.0</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
@@ -763,18 +623,6 @@
</sources>
</configuration>
</execution>
- <execution>
- <id>add-source-2.0</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/hadoop20</source>
- </sources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index bbe189f..c3816ab 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -107,90 +107,6 @@
<profiles>
<profile>
- <id>hadoop-2.0</id>
- <properties>
- <output.dir>${hadoop20.output.dir}</output.dir>
- </properties>
- </profile>
- <profile>
- <id>hadoop-2.1</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- <profile>
- <id>hadoop-2.2</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- <profile>
- <id>hadoop-2.3</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- <profile>
- <id>hadoop-2.4</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- <profile>
- <id>hadoop-2.5</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- <profile>
- <id>hadoop-2.6</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- <profile>
<id>mapr-hadoop-2.4</id>
<dependencies>
<dependency>
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
deleted file mode 100644
index 76de0c0..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
-import org.apache.twill.internal.yarn.ports.AMRMClient;
-import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
-import org.apache.twill.internal.yarn.ports.AllocationResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- *
- */
-public final class Hadoop20YarnAMClient extends
AbstractYarnAMClient<AMRMClient.ContainerRequest> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
- private static final Function<ContainerStatus, YarnContainerStatus>
STATUS_TRANSFORM;
-
- static {
- STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
- @Override
- public YarnContainerStatus apply(ContainerStatus status) {
- return new Hadoop20YarnContainerStatus(status);
- }
- };
- }
-
- private final AMRMClient amrmClient;
- private final YarnNMClient nmClient;
- private Resource maxCapability;
- private Resource minCapability;
-
- public Hadoop20YarnAMClient(Configuration conf) {
- super(ApplicationConstants.AM_CONTAINER_ID_ENV);
-
- this.amrmClient = new
AMRMClientImpl(containerId.getApplicationAttemptId());
- this.amrmClient.init(conf);
- this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
- }
-
- @Override
- protected ContainerId containerIdLookup(String containerIdStr) {
- return (ConverterUtils.toContainerId(containerIdStr));
- }
-
- @Override
- protected void startUp() throws Exception {
- Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
- Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
-
- amrmClient.start();
-
- String url = String.format("%s:%d",
- trackerUrl.getHost(),
- trackerUrl.getPort() == -1 ?
trackerUrl.getDefaultPort() : trackerUrl.getPort());
- RegisterApplicationMasterResponse response =
amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
-
trackerAddr.getPort(),
-
url);
- maxCapability = response.getMaximumResourceCapability();
- minCapability = response.getMinimumResourceCapability();
- }
-
- @Override
- protected void shutDown() throws Exception {
- amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, trackerUrl.toString());
- amrmClient.stop();
- }
-
- @Override
- public String getHost() {
- return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
- }
-
- @Override
- public int getNMPort() {
- return
Integer.parseInt(System.getenv().get(ApplicationConstants.NM_PORT_ENV));
- }
-
- @Override
- protected Resource adjustCapability(Resource resource) {
- int cores = YarnUtils.getVirtualCores(resource);
- int updatedCores = Math.max(Math.min(cores,
YarnUtils.getVirtualCores(maxCapability)),
- YarnUtils.getVirtualCores(minCapability));
- // Try and set the virtual cores, which older versions of YARN don't
support this.
- if (cores != updatedCores && YarnUtils.setVirtualCores(resource,
updatedCores)) {
- LOG.info("Adjust virtual cores requirement from {} to {}.", cores,
updatedCores);
- }
-
- int updatedMemory = Math.min(resource.getMemory(),
maxCapability.getMemory());
- int minMemory = minCapability.getMemory();
- updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) *
minMemory;
-
- if (resource.getMemory() != updatedMemory) {
- LOG.info("Adjust memory requirement from {} to {} MB.",
resource.getMemory(), updatedMemory);
- resource.setMemory(updatedMemory);
- }
-
- return resource;
- }
-
- @Override
- protected AMRMClient.ContainerRequest createContainerRequest(Priority
priority, Resource capability,
- @Nullable
String[] hosts, @Nullable String[] racks,
- boolean
relaxLocality) {
- // Ignore relaxLocality param since the corresponding support is not
present in Hadoop 2.0.
- return new AMRMClient.ContainerRequest(capability, hosts, racks, priority,
1);
- }
-
- @Override
- protected void addContainerRequest(AMRMClient.ContainerRequest request) {
- amrmClient.addContainerRequest(request);
- }
-
- @Override
- protected void removeContainerRequest(AMRMClient.ContainerRequest request) {
- amrmClient.removeContainerRequest(request);
- }
-
- @Override
- protected void updateBlacklist(List<String> blacklistAdditions, List<String>
blacklistRemovals) {
- // An empty implementation since Blacklist is not supported in Hadoop-2.0.
- if (recordUnsupportedFeature("blacklist")) {
- LOG.warn("Blacklist is not supported in Hadoop 2.0");
- }
- }
-
- @Override
- protected AllocateResult doAllocate(float progress) throws Exception {
- AllocationResponse response = amrmClient.allocate(progress);
- List<RunnableProcessLauncher> launchers
- =
Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
-
- for (Container container : response.getAllocatedContainers()) {
- launchers.add(new RunnableProcessLauncher(new
Hadoop20YarnContainerInfo(container), nmClient));
- }
-
- List<YarnContainerStatus> completed = ImmutableList.copyOf(
- Iterables.transform(response.getCompletedContainersStatuses(),
STATUS_TRANSFORM));
-
- return new AllocateResult(launchers, completed);
- }
-
- @Override
- protected void releaseAssignedContainer(YarnContainerInfo containerInfo) {
- Container container = containerInfo.getContainer();
- amrmClient.releaseAssignedContainer(container.getId());
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
deleted file mode 100644
index 787f8a4..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.twill.api.Configs;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
-import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationSubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * <p>
- * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.0.
- *
- * The {@link VersionDetectYarnAppClientFactory} class will decide to return
instance of this class for
- * Apache Hadoop 2.0.
- * </p>
- */
-@SuppressWarnings("unused")
-public final class Hadoop20YarnAppClient implements YarnAppClient {
-
- private static final Logger LOG =
LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
- private final Configuration configuration;
- private String user;
-
- public Hadoop20YarnAppClient(Configuration configuration) {
- this.configuration = configuration;
- this.user = System.getProperty("user.name");
- }
-
- // Creates and starts a yarn client
- private YarnClient createYarnClient() {
- YarnClient yarnClient = new YarnClientImpl();
- yarnClient.init(configuration);
- yarnClient.start();
- return yarnClient;
- }
-
- @Override
- public ProcessLauncher<ApplicationMasterInfo>
createLauncher(TwillSpecification twillSpec,
- @Nullable
String schedulerQueue) throws Exception {
- YarnClient yarnClient = createYarnClient();
- try {
- // Request for new application
- final GetNewApplicationResponse response =
yarnClient.getNewApplication();
- final ApplicationId appId = response.getApplicationId();
-
- // Setup the context for application submission
- final ApplicationSubmissionContext appSubmissionContext =
Records.newRecord(ApplicationSubmissionContext.class);
- appSubmissionContext.setApplicationId(appId);
- appSubmissionContext.setApplicationName(twillSpec.getName());
- appSubmissionContext.setUser(user);
-
- if (schedulerQueue != null) {
- appSubmissionContext.setQueue(schedulerQueue);
- }
-
-
- int memoryMB = configuration.getInt(Configs.Keys.YARN_AM_MEMORY_MB,
Configs.Defaults.YARN_AM_MEMORY_MB);
- // Set the resource requirement for AM
- Resource amResource = Records.newRecord(Resource.class);
- amResource.setMemory(memoryMB);
- final Resource capability = adjustMemory(response, amResource);
- ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId,
capability.getMemory(), 1);
-
- ApplicationSubmitter submitter = new ApplicationSubmitter() {
-
- @Override
- public ProcessController<YarnApplicationReport>
submit(YarnLaunchContext launchContext) {
- YarnClient yarnClient = createYarnClient();
- try {
- ContainerLaunchContext context = launchContext.getLaunchContext();
- addRMToken(context, yarnClient);
- context.setUser(appSubmissionContext.getUser());
- context.setResource(adjustMemory(response, capability));
- appSubmissionContext.setAMContainerSpec(context);
-
- yarnClient.submitApplication(appSubmissionContext);
- return new ProcessControllerImpl(yarnClient, appId);
- } catch (YarnRemoteException e) {
- throw new RuntimeException("Failed to submit application " +
appId, e);
- } finally {
- yarnClient.stop();
- }
- }
- };
-
- return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
- } finally {
- yarnClient.stop();
- }
- }
-
- private Resource adjustMemory(GetNewApplicationResponse response, Resource
capability) {
- int minMemory = response.getMinimumResourceCapability().getMemory();
-
- int updatedMemory = Math.min(capability.getMemory(),
response.getMaximumResourceCapability().getMemory());
- updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) *
minMemory;
-
- if (updatedMemory != capability.getMemory()) {
- capability.setMemory(updatedMemory);
- }
-
- return capability;
- }
-
- private void addRMToken(ContainerLaunchContext context, YarnClient
yarnClient) {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
- }
-
- try {
- Credentials credentials =
YarnUtils.decodeCredentials(context.getContainerTokens());
-
- Configuration config = yarnClient.getConfig();
- Token<TokenIdentifier> token = convertToken(
- yarnClient.getRMDelegationToken(new
Text(YarnUtils.getYarnTokenRenewer(config))),
- YarnUtils.getRMAddress(config));
-
- LOG.debug("Added RM delegation token {}", token);
- credentials.addToken(token.getService(), token);
-
- context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
- } catch (IOException e) {
- throw new RuntimeException("Failed to acquire RM delegation token", e);
- }
- }
-
- private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken
protoToken,
- @Nullable
InetSocketAddress serviceAddr) {
- Token<T> token = new Token<>(protoToken.getIdentifier().array(),
- protoToken.getPassword().array(),
- new Text(protoToken.getKind()),
- new Text(protoToken.getService()));
- if (serviceAddr != null) {
- SecurityUtil.setTokenService(token, serviceAddr);
- }
- return token;
- }
-
- @Override
- public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
-
TwillSpecification twillSpec,
- @Nullable
String schedulerQueue) throws Exception {
- this.user = user;
- return createLauncher(twillSpec, schedulerQueue);
- }
-
- @Override
- public ProcessController<YarnApplicationReport>
createProcessController(ApplicationId appId) {
- return new ProcessControllerImpl(createYarnClient(), appId);
- }
-
- @Override
- public List<NodeReport> getNodeReports() throws Exception {
- YarnClient yarnClient = createYarnClient();
- try {
- return yarnClient.getNodeReports();
- } finally {
- yarnClient.stop();
- }
- }
-
- private static final class ProcessControllerImpl implements
ProcessController<YarnApplicationReport> {
- private final YarnClient yarnClient;
- private final ApplicationId appId;
-
- ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
- this.yarnClient = yarnClient;
- this.appId = appId;
- }
-
- @Override
- public YarnApplicationReport getReport() {
- try {
- return new
Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
- } catch (YarnRemoteException e) {
- throw new RuntimeException("Failed to get application report for " +
appId, e);
- }
- }
-
- @Override
- public void cancel() {
- try {
- yarnClient.killApplication(appId);
- } catch (YarnRemoteException e) {
- throw new RuntimeException("Failed to kill application " + appId, e);
- }
- }
-
- @Override
- public void close() throws Exception {
- yarnClient.stop();
- }
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
deleted file mode 100644
index 8d6e2df..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-
-/**
- *
- */
-public final class Hadoop20YarnApplicationReport implements
YarnApplicationReport {
-
- private final ApplicationReport report;
-
- public Hadoop20YarnApplicationReport(ApplicationReport report) {
- this.report = report;
- }
-
- @Override
- public ApplicationId getApplicationId() {
- return report.getApplicationId();
- }
-
- @Override
- public ApplicationAttemptId getCurrentApplicationAttemptId() {
- return report.getCurrentApplicationAttemptId();
- }
-
- @Override
- public String getQueue() {
- return report.getQueue();
- }
-
- @Override
- public String getName() {
- return report.getName();
- }
-
- @Override
- public String getHost() {
- return report.getHost();
- }
-
- @Override
- public int getRpcPort() {
- return report.getRpcPort();
- }
-
- @Override
- public YarnApplicationState getYarnApplicationState() {
- return report.getYarnApplicationState();
- }
-
- @Override
- public String getDiagnostics() {
- return report.getDiagnostics();
- }
-
- @Override
- public String getTrackingUrl() {
- return "http://" + report.getTrackingUrl();
- }
-
- @Override
- public String getOriginalTrackingUrl() {
- return "http://" + report.getOriginalTrackingUrl();
- }
-
- @Override
- public long getStartTime() {
- return report.getStartTime();
- }
-
- @Override
- public long getFinishTime() {
- return report.getFinishTime();
- }
-
- @Override
- public FinalApplicationStatus getFinalApplicationStatus() {
- return report.getFinalApplicationStatus();
- }
-
- @Override
- public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
- return report.getApplicationResourceUsageReport();
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
deleted file mode 100644
index 79b2cb5..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.yarn.api.records.Container;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
-
- private final Container container;
-
- public Hadoop20YarnContainerInfo(Container container) {
- this.container = container;
- }
-
- @Override
- public <T> T getContainer() {
- return (T) container;
- }
-
- @Override
- public String getId() {
- return container.getId().toString();
- }
-
- @Override
- public InetAddress getHost() {
- try {
- return InetAddress.getByName(container.getNodeId().getHost());
- } catch (UnknownHostException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public int getPort() {
- return container.getNodeId().getPort();
- }
-
- @Override
- public int getMemoryMB() {
- return container.getResource().getMemory();
- }
-
- @Override
- public int getVirtualCores() {
- return YarnUtils.getVirtualCores(container.getResource());
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
deleted file mode 100644
index cc61856..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
-
- private final ContainerStatus containerStatus;
-
- public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
- this.containerStatus = containerStatus;
- }
-
- @Override
- public String getContainerId() {
- return containerStatus.getContainerId().toString();
- }
-
- @Override
- public ContainerState getState() {
- return containerStatus.getState();
- }
-
- @Override
- public int getExitStatus() {
- return containerStatus.getExitStatus();
- }
-
- @Override
- public String getDiagnostics() {
- return containerStatus.getDiagnostics();
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
deleted file mode 100644
index b1f6d66..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
-
- private static final Function<YarnLocalResource, LocalResource>
RESOURCE_TRANSFORM;
-
- static {
- // Creates transform function from YarnLocalResource -> LocalResource
- RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
- @Override
- public LocalResource apply(YarnLocalResource input) {
- return input.getLocalResource();
- }
- };
- }
-
- private final ContainerLaunchContext launchContext;
-
- public Hadoop20YarnLaunchContext() {
- launchContext = Records.newRecord(ContainerLaunchContext.class);
- }
-
- @Override
- public <T> T getLaunchContext() {
- return (T) launchContext;
- }
-
- @Override
- public void setCredentials(Credentials credentials) {
- launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
- }
-
- @Override
- public void setLocalResources(Map<String, YarnLocalResource> localResources)
{
- launchContext.setLocalResources(Maps.transformValues(localResources,
RESOURCE_TRANSFORM));
- }
-
- @Override
- public void setServiceData(Map<String, ByteBuffer> serviceData) {
- launchContext.setServiceData(serviceData);
- }
-
- @Override
- public Map<String, String> getEnvironment() {
- return launchContext.getEnvironment();
- }
-
- @Override
- public void setEnvironment(Map<String, String> environment) {
- launchContext.setEnvironment(environment);
- }
-
- @Override
- public List<String> getCommands() {
- return launchContext.getCommands();
- }
-
- @Override
- public void setCommands(List<String> commands) {
- launchContext.setCommands(commands);
- }
-
- @Override
- public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
- launchContext.setApplicationACLs(acls);
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
deleted file mode 100644
index b327b94..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- *
- */
-public final class Hadoop20YarnLocalResource implements YarnLocalResource {
-
- private final LocalResource localResource;
-
- public Hadoop20YarnLocalResource() {
- this.localResource = Records.newRecord(LocalResource.class);
- }
-
- @Override
- public <T> T getLocalResource() {
- return (T) localResource;
- }
-
- @Override
- public URL getResource() {
- return localResource.getResource();
- }
-
- @Override
- public void setResource(URL resource) {
- localResource.setResource(resource);
- }
-
- @Override
- public long getSize() {
- return localResource.getSize();
- }
-
- @Override
- public void setSize(long size) {
- localResource.setSize(size);
- }
-
- @Override
- public long getTimestamp() {
- return localResource.getTimestamp();
- }
-
- @Override
- public void setTimestamp(long timestamp) {
- localResource.setTimestamp(timestamp);
- }
-
- @Override
- public LocalResourceType getType() {
- return localResource.getType();
- }
-
- @Override
- public void setType(LocalResourceType type) {
- localResource.setType(type);
- }
-
- @Override
- public LocalResourceVisibility getVisibility() {
- return localResource.getVisibility();
- }
-
- @Override
- public void setVisibility(LocalResourceVisibility visibility) {
- localResource.setVisibility(visibility);
- }
-
- @Override
- public String getPattern() {
- return localResource.getPattern();
- }
-
- @Override
- public void setPattern(String pattern) {
- localResource.setPattern(pattern);
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
deleted file mode 100644
index e8628da..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.twill.common.Cancellable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public final class Hadoop20YarnNMClient implements YarnNMClient {
-
- private static final Logger LOG =
LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
-
- private final YarnRPC yarnRPC;
- private final Configuration yarnConf;
-
- public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
- this.yarnRPC = yarnRPC;
- this.yarnConf = yarnConf;
- }
-
- @Override
- public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext
launchContext) {
- ContainerLaunchContext context = launchContext.getLaunchContext();
- context.setUser(System.getProperty("user.name"));
-
- Container container = containerInfo.getContainer();
-
- context.setContainerId(container.getId());
- context.setResource(container.getResource());
-
- StartContainerRequest startRequest =
Records.newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(context);
-
- ContainerManager manager = connectContainerManager(container);
- try {
- manager.startContainer(startRequest);
- return new ContainerTerminator(container, manager);
- } catch (YarnRemoteException e) {
- LOG.error("Error in launching process", e);
- throw Throwables.propagate(e);
- }
-
- }
-
- /**
- * Helper to connect to container manager (node manager).
- */
- private ContainerManager connectContainerManager(Container container) {
- String cmIpPortStr = String.format("%s:%d",
container.getNodeId().getHost(), container.getNodeId().getPort());
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class,
cmAddress, yarnConf));
- }
-
- private static final class ContainerTerminator implements Cancellable {
-
- private final Container container;
- private final ContainerManager manager;
-
- private ContainerTerminator(Container container, ContainerManager manager)
{
- this.container = container;
- this.manager = manager;
- }
-
- @Override
- public void cancel() {
- LOG.info("Request to stop container {}.", container.getId());
- StopContainerRequest stopRequest =
Records.newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(container.getId());
- try {
- manager.stopContainer(stopRequest);
- while (true) {
- GetContainerStatusRequest statusRequest =
Records.newRecord(GetContainerStatusRequest.class);
- statusRequest.setContainerId(container.getId());
- GetContainerStatusResponse statusResponse =
manager.getContainerStatus(statusRequest);
- LOG.trace("Container status: {} {}", statusResponse.getStatus(),
statusResponse.getStatus().getDiagnostics());
-
- if (statusResponse.getStatus().getState() ==
ContainerState.COMPLETE) {
- break;
- }
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
- }
- LOG.info("Container {} stopped.", container.getId());
- } catch (YarnRemoteException e) {
- LOG.error("Fail to stop container {}", container.getId(), e);
- throw Throwables.propagate(e);
- }
- }
- }
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
deleted file mode 100644
index 26b6fa2..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.service.Service;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public interface AMRMClient extends Service {
-
- /**
- * Value used to define no locality.
- */
- static final String ANY = "*";
-
- /**
- * Object to represent container request for resources.
- * Resources may be localized to nodes and racks.
- * Resources may be assigned priorities.
- * Can ask for multiple containers of a given type.
- */
- public static class ContainerRequest {
- Resource capability;
- String[] hosts;
- String[] racks;
- Priority priority;
- int containerCount;
-
- public ContainerRequest(Resource capability, String[] hosts,
- String[] racks, Priority priority, int
containerCount) {
- this.capability = capability;
- this.hosts = (hosts != null ? hosts.clone() : null);
- this.racks = (racks != null ? racks.clone() : null);
- this.priority = priority;
- this.containerCount = containerCount;
- }
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Capability[").append(capability).append("]");
- sb.append("Priority[").append(priority).append("]");
- sb.append("ContainerCount[").append(containerCount).append("]");
- return sb.toString();
- }
- }
-
- /**
- * Register the application master. This must be called before any
- * other interaction
- * @param appHostName Name of the host on which master is running
- * @param appHostPort Port master is listening on
- * @param appTrackingUrl URL at which the master info can be seen
- * @return <code>RegisterApplicationMasterResponse</code>
- * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
- */
- public RegisterApplicationMasterResponse
- registerApplicationMaster(String appHostName,
- int appHostPort,
- String appTrackingUrl)
- throws YarnRemoteException;
-
- /**
- * Request additional containers and receive new container allocations.
- * Requests made via <code>addContainerRequest</code> are sent to the
- * <code>ResourceManager</code>. New containers assigned to the master are
- * retrieved. Status of completed containers and node health updates are
- * also retrieved.
- * This also doubles as a heartbeat to the ResourceManager and must be
- * made periodically.
- * The call may not always return any new allocations of containers.
- * App should not make concurrent allocate requests. May cause request loss.
- * @param progressIndicator Indicates progress made by the master
- * @return the response of the allocate request
- * @throws YarnRemoteException
- */
- public AllocationResponse allocate(float progressIndicator)
- throws YarnRemoteException;
-
- /**
- * Unregister the Application Master. This must be called in the end.
- * @param appStatus Success/Failure status of the master
- * @param appMessage Diagnostics message on failure
- * @param appTrackingUrl New URL to get master info
- * @throws YarnRemoteException
- */
- public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
- String appMessage,
- String appTrackingUrl)
- throws YarnRemoteException;
-
- /**
- * Request containers for resources before calling <code>allocate</code>.
- * @param req Resource request
- */
- public void addContainerRequest(ContainerRequest req);
-
- /**
- * Remove previous container request. The previous container request may have
- * already been sent to the ResourceManager. So even after the remove request
- * the app must be prepared to receive an allocation for the previous request
- * even after the remove request
- * @param req Resource request
- */
- public void removeContainerRequest(ContainerRequest req);
-
- /**
- * Release containers assigned by the Resource Manager. If the app cannot use
- * the container or wants to give up the container then it can release it.
- * The app needs to make new requests for the released resource capability if
- * it still needs it. For example, if it released non-local resources
- * @param containerId
- */
- public void releaseAssignedContainer(ContainerId containerId);
-
- /**
- * Get the currently available resources in the cluster.
- * A valid value is available after a call to allocate has been made
- * @return Currently available resources
- */
- public Resource getClusterAvailableResources();
-
- /**
- * Get the current number of nodes in the cluster.
- * A valid values is available after a call to allocate has been made
- * @return Current number of nodes in the cluster
- */
- public int getClusterNodeCount();
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
deleted file mode 100644
index c1bd75a..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public final class AMRMClientImpl extends AbstractService implements
AMRMClient {
-
- private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
-
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
- private int lastResponseId = 0;
-
- protected AMRMProtocol rmClient;
- protected final ApplicationAttemptId appAttemptId;
- protected Resource clusterAvailableResources;
- protected int clusterNodeCount;
-
- //Key -> Priority
- //Value -> Map
- //Key->ResourceName (e.g., hostname, rackname, *)
- //Value->Map
- //Key->Resource Capability
- //Value->ResourceRequest
- protected final
- Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
- remoteRequestsTable =
- new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
-
- protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
- new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
- protected final Set<ContainerId> release = new TreeSet<ContainerId>();
-
- public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
- super(AMRMClientImpl.class.getName());
- this.appAttemptId = appAttemptId;
- }
-
- @Override
- public synchronized void init(Configuration conf) {
- super.init(conf);
- }
-
- @Override
- public synchronized void start() {
- final YarnConfiguration conf = new YarnConfiguration(getConfig());
- final YarnRPC rpc = YarnRPC.create(conf);
- final InetSocketAddress rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
- UserGroupInformation currentUser;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- if (UserGroupInformation.isSecurityEnabled()) {
- String tokenURLEncodedStr = System.getenv().get(
- ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
- Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
- try {
- token.decodeFromUrlString(tokenURLEncodedStr);
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- SecurityUtil.setTokenService(token, rmAddress);
- if (LOG.isDebugEnabled()) {
- LOG.debug("AppMasterToken is " + token);
- }
- currentUser.addToken(token);
- }
-
- rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
- @Override
- public AMRMProtocol run() {
- return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
- conf);
- }
- });
- LOG.debug("Connecting to ResourceManager at " + rmAddress);
- super.start();
- }
-
- @Override
- public synchronized void stop() {
- if (this.rmClient != null) {
- RPC.stopProxy(this.rmClient);
- }
- super.stop();
- }
-
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
- String appHostName, int appHostPort, String appTrackingUrl)
- throws YarnRemoteException {
- // do this only once ???
- RegisterApplicationMasterRequest request = recordFactory
- .newRecordInstance(RegisterApplicationMasterRequest.class);
- synchronized (this) {
- request.setApplicationAttemptId(appAttemptId);
- }
- request.setHost(appHostName);
- request.setRpcPort(appHostPort);
- if (appTrackingUrl != null) {
- request.setTrackingUrl(appTrackingUrl);
- }
- RegisterApplicationMasterResponse response = rmClient
- .registerApplicationMaster(request);
- return response;
- }
-
- @Override
- public AllocationResponse allocate(float progressIndicator)
- throws YarnRemoteException {
- AllocateResponse allocateResponse = null;
- ArrayList<ResourceRequest> askList = null;
- ArrayList<ContainerId> releaseList = null;
- AllocateRequest allocateRequest = null;
-
- try {
- synchronized (this) {
- askList = new ArrayList<ResourceRequest>(ask);
- releaseList = new ArrayList<ContainerId>(release);
- // optimistically clear this collection assuming no RPC failure
- ask.clear();
- release.clear();
- allocateRequest = BuilderUtils
- .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
- askList, releaseList);
- }
-
- allocateResponse = rmClient.allocate(allocateRequest);
- AllocationResponse response =
AllocationResponses.create(allocateResponse);
-
- synchronized (this) {
- // update these on successful RPC
- clusterNodeCount = allocateResponse.getNumClusterNodes();
- lastResponseId = response.getResponseId();
- clusterAvailableResources = response.getAvailableResources();
- }
-
- return response;
- } finally {
- // TODO how to differentiate remote YARN exception vs error in RPC
- if (allocateResponse == null) {
- // We hit an exception in allocate()
- // Preserve ask and release for next call to allocate()
- synchronized (this) {
- release.addAll(releaseList);
- // Requests could have been added or deleted during call to allocate.
- // If requests were added/removed then there is nothing to do since
- // the ResourceRequest object in ask would have the actual new value.
- // If ask does not have this ResourceRequest then it was unchanged
and
- // so we can add the value back safely.
- // This assumes that there will no concurrent calls to allocate() and
- // so we don't have to worry about ask being changed in the
- // synchronized block at the beginning of this method.
- for (ResourceRequest oldAsk : askList) {
- if (!ask.contains(oldAsk)) {
- ask.add(oldAsk);
- }
- }
- }
- }
- }
- }
-
- @Override
- public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
- String appMessage, String
appTrackingUrl) throws YarnRemoteException {
- FinishApplicationMasterRequest request = recordFactory
- .newRecordInstance(FinishApplicationMasterRequest.class);
- request.setAppAttemptId(appAttemptId);
- request.setFinishApplicationStatus(appStatus);
- if (appMessage != null) {
- request.setDiagnostics(appMessage);
- }
- if (appTrackingUrl != null) {
- request.setTrackingUrl(appTrackingUrl);
- }
- rmClient.finishApplicationMaster(request);
- }
-
- @Override
- public synchronized void addContainerRequest(ContainerRequest req) {
- // Create resource requests
- if (req.hosts != null) {
- for (String host : req.hosts) {
- addResourceRequest(req.priority, host, req.capability,
req.containerCount);
- }
- }
-
- if (req.racks != null) {
- for (String rack : req.racks) {
- addResourceRequest(req.priority, rack, req.capability,
req.containerCount);
- }
- }
-
- // Off switch
- addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
- }
-
- @Override
- public synchronized void removeContainerRequest(ContainerRequest req) {
- // Update resource requests
- if (req.hosts != null) {
- for (String hostName : req.hosts) {
- decResourceRequest(req.priority, hostName, req.capability,
req.containerCount);
- }
- }
-
- if (req.racks != null) {
- for (String rack : req.racks) {
- decResourceRequest(req.priority, rack, req.capability,
req.containerCount);
- }
- }
-
- decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
- }
-
- @Override
- public synchronized void releaseAssignedContainer(ContainerId containerId) {
- release.add(containerId);
- }
-
- @Override
- public synchronized Resource getClusterAvailableResources() {
- return clusterAvailableResources;
- }
-
- @Override
- public synchronized int getClusterNodeCount() {
- return clusterNodeCount;
- }
-
- private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
- // This code looks weird but is needed because of the following scenario.
- // A ResourceRequest is removed from the remoteRequestTable. A 0 container
- // request is added to 'ask' to notify the RM about not needing it any
more.
- // Before the call to allocate, the user now requests more containers. If
- // the locations of the 0 size request and the new request are the same
- // (with the difference being only container count), then the set
comparator
- // will consider both to be the same and not add the new request to ask.
So
- // we need to check for the "same" request being present and remove it and
- // then add it back. The comparator is container count agnostic.
- // This should happen only rarely but we do need to guard against it.
- if (ask.contains(remoteRequest)) {
- ask.remove(remoteRequest);
- }
- ask.add(remoteRequest);
- }
-
- private void addResourceRequest(Priority priority, String resourceName,
- Resource capability, int containerCount) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
- this.remoteRequestsTable.get(priority);
- if (remoteRequests == null) {
- remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
- this.remoteRequestsTable.put(priority, remoteRequests);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added priority=" + priority);
- }
- }
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
- if (reqMap == null) {
- reqMap = new HashMap<Resource, ResourceRequest>();
- remoteRequests.put(resourceName, reqMap);
- }
- ResourceRequest remoteRequest = reqMap.get(capability);
- if (remoteRequest == null) {
- remoteRequest = BuilderUtils.
- newResourceRequest(priority, resourceName, capability, 0);
- reqMap.put(capability, remoteRequest);
- }
-
- remoteRequest.setNumContainers(remoteRequest.getNumContainers() +
containerCount);
-
- // Note this down for next interaction with ResourceManager
- addResourceRequestToAsk(remoteRequest);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("addResourceRequest:" + " applicationId="
- + appAttemptId + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
- }
- }
-
- private void decResourceRequest(Priority priority, String resourceName,
- Resource capability, int containerCount) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
- this.remoteRequestsTable.get(priority);
-
- if (remoteRequests == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not decrementing resource as priority " + priority
- + " is not present in request table");
- }
- return;
- }
-
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
- if (reqMap == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not decrementing resource as " + resourceName
- + " is not present in request table");
- }
- return;
- }
- ResourceRequest remoteRequest = reqMap.get(capability);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("BEFORE decResourceRequest:" + " applicationId="
- + appAttemptId + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
- }
-
- remoteRequest.
- setNumContainers(remoteRequest.getNumContainers() - containerCount);
- if (remoteRequest.getNumContainers() < 0) {
- // guard against spurious removals
- remoteRequest.setNumContainers(0);
- }
- // Send the ResourceRequest to RM even if is 0 because it needs to override
- // a previously sent value. If ResourceRequest was not sent previously then
- // sending 0 ought to be a no-op on RM.
- addResourceRequestToAsk(remoteRequest);
-
- // Delete entries from map if no longer needed.
- if (remoteRequest.getNumContainers() == 0) {
- reqMap.remove(capability);
- if (reqMap.size() == 0) {
- remoteRequests.remove(resourceName);
- }
- if (remoteRequests.size() == 0) {
- remoteRequestsTable.remove(priority);
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.info("AFTER decResourceRequest:" + " applicationId="
- + appAttemptId + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
- }
- }
-
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
deleted file mode 100644
index d8d4c19..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * This interface is to abstract the differences in Vanilla Hadoop YARN 2.0
and CDH 4.4.
- */
-public interface AllocationResponse {
-
- int getResponseId();
-
- Resource getAvailableResources();
-
- List<Container> getAllocatedContainers();
-
- List<ContainerStatus> getCompletedContainersStatuses();
-}
diff --git
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
deleted file mode 100644
index ba8b5fa..0000000
---
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import com.google.common.base.Throwables;
-import com.google.common.reflect.TypeToken;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * Factory for building instance of {@link AllocationResponse} based on the
response type.
- */
-public final class AllocationResponses {
-
- /**
- * A hack for CDH 4.4.0, as the AllocateResponse class is being rewritten
and diverted from YARN 2.0.
- */
- private static final boolean IS_CDH_4_4;
-
- static {
- boolean result = false;
- try {
- try {
- // See if it is standard YARN 2.0 AllocateResponse object.
- AllocateResponse.class.getMethod("getAMResponse");
- } catch (NoSuchMethodException e) {
- // See if it is CDH 4.4 AllocateResponse object.
- AllocationResponse.class.getMethod("getAllocatedContainers");
- result = true;
- }
- } catch (Exception e) {
- // Something very wrong in here, as it shouldn't arrive here.
- e.printStackTrace();
- throw Throwables.propagate(e);
- }
-
- IS_CDH_4_4 = result;
- }
-
- public static AllocationResponse create(Object response) {
- if (IS_CDH_4_4) {
- return new ReflectionAllocationResponse(response);
- }
-
- try {
- Object amResponse =
response.getClass().getMethod("getAMResponse").invoke(response);
- return new ReflectionAllocationResponse(amResponse);
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- private static final class ReflectionAllocationResponse implements
AllocationResponse {
-
- private final Object response;
-
- private ReflectionAllocationResponse(Object response) {
- this.response = response;
- }
-
- @Override
- public int getResponseId() {
- return call("getResponseId", TypeToken.of(Integer.class));
- }
-
- @Override
- public Resource getAvailableResources() {
- return call("getAvailableResources", TypeToken.of(Resource.class));
- }
-
- @Override
- public List<Container> getAllocatedContainers() {
- return call("getAllocatedContainers", new TypeToken<List<Container>>() {
});
- }
-
- @Override
- public List<ContainerStatus> getCompletedContainersStatuses() {
- return call("getCompletedContainersStatuses", new
TypeToken<List<ContainerStatus>>() { });
- }
-
- private <T> T call(String methodName, TypeToken<T> resultType) {
- try {
- return (T)
resultType.getRawType().cast(response.getClass().getMethod(methodName).invoke(response));
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- }
-
- private AllocationResponses() {
- }
-}
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
index c6ab02b..799b5c7 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -38,11 +38,6 @@ public final class VersionDetectYarnAMClientFactory
implements YarnAMClientFacto
Class<YarnAMClient> clz;
String clzName;
switch (YarnUtils.getHadoopVersion()) {
- case HADOOP_20:
- // Uses hadoop-2.0 class
- clzName = getClass().getPackage().getName() +
".Hadoop20YarnAMClient";
- clz = (Class<YarnAMClient>) Class.forName(clzName);
- break;
case HADOOP_21:
// Uses hadoop-2.1 class
clzName = getClass().getPackage().getName() +
".Hadoop21YarnAMClient";
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index 83de2a4..ee40e2e 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -31,9 +31,6 @@ public final class VersionDetectYarnAppClientFactory
implements YarnAppClientFac
try {
String clzName;
switch (YarnUtils.getHadoopVersion()) {
- case HADOOP_20:
- clzName = getClass().getPackage().getName() +
".Hadoop20YarnAppClient";
- break;
case HADOOP_21:
case HADOOP_22:
// 2.1 and 2.2 uses the same YarnAppClient
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index 3a2f4a5..b6454b9 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -37,7 +37,6 @@ import
org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.apache.twill.api.LocalFile;
import org.apache.twill.filesystem.FileContextLocationFactory;
import org.apache.twill.filesystem.ForwardingLocationFactory;
@@ -70,7 +69,6 @@ public class YarnUtils {
* Defines different versions of Hadoop.
*/
public enum HadoopVersions {
- HADOOP_20,
HADOOP_21,
HADOOP_22,
HADOOP_23,
@@ -109,7 +107,7 @@ public class YarnUtils {
Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last
modified time should be >= 0.");
Preconditions.checkArgument(localFile.getSize() >= 0, "File size should be
>= 0.");
- YarnLocalResource resource = createAdapter(YarnLocalResource.class);
+ YarnLocalResource resource = new Hadoop21YarnLocalResource();
resource.setVisibility(LocalResourceVisibility.APPLICATION);
resource.setResource(ConverterUtils.getYarnUrlFromURI(localFile.getURI()));
resource.setTimestamp(localFile.getLastModified());
@@ -118,7 +116,7 @@ public class YarnUtils {
}
public static YarnLaunchContext createLaunchContext() {
- return createAdapter(YarnLaunchContext.class);
+ return new Hadoop21YarnLaunchContext();
}
// temporary workaround since older versions of hadoop don't have the
getVirtualCores method.
@@ -150,32 +148,6 @@ public class YarnUtils {
}
/**
- * Creates {@link ApplicationId} from the given cluster timestamp and id.
- */
- public static ApplicationId createApplicationId(long timestamp, int id) {
- try {
- try {
- // For Hadoop-2.1
- Method method = ApplicationId.class.getMethod("newInstance",
long.class, int.class);
- return (ApplicationId) method.invoke(null, timestamp, id);
- } catch (NoSuchMethodException e) {
- // Try with Hadoop-2.0 way
- ApplicationId appId = Records.newRecord(ApplicationId.class);
-
- Method setClusterTimestamp =
ApplicationId.class.getMethod("setClusterTimestamp", long.class);
- Method setId = ApplicationId.class.getMethod("setId", int.class);
-
- setClusterTimestamp.invoke(appId, timestamp);
- setId.invoke(appId, id);
-
- return appId;
- }
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- /**
* Helper method to get delegation tokens for the given LocationFactory.
* @param config The hadoop configuration.
* @param locationFactory The LocationFactory for generating tokens.
@@ -329,52 +301,27 @@ public class YarnUtils {
return hadoopVersion;
}
try {
- Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
+ Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
try {
- Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
+ Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
try {
- Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
- try {
- Class[] args = new Class[1];
- args[0] = String.class;
- // see if we have a
org.apache.hadoop.yarn.api.records.ContainerId.fromString() method
-
Class.forName("org.apache.hadoop.yarn.api.records.ContainerId").getMethod("fromString",
args);
- HADOOP_VERSION.set(HadoopVersions.HADOOP_26);
- } catch (NoSuchMethodException e) {
- HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
- }
- } catch (ClassNotFoundException e) {
- HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
+ Class[] args = new Class[1];
+ args[0] = String.class;
+ // see if we have a
org.apache.hadoop.yarn.api.records.ContainerId.fromString() method
+
Class.forName("org.apache.hadoop.yarn.api.records.ContainerId").getMethod("fromString",
args);
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_26);
+ } catch (NoSuchMethodException e) {
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
}
} catch (ClassNotFoundException e) {
- HADOOP_VERSION.set(HadoopVersions.HADOOP_21);
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
}
} catch (ClassNotFoundException e) {
- HADOOP_VERSION.set(HadoopVersions.HADOOP_20);
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_21);
}
return HADOOP_VERSION.get();
}
- /**
- * Helper method to create adapter class for bridging between Hadoop 2.0 and
2.1.
- */
- private static <T> T createAdapter(Class<T> clz) {
- String className = clz.getPackage().getName();
-
- if (getHadoopVersion().equals(HadoopVersions.HADOOP_20)) {
- className += ".Hadoop20" + clz.getSimpleName();
- } else {
- className += ".Hadoop21" + clz.getSimpleName();
- }
-
- try {
- //noinspection unchecked
- return (T) Class.forName(className).newInstance();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
private static YarnLocalResource setLocalResourceType(YarnLocalResource
localResource, LocalFile localFile) {
if (localFile.isArchive()) {
if (localFile.getPattern() == null) {
diff --git
a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 8f844a2..72100a1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -37,7 +37,6 @@ import org.apache.twill.internal.appmaster.TrackerService;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
-import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.data.Stat;
@@ -83,13 +82,9 @@ final class YarnTwillController extends
AbstractTwillController implements Twill
super(appName, runId, zkClient, amLiveNodeData.getKafkaZKConnect() !=
null, Collections.<LogHandler>emptyList());
this.appName = appName;
this.amLiveNodeData = amLiveNodeData;
- this.startUp = new Callable<ProcessController<YarnApplicationReport>>() {
- @Override
- public ProcessController<YarnApplicationReport> call() throws Exception {
- return yarnAppClient.createProcessController(
- YarnUtils.createApplicationId(amLiveNodeData.getAppIdClusterTime(),
amLiveNodeData.getAppId()));
- }
- };
+ this.startUp = () -> yarnAppClient.createProcessController(
+ ApplicationId.newInstance(amLiveNodeData.getAppIdClusterTime(),
+ amLiveNodeData.getAppId()));
this.startTimeout = Constants.APPLICATION_MAX_START_SECONDS;
this.startTimeoutUnit = TimeUnit.SECONDS;
}
diff --git
a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
index 5629295..3372caf 100644
---
a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
+++
b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.twill.internal.yarn.YarnUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -58,16 +57,6 @@ public class FileContextLocationTest extends
LocationTestBase {
return new
FileContextLocationFactory(dfsCluster.getFileSystem().getConf(), pathBase);
}
- @Override
- protected String correctFilePermissions(String original) {
- if
(YarnUtils.HadoopVersions.HADOOP_20.equals(YarnUtils.getHadoopVersion())) {
- return original.substring(0, 2) + '-' + // strip the x for owner
- original.substring(3, 5) + '-' + // strip the x for group
- original.substring(6, 8) + '-'; // strip the x for world;
- }
- return original;
- }
-
@Test
public void testGetFileContext() throws Exception {
final FileContextLocationFactory locationFactory =
(FileContextLocationFactory) createLocationFactory("/testGetFC");
diff --git
a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
index d42cf37..379e37b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
@@ -329,7 +329,7 @@ public abstract class LocationTestBase {
Assert.assertFalse(textfile.isDirectory());
Assert.assertEquals(permissions, child.getPermissions());
Assert.assertEquals(permissions, grandchild.getPermissions());
- Assert.assertEquals(correctFilePermissions(permissions),
textfile.getPermissions());
+ Assert.assertEquals(permissions, textfile.getPermissions());
// mkdirs of existing file
Location file = factory.create("existingfile");
@@ -380,14 +380,6 @@ public abstract class LocationTestBase {
*/
protected abstract LocationFactory createLocationFactory(String pathBase)
throws Exception;
- /**
- * Some older versions of Hadoop always strip the execute permission from
files (but keep it for directories).
- * This allows subclasses to correct the expected file permissions, based on
the Hadoop version (if any).
- */
- protected String correctFilePermissions(String expectedFilePermissions) {
- return expectedFilePermissions; // unchanged by default
- }
-
protected UserGroupInformation createTestUGI() throws IOException {
String userName = System.getProperty("user.name").equals("tester") ?
"twiller" : "tester";
return UserGroupInformation.createUserForTesting(userName, new String[] {
"testgroup" });
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index 04902d5..407d519 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -42,7 +42,6 @@ import org.apache.twill.filesystem.FileContextLocationFactory;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
import org.apache.twill.internal.yarn.YarnAppClient;
-import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
@@ -128,16 +127,11 @@ public class TwillTester extends ExternalResource {
Configuration conf = new
YarnConfiguration(dfsCluster.getFileSystem().getConf());
- if
(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
- conf.set("yarn.resourcemanager.scheduler.class",
-
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
- } else {
- conf.set("yarn.resourcemanager.scheduler.class",
-
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
- conf.set("yarn.scheduler.capacity.resource-calculator",
-
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
- conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
- }
+ conf.set("yarn.resourcemanager.scheduler.class",
+
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
+ conf.set("yarn.scheduler.capacity.resource-calculator",
+
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+ conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
conf.set("yarn.nodemanager.vmem-pmem-ratio", "100.1");
conf.set("yarn.nodemanager.vmem-check-enabled", "false");
conf.set("yarn.scheduler.minimum-allocation-mb", "128");
@@ -235,8 +229,8 @@ public class TwillTester extends ExternalResource {
public ApplicationResourceUsageReport getApplicationResourceReport(String
appId) throws Exception {
List<String> splits = Lists.newArrayList(Splitter.on('_').split(appId));
Preconditions.checkArgument(splits.size() == 3, "Invalid application id -
" + appId);
- ApplicationId applicationId =
- YarnUtils.createApplicationId(Long.parseLong(splits.get(1)),
Integer.parseInt(splits.get(2)));
+ ApplicationId applicationId =
ApplicationId.newInstance(Long.parseLong(splits.get(1)),
+
Integer.parseInt(splits.get(2)));
ClientRMService clientRMService =
cluster.getResourceManager().getClientRMService();
GetApplicationReportRequest request =
Records.newRecord(GetApplicationReportRequest.class);