This is an automated email from the ASF dual-hosted git repository.

chtyim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/twill.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b993c  (TWILL-270) Remove support for old Hadoop-2.0 alpha
77b993c is described below

commit 77b993c050883324f9e3c62fbe6ea131a86d1e04
Author: Terence Yim <[email protected]>
AuthorDate: Mon Jan 6 13:57:10 2020 -0800

    (TWILL-270) Remove support for old Hadoop-2.0 alpha
    
    This closes #82 from Github.
    
    Signed-off-by: Terence Yim <[email protected]>
---
 .travis.yml                                        |   2 -
 pom.xml                                            | 164 +-------
 twill-yarn/pom.xml                                 |  84 -----
 .../twill/internal/yarn/Hadoop20YarnAMClient.java  | 183 ---------
 .../twill/internal/yarn/Hadoop20YarnAppClient.java | 238 ------------
 .../yarn/Hadoop20YarnApplicationReport.java        | 107 ------
 .../internal/yarn/Hadoop20YarnContainerInfo.java   |  70 ----
 .../internal/yarn/Hadoop20YarnContainerStatus.java |  53 ---
 .../internal/yarn/Hadoop20YarnLaunchContext.java   |  99 -----
 .../internal/yarn/Hadoop20YarnLocalResource.java   | 101 -----
 .../twill/internal/yarn/Hadoop20YarnNMClient.java  | 125 -------
 .../twill/internal/yarn/ports/AMRMClient.java      | 149 --------
 .../twill/internal/yarn/ports/AMRMClientImpl.java  | 412 ---------------------
 .../internal/yarn/ports/AllocationResponse.java    |  38 --
 .../internal/yarn/ports/AllocationResponses.java   | 111 ------
 .../yarn/VersionDetectYarnAMClientFactory.java     |   5 -
 .../yarn/VersionDetectYarnAppClientFactory.java    |   3 -
 .../org/apache/twill/internal/yarn/YarnUtils.java  |  79 +---
 .../org/apache/twill/yarn/YarnTwillController.java |  11 +-
 .../twill/filesystem/FileContextLocationTest.java  |  11 -
 .../apache/twill/filesystem/LocationTestBase.java  |  10 +-
 .../java/org/apache/twill/yarn/TwillTester.java    |  20 +-
 22 files changed, 30 insertions(+), 2045 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 2c0770a..139fff7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,13 +32,11 @@ script: mvn --batch-mode test -P $PROFILE 
-Dsurefire.redirectTestOutputToFile=fa
 install: mvn --batch-mode install -P $PROFILE -DskipTests=true
 
 env:
-  - PROFILE='hadoop-2.0'
   - PROFILE='hadoop-2.1'
   - PROFILE='hadoop-2.2'
   - PROFILE='hadoop-2.4'
   - PROFILE='hadoop-2.5'
   - PROFILE='hadoop-2.6'
-  - PROFILE='cdh-4.4.0'
   - PROFILE='mapr-hadoop-2.4'
   - PROFILE='hadoop-2.6,java8-test'
 
diff --git a/pom.xml b/pom.xml
index 8c29192..5c8ad78 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,7 +179,6 @@
         <junit.version>4.11</junit.version>
         <jopt-simple.version>3.2</jopt-simple.version>
         <commons-compress.version>1.5</commons-compress.version>
-        <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
         <force.mac.tests>false</force.mac.tests>
     </properties>
 
@@ -225,11 +224,13 @@
                     <configuration>
                         
<excludePackageNames>*.internal.*:echo:*.example.*</excludePackageNames>
                         <links>
-                            
<link>http://docs.oracle.com/javase/7/docs/api/</link>
+                            
<link>http://docs.oracle.com/javase/8/docs/api/</link>
                         </links>
                         <bottom>
-                            <![CDATA[Copyright &#169; 2013-2016 <a 
href="http://www.apache.org";>The Apache Software Foundation</a>. All rights 
reserved.]]>
+                            <![CDATA[Copyright &#169; 2013-2020 <a 
href="http://www.apache.org";>The Apache Software Foundation</a>. All rights 
reserved.]]>
                         </bottom>
+                        <doclint>none</doclint>
+                        <failOnError>false</failOnError>
                     </configuration>
                     <executions>
                         <execution>
@@ -375,12 +376,13 @@
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-assembly-plugin</artifactId>
-                        <version>2.4</version>
+                        <version>3.2.0</version>
                         <configuration>
                             
<finalName>apache-twill-${project.version}</finalName>
                             <formats>
                                 <format>tar.gz</format>
                             </formats>
+                            <tarLongFileMode>posix</tarLongFileMode>
                         </configuration>
                     </plugin>
                     <plugin>
@@ -395,76 +397,6 @@
             </build>
         </profile>
         <profile>
-            <id>hadoop-2.0</id>
-            <properties>
-                <hadoop.version>2.0.2-alpha</hadoop.version>
-            </properties>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.codehaus.mojo</groupId>
-                        <artifactId>build-helper-maven-plugin</artifactId>
-                        <version>1.8</version>
-                        <executions>
-                            <execution>
-                                <id>add-source</id>
-                                <phase>generate-sources</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
-            <id>cdh-4.4.0</id>
-            <properties>
-                <hadoop.version>2.0.0-cdh4.4.0</hadoop.version>
-            </properties>
-            <repositories>
-                <repository>
-                    <id>cloudera-releases</id>
-                    
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
-                    <releases>
-                        <enabled>true</enabled>
-                    </releases>
-                    <snapshots>
-                        <enabled>false</enabled>
-                    </snapshots>
-                </repository>
-            </repositories>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.codehaus.mojo</groupId>
-                        <artifactId>build-helper-maven-plugin</artifactId>
-                        <version>1.8</version>
-                        <executions>
-                            <execution>
-                                <id>add-source</id>
-                                <phase>generate-sources</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
             <id>hadoop-2.1</id>
             <properties>
                 <hadoop.version>2.1.0-beta</hadoop.version>
@@ -488,18 +420,6 @@
                                     </sources>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>add-source-2.0</id>
-                                <phase>prepare-package</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
@@ -530,18 +450,6 @@
                                     </sources>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>add-source-2.0</id>
-                                <phase>prepare-package</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
@@ -573,18 +481,6 @@
                                     </sources>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>add-source-2.0</id>
-                                <phase>prepare-package</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
@@ -616,18 +512,6 @@
                                     </sources>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>add-source-2.0</id>
-                                <phase>prepare-package</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
@@ -673,18 +557,6 @@
                                     </sources>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>add-source-2.0</id>
-                                <phase>prepare-package</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
@@ -716,18 +588,6 @@
                                     </sources>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>add-source-2.0</id>
-                                <phase>prepare-package</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
@@ -763,18 +623,6 @@
                                     </sources>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>add-source-2.0</id>
-                                <phase>prepare-package</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        <source>src/main/hadoop20</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index bbe189f..c3816ab 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -107,90 +107,6 @@
 
     <profiles>
         <profile>
