YARN-6050. AMs can't be scheduled on racks or nodes (rkanter)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a65011a1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a65011a1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a65011a1

Branch: refs/heads/branch-2
Commit: a65011a1282e33828d3b7d8f7596e42de7d9b7a2
Parents: 97c83f2
Author: Robert Kanter <rkan...@apache.org>
Authored: Tue Mar 28 14:33:08 2017 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Tue Mar 28 14:33:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/mapred/YARNRunner.java    |   4 +-
 .../apache/hadoop/mapred/TestYARNRunner.java    |   2 +-
 .../records/ApplicationSubmissionContext.java   |  57 +++-
 .../src/main/proto/yarn_protos.proto            |   2 +-
 .../pb/ApplicationSubmissionContextPBImpl.java  |  64 +++-
 .../nodelabels/CommonNodeLabelsManager.java     |  22 ++
 .../server/resourcemanager/RMAppManager.java    | 104 +++++--
 .../server/resourcemanager/RMServerUtils.java   |  68 ++++-
 .../server/resourcemanager/rmapp/RMApp.java     |   3 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  20 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         |  35 ++-
 .../scheduler/AbstractYarnScheduler.java        |   5 +
 .../scheduler/ClusterNodeTracker.java           |  65 +++-
 .../scheduler/ResourceScheduler.java            |   9 +
 .../scheduler/common/fica/FiCaSchedulerApp.java |  10 +-
 .../resourcemanager/webapp/dao/AppInfo.java     |   2 +-
 .../yarn/server/resourcemanager/MockRM.java     |  45 ++-
 .../server/resourcemanager/TestAppManager.java  | 195 +++++++++++-
 .../resourcemanager/TestClientRMService.java    |   8 +-
 .../TestNodeBlacklistingOnAMFailures.java       | 184 ++++++++++++
 .../resourcemanager/TestRMServerUtils.java      | 297 +++++++++++++++++++
 .../applicationsmanager/MockAsm.java            |   6 +-
 .../metrics/TestSystemMetricsPublisher.java     |   3 +-
 .../server/resourcemanager/rmapp/MockRMApp.java |  11 +-
 .../rmapp/TestRMAppTransitions.java             |   9 +-
 .../attempt/TestRMAppAttemptTransitions.java    |  19 +-
 .../capacity/TestApplicationLimits.java         |   4 +-
 .../TestApplicationLimitsByPartition.java       |   4 +-
 .../capacity/TestCapacityScheduler.java         |   4 +-
 .../scheduler/capacity/TestLeafQueue.java       |   3 +-
 .../scheduler/fair/TestFairScheduler.java       |  78 +++++
 .../webapp/TestRMWebServicesApps.java           |  10 +-
 32 files changed, 1212 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 228c6af..2339c79 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -589,7 +590,8 @@ public class YARNRunner implements ClientProtocol {
       amResourceRequest.setCapability(capability);
       amResourceRequest.setNumContainers(1);
       amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
-      appContext.setAMContainerResourceRequest(amResourceRequest);
+      appContext.setAMContainerResourceRequests(
+          Collections.singletonList(amResourceRequest));
     }
     // set labels for the Job containers
     appContext.setNodeLabelExpression(jobConf

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index c6da9a3..cb355b5 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -571,7 +571,7 @@ public class TestYARNRunner {
         buildSubmitContext(yarnRunner, jobConf);
 
     assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
-    assertEquals(appSubCtx.getAMContainerResourceRequest()
+    assertEquals(appSubCtx.getAMContainerResourceRequests().get(0)
         .getNodeLabelExpression(), "highMem");
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index e562aaa..4f1d147 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api.records;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -100,7 +102,7 @@ public abstract class ApplicationSubmissionContext {
     amReq.setNumContainers(1);
     amReq.setRelaxLocality(true);
     amReq.setNodeLabelExpression(amContainerLabelExpression);
-    context.setAMContainerResourceRequest(amReq);
+    context.setAMContainerResourceRequests(Collections.singletonList(amReq));
     return context;
   }
   
@@ -159,7 +161,8 @@ public abstract class ApplicationSubmissionContext {
     context.setApplicationType(applicationType);
     context.setKeepContainersAcrossApplicationAttempts(keepContainers);
     context.setNodeLabelExpression(appLabelExpression);
-    context.setAMContainerResourceRequest(resourceRequest);
+    context.setAMContainerResourceRequests(
+        Collections.singletonList(resourceRequest));
     return context;
   }
 
@@ -454,30 +457,62 @@ public abstract class ApplicationSubmissionContext {
   public abstract void setNodeLabelExpression(String nodeLabelExpression);
   
   /**
-   * Get ResourceRequest of AM container, if this is not null, scheduler will
-   * use this to acquire resource for AM container.
-   * 
+   * Get the ResourceRequest of the AM container.
+   *
+   * If this is not null, scheduler will use this to acquire resource for AM
+   * container.
+   *
    * If this is null, scheduler will assemble a ResourceRequest by using
    * <em>getResource</em> and <em>getPriority</em> of
    * <em>ApplicationSubmissionContext</em>.
-   * 
-   * Number of containers and Priority will be ignore.
-   * 
-   * @return ResourceRequest of AM container
+   *
+   * Number of containers and Priority will be ignored.
+   *
+   * @return ResourceRequest of the AM container
+   * @deprecated See {@link #getAMContainerResourceRequests()}
    */
   @Public
   @Evolving
+  @Deprecated
   public abstract ResourceRequest getAMContainerResourceRequest();
   
   /**
-   * Set ResourceRequest of AM container
-   * @param request of AM container
+   * Set ResourceRequest of the AM container
+   * @param request of the AM container
+   * @deprecated See {@link #setAMContainerResourceRequests(List)}
    */
   @Public
   @Evolving
+  @Deprecated
   public abstract void setAMContainerResourceRequest(ResourceRequest request);
 
   /**
+   * Get the ResourceRequests of the AM container.
+   *
+   * If this is not null, scheduler will use this to acquire resource for AM
+   * container.
+   *
+   * If this is null, scheduler will use the ResourceRequest as determined by
+   * <em>getAMContainerResourceRequest</em> and its behavior.
+   *
+   * Number of containers and Priority will be ignored.
+   *
+   * @return List of ResourceRequests of the AM container
+   */
+  @Public
+  @Evolving
+  public abstract List<ResourceRequest> getAMContainerResourceRequests();
+
+  /**
+   * Set ResourceRequests of the AM container.
+   * @param requests of the AM container
+   */
+  @Public
+  @Evolving
+  public abstract void setAMContainerResourceRequests(
+      List<ResourceRequest> requests);
+
+  /**
    * Get the attemptFailuresValidityInterval in milliseconds for the 
application
    *
    * @return the attemptFailuresValidityInterval

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index fbb2089..ff30c37 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -383,7 +383,7 @@ message ApplicationSubmissionContextProto {
   optional LogAggregationContextProto log_aggregation_context = 14;
   optional ReservationIdProto reservation_id = 15;
   optional string node_label_expression = 16;
-  optional ResourceRequestProto am_container_resource_request = 17;
+  repeated ResourceRequestProto am_container_resource_request = 17;
   repeated ApplicationTimeoutMapProto application_timeouts = 18;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 62b54e7..1a6719a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api.records.impl.pb;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -66,7 +68,7 @@ extends ApplicationSubmissionContext {
   private ContainerLaunchContext amContainer = null;
   private Resource resource = null;
   private Set<String> applicationTags = null;
-  private ResourceRequest amResourceRequest = null;
+  private List<ResourceRequest> amResourceRequests = null;
   private LogAggregationContext logAggregationContext = null;
   private ReservationId reservationId = null;
   private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
@@ -127,9 +129,10 @@ extends ApplicationSubmissionContext {
       builder.clearApplicationTags();
       builder.addAllApplicationTags(this.applicationTags);
     }
-    if (this.amResourceRequest != null) {
-      builder.setAmContainerResourceRequest(
-          convertToProtoFormat(this.amResourceRequest));
+    if (this.amResourceRequests != null) {
+      builder.clearAmContainerResourceRequest();
+      builder.addAllAmContainerResourceRequest(
+          convertToProtoFormat(this.amResourceRequests));
     }
     if (this.logAggregationContext != null) {
       builder.setLogAggregationContext(
@@ -430,13 +433,23 @@ extends ApplicationSubmissionContext {
   private PriorityProto convertToProtoFormat(Priority t) {
     return ((PriorityPBImpl)t).getProto();
   }
-  
-  private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) 
{
-    return new ResourceRequestPBImpl(p);
+
+  private List<ResourceRequest> convertFromProtoFormat(
+      List<ResourceRequestProto> ps) {
+    List<ResourceRequest> rs = new ArrayList<>();
+    for (ResourceRequestProto p : ps) {
+      rs.add(new ResourceRequestPBImpl(p));
+    }
+    return rs;
   }
 
-  private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
-    return ((ResourceRequestPBImpl)t).getProto();
+  private List<ResourceRequestProto> convertToProtoFormat(
+      List<ResourceRequest> ts) {
+    List<ResourceRequestProto> rs = new ArrayList<>(ts.size());
+    for (ResourceRequest t : ts) {
+      rs.add(((ResourceRequestPBImpl)t).getProto());
+    }
+    return rs;
   }
 
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
@@ -485,25 +498,46 @@ extends ApplicationSubmissionContext {
   }
   
   @Override
+  @Deprecated
   public ResourceRequest getAMContainerResourceRequest() {
+    List<ResourceRequest> reqs = getAMContainerResourceRequests();
+    if (reqs == null || reqs.isEmpty()) {
+      return null;
+    }
+    return getAMContainerResourceRequests().get(0);
+  }
+
+  @Override
+  public List<ResourceRequest> getAMContainerResourceRequests() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.amResourceRequest != null) {
-      return amResourceRequest;
+    if (this.amResourceRequests != null) {
+      return amResourceRequests;
     } // Else via proto
-    if (!p.hasAmContainerResourceRequest()) {
+    if (p.getAmContainerResourceRequestCount() == 0) {
       return null;
     }
-    amResourceRequest = 
convertFromProtoFormat(p.getAmContainerResourceRequest());
-    return amResourceRequest;
+    amResourceRequests =
+        convertFromProtoFormat(p.getAmContainerResourceRequestList());
+    return amResourceRequests;
   }
 
   @Override
+  @Deprecated
   public void setAMContainerResourceRequest(ResourceRequest request) {
     maybeInitBuilder();
     if (request == null) {
       builder.clearAmContainerResourceRequest();
     }
-    this.amResourceRequest = request;
+    this.amResourceRequests = Collections.singletonList(request);
+  }
+
+  @Override
+  public void setAMContainerResourceRequests(List<ResourceRequest> requests) {
+    maybeInitBuilder();
+    if (requests == null) {
+      builder.clearAmContainerResourceRequest();
+    }
+    this.amResourceRequests = requests;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
index f3f4ba0..60ade2d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
@@ -801,6 +801,28 @@ public class CommonNodeLabelsManager extends 
AbstractService {
     }
   }
 
+  /**
+   * Get nodes that have no labels.
+   *
+   * @return set of nodes with no labels
+   */
+  public Set<NodeId> getNodesWithoutALabel() {
+    try {
+      readLock.lock();
+      Set<NodeId> nodes = new HashSet<>();
+      for (Host host : nodeCollections.values()) {
+        for (NodeId nodeId : host.nms.keySet()) {
+          if (getLabelsByNode(nodeId).isEmpty()) {
+            nodes.add(nodeId);
+          }
+        }
+      }
+      return Collections.unmodifiableSet(nodes);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
 
   /**
    * Get mapping of labels to nodes for all the labels.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 7c8408e..0e6aa39 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -31,6 +33,8 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -335,14 +339,16 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
     // has been disabled. Reject the recovery of this application if it
     // is true and give clear message so that user can react properly.
     if (!appContext.getUnmanagedAM() &&
-        application.getAMResourceRequest() == null &&
+        (application.getAMResourceRequests() == null ||
+            application.getAMResourceRequests().isEmpty()) &&
         !YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
       // check application submission context and see if am resource request
       // or application itself contains any node label expression.
-      ResourceRequest amReqFromAppContext =
-          appContext.getAMContainerResourceRequest();
-      String labelExp = (amReqFromAppContext != null) ?
-          amReqFromAppContext.getNodeLabelExpression() : null;
+      List<ResourceRequest> amReqsFromAppContext =
+          appContext.getAMContainerResourceRequests();
+      String labelExp =
+          (amReqsFromAppContext != null && !amReqsFromAppContext.isEmpty()) ?
+          amReqsFromAppContext.get(0).getNodeLabelExpression() : null;
       if (labelExp == null) {
         labelExp = appContext.getNodeLabelExpression();
       }
@@ -377,9 +383,9 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
     }
 
     ApplicationId applicationId = submissionContext.getApplicationId();
-    ResourceRequest amReq = null;
+    List<ResourceRequest> amReqs = null;
     try {
-      amReq = validateAndCreateResourceRequest(submissionContext, isRecovery);
+      amReqs = validateAndCreateResourceRequest(submissionContext, isRecovery);
     } catch (InvalidLabelResourceRequestException e) {
       // This can happen if the application had been submitted and run
       // with Node Label enabled but recover with Node Label disabled.
@@ -440,7 +446,7 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
         submissionContext.getApplicationName(), user,
         submissionContext.getQueue(), submissionContext, this.scheduler,
         this.masterService, submitTime, submissionContext.getApplicationType(),
-        submissionContext.getApplicationTags(), amReq);
+        submissionContext.getApplicationTags(), amReqs);
 
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with different applicationIds will not
@@ -462,7 +468,7 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
     return application;
   }
 
-  private ResourceRequest validateAndCreateResourceRequest(
+  private List<ResourceRequest> validateAndCreateResourceRequest(
       ApplicationSubmissionContext submissionContext, boolean isRecovery)
       throws InvalidResourceRequestException {
     // Validation of the ApplicationSubmissionContext needs to be completed
@@ -472,33 +478,77 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
 
     // Check whether AM resource requirements are within required limits
     if (!submissionContext.getUnmanagedAM()) {
-      ResourceRequest amReq = 
submissionContext.getAMContainerResourceRequest();
-      if (amReq == null) {
-        amReq = BuilderUtils
-            .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
-                ResourceRequest.ANY, submissionContext.getResource(), 1);
-      }
-
-      // set label expression for AM container
-      if (null == amReq.getNodeLabelExpression()) {
-        amReq.setNodeLabelExpression(submissionContext
-            .getNodeLabelExpression());
+      List<ResourceRequest> amReqs =
+          submissionContext.getAMContainerResourceRequests();
+      if (amReqs == null || amReqs.isEmpty()) {
+        if (submissionContext.getResource() != null) {
+          amReqs = Collections.singletonList(BuilderUtils
+              .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
+                  ResourceRequest.ANY, submissionContext.getResource(), 1));
+        } else {
+          throw new InvalidResourceRequestException("Invalid resource request, 
"
+              + "no resources requested");
+        }
       }
 
       try {
-        SchedulerUtils.normalizeAndValidateRequest(amReq,
-            scheduler.getMaximumResourceCapability(),
-            submissionContext.getQueue(), scheduler, isRecovery, rmContext);
+        // Find the ANY request and ensure there's only one
+        ResourceRequest anyReq = null;
+        for (ResourceRequest amReq : amReqs) {
+          if (amReq.getResourceName().equals(ResourceRequest.ANY)) {
+            if (anyReq == null) {
+              anyReq = amReq;
+            } else {
+              throw new InvalidResourceRequestException("Invalid resource "
+                  + "request, only one resource request with "
+                  + ResourceRequest.ANY + " is allowed");
+            }
+          }
+        }
+        if (anyReq == null) {
+          throw new InvalidResourceRequestException("Invalid resource request, 
"
+              + "no resource request specified with " + ResourceRequest.ANY);
+        }
+
+        // Make sure that all of the requests agree with the ANY request
+        // and have correct values
+        for (ResourceRequest amReq : amReqs) {
+          amReq.setCapability(anyReq.getCapability());
+          amReq.setExecutionTypeRequest(
+              ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+          amReq.setNumContainers(1);
+          amReq.setPriority(RMAppAttemptImpl.AM_CONTAINER_PRIORITY);
+        }
+
+        // set label expression for AM ANY request if not set
+        if (null == anyReq.getNodeLabelExpression()) {
+          anyReq.setNodeLabelExpression(submissionContext
+              .getNodeLabelExpression());
+        }
+
+        // Put ANY request at the front
+        if (!amReqs.get(0).equals(anyReq)) {
+          amReqs.remove(anyReq);
+          amReqs.add(0, anyReq);
+        }
+
+        // Normalize all requests
+        for (ResourceRequest amReq : amReqs) {
+          SchedulerUtils.normalizeAndValidateRequest(amReq,
+              scheduler.getMaximumResourceCapability(),
+              submissionContext.getQueue(), scheduler, isRecovery, rmContext);
+
+          amReq.setCapability(
+              scheduler.getNormalizedResource(amReq.getCapability()));
+        }
+        return amReqs;
       } catch (InvalidResourceRequestException e) {
         LOG.warn("RM app submission failed in validating AM resource request"
             + " for application " + submissionContext.getApplicationId(), e);
         throw e;
       }
-
-      
amReq.setCapability(scheduler.getNormalizedResource(amReq.getCapability()));
-      return amReq;
     }
-    
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 49aa3b0..92c2313 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -21,13 +21,16 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
@@ -41,6 +44,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -564,19 +568,65 @@ public class RMServerUtils {
    *
    * @param rmContext context
    * @param conf configuration
-   * @param amreq am resource request
+   * @param amReqs am resource requests
    * @return applicable node count
    */
   public static int getApplicableNodeCountForAM(RMContext rmContext,
-      Configuration conf, ResourceRequest amreq) {
+      Configuration conf, List<ResourceRequest> amReqs) {
+    // Determine the list of nodes that are eligible based on the strict
+    // resource requests
+    Set<NodeId> nodesForReqs = new HashSet<>();
+    for (ResourceRequest amReq : amReqs) {
+      if (amReq.getRelaxLocality() &&
+          !amReq.getResourceName().equals(ResourceRequest.ANY)) {
+        nodesForReqs.addAll(
+            rmContext.getScheduler().getNodeIds(amReq.getResourceName()));
+      }
+    }
+
     if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
-      RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager();
-      String amNodeLabelExpression = amreq.getNodeLabelExpression();
-      amNodeLabelExpression = (amNodeLabelExpression == null
-          || amNodeLabelExpression.trim().isEmpty())
-              ? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression;
-      return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression);
+      // Determine the list of nodes that are eligible based on the node label
+      String amNodeLabelExpression = amReqs.get(0).getNodeLabelExpression();
+      Set<NodeId> nodesForLabels =
+          getNodeIdsForLabel(rmContext, amNodeLabelExpression);
+      if (nodesForLabels != null && !nodesForLabels.isEmpty()) {
+        // If only node labels, strip out any wildcard NodeIds and return
+        if (nodesForReqs.isEmpty()) {
+          for (Iterator<NodeId> it = nodesForLabels.iterator(); it.hasNext();) 
{
+            if (it.next().getPort() == 0) {
+              it.remove();
+            }
+          }
+          return nodesForLabels.size();
+        } else {
+          // The NodeIds common to both the strict resource requests and the
+          // node label is the eligible set
+          return Sets.intersection(nodesForReqs, nodesForLabels).size();
+        }
+      }
+    }
+
+    // If no strict resource request NodeIds nor node label NodeIds, then just
+    // return the entire cluster
+    if (nodesForReqs.isEmpty()) {
+      return rmContext.getScheduler().getNumClusterNodes();
+    }
+    // No node label NodeIds, so return the strict resource request NodeIds
+    return nodesForReqs.size();
+  }
+
+  private static Set<NodeId> getNodeIdsForLabel(RMContext rmContext,
+      String label) {
+    label = (label == null || label.trim().isEmpty())
+        ? RMNodeLabelsManager.NO_LABEL : label;
+    if (label.equals(RMNodeLabelsManager.NO_LABEL)) {
+      // NO_LABEL nodes aren't tracked directly
+      return rmContext.getNodeLabelManager().getNodesWithoutALabel();
+    } else {
+      Map<String, Set<NodeId>> labelsToNodes =
+          rmContext.getNodeLabelManager().getLabelsToNodes(
+              Collections.singleton(label));
+      return labelsToNodes.get(label);
     }
-    return rmContext.getScheduler().getNumClusterNodes();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index e55649d..a42b3bf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -246,7 +247,7 @@ public interface RMApp extends EventHandler<RMAppEvent> {
 
   ReservationId getReservationId();
   
-  ResourceRequest getAMResourceRequest();
+  List<ResourceRequest> getAMResourceRequests();
 
   Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index bda87c4..41f6e04 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -194,7 +194,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private RMAppEvent eventCausingFinalSaving;
   private RMAppState targetedFinalState;
   private RMAppState recoveredFinalState;
-  private ResourceRequest amReq;
+  private List<ResourceRequest> amReqs;
   
   private CallerContext callerContext;
 
@@ -405,8 +405,8 @@ public class RMAppImpl implements RMApp, Recoverable {
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
       ApplicationMasterService masterService, long submitTime,
-      String applicationType, Set<String> applicationTags, 
-      ResourceRequest amReq) {
+      String applicationType, Set<String> applicationTags,
+      List<ResourceRequest> amReqs) {
 
     this.systemClock = SystemClock.getInstance();
 
@@ -425,7 +425,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.startTime = this.systemClock.getTime();
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
-    this.amReq = amReq;
+    this.amReqs = amReqs;
     if (submissionContext.getPriority() != null) {
       this.applicationPriority = Priority
           .newInstance(submissionContext.getPriority().getPriority());
@@ -919,7 +919,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) {
         currentAMBlacklistManager = new SimpleBlacklistManager(
             RMServerUtils.getApplicableNodeCountForAM(rmContext, conf,
-                getAMResourceRequest()),
+                getAMResourceRequests()),
             blacklistDisableThreshold);
       } else {
         currentAMBlacklistManager = new DisabledBlacklistManager();
@@ -927,7 +927,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf, amReq, this, currentAMBlacklistManager);
+          submissionContext, conf, amReqs, this, currentAMBlacklistManager);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
   }
@@ -1605,8 +1605,8 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
   
   @Override
-  public ResourceRequest getAMResourceRequest() {
-    return this.amReq; 
+  public List<ResourceRequest> getAMResourceRequests() {
+    return this.amReqs;
   }
 
   @Override
@@ -1879,7 +1879,9 @@ public class RMAppImpl implements RMApp, Recoverable {
   public String getAmNodeLabelExpression() {
     String amNodeLabelExpression = null;
     if (!getApplicationSubmissionContext().getUnmanagedAM()) {
-      amNodeLabelExpression = getAMResourceRequest().getNodeLabelExpression();
+      amNodeLabelExpression =
+          getAMResourceRequests() != null && !getAMResourceRequests().isEmpty()
+              ? getAMResourceRequests().get(0).getNodeLabelExpression() : null;
       amNodeLabelExpression = (amNodeLabelExpression == null)
           ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : amNodeLabelExpression;
       amNodeLabelExpression = (amNodeLabelExpression.trim().isEmpty())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 6158f88..1f182a3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -192,7 +192,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   private Object transitionTodo;
   
   private RMAppAttemptMetrics attemptMetrics = null;
-  private ResourceRequest amReq = null;
+  private List<ResourceRequest> amReqs = null;
   private BlacklistManager blacklistedNodesForAM = null;
 
   private String amLaunchDiagnostics;
@@ -485,16 +485,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, ResourceRequest amReq, RMApp rmApp) {
+      Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp) {
     this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
-        conf, amReq, rmApp, new DisabledBlacklistManager());
+        conf, amReqs, rmApp, new DisabledBlacklistManager());
   }
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, ResourceRequest amReq, RMApp rmApp,
+      Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp,
       BlacklistManager amBlacklistManager) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
@@ -514,7 +514,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     this.attemptMetrics =
         new RMAppAttemptMetrics(applicationAttemptId, rmContext);
 
-    this.amReq = amReq;
+    this.amReqs = amReqs;
     this.blacklistedNodesForAM = amBlacklistManager;
 
     final int diagnosticsLimitKC = getDiagnosticsLimitKCOrThrow(conf);
@@ -1092,18 +1092,21 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
         // will be passed to scheduler, and scheduler will deduct the number 
after
         // AM container allocated
         
-        // Currently, following fields are all hard code,
+        // Currently, following fields are all hard coded,
         // TODO: change these fields when we want to support
-        // priority/resource-name/relax-locality specification for AM 
containers
-        // allocation.
-        appAttempt.amReq.setNumContainers(1);
-        appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
-        appAttempt.amReq.setResourceName(ResourceRequest.ANY);
-        appAttempt.amReq.setRelaxLocality(true);
-
-        appAttempt.getAMBlacklistManager().refreshNodeHostCount(
+        // priority or multiple containers AM container allocation.
+        for (ResourceRequest amReq : appAttempt.amReqs) {
+          amReq.setNumContainers(1);
+          amReq.setPriority(AM_CONTAINER_PRIORITY);
+        }
+
+        int numNodes =
             RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext,
-                appAttempt.conf, appAttempt.amReq));
+                appAttempt.conf, appAttempt.amReqs);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting node count for blacklist to " + numNodes);
+        }
+        appAttempt.getAMBlacklistManager().refreshNodeHostCount(numNodes);
 
         ResourceBlacklistRequest amBlacklist =
             appAttempt.getAMBlacklistManager().getBlacklistUpdates();
@@ -1116,7 +1119,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
         Allocation amContainerAllocation =
             appAttempt.scheduler.allocate(
                 appAttempt.applicationAttemptId,
-                Collections.singletonList(appAttempt.amReq),
+                appAttempt.amReqs,
                 EMPTY_CONTAINER_RELEASE_LIST,
                 amBlacklist.getBlacklistAdditions(),
                 amBlacklist.getBlacklistRemovals(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index ce6d2a2..1d6ce6c8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -1162,4 +1162,9 @@ public abstract class AbstractYarnScheduler
     return SchedulerUtils.createOpportunisticRmContainer(
         rmContext, demotedContainer, false);
   }
+
+  @Override
+  public List<NodeId> getNodeIds(String resourceName) {
+    return nodeTracker.getNodeIdsByResourceName(resourceName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index e487f69..010e645 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -268,6 +268,9 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
 
   /**
    * Convenience method to filter nodes based on a condition.
+   *
+   * @param nodeFilter A {@link NodeFilter} for filtering the nodes
+   * @return A list of filtered nodes
    */
   public List<N> getNodes(NodeFilter nodeFilter) {
     List<N> nodeList = new ArrayList<>();
@@ -288,6 +291,37 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     return nodeList;
   }
 
+  public List<NodeId> getAllNodeIds() {
+    return getNodeIds(null);
+  }
+
+  /**
+   * Convenience method to filter nodes based on a condition.
+   *
+   * @param nodeFilter A {@link NodeFilter} for filtering the nodes
+   * @return A list of filtered nodes
+   */
+  public List<NodeId> getNodeIds(NodeFilter nodeFilter) {
+    List<NodeId> nodeList = new ArrayList<>();
+    readLock.lock();
+    try {
+      if (nodeFilter == null) {
+        for (N node : nodes.values()) {
+          nodeList.add(node.getNodeID());
+        }
+      } else {
+        for (N node : nodes.values()) {
+          if (nodeFilter.accept(node)) {
+            nodeList.add(node.getNodeID());
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return nodeList;
+  }
+
   /**
    * Convenience method to sort nodes.
    *
@@ -320,11 +354,38 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         resourceName != null && !resourceName.isEmpty());
     List<N> retNodes = new ArrayList<>();
     if (ResourceRequest.ANY.equals(resourceName)) {
-      return getAllNodes();
+      retNodes.addAll(getAllNodes());
     } else if (nodeNameToNodeMap.containsKey(resourceName)) {
       retNodes.add(nodeNameToNodeMap.get(resourceName));
     } else if (nodesPerRack.containsKey(resourceName)) {
-      return nodesPerRack.get(resourceName);
+      retNodes.addAll(nodesPerRack.get(resourceName));
+    } else {
+      LOG.info(
+          "Could not find a node matching given resourceName " + resourceName);
+    }
+    return retNodes;
+  }
+
+  /**
+   * Convenience method to return list of {@link NodeId} corresponding to
+   * resourceName passed in the {@link ResourceRequest}.
+   *
+   * @param resourceName Host/rack name of the resource, or
+   * {@link ResourceRequest#ANY}
+   * @return list of {@link NodeId} that match the resourceName
+   */
+  public List<NodeId> getNodeIdsByResourceName(final String resourceName) {
+    Preconditions.checkArgument(
+        resourceName != null && !resourceName.isEmpty());
+    List<NodeId> retNodes = new ArrayList<>();
+    if (ResourceRequest.ANY.equals(resourceName)) {
+      retNodes.addAll(getAllNodeIds());
+    } else if (nodeNameToNodeMap.containsKey(resourceName)) {
+      retNodes.add(nodeNameToNodeMap.get(resourceName).getNodeID());
+    } else if (nodesPerRack.containsKey(resourceName)) {
+      for (N node : nodesPerRack.get(resourceName)) {
+        retNodes.add(node.getNodeID());
+      }
     } else {
       LOG.info(
           "Could not find a node matching given resourceName " + resourceName);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
index 5649ccf..d96d625 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 
@@ -49,4 +51,11 @@ public interface ResourceScheduler extends YarnScheduler, 
Recoverable {
    * @throws IOException
    */
   void reinitialize(Configuration conf, RMContext rmContext) throws 
IOException;
+
+  /**
+   * Get the {@link NodeId} available in the cluster by resource name.
+   * @param resourceName resource name
+   * @return the number of available {@link NodeId} by resource name.
+   */
+  List<NodeId> getNodeIds(String resourceName);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f12efb4..e824b6c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -139,18 +139,20 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
     Resource amResource;
     String partition;
 
-    if (rmApp == null || rmApp.getAMResourceRequest() == null) {
+    if (rmApp == null || rmApp.getAMResourceRequests() == null
+        || rmApp.getAMResourceRequests().isEmpty()) {
       // the rmApp may be undefined (the resource manager checks for this too)
       // and unmanaged applications do not provide an amResource request
       // in these cases, provide a default using the scheduler
       amResource = rmContext.getScheduler().getMinimumResourceCapability();
       partition = CommonNodeLabelsManager.NO_LABEL;
     } else {
-      amResource = rmApp.getAMResourceRequest().getCapability();
+      amResource = rmApp.getAMResourceRequests().get(0).getCapability();
       partition =
-          (rmApp.getAMResourceRequest().getNodeLabelExpression() == null)
+          (rmApp.getAMResourceRequests().get(0)
+              .getNodeLabelExpression() == null)
           ? CommonNodeLabelsManager.NO_LABEL
-          : rmApp.getAMResourceRequest().getNodeLabelExpression();
+          : rmApp.getAMResourceRequests().get(0).getNodeLabelExpression();
     }
 
     setAppAMNodePartitionName(partition);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
index 4e85b67..10e627a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
@@ -229,7 +229,7 @@ public class AppInfo {
       appNodeLabelExpression =
           app.getApplicationSubmissionContext().getNodeLabelExpression();
       amNodeLabelExpression = (unmanagedApplication) ? null
-          : app.getAMResourceRequest().getNodeLabelExpression();
+          : app.getAMResourceRequests().get(0).getNodeLabelExpression();
 
       // Setting partition based resource usage of application
       ResourceScheduler scheduler = rm.getRMContext().getScheduler();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index f9f42ad..aca2fc5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -24,6 +24,7 @@ import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -678,6 +679,17 @@ public class MockRM extends ResourceManager {
         tokensConf);
   }
 
+  public RMApp submitApp(List<ResourceRequest> amResourceRequests)
+      throws Exception {
+    return submitApp(amResourceRequests, "app1",
+        "user", null, false, null,
+        super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
+        false, false, null, 0, null, true,
+        amResourceRequests.get(0).getPriority(),
+        amResourceRequests.get(0).getNodeLabelExpression(), null, null);
+  }
+
   public RMApp submitApp(Resource capability, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType,
@@ -688,6 +700,30 @@ public class MockRM extends ResourceManager {
       Map<ApplicationTimeoutType, Long> applicationTimeouts,
       ByteBuffer tokensConf)
       throws Exception {
+    priority = (priority == null) ? Priority.newInstance(0) : priority;
+    ResourceRequest amResourceRequest = ResourceRequest.newInstance(
+        priority, ResourceRequest.ANY, capability, 1);
+    if (amLabel != null && !amLabel.isEmpty()) {
+      amResourceRequest.setNodeLabelExpression(amLabel.trim());
+    }
+    return submitApp(Collections.singletonList(amResourceRequest), name, user,
+        acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted,
+        keepContainers, isAppIdProvided, applicationId,
+        attemptFailuresValidityInterval, logAggregationContext,
+        cancelTokensWhenComplete, priority, amLabel, applicationTimeouts,
+        tokensConf);
+  }
+
+  public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
+      String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
+      String queue, int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+      ApplicationId applicationId, long attemptFailuresValidityInterval,
+      LogAggregationContext logAggregationContext,
+      boolean cancelTokensWhenComplete, Priority priority, String amLabel,
+      Map<ApplicationTimeoutType, Long> applicationTimeouts,
+      ByteBuffer tokensConf)
+      throws Exception {
     ApplicationId appId = isAppIdProvided ? applicationId : null;
     ApplicationClientProtocol client = getClientRMService();
     if (! isAppIdProvided) {
@@ -718,7 +754,6 @@ public class MockRM extends ResourceManager {
     sub.setApplicationType(appType);
     ContainerLaunchContext clc = Records
         .newRecord(ContainerLaunchContext.class);
-    sub.setResource(capability);
     clc.setApplicationACLs(acls);
     if (ts != null && UserGroupInformation.isSecurityEnabled()) {
       DataOutputBuffer dob = new DataOutputBuffer();
@@ -733,12 +768,12 @@ public class MockRM extends ResourceManager {
       sub.setLogAggregationContext(logAggregationContext);
     }
     sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
-    ResourceRequest amResourceRequest = ResourceRequest.newInstance(
-        Priority.newInstance(0), ResourceRequest.ANY, capability, 1);
     if (amLabel != null && !amLabel.isEmpty()) {
-      amResourceRequest.setNodeLabelExpression(amLabel.trim());
+      for (ResourceRequest amResourceRequest : amResourceRequests) {
+        amResourceRequest.setNodeLabelExpression(amLabel.trim());
+      }
     }
-    sub.setAMContainerResourceRequest(amResourceRequest);
+    sub.setAMContainerResourceRequests(amResourceRequests);
     req.setApplicationSubmissionContext(sub);
     UserGroupInformation fakeUser =
       UserGroupInformation.createUserForTesting(user, new String[] 
{"someGroup"});

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 1aec76f..8ec3eae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
@@ -50,6 +52,8 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -57,11 +61,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -72,6 +78,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -307,7 +314,7 @@ public class TestAppManager{
     ResourceRequest resReg =
         ResourceRequest.newInstance(Priority.newInstance(0),
             ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
-    sub.setAMContainerResourceRequest(resReg);
+    sub.setAMContainerResourceRequests(Collections.singletonList(resReg));
     req.setApplicationSubmissionContext(sub);
     sub.setAMContainerSpec(mock(ContainerLaunchContext.class));
     try {
@@ -517,8 +524,157 @@ public class TestAppManager{
     Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, 
appEventType);
   }
 
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequests() throws Exception {
+    asContext.setResource(Resources.createResource(1024));
+    asContext.setAMContainerResourceRequest(
+        ResourceRequest.newInstance(Priority.newInstance(0),
+            ResourceRequest.ANY, Resources.createResource(1024), 1, true));
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
+        ResourceRequest.ANY, Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
+        "/rack", Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
+        "/rack/node", Resources.createResource(1025), 1, true));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    // getAMContainerResourceRequest uses the first entry of
+    // getAMContainerResourceRequests
+    Assert.assertEquals(reqs.get(0), 
asContext.getAMContainerResourceRequest());
+    Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
+    RMApp app = testRMAppSubmit();
+    for (ResourceRequest req : reqs) {
+      req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    }
+    // setAMContainerResourceRequests has priority over
+    // setAMContainerResourceRequest and setResource
+    Assert.assertEquals(reqs, app.getAMResourceRequests());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequest() throws Exception {
+    asContext.setResource(Resources.createResource(1024));
+    asContext.setAMContainerResourceRequests(null);
+    ResourceRequest req =
+        ResourceRequest.newInstance(Priority.newInstance(0),
+            ResourceRequest.ANY, Resources.createResource(1025), 1, true);
+    asContext.setAMContainerResourceRequest(cloneResourceRequest(req));
+    // getAMContainerResourceRequests uses a singleton list of
+    // getAMContainerResourceRequest
+    Assert.assertEquals(req, asContext.getAMContainerResourceRequest());
+    Assert.assertEquals(req, 
asContext.getAMContainerResourceRequests().get(0));
+    Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size());
+    RMApp app = testRMAppSubmit();
+    req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    // setAMContainerResourceRequest has priority over setResource
+    Assert.assertEquals(Collections.singletonList(req),
+        app.getAMResourceRequests());
+  }
+
   @Test
-  public void testRMAppSubmit() throws Exception {
+  public void testRMAppSubmitResource() throws Exception {
+    asContext.setResource(Resources.createResource(1024));
+    asContext.setAMContainerResourceRequests(null);
+    RMApp app = testRMAppSubmit();
+    // setResource
+    Assert.assertEquals(Collections.singletonList(
+        ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
+        ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")),
+        app.getAMResourceRequests());
+  }
+
+  @Test
+  public void testRMAppSubmitNoResourceRequests() throws Exception {
+    asContext.setResource(null);
+    asContext.setAMContainerResourceRequests(null);
+    try {
+      testRMAppSubmit();
+      Assert.fail("Should have failed due to no ResourceRequest");
+    } catch (InvalidResourceRequestException e) {
+      Assert.assertEquals(
+          "Invalid resource request, no resources requested",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequestsDisagree()
+      throws Exception {
+    asContext.setResource(null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    ResourceRequest anyReq = ResourceRequest.newInstance(
+        Priority.newInstance(1),
+        ResourceRequest.ANY, Resources.createResource(1024), 1, false, 
"label1",
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+    reqs.add(anyReq);
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(2),
+        "/rack", Resources.createResource(1025), 2, false, "",
+        ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(3),
+        "/rack/node", Resources.createResource(1026), 3, true, "",
+        ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    RMApp app = testRMAppSubmit();
+    // It should force the requests to all agree on these points
+    for (ResourceRequest req : reqs) {
+      req.setCapability(anyReq.getCapability());
+      req.setExecutionTypeRequest(
+          ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+      req.setNumContainers(1);
+      req.setPriority(Priority.newInstance(0));
+    }
+    Assert.assertEquals(reqs, app.getAMResourceRequests());
+  }
+
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequestsNoAny()
+      throws Exception {
+    asContext.setResource(null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        "/rack", Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        "/rack/node", Resources.createResource(1025), 1, true));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    // getAMContainerResourceRequest uses the first entry of
+    // getAMContainerResourceRequests
+    Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
+    try {
+      testRMAppSubmit();
+      Assert.fail("Should have failed due to missing ANY ResourceRequest");
+    } catch (InvalidResourceRequestException e) {
+      Assert.assertEquals(
+          "Invalid resource request, no resource request specified with *",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequestsTwoManyAny()
+      throws Exception {
+    asContext.setResource(null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        ResourceRequest.ANY, Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        ResourceRequest.ANY, Resources.createResource(1025), 1, false));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    // getAMContainerResourceRequest uses the first entry of
+    // getAMContainerResourceRequests
+    Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
+    try {
+      testRMAppSubmit();
+      Assert.fail("Should have failed due to too many ANY ResourceRequests");
+    } catch (InvalidResourceRequestException e) {
+      Assert.assertEquals(
+          "Invalid resource request, only one resource request with * is " +
+              "allowed", e.getMessage());
+    }
+  }
+
+  private RMApp testRMAppSubmit() throws Exception {
     appMonitor.submitApplication(asContext, "test");
     RMApp app = rmContext.getRMApps().get(appId);
     Assert.assertNotNull("app is null", app);
@@ -529,12 +685,14 @@ public class TestAppManager{
 
     // wait for event to be processed
     int timeoutSecs = 0;
-    while ((getAppEventType() == RMAppEventType.KILL) && 
+    while ((getAppEventType() == RMAppEventType.KILL) &&
         timeoutSecs++ < 20) {
       Thread.sleep(1000);
     }
     Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
         getAppEventType());
+
+    return app;
   }
 
   @Test
@@ -732,6 +890,15 @@ public class TestAppManager{
     ResourceCalculator rs = mock(ResourceCalculator.class);
     when(scheduler.getResourceCalculator()).thenReturn(rs);
 
+    when(scheduler.getNormalizedResource((Resource) any()))
+        .thenAnswer(new Answer<Resource>() {
+      @Override
+      public Resource answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        return (Resource) invocationOnMock.getArguments()[0];
+      }
+    });
+
     return scheduler;
   }
 
@@ -748,4 +915,26 @@ public class TestAppManager{
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
   }
 
+  private static ResourceRequest cloneResourceRequest(ResourceRequest req) {
+    return ResourceRequest.newInstance(
+        Priority.newInstance(req.getPriority().getPriority()),
+        new String(req.getResourceName()),
+        Resource.newInstance(req.getCapability().getMemorySize(),
+            req.getCapability().getVirtualCores()),
+        req.getNumContainers(),
+        req.getRelaxLocality(),
+        req.getNodeLabelExpression() != null
+            ? new String(req.getNodeLabelExpression()) : null,
+        ExecutionTypeRequest.newInstance(
+            req.getExecutionTypeRequest().getExecutionType()));
+  }
+
+  private static List<ResourceRequest> cloneResourceRequests(
+      List<ResourceRequest> reqs) {
+    List<ResourceRequest> cloneReqs = new ArrayList<>();
+    for (ResourceRequest req : reqs) {
+      cloneReqs.add(cloneResourceRequest(req));
+    }
+    return cloneReqs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 01c1b43..3c70efe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1039,9 +1040,9 @@ public class TestClientRMService {
         spy(new RMAppImpl(applicationId3, rmContext, config, null, null,
             queueName, asContext, yarnScheduler, null,
             System.currentTimeMillis(), "YARN", null,
-            BuilderUtils.newResourceRequest(
+            Collections.singletonList(BuilderUtils.newResourceRequest(
                 RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-                Resource.newInstance(1024, 1), 1)){
+                Resource.newInstance(1024, 1), 1))){
                   @Override
                   public ApplicationReport createAndGetApplicationReport(
                       String clientUserName, boolean allowAccess) {
@@ -1055,7 +1056,8 @@ public class TestClientRMService {
                     return report;
                   }
               });
-    app.getAMResourceRequest().setNodeLabelExpression(amNodeLabelExpression);
+    app.getAMResourceRequests().get(0)
+        .setNodeLabelExpression(amNodeLabelExpression);
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(123456, 1), 1);
     RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a65011a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
index c80a799..b4adf48 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -28,6 +29,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -157,6 +161,186 @@ public class TestNodeBlacklistingOnAMFailures {
   }
 
   @Test(timeout = 100000)
+  public void testNodeBlacklistingOnAMFailureStrictNodeLocality()
+      throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
+        true);
+
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = startRM(conf, dispatcher);
+    CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
+
+    // Register 5 nodes, so that we can blacklist atleast one if AM container
+    // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockNM nm2 =
+        new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
+    nm2.registerNode();
+
+    MockNM nm3 =
+        new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
+    nm3.registerNode();
+
+    MockNM nm4 =
+        new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
+    nm4.registerNode();
+
+    MockNM nm5 =
+        new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
+    nm5.registerNode();
+
+    // Specify a strict locality on nm2
+    List<ResourceRequest> reqs = new ArrayList<>();
+    ResourceRequest nodeReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), nm2.getNodeId().getHost(),
+        Resource.newInstance(200, 1), 1, true);
+    ResourceRequest rackReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), "/default-rack",
+        Resource.newInstance(200, 1), 1, false);
+    ResourceRequest anyReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), ResourceRequest.ANY,
+        Resource.newInstance(200, 1), 1, false);
+    reqs.add(anyReq);
+    reqs.add(rackReq);
+    reqs.add(nodeReq);
+    RMApp app = rm.submitApp(reqs);
+
+    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
+    ContainerId amContainerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
+    NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
+    Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    // Set the exist status to INVALID so that we can verify that the system
+    // automatically blacklisting the node
+    makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
+
+    // restart the am
+    RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
+    System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
+
+    nm2.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Now the AM container should be allocated
+    MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
+
+    MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+    amContainerId =
+        ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
+    rmContainer = scheduler.getRMContainer(amContainerId);
+    nodeWhereAMRan = rmContainer.getAllocatedNode();
+
+    // The second AM should be on the same node because the strict locality
+    // made the eligible nodes only 1, so the blacklisting threshold kicked in
+    System.out.println("AM ran on " + nodeWhereAMRan);
+    Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    am2.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+  }
+
+  @Test(timeout = 100000)
+  public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality()
+      throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
+        true);
+
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = startRM(conf, dispatcher);
+    CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
+
+    // Register 5 nodes, so that we can blacklist atleast one if AM container
+    // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockNM nm2 =
+        new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
+    nm2.registerNode();
+
+    MockNM nm3 =
+        new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
+    nm3.registerNode();
+
+    MockNM nm4 =
+        new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
+    nm4.registerNode();
+
+    MockNM nm5 =
+        new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
+    nm5.registerNode();
+
+    // Specify a relaxed locality on nm2
+    List<ResourceRequest> reqs = new ArrayList<>();
+    ResourceRequest nodeReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), nm2.getNodeId().getHost(),
+        Resource.newInstance(200, 1), 1, true);
+    ResourceRequest rackReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), "/default-rack",
+        Resource.newInstance(200, 1), 1, true);
+    ResourceRequest anyReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), ResourceRequest.ANY,
+        Resource.newInstance(200, 1), 1, true);
+    reqs.add(anyReq);
+    reqs.add(rackReq);
+    reqs.add(nodeReq);
+    RMApp app = rm.submitApp(reqs);
+
+    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
+    ContainerId amContainerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
+    NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
+    Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    // Set the exist status to INVALID so that we can verify that the system
+    // automatically blacklisting the node
+    makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
+
+    // restart the am
+    RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
+    System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
+
+    nm2.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    nm4.nodeHeartbeat(true);
+    nm5.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Now the AM container should be allocated
+    MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
+
+    MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+    amContainerId =
+        ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
+    rmContainer = scheduler.getRMContainer(amContainerId);
+    nodeWhereAMRan = rmContainer.getAllocatedNode();
+
+    // The second AM should be on a different node because the relaxed locality
+    // made the app schedulable on other nodes and nm2 is blacklisted
+    System.out.println("AM ran on " + nodeWhereAMRan);
+    Assert.assertNotEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    am2.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+  }
+
+  @Test(timeout = 100000)
   public void testNoBlacklistingForNonSystemErrors() throws Exception {
 
     YarnConfiguration conf = new YarnConfiguration();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to