-            <id>hadoop-2.0</id>
-            <properties>
-                <output.dir>${hadoop20.output.dir}</output.dir>
-            </properties>
-        </profile>
-        <profile>
-            <id>hadoop-2.1</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-        <profile>
-            <id>hadoop-2.2</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-        <profile>
-            <id>hadoop-2.3</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-        <profile>
-            <id>hadoop-2.4</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-        <profile>
-            <id>hadoop-2.5</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-        <profile>
-            <id>hadoop-2.6</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-        <profile>
             <id>mapr-hadoop-2.4</id>
             <dependencies>
                 <dependency>
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
deleted file mode 100644
index 76de0c0..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
-import org.apache.twill.internal.yarn.ports.AMRMClient;
-import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
-import org.apache.twill.internal.yarn.ports.AllocationResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- *
- */
-public final class Hadoop20YarnAMClient extends 
AbstractYarnAMClient<AMRMClient.ContainerRequest> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
-  private static final Function<ContainerStatus, YarnContainerStatus> 
STATUS_TRANSFORM;
-
-  static {
-    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
-      @Override
-      public YarnContainerStatus apply(ContainerStatus status) {
-        return new Hadoop20YarnContainerStatus(status);
-      }
-    };
-  }
-
-  private final AMRMClient amrmClient;
-  private final YarnNMClient nmClient;
-  private Resource maxCapability;
-  private Resource minCapability;
-
-  public Hadoop20YarnAMClient(Configuration conf) {
-    super(ApplicationConstants.AM_CONTAINER_ID_ENV);
-
-    this.amrmClient = new 
AMRMClientImpl(containerId.getApplicationAttemptId());
-    this.amrmClient.init(conf);
-    this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
-  }
-
-  @Override
-  protected ContainerId containerIdLookup(String containerIdStr) {
-    return (ConverterUtils.toContainerId(containerIdStr));
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
-    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
-
-    amrmClient.start();
-
-    String url = String.format("%s:%d",
-                               trackerUrl.getHost(),
-                               trackerUrl.getPort() == -1 ? 
trackerUrl.getDefaultPort() : trackerUrl.getPort());
-    RegisterApplicationMasterResponse response = 
amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
-                                                                               
       trackerAddr.getPort(),
-                                                                               
       url);
-    maxCapability = response.getMaximumResourceCapability();
-    minCapability = response.getMinimumResourceCapability();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
null, trackerUrl.toString());
-    amrmClient.stop();
-  }
-
-  @Override
-  public String getHost() {
-    return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
-  }
-
-  @Override
-  public int getNMPort() {
-    return 
Integer.parseInt(System.getenv().get(ApplicationConstants.NM_PORT_ENV));
-  }
-
-  @Override
-  protected Resource adjustCapability(Resource resource) {
-    int cores = YarnUtils.getVirtualCores(resource);
-    int updatedCores = Math.max(Math.min(cores, 
YarnUtils.getVirtualCores(maxCapability)),
-                                YarnUtils.getVirtualCores(minCapability));
-    // Try and set the virtual cores, which older versions of YARN don't 
support this.
-    if (cores != updatedCores && YarnUtils.setVirtualCores(resource, 
updatedCores)) {
-      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, 
updatedCores);
-    }
-
-    int updatedMemory = Math.min(resource.getMemory(), 
maxCapability.getMemory());
-    int minMemory = minCapability.getMemory();
-    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * 
minMemory;
-
-    if (resource.getMemory() != updatedMemory) {
-      LOG.info("Adjust memory requirement from {} to {} MB.", 
resource.getMemory(), updatedMemory);
-      resource.setMemory(updatedMemory);
-    }
-
-    return resource;
-  }
-
-  @Override
-  protected AMRMClient.ContainerRequest createContainerRequest(Priority 
priority, Resource capability,
-                                                               @Nullable 
String[] hosts, @Nullable String[] racks,
-                                                               boolean 
relaxLocality) {
-    // Ignore relaxLocality param since the corresponding support is not 
present in Hadoop 2.0.
-    return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 
1);
-  }
-
-  @Override
-  protected void addContainerRequest(AMRMClient.ContainerRequest request) {
-    amrmClient.addContainerRequest(request);
-  }
-
-  @Override
-  protected void removeContainerRequest(AMRMClient.ContainerRequest request) {
-    amrmClient.removeContainerRequest(request);
-  }
-
-  @Override
-  protected void updateBlacklist(List<String> blacklistAdditions, List<String> 
blacklistRemovals) {
-    // An empty implementation since Blacklist is not supported in Hadoop-2.0.
-    if (recordUnsupportedFeature("blacklist")) {
-      LOG.warn("Blacklist is not supported in Hadoop 2.0");
-    }
-  }
-
-  @Override
-  protected AllocateResult doAllocate(float progress) throws Exception {
-    AllocationResponse response = amrmClient.allocate(progress);
-    List<RunnableProcessLauncher> launchers
-      = 
Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
-
-    for (Container container : response.getAllocatedContainers()) {
-      launchers.add(new RunnableProcessLauncher(new 
Hadoop20YarnContainerInfo(container), nmClient));
-    }
-
-    List<YarnContainerStatus> completed = ImmutableList.copyOf(
-      Iterables.transform(response.getCompletedContainersStatuses(), 
STATUS_TRANSFORM));
-
-    return new AllocateResult(launchers, completed);
-  }
-
-  @Override
-  protected void releaseAssignedContainer(YarnContainerInfo containerInfo) {
-    Container container = containerInfo.getContainer();
-    amrmClient.releaseAssignedContainer(container.getId());
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
deleted file mode 100644
index 787f8a4..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.twill.api.Configs;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
-import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationSubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * <p>
- * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.0.
- *
- * The {@link VersionDetectYarnAppClientFactory} class will decide to return 
instance of this class for
- * Apache Hadoop 2.0.
- * </p>
- */
-@SuppressWarnings("unused")
-public final class Hadoop20YarnAppClient implements YarnAppClient {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
-  private final Configuration configuration;
-  private String user;
-
-  public Hadoop20YarnAppClient(Configuration configuration) {
-    this.configuration = configuration;
-    this.user = System.getProperty("user.name");
-  }
-
-  // Creates and starts a yarn client
-  private YarnClient createYarnClient() {
-    YarnClient yarnClient = new YarnClientImpl();
-    yarnClient.init(configuration);
-    yarnClient.start();
-    return yarnClient;
-  }
-
-  @Override
-  public ProcessLauncher<ApplicationMasterInfo> 
createLauncher(TwillSpecification twillSpec,
-                                                               @Nullable 
String schedulerQueue) throws Exception {
-    YarnClient yarnClient = createYarnClient();
-    try {
-      // Request for new application
-      final GetNewApplicationResponse response = 
yarnClient.getNewApplication();
-      final ApplicationId appId = response.getApplicationId();
-
-      // Setup the context for application submission
-      final ApplicationSubmissionContext appSubmissionContext = 
Records.newRecord(ApplicationSubmissionContext.class);
-      appSubmissionContext.setApplicationId(appId);
-      appSubmissionContext.setApplicationName(twillSpec.getName());
-      appSubmissionContext.setUser(user);
-
-      if (schedulerQueue != null) {
-        appSubmissionContext.setQueue(schedulerQueue);
-      }
-
-
-      int memoryMB = configuration.getInt(Configs.Keys.YARN_AM_MEMORY_MB, 
Configs.Defaults.YARN_AM_MEMORY_MB);
-      // Set the resource requirement for AM
-      Resource amResource = Records.newRecord(Resource.class);
-      amResource.setMemory(memoryMB);
-      final Resource capability = adjustMemory(response, amResource);
-      ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, 
capability.getMemory(), 1);
-
-      ApplicationSubmitter submitter = new ApplicationSubmitter() {
-
-        @Override
-        public ProcessController<YarnApplicationReport> 
submit(YarnLaunchContext launchContext) {
-          YarnClient yarnClient = createYarnClient();
-          try {
-            ContainerLaunchContext context = launchContext.getLaunchContext();
-            addRMToken(context, yarnClient);
-            context.setUser(appSubmissionContext.getUser());
-            context.setResource(adjustMemory(response, capability));
-            appSubmissionContext.setAMContainerSpec(context);
-
-            yarnClient.submitApplication(appSubmissionContext);
-            return new ProcessControllerImpl(yarnClient, appId);
-          } catch (YarnRemoteException e) {
-            throw new RuntimeException("Failed to submit application " + 
appId, e);
-          } finally {
-            yarnClient.stop();
-          }
-        }
-      };
-
-      return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
-    } finally {
-      yarnClient.stop();
-    }
-  }
-
-  private Resource adjustMemory(GetNewApplicationResponse response, Resource 
capability) {
-    int minMemory = response.getMinimumResourceCapability().getMemory();
-
-    int updatedMemory = Math.min(capability.getMemory(), 
response.getMaximumResourceCapability().getMemory());
-    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * 
minMemory;
-
-    if (updatedMemory != capability.getMemory()) {
-      capability.setMemory(updatedMemory);
-    }
-
-    return capability;
-  }
-
-  private void addRMToken(ContainerLaunchContext context, YarnClient 
yarnClient) {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return;
-    }
-
-    try {
-      Credentials credentials = 
YarnUtils.decodeCredentials(context.getContainerTokens());
-
-      Configuration config = yarnClient.getConfig();
-      Token<TokenIdentifier> token = convertToken(
-        yarnClient.getRMDelegationToken(new 
Text(YarnUtils.getYarnTokenRenewer(config))),
-        YarnUtils.getRMAddress(config));
-
-      LOG.debug("Added RM delegation token {}", token);
-      credentials.addToken(token.getService(), token);
-
-      context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to acquire RM delegation token", e);
-    }
-  }
-
-  private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken 
protoToken,
-                                                            @Nullable 
InetSocketAddress serviceAddr) {
-    Token<T> token = new Token<>(protoToken.getIdentifier().array(),
-                                 protoToken.getPassword().array(),
-                                 new Text(protoToken.getKind()),
-                                 new Text(protoToken.getService()));
-    if (serviceAddr != null) {
-      SecurityUtil.setTokenService(token, serviceAddr);
-    }
-    return token;
-  }
-
-  @Override
-  public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
-                                                               
TwillSpecification twillSpec,
-                                                               @Nullable 
String schedulerQueue) throws Exception {
-    this.user = user;
-    return createLauncher(twillSpec, schedulerQueue);
-  }
-
-  @Override
-  public ProcessController<YarnApplicationReport> 
createProcessController(ApplicationId appId) {
-    return new ProcessControllerImpl(createYarnClient(), appId);
-  }
-
-  @Override
-  public List<NodeReport> getNodeReports() throws Exception {
-    YarnClient yarnClient = createYarnClient();
-    try {
-      return yarnClient.getNodeReports();
-    } finally {
-      yarnClient.stop();
-    }
-  }
-
-  private static final class ProcessControllerImpl implements 
ProcessController<YarnApplicationReport> {
-    private final YarnClient yarnClient;
-    private final ApplicationId appId;
-
-    ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
-      this.yarnClient = yarnClient;
-      this.appId = appId;
-    }
-
-    @Override
-    public YarnApplicationReport getReport() {
-      try {
-        return new 
Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
-      } catch (YarnRemoteException e) {
-        throw new RuntimeException("Failed to get application report for " + 
appId, e);
-      }
-    }
-
-    @Override
-    public void cancel() {
-      try {
-        yarnClient.killApplication(appId);
-      } catch (YarnRemoteException e) {
-        throw new RuntimeException("Failed to kill application " + appId, e);
-      }
-    }
-
-    @Override
-    public void close() throws Exception {
-      yarnClient.stop();
-    }
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
deleted file mode 100644
index 8d6e2df..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-
-/**
- *
- */
-public final class Hadoop20YarnApplicationReport implements 
YarnApplicationReport {
-
-  private final ApplicationReport report;
-
-  public Hadoop20YarnApplicationReport(ApplicationReport report) {
-    this.report = report;
-  }
-
-  @Override
-  public ApplicationId getApplicationId() {
-    return report.getApplicationId();
-  }
-
-  @Override
-  public ApplicationAttemptId getCurrentApplicationAttemptId() {
-    return report.getCurrentApplicationAttemptId();
-  }
-
-  @Override
-  public String getQueue() {
-    return report.getQueue();
-  }
-
-  @Override
-  public String getName() {
-    return report.getName();
-  }
-
-  @Override
-  public String getHost() {
-    return report.getHost();
-  }
-
-  @Override
-  public int getRpcPort() {
-    return report.getRpcPort();
-  }
-
-  @Override
-  public YarnApplicationState getYarnApplicationState() {
-    return report.getYarnApplicationState();
-  }
-
-  @Override
-  public String getDiagnostics() {
-    return report.getDiagnostics();
-  }
-
-  @Override
-  public String getTrackingUrl() {
-    return "http://"; + report.getTrackingUrl();
-  }
-
-  @Override
-  public String getOriginalTrackingUrl() {
-    return "http://"; + report.getOriginalTrackingUrl();
-  }
-
-  @Override
-  public long getStartTime() {
-    return report.getStartTime();
-  }
-
-  @Override
-  public long getFinishTime() {
-    return report.getFinishTime();
-  }
-
-  @Override
-  public FinalApplicationStatus getFinalApplicationStatus() {
-    return report.getFinalApplicationStatus();
-  }
-
-  @Override
-  public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
-    return report.getApplicationResourceUsageReport();
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
deleted file mode 100644
index 79b2cb5..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.yarn.api.records.Container;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
-
-  private final Container container;
-
-  public Hadoop20YarnContainerInfo(Container container) {
-    this.container = container;
-  }
-
-  @Override
-  public <T> T getContainer() {
-    return (T) container;
-  }
-
-  @Override
-  public String getId() {
-    return container.getId().toString();
-  }
-
-  @Override
-  public InetAddress getHost() {
-    try {
-      return InetAddress.getByName(container.getNodeId().getHost());
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public int getPort() {
-    return container.getNodeId().getPort();
-  }
-
-  @Override
-  public int getMemoryMB() {
-    return container.getResource().getMemory();
-  }
-
-  @Override
-  public int getVirtualCores() {
-    return YarnUtils.getVirtualCores(container.getResource());
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
deleted file mode 100644
index cc61856..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
-
-  private final ContainerStatus containerStatus;
-
-  public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
-    this.containerStatus = containerStatus;
-  }
-
-  @Override
-  public String getContainerId() {
-    return containerStatus.getContainerId().toString();
-  }
-
-  @Override
-  public ContainerState getState() {
-    return containerStatus.getState();
-  }
-
-  @Override
-  public int getExitStatus() {
-    return containerStatus.getExitStatus();
-  }
-
-  @Override
-  public String getDiagnostics() {
-    return containerStatus.getDiagnostics();
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
deleted file mode 100644
index b1f6d66..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
-
-  private static final Function<YarnLocalResource, LocalResource> 
RESOURCE_TRANSFORM;
-
-  static {
-    // Creates transform function from YarnLocalResource -> LocalResource
-    RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
-      @Override
-      public LocalResource apply(YarnLocalResource input) {
-        return input.getLocalResource();
-      }
-    };
-  }
-
-  private final ContainerLaunchContext launchContext;
-
-  public Hadoop20YarnLaunchContext() {
-    launchContext = Records.newRecord(ContainerLaunchContext.class);
-  }
-
-  @Override
-  public <T> T getLaunchContext() {
-    return (T) launchContext;
-  }
-
-  @Override
-  public void setCredentials(Credentials credentials) {
-    launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
-  }
-
-  @Override
-  public void setLocalResources(Map<String, YarnLocalResource> localResources) 
{
-    launchContext.setLocalResources(Maps.transformValues(localResources, 
RESOURCE_TRANSFORM));
-  }
-
-  @Override
-  public void setServiceData(Map<String, ByteBuffer> serviceData) {
-    launchContext.setServiceData(serviceData);
-  }
-
-  @Override
-  public Map<String, String> getEnvironment() {
-    return launchContext.getEnvironment();
-  }
-
-  @Override
-  public void setEnvironment(Map<String, String> environment) {
-    launchContext.setEnvironment(environment);
-  }
-
-  @Override
-  public List<String> getCommands() {
-    return launchContext.getCommands();
-  }
-
-  @Override
-  public void setCommands(List<String> commands) {
-    launchContext.setCommands(commands);
-  }
-
-  @Override
-  public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
-    launchContext.setApplicationACLs(acls);
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
deleted file mode 100644
index b327b94..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- *
- */
-public final class Hadoop20YarnLocalResource implements YarnLocalResource {
-
-  private final LocalResource localResource;
-
-  public Hadoop20YarnLocalResource() {
-    this.localResource = Records.newRecord(LocalResource.class);
-  }
-
-  @Override
-  public <T> T getLocalResource() {
-    return (T) localResource;
-  }
-
-  @Override
-  public URL getResource() {
-    return localResource.getResource();
-  }
-
-  @Override
-  public void setResource(URL resource) {
-    localResource.setResource(resource);
-  }
-
-  @Override
-  public long getSize() {
-    return localResource.getSize();
-  }
-
-  @Override
-  public void setSize(long size) {
-    localResource.setSize(size);
-  }
-
-  @Override
-  public long getTimestamp() {
-    return localResource.getTimestamp();
-  }
-
-  @Override
-  public void setTimestamp(long timestamp) {
-    localResource.setTimestamp(timestamp);
-  }
-
-  @Override
-  public LocalResourceType getType() {
-    return localResource.getType();
-  }
-
-  @Override
-  public void setType(LocalResourceType type) {
-    localResource.setType(type);
-  }
-
-  @Override
-  public LocalResourceVisibility getVisibility() {
-    return localResource.getVisibility();
-  }
-
-  @Override
-  public void setVisibility(LocalResourceVisibility visibility) {
-    localResource.setVisibility(visibility);
-  }
-
-  @Override
-  public String getPattern() {
-    return localResource.getPattern();
-  }
-
-  @Override
-  public void setPattern(String pattern) {
-    localResource.setPattern(pattern);
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
deleted file mode 100644
index e8628da..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.twill.common.Cancellable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public final class Hadoop20YarnNMClient implements YarnNMClient {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
-
-  private final YarnRPC yarnRPC;
-  private final Configuration yarnConf;
-
-  public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
-    this.yarnRPC = yarnRPC;
-    this.yarnConf = yarnConf;
-  }
-
-  @Override
-  public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext 
launchContext) {
-    ContainerLaunchContext context = launchContext.getLaunchContext();
-    context.setUser(System.getProperty("user.name"));
-
-    Container container = containerInfo.getContainer();
-
-    context.setContainerId(container.getId());
-    context.setResource(container.getResource());
-
-    StartContainerRequest startRequest = 
Records.newRecord(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(context);
-
-    ContainerManager manager = connectContainerManager(container);
-    try {
-      manager.startContainer(startRequest);
-      return new ContainerTerminator(container, manager);
-    } catch (YarnRemoteException e) {
-      LOG.error("Error in launching process", e);
-      throw Throwables.propagate(e);
-    }
-
-  }
-
-  /**
-   * Helper to connect to container manager (node manager).
-   */
-  private ContainerManager connectContainerManager(Container container) {
-    String cmIpPortStr = String.format("%s:%d", 
container.getNodeId().getHost(), container.getNodeId().getPort());
-    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
-    return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, 
cmAddress, yarnConf));
-  }
-
-  private static final class ContainerTerminator implements Cancellable {
-
-    private final Container container;
-    private final ContainerManager manager;
-
-    private ContainerTerminator(Container container, ContainerManager manager) 
{
-      this.container = container;
-      this.manager = manager;
-    }
-
-    @Override
-    public void cancel() {
-      LOG.info("Request to stop container {}.", container.getId());
-      StopContainerRequest stopRequest = 
Records.newRecord(StopContainerRequest.class);
-      stopRequest.setContainerId(container.getId());
-      try {
-        manager.stopContainer(stopRequest);
-        while (true) {
-          GetContainerStatusRequest statusRequest = 
Records.newRecord(GetContainerStatusRequest.class);
-          statusRequest.setContainerId(container.getId());
-          GetContainerStatusResponse statusResponse = 
manager.getContainerStatus(statusRequest);
-          LOG.trace("Container status: {} {}", statusResponse.getStatus(), 
statusResponse.getStatus().getDiagnostics());
-
-          if (statusResponse.getStatus().getState() == 
ContainerState.COMPLETE) {
-            break;
-          }
-          Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
-        }
-        LOG.info("Container {} stopped.", container.getId());
-      } catch (YarnRemoteException e) {
-        LOG.error("Fail to stop container {}", container.getId(), e);
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
deleted file mode 100644
index 26b6fa2..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.service.Service;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public interface AMRMClient extends Service {
-
-  /**
-   * Value used to define no locality.
-   */
-  static final String ANY = "*";
-
-  /**
-   * Object to represent container request for resources.
-   * Resources may be localized to nodes and racks.
-   * Resources may be assigned priorities.
-   * Can ask for multiple containers of a given type.
-   */
-  public static class ContainerRequest {
-    Resource capability;
-    String[] hosts;
-    String[] racks;
-    Priority priority;
-    int containerCount;
-
-    public ContainerRequest(Resource capability, String[] hosts,
-                            String[] racks, Priority priority, int 
containerCount) {
-      this.capability = capability;
-      this.hosts = (hosts != null ? hosts.clone() : null);
-      this.racks = (racks != null ? racks.clone() : null);
-      this.priority = priority;
-      this.containerCount = containerCount;
-    }
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Capability[").append(capability).append("]");
-      sb.append("Priority[").append(priority).append("]");
-      sb.append("ContainerCount[").append(containerCount).append("]");
-      return sb.toString();
-    }
-  }
-
-  /**
-   * Register the application master. This must be called before any
-   * other interaction
-   * @param appHostName Name of the host on which master is running
-   * @param appHostPort Port master is listening on
-   * @param appTrackingUrl URL at which the master info can be seen
-   * @return <code>RegisterApplicationMasterResponse</code>
-   * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
-   */
-  public RegisterApplicationMasterResponse
-  registerApplicationMaster(String appHostName,
-                            int appHostPort,
-                            String appTrackingUrl)
-    throws YarnRemoteException;
-
-  /**
-   * Request additional containers and receive new container allocations.
-   * Requests made via <code>addContainerRequest</code> are sent to the
-   * <code>ResourceManager</code>. New containers assigned to the master are
-   * retrieved. Status of completed containers and node health updates are
-   * also retrieved.
-   * This also doubles as a heartbeat to the ResourceManager and must be
-   * made periodically.
-   * The call may not always return any new allocations of containers.
-   * App should not make concurrent allocate requests. May cause request loss.
-   * @param progressIndicator Indicates progress made by the master
-   * @return the response of the allocate request
-   * @throws YarnRemoteException
-   */
-  public AllocationResponse allocate(float progressIndicator)
-    throws YarnRemoteException;
-
-  /**
-   * Unregister the Application Master. This must be called in the end.
-   * @param appStatus Success/Failure status of the master
-   * @param appMessage Diagnostics message on failure
-   * @param appTrackingUrl New URL to get master info
-   * @throws YarnRemoteException
-   */
-  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-                                          String appMessage,
-                                          String appTrackingUrl)
-    throws YarnRemoteException;
-
-  /**
-   * Request containers for resources before calling <code>allocate</code>.
-   * @param req Resource request
-   */
-  public void addContainerRequest(ContainerRequest req);
-
-  /**
-   * Remove previous container request. The previous container request may have
-   * already been sent to the ResourceManager. So even after the remove request
-   * the app must be prepared to receive an allocation for the previous request
-   * even after the remove request
-   * @param req Resource request
-   */
-  public void removeContainerRequest(ContainerRequest req);
-
-  /**
-   * Release containers assigned by the Resource Manager. If the app cannot use
-   * the container or wants to give up the container then it can release it.
-   * The app needs to make new requests for the released resource capability if
-   * it still needs it. For example, if it released non-local resources
-   * @param containerId
-   */
-  public void releaseAssignedContainer(ContainerId containerId);
-
-  /**
-   * Get the currently available resources in the cluster.
-   * A valid value is available after a call to allocate has been made
-   * @return Currently available resources
-   */
-  public Resource getClusterAvailableResources();
-
-  /**
-   * Get the current number of nodes in the cluster.
-   * A valid values is available after a call to allocate has been made
-   * @return Current number of nodes in the cluster
-   */
-  public int getClusterNodeCount();
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
deleted file mode 100644
index c1bd75a..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public final class AMRMClientImpl extends AbstractService implements 
AMRMClient {
-
-  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
-
-  private final RecordFactory recordFactory =
-    RecordFactoryProvider.getRecordFactory(null);
-
-  private int lastResponseId = 0;
-
-  protected AMRMProtocol rmClient;
-  protected final ApplicationAttemptId appAttemptId;
-  protected Resource clusterAvailableResources;
-  protected int clusterNodeCount;
-
-  //Key -> Priority
-  //Value -> Map
-  //Key->ResourceName (e.g., hostname, rackname, *)
-  //Value->Map
-  //Key->Resource Capability
-  //Value->ResourceRequest
-  protected final
-  Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
-    remoteRequestsTable =
-    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
-
-  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
-    new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
-  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
-
-  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
-    super(AMRMClientImpl.class.getName());
-    this.appAttemptId = appAttemptId;
-  }
-
-  @Override
-  public synchronized void init(Configuration conf) {
-    super.init(conf);
-  }
-
-  @Override
-  public synchronized void start() {
-    final YarnConfiguration conf = new YarnConfiguration(getConfig());
-    final YarnRPC rpc = YarnRPC.create(conf);
-    final InetSocketAddress rmAddress = conf.getSocketAddr(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
-    UserGroupInformation currentUser;
-    try {
-      currentUser = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      String tokenURLEncodedStr = System.getenv().get(
-        ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
-      try {
-        token.decodeFromUrlString(tokenURLEncodedStr);
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-
-      SecurityUtil.setTokenService(token, rmAddress);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AppMasterToken is " + token);
-      }
-      currentUser.addToken(token);
-    }
-
-    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
-      @Override
-      public AMRMProtocol run() {
-        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
-                                           conf);
-      }
-    });
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
-    super.start();
-  }
-
-  @Override
-  public synchronized void stop() {
-    if (this.rmClient != null) {
-      RPC.stopProxy(this.rmClient);
-    }
-    super.stop();
-  }
-
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster(
-    String appHostName, int appHostPort, String appTrackingUrl)
-    throws YarnRemoteException {
-    // do this only once ???
-    RegisterApplicationMasterRequest request = recordFactory
-      .newRecordInstance(RegisterApplicationMasterRequest.class);
-    synchronized (this) {
-      request.setApplicationAttemptId(appAttemptId);
-    }
-    request.setHost(appHostName);
-    request.setRpcPort(appHostPort);
-    if (appTrackingUrl != null) {
-      request.setTrackingUrl(appTrackingUrl);
-    }
-    RegisterApplicationMasterResponse response = rmClient
-      .registerApplicationMaster(request);
-    return response;
-  }
-
-  @Override
-  public AllocationResponse allocate(float progressIndicator)
-    throws YarnRemoteException {
-    AllocateResponse allocateResponse = null;
-    ArrayList<ResourceRequest> askList = null;
-    ArrayList<ContainerId> releaseList = null;
-    AllocateRequest allocateRequest = null;
-
-    try {
-      synchronized (this) {
-        askList = new ArrayList<ResourceRequest>(ask);
-        releaseList = new ArrayList<ContainerId>(release);
-        // optimistically clear this collection assuming no RPC failure
-        ask.clear();
-        release.clear();
-        allocateRequest = BuilderUtils
-          .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
-                              askList, releaseList);
-      }
-
-      allocateResponse = rmClient.allocate(allocateRequest);
-      AllocationResponse response = 
AllocationResponses.create(allocateResponse);
-
-      synchronized (this) {
-        // update these on successful RPC
-        clusterNodeCount = allocateResponse.getNumClusterNodes();
-        lastResponseId = response.getResponseId();
-        clusterAvailableResources = response.getAvailableResources();
-      }
-
-      return response;
-    } finally {
-      // TODO how to differentiate remote YARN exception vs error in RPC
-      if (allocateResponse == null) {
-        // We hit an exception in allocate()
-        // Preserve ask and release for next call to allocate()
-        synchronized (this) {
-          release.addAll(releaseList);
-          // Requests could have been added or deleted during call to allocate.
-          // If requests were added/removed then there is nothing to do since
-          // the ResourceRequest object in ask would have the actual new value.
-          // If ask does not have this ResourceRequest then it was unchanged 
and
-          // so we can add the value back safely.
-          // This assumes that there will no concurrent calls to allocate() and
-          // so we don't have to worry about ask being changed in the
-          // synchronized block at the beginning of this method.
-          for (ResourceRequest oldAsk : askList) {
-            if (!ask.contains(oldAsk)) {
-              ask.add(oldAsk);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-                                          String appMessage, String 
appTrackingUrl) throws YarnRemoteException {
-    FinishApplicationMasterRequest request = recordFactory
-      .newRecordInstance(FinishApplicationMasterRequest.class);
-    request.setAppAttemptId(appAttemptId);
-    request.setFinishApplicationStatus(appStatus);
-    if (appMessage != null) {
-      request.setDiagnostics(appMessage);
-    }
-    if (appTrackingUrl != null) {
-      request.setTrackingUrl(appTrackingUrl);
-    }
-    rmClient.finishApplicationMaster(request);
-  }
-
-  @Override
-  public synchronized void addContainerRequest(ContainerRequest req) {
-    // Create resource requests
-    if (req.hosts != null) {
-      for (String host : req.hosts) {
-        addResourceRequest(req.priority, host, req.capability, 
req.containerCount);
-      }
-    }
-
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        addResourceRequest(req.priority, rack, req.capability, 
req.containerCount);
-      }
-    }
-
-    // Off switch
-    addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
-  }
-
-  @Override
-  public synchronized void removeContainerRequest(ContainerRequest req) {
-    // Update resource requests
-    if (req.hosts != null) {
-      for (String hostName : req.hosts) {
-        decResourceRequest(req.priority, hostName, req.capability, 
req.containerCount);
-      }
-    }
-
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        decResourceRequest(req.priority, rack, req.capability, 
req.containerCount);
-      }
-    }
-
-    decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
-  }
-
-  @Override
-  public synchronized void releaseAssignedContainer(ContainerId containerId) {
-    release.add(containerId);
-  }
-
-  @Override
-  public synchronized Resource getClusterAvailableResources() {
-    return clusterAvailableResources;
-  }
-
-  @Override
-  public synchronized int getClusterNodeCount() {
-    return clusterNodeCount;
-  }
-
-  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
-    // This code looks weird but is needed because of the following scenario.
-    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
-    // request is added to 'ask' to notify the RM about not needing it any 
more.
-    // Before the call to allocate, the user now requests more containers. If 
-    // the locations of the 0 size request and the new request are the same
-    // (with the difference being only container count), then the set 
comparator
-    // will consider both to be the same and not add the new request to ask. 
So 
-    // we need to check for the "same" request being present and remove it and 
-    // then add it back. The comparator is container count agnostic.
-    // This should happen only rarely but we do need to guard against it.
-    if (ask.contains(remoteRequest)) {
-      ask.remove(remoteRequest);
-    }
-    ask.add(remoteRequest);
-  }
-
-  private void addResourceRequest(Priority priority, String resourceName,
-                                  Resource capability, int containerCount) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
-      this.remoteRequestsTable.put(priority, remoteRequests);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Added priority=" + priority);
-      }
-    }
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      reqMap = new HashMap<Resource, ResourceRequest>();
-      remoteRequests.put(resourceName, reqMap);
-    }
-    ResourceRequest remoteRequest = reqMap.get(capability);
-    if (remoteRequest == null) {
-      remoteRequest = BuilderUtils.
-        newResourceRequest(priority, resourceName, capability, 0);
-      reqMap.put(capability, remoteRequest);
-    }
-
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 
containerCount);
-
-    // Note this down for next interaction with ResourceManager
-    addResourceRequestToAsk(remoteRequest);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("addResourceRequest:" + " applicationId="
-                  + appAttemptId + " priority=" + priority.getPriority()
-                  + " resourceName=" + resourceName + " numContainers="
-                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
-    }
-  }
-
-  private void decResourceRequest(Priority priority, String resourceName,
-                                  Resource capability, int containerCount) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-
-    if (remoteRequests == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as priority " + priority
-                    + " is not present in request table");
-      }
-      return;
-    }
-
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as " + resourceName
-                    + " is not present in request table");
-      }
-      return;
-    }
-    ResourceRequest remoteRequest = reqMap.get(capability);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
-                  + appAttemptId + " priority=" + priority.getPriority()
-                  + " resourceName=" + resourceName + " numContainers="
-                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
-    }
-
-    remoteRequest.
-      setNumContainers(remoteRequest.getNumContainers() - containerCount);
-    if (remoteRequest.getNumContainers() < 0) {
-      // guard against spurious removals
-      remoteRequest.setNumContainers(0);
-    }
-    // Send the ResourceRequest to RM even if is 0 because it needs to override
-    // a previously sent value. If ResourceRequest was not sent previously then
-    // sending 0 ought to be a no-op on RM.
-    addResourceRequestToAsk(remoteRequest);
-
-    // Delete entries from map if no longer needed.
-    if (remoteRequest.getNumContainers() == 0) {
-      reqMap.remove(capability);
-      if (reqMap.size() == 0) {
-        remoteRequests.remove(resourceName);
-      }
-      if (remoteRequests.size() == 0) {
-        remoteRequestsTable.remove(priority);
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.info("AFTER decResourceRequest:" + " applicationId="
-                 + appAttemptId + " priority=" + priority.getPriority()
-                 + " resourceName=" + resourceName + " numContainers="
-                 + remoteRequest.getNumContainers() + " #asks=" + ask.size());
-    }
-  }
-
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
deleted file mode 100644
index d8d4c19..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * This interface is to abstract the differences in Vanilla Hadoop YARN 2.0 
and CDH 4.4.
- */
-public interface AllocationResponse {
-
-  int getResponseId();
-
-  Resource getAvailableResources();
-
-  List<Container> getAllocatedContainers();
-
-  List<ContainerStatus> getCompletedContainersStatuses();
-}
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
deleted file mode 100644
index ba8b5fa..0000000
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import com.google.common.base.Throwables;
-import com.google.common.reflect.TypeToken;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * Factory for building instance of {@link AllocationResponse} based on the 
response type.
- */
-public final class AllocationResponses {
-
-  /**
-   * A hack for CDH 4.4.0, as the AllocateResponse class is being rewritten 
and diverted from YARN 2.0.
-   */
-  private static final boolean IS_CDH_4_4;
-
-  static {
-    boolean result = false;
-    try {
-      try {
-        // See if it is standard YARN 2.0 AllocateResponse object.
-        AllocateResponse.class.getMethod("getAMResponse");
-      } catch (NoSuchMethodException e) {
-          // See if it is CDH 4.4 AllocateResponse object.
-        AllocationResponse.class.getMethod("getAllocatedContainers");
-        result = true;
-      }
-    } catch (Exception e) {
-      // Something very wrong in here, as it shouldn't arrive here.
-      e.printStackTrace();
-      throw Throwables.propagate(e);
-    }
-
-    IS_CDH_4_4 = result;
-  }
-
-  public static AllocationResponse create(Object response) {
-    if (IS_CDH_4_4) {
-      return new ReflectionAllocationResponse(response);
-    }
-
-    try {
-      Object amResponse = 
response.getClass().getMethod("getAMResponse").invoke(response);
-      return new ReflectionAllocationResponse(amResponse);
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private static final class ReflectionAllocationResponse implements 
AllocationResponse {
-
-    private final Object response;
-
-    private ReflectionAllocationResponse(Object response) {
-      this.response = response;
-    }
-
-    @Override
-    public int getResponseId() {
-      return call("getResponseId", TypeToken.of(Integer.class));
-    }
-
-    @Override
-    public Resource getAvailableResources() {
-      return call("getAvailableResources", TypeToken.of(Resource.class));
-    }
-
-    @Override
-    public List<Container> getAllocatedContainers() {
-      return call("getAllocatedContainers", new TypeToken<List<Container>>() { 
});
-    }
-
-    @Override
-    public List<ContainerStatus> getCompletedContainersStatuses() {
-      return call("getCompletedContainersStatuses", new 
TypeToken<List<ContainerStatus>>() { });
-    }
-
-    private <T> T call(String methodName, TypeToken<T> resultType) {
-      try {
-        return (T) 
resultType.getRawType().cast(response.getClass().getMethod(methodName).invoke(response));
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-
-  private AllocationResponses() {
-  }
-}
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
index c6ab02b..799b5c7 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -38,11 +38,6 @@ public final class VersionDetectYarnAMClientFactory 
implements YarnAMClientFacto
       Class<YarnAMClient> clz;
       String clzName;
       switch (YarnUtils.getHadoopVersion()) {
-        case HADOOP_20:
-          // Uses hadoop-2.0 class
-          clzName = getClass().getPackage().getName() + 
".Hadoop20YarnAMClient";
-          clz = (Class<YarnAMClient>) Class.forName(clzName);
-          break;
         case HADOOP_21:
           // Uses hadoop-2.1 class
           clzName = getClass().getPackage().getName() + 
".Hadoop21YarnAMClient";
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index 83de2a4..ee40e2e 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -31,9 +31,6 @@ public final class VersionDetectYarnAppClientFactory 
implements YarnAppClientFac
     try {
       String clzName;
       switch (YarnUtils.getHadoopVersion()) {
-        case HADOOP_20:
-          clzName = getClass().getPackage().getName() + 
".Hadoop20YarnAppClient";
-          break;
         case HADOOP_21:
         case HADOOP_22:
           // 2.1 and 2.2 uses the same YarnAppClient
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index 3a2f4a5..b6454b9 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -37,7 +37,6 @@ import 
org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.twill.api.LocalFile;
 import org.apache.twill.filesystem.FileContextLocationFactory;
 import org.apache.twill.filesystem.ForwardingLocationFactory;
@@ -70,7 +69,6 @@ public class YarnUtils {
    * Defines different versions of Hadoop.
    */
   public enum HadoopVersions {
-    HADOOP_20,
     HADOOP_21,
     HADOOP_22,
     HADOOP_23,
@@ -109,7 +107,7 @@ public class YarnUtils {
     Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last 
modified time should be >= 0.");
     Preconditions.checkArgument(localFile.getSize() >= 0, "File size should be 
>= 0.");
 
-    YarnLocalResource resource = createAdapter(YarnLocalResource.class);
+    YarnLocalResource resource = new Hadoop21YarnLocalResource();
     resource.setVisibility(LocalResourceVisibility.APPLICATION);
     resource.setResource(ConverterUtils.getYarnUrlFromURI(localFile.getURI()));
     resource.setTimestamp(localFile.getLastModified());
@@ -118,7 +116,7 @@ public class YarnUtils {
   }
 
   public static YarnLaunchContext createLaunchContext() {
-    return createAdapter(YarnLaunchContext.class);
+    return new Hadoop21YarnLaunchContext();
   }
 
   // temporary workaround since older versions of hadoop don't have the 
getVirtualCores method.
@@ -150,32 +148,6 @@ public class YarnUtils {
   }
 
   /**
-   * Creates {@link ApplicationId} from the given cluster timestamp and id.
-   */
-  public static ApplicationId createApplicationId(long timestamp, int id) {
-    try {
-      try {
-        // For Hadoop-2.1
-        Method method = ApplicationId.class.getMethod("newInstance", 
long.class, int.class);
-        return (ApplicationId) method.invoke(null, timestamp, id);
-      } catch (NoSuchMethodException e) {
-        // Try with Hadoop-2.0 way
-        ApplicationId appId = Records.newRecord(ApplicationId.class);
-
-        Method setClusterTimestamp = 
ApplicationId.class.getMethod("setClusterTimestamp", long.class);
-        Method setId = ApplicationId.class.getMethod("setId", int.class);
-
-        setClusterTimestamp.invoke(appId, timestamp);
-        setId.invoke(appId, id);
-
-        return appId;
-      }
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  /**
    * Helper method to get delegation tokens for the given LocationFactory.
    * @param config The hadoop configuration.
    * @param locationFactory The LocationFactory for generating tokens.
@@ -329,52 +301,27 @@ public class YarnUtils {
       return hadoopVersion;
     }
     try {
-      Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
+      Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
       try {
-        Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
+        Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
         try {
-          Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
-          try {
-            Class[] args = new Class[1];
-            args[0] = String.class;
-            // see if we have a 
org.apache.hadoop.yarn.api.records.ContainerId.fromString() method
-            
Class.forName("org.apache.hadoop.yarn.api.records.ContainerId").getMethod("fromString",
 args);
-            HADOOP_VERSION.set(HadoopVersions.HADOOP_26);
-          } catch (NoSuchMethodException e) {
-            HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
-          }
-        } catch (ClassNotFoundException e) {
-          HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
+          Class[] args = new Class[1];
+          args[0] = String.class;
+          // see if we have a 
org.apache.hadoop.yarn.api.records.ContainerId.fromString() method
+          
Class.forName("org.apache.hadoop.yarn.api.records.ContainerId").getMethod("fromString",
 args);
+          HADOOP_VERSION.set(HadoopVersions.HADOOP_26);
+        } catch (NoSuchMethodException e) {
+          HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
         }
       } catch (ClassNotFoundException e) {
-        HADOOP_VERSION.set(HadoopVersions.HADOOP_21);
+        HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
       }
     } catch (ClassNotFoundException e) {
-      HADOOP_VERSION.set(HadoopVersions.HADOOP_20);
+      HADOOP_VERSION.set(HadoopVersions.HADOOP_21);
     }
     return HADOOP_VERSION.get();
   }
 
-  /**
-   * Helper method to create adapter class for bridging between Hadoop 2.0 and 
2.1.
-   */
-  private static <T> T createAdapter(Class<T> clz) {
-    String className = clz.getPackage().getName();
-
-    if (getHadoopVersion().equals(HadoopVersions.HADOOP_20)) {
-      className += ".Hadoop20" + clz.getSimpleName();
-    } else {
-      className += ".Hadoop21" + clz.getSimpleName();
-    }
-
-    try {
-      //noinspection unchecked
-      return (T) Class.forName(className).newInstance();
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
   private static YarnLocalResource setLocalResourceType(YarnLocalResource 
localResource, LocalFile localFile) {
     if (localFile.isArchive()) {
       if (localFile.getPattern() == null) {
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java 
b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 8f844a2..72100a1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -37,7 +37,6 @@ import org.apache.twill.internal.appmaster.TrackerService;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
-import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.zookeeper.data.Stat;
@@ -83,13 +82,9 @@ final class YarnTwillController extends 
AbstractTwillController implements Twill
     super(appName, runId, zkClient, amLiveNodeData.getKafkaZKConnect() != 
null, Collections.<LogHandler>emptyList());
     this.appName = appName;
     this.amLiveNodeData = amLiveNodeData;
-    this.startUp = new Callable<ProcessController<YarnApplicationReport>>() {
-      @Override
-      public ProcessController<YarnApplicationReport> call() throws Exception {
-        return yarnAppClient.createProcessController(
-          YarnUtils.createApplicationId(amLiveNodeData.getAppIdClusterTime(), 
amLiveNodeData.getAppId()));
-      }
-    };
+    this.startUp = () -> yarnAppClient.createProcessController(
+      ApplicationId.newInstance(amLiveNodeData.getAppIdClusterTime(),
+                                amLiveNodeData.getAppId()));
     this.startTimeout = Constants.APPLICATION_MAX_START_SECONDS;
     this.startTimeoutUnit = TimeUnit.SECONDS;
   }
diff --git 
a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
 
b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
index 5629295..3372caf 100644
--- 
a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
+++ 
b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.twill.internal.yarn.YarnUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -58,16 +57,6 @@ public class FileContextLocationTest extends 
LocationTestBase {
     return new 
FileContextLocationFactory(dfsCluster.getFileSystem().getConf(), pathBase);
   }
 
-  @Override
-  protected String correctFilePermissions(String original) {
-    if 
(YarnUtils.HadoopVersions.HADOOP_20.equals(YarnUtils.getHadoopVersion())) {
-      return original.substring(0, 2) + '-' + // strip the x for owner
-        original.substring(3, 5) + '-' + // strip the x for group
-        original.substring(6, 8) + '-'; // strip the x for world;
-    }
-    return original;
-  }
-
   @Test
   public void testGetFileContext() throws Exception {
     final FileContextLocationFactory locationFactory = 
(FileContextLocationFactory) createLocationFactory("/testGetFC");
diff --git 
a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java 
b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
index d42cf37..379e37b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
@@ -329,7 +329,7 @@ public abstract class LocationTestBase {
         Assert.assertFalse(textfile.isDirectory());
         Assert.assertEquals(permissions, child.getPermissions());
         Assert.assertEquals(permissions, grandchild.getPermissions());
-        Assert.assertEquals(correctFilePermissions(permissions), 
textfile.getPermissions());
+        Assert.assertEquals(permissions, textfile.getPermissions());
 
         // mkdirs of existing file
         Location file = factory.create("existingfile");
@@ -380,14 +380,6 @@ public abstract class LocationTestBase {
    */
   protected abstract LocationFactory createLocationFactory(String pathBase) 
throws Exception;
 
-  /**
-   * Some older versions of Hadoop always strip the execute permission from 
files (but keep it for directories).
-   * This allows subclasses to correct the expected file permissions, based on 
the Hadoop version (if any).
-   */
-  protected String correctFilePermissions(String expectedFilePermissions) {
-    return expectedFilePermissions; // unchanged by default
-  }
-
   protected UserGroupInformation createTestUGI() throws IOException {
     String userName = System.getProperty("user.name").equals("tester") ? 
"twiller" : "tester";
     return UserGroupInformation.createUserForTesting(userName, new String[] { 
"testgroup" });
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java 
b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index 04902d5..407d519 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -42,7 +42,6 @@ import org.apache.twill.filesystem.FileContextLocationFactory;
 import org.apache.twill.filesystem.LocationFactory;
 import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
 import org.apache.twill.internal.yarn.YarnAppClient;
-import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.internal.zookeeper.InMemoryZKServer;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
@@ -128,16 +127,11 @@ public class TwillTester extends ExternalResource {
 
     Configuration conf = new 
YarnConfiguration(dfsCluster.getFileSystem().getConf());
 
-    if 
(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
-      conf.set("yarn.resourcemanager.scheduler.class",
-               
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
-    } else {
-      conf.set("yarn.resourcemanager.scheduler.class",
-               
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
-      conf.set("yarn.scheduler.capacity.resource-calculator",
-               
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
-      conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
-    }
+    conf.set("yarn.resourcemanager.scheduler.class",
+             
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
+    conf.set("yarn.scheduler.capacity.resource-calculator",
+             
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+    conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
     conf.set("yarn.nodemanager.vmem-pmem-ratio", "100.1");
     conf.set("yarn.nodemanager.vmem-check-enabled", "false");
     conf.set("yarn.scheduler.minimum-allocation-mb", "128");
@@ -235,8 +229,8 @@ public class TwillTester extends ExternalResource {
   public ApplicationResourceUsageReport getApplicationResourceReport(String 
appId) throws Exception {
     List<String> splits = Lists.newArrayList(Splitter.on('_').split(appId));
     Preconditions.checkArgument(splits.size() == 3, "Invalid application id - 
" + appId);
-    ApplicationId applicationId =
-      YarnUtils.createApplicationId(Long.parseLong(splits.get(1)), 
Integer.parseInt(splits.get(2)));
+    ApplicationId applicationId = 
ApplicationId.newInstance(Long.parseLong(splits.get(1)),
+                                                            
Integer.parseInt(splits.get(2)));
 
     ClientRMService clientRMService = 
cluster.getResourceManager().getClientRMService();
     GetApplicationReportRequest request = 
Records.newRecord(GetApplicationReportRequest.class);

Reply via email to