http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
new file mode 100644
index 0000000..94cb28e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Class that tests the allocation of OPPORTUNISTIC containers through the
+ * centralized ResourceManager.
+ */
+public class TestOpportunisticContainerAllocationE2E {
+  private static Configuration conf = null;
+  private static MiniYARNCluster yarnCluster = null;
+  private static YarnClient yarnClient = null;
+  private static List<NodeReport> nodeReports = null;
+  private static int nodeCount = 3;
+
+  private static final int ROLLING_INTERVAL_SEC = 13;
+  private static final long AM_EXPIRE_MS = 4000;
+
+  private static Resource capability;
+  private static ProfileCapability profileCapability;
+  private static Priority priority;
+  private static Priority priority2;
+  private static Priority priority3;
+  private static Priority priority4;
+  private static String node;
+  private static String rack;
+  private static String[] nodes;
+  private static String[] racks;
+  private final static int DEFAULT_ITERATION = 3;
+
+  // Per test..
+  private ApplicationAttemptId attemptId = null;
+  private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
+  private long availMB;
+  private int availVCores;
+  private long allocMB;
+  private int allocVCores;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // start minicluster
+    conf = new YarnConfiguration();
+    conf.setLong(
+        YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+        ROLLING_INTERVAL_SEC);
+    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
+    // set the minimum allocation so that resource decrease can go under 1024
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+    yarnCluster =
+        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    // start rm client
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    yarnClient.start();
+
+    // get node info
+    nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+
+    priority = Priority.newInstance(1);
+    priority2 = Priority.newInstance(2);
+    priority3 = Priority.newInstance(3);
+    priority4 = Priority.newInstance(4);
+    capability = Resource.newInstance(512, 1);
+    profileCapability = ProfileCapability.newInstance(capability);
+
+    node = nodeReports.get(0).getNodeId().getHost();
+    rack = nodeReports.get(0).getRackName();
+    nodes = new String[]{node};
+    racks = new String[]{rack};
+  }
+
+  @Before
+  public void startApp() throws Exception {
+    // submit new app
+    ApplicationSubmissionContext appContext =
+        yarnClient.createApplication().getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = 
BuilderUtils.newContainerLaunchContext(
+        Collections.<String, LocalResource>emptyMap(),
+        new HashMap<String, String>(), Arrays.asList("sleep", "100"),
+        new HashMap<String, ByteBuffer>(), null,
+        new HashMap<ApplicationAccessType, String>());
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setResource(Resource.newInstance(1024, 1));
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest =
+        Records.newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    RMAppAttempt appAttempt = null;
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() ==
+          YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        appAttempt = 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+        while (true) {
+          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+            break;
+          }
+        }
+        break;
+      }
+    }
+    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
+    // of testing.
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+    // emulate RM setup of AMRM token in credentials by adding the token
+    // *before* setting the token service
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    appAttempt.getAMRMToken()
+        .setService(ClientRMProxy.getAMRMTokenService(conf));
+
+    // start am rm client
+    amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient
+        .createAMRMClient();
+
+    //setting an instance NMTokenCache
+    amClient.setNMTokenCache(new NMTokenCache());
+    //asserting we are not using the singleton instance cache
+    Assert.assertNotSame(NMTokenCache.getSingleton(),
+        amClient.getNMTokenCache());
+
+    amClient.init(conf);
+    amClient.start();
+
+    amClient.registerApplicationMaster("Host", 10000, "");
+  }
+
+  @After
+  public void cancelApp() throws YarnException, IOException {
+    try {
+      amClient
+          .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+              null);
+    } finally {
+      if (amClient != null &&
+          amClient.getServiceState() == Service.STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+    yarnClient.killApplication(attemptId.getApplicationId());
+    attemptId = null;
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (yarnClient != null &&
+        yarnClient.getServiceState() == Service.STATE.STARTED) {
+      yarnClient.stop();
+    }
+    if (yarnCluster != null &&
+        yarnCluster.getServiceState() == Service.STATE.STARTED) {
+      yarnCluster.stop();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testPromotionFromAcquired() throws YarnException, IOException {
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(1, oppContainersRequestedAny);
+
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
+    int iterationsLeft = 50;
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    updateMetrics("Before Opp Allocation");
+
+    while (allocatedContainerCount < oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+          allocatedOpportContainers.put(container.getId(), container);
+          removeCR(container);
+        }
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < oppContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+    assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+    updateMetrics("After Opp Allocation / Before Promotion");
+
+    try {
+      Container c = allocatedOpportContainers.values().iterator().next();
+      amClient.requestContainerUpdate(
+          c, UpdateContainerRequest.newInstance(c.getVersion(),
+              c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+              null, ExecutionType.OPPORTUNISTIC));
+      Assert.fail("Should throw Exception..");
+    } catch (IllegalArgumentException e) {
+      System.out.println("## " + e.getMessage());
+      Assert.assertTrue(e.getMessage().contains(
+          "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+    }
+
+    Container c = allocatedOpportContainers.values().iterator().next();
+    amClient.requestContainerUpdate(
+        c, UpdateContainerRequest.newInstance(c.getVersion(),
+            c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED));
+    iterationsLeft = 120;
+    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      if (allocResponse.getUpdatedContainers() != null) {
+        for (UpdatedContainer updatedContainer : allocResponse
+            .getUpdatedContainers()) {
+          System.out.println("Got update..");
+          updatedContainers.put(updatedContainer.getContainer().getId(),
+              updatedContainer);
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
+      }
+    }
+
+    updateMetrics("After Promotion");
+
+    assertEquals(1, updatedContainers.size());
+    for (ContainerId cId : allocatedOpportContainers.keySet()) {
+      Container orig = allocatedOpportContainers.get(cId);
+      UpdatedContainer updatedContainer = updatedContainers.get(cId);
+      assertNotNull(updatedContainer);
+      assertEquals(ExecutionType.GUARANTEED,
+          updatedContainer.getContainer().getExecutionType());
+      assertEquals(orig.getResource(),
+          updatedContainer.getContainer().getResource());
+      assertEquals(orig.getNodeId(),
+          updatedContainer.getContainer().getNodeId());
+      assertEquals(orig.getVersion() + 1,
+          updatedContainer.getContainer().getVersion());
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+    amClient.ask.clear();
+  }
+
+  @Test(timeout = 60000)
+  public void testDemotionFromAcquired() throws YarnException, IOException {
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority3));
+
+    int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+        .remoteRequest.getNumContainers();
+
+    assertEquals(1, guarContainersRequestedAny);
+
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>();
+    int iterationsLeft = 50;
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    updateMetrics("Before Guar Allocation");
+
+    while (allocatedContainerCount < guarContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+          allocatedGuarContainers.put(container.getId(), container);
+          removeCR(container);
+        }
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < guarContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(guarContainersRequestedAny, allocatedContainerCount);
+    assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
+
+    updateMetrics("After Guar Allocation / Before Demotion");
+
+    try {
+      Container c = allocatedGuarContainers.values().iterator().next();
+      amClient.requestContainerUpdate(
+          c, UpdateContainerRequest.newInstance(c.getVersion(),
+              c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+              null, ExecutionType.GUARANTEED));
+      Assert.fail("Should throw Exception..");
+    } catch (IllegalArgumentException e) {
+      System.out.println("## " + e.getMessage());
+      Assert.assertTrue(e.getMessage().contains(
+          "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+    }
+
+    Container c = allocatedGuarContainers.values().iterator().next();
+    amClient.requestContainerUpdate(
+        c, UpdateContainerRequest.newInstance(c.getVersion(),
+            c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+            null, ExecutionType.OPPORTUNISTIC));
+    iterationsLeft = 120;
+    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      if (allocResponse.getUpdatedContainers() != null) {
+        for (UpdatedContainer updatedContainer : allocResponse
+            .getUpdatedContainers()) {
+          System.out.println("Got update..");
+          updatedContainers.put(updatedContainer.getContainer().getId(),
+              updatedContainer);
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
+      }
+    }
+
+    updateMetrics("After Demotion");
+
+    assertEquals(1, updatedContainers.size());
+    for (ContainerId cId : allocatedGuarContainers.keySet()) {
+      Container orig = allocatedGuarContainers.get(cId);
+      UpdatedContainer updatedContainer = updatedContainers.get(cId);
+      assertNotNull(updatedContainer);
+      assertEquals(ExecutionType.OPPORTUNISTIC,
+          updatedContainer.getContainer().getExecutionType());
+      assertEquals(orig.getResource(),
+          updatedContainer.getContainer().getResource());
+      assertEquals(orig.getNodeId(),
+          updatedContainer.getContainer().getNodeId());
+      assertEquals(orig.getVersion() + 1,
+          updatedContainer.getContainer().getVersion());
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+    amClient.ask.clear();
+  }
+
+  @Test(timeout = 60000)
+  public void testMixedAllocationAndRelease() throws YarnException,
+      IOException {
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    int containersRequestedNode = amClient.getTable(0).get(priority,
+        node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+        .getNumContainers();
+    int containersRequestedRack = amClient.getTable(0).get(priority,
+        rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+        .getNumContainers();
+    int containersRequestedAny = amClient.getTable(0).get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+        .remoteRequest.getNumContainers();
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(4, containersRequestedNode);
+    assertEquals(4, containersRequestedRack);
+    assertEquals(4, containersRequestedAny);
+    assertEquals(2, oppContainersRequestedAny);
+
+    assertEquals(4, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    containersRequestedNode = amClient.getTable(0).get(priority,
+        node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+        .getNumContainers();
+    containersRequestedRack = amClient.getTable(0).get(priority,
+        rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+        .getNumContainers();
+    containersRequestedAny = amClient.getTable(0).get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+        .remoteRequest.getNumContainers();
+    oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(2, containersRequestedNode);
+    assertEquals(2, containersRequestedRack);
+    assertEquals(2, containersRequestedAny);
+    assertEquals(1, oppContainersRequestedAny);
+
+    assertEquals(4, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int allocatedOpportContainerCount = 0;
+    int iterationsLeft = 50;
+    Set<ContainerId> releases = new TreeSet<>();
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    while (allocatedContainerCount <
+        containersRequestedAny + oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+          allocatedOpportContainerCount++;
+        }
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(containersRequestedAny + oppContainersRequestedAny,
+        allocatedContainerCount);
+    assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
+    for (ContainerId rejectContainerId : releases) {
+      amClient.releaseAssignedContainer(rejectContainerId);
+    }
+    assertEquals(3, amClient.release.size());
+    assertEquals(0, amClient.ask.size());
+
+    // need to tell the AMRMClient that we don't need these resources anymore
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    assertEquals(4, amClient.ask.size());
+
+    iterationsLeft = 3;
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      assertEquals(0, allocResponse.getAllocatedContainers().size());
+      if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+        for (ContainerStatus cStatus : allocResponse
+            .getCompletedContainersStatuses()) {
+          if (releases.contains(cStatus.getContainerId())) {
+            assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+            assertEquals(-100, cStatus.getExitStatus());
+            releases.remove(cStatus.getContainerId());
+          }
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
+      }
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+  }
+
+  /**
+   * Tests allocation with requests comprising only opportunistic containers.
+   */
+  @Test(timeout = 60000)
+  public void testOpportunisticAllocation() throws YarnException, IOException {
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    int oppContainersRequestedAny = amClient.getTable(0)
+        .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+            profileCapability).remoteRequest.getNumContainers();
+
+    assertEquals(2, oppContainersRequestedAny);
+
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int iterationsLeft = 10;
+    Set<ContainerId> releases = new TreeSet<>();
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    while (allocatedContainerCount < oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        allocatedContainerCount++;
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < oppContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+    assertEquals(1, receivedNMTokens.values().size());
+  }
+
+  private void removeCR(Container container) {
+    List<? extends Collection<AMRMClient.ContainerRequest>>
+        matchingRequests = amClient.getMatchingRequests(container
+            .getPriority(),
+        ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+        container.getResource());
+    Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>();
+    for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) {
+      for (AMRMClient.ContainerRequest cr : rc) {
+        toRemove.add(cr);
+      }
+    }
+    for (AMRMClient.ContainerRequest cr : toRemove) {
+      amClient.removeContainerRequest(cr);
+    }
+  }
+
+  private void updateMetrics(String msg) {
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler)yarnCluster.getResourceManager()
+            .getResourceScheduler();
+    availMB = scheduler.getRootQueueMetrics().getAvailableMB();
+    availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+    allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
+    allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+    System.out.println("## METRICS (" + msg + ")==>");
+    System.out.println(" : availMB=" + availMB + ", " +
+        "availVCores=" +availVCores + ", " +
+        "allocMB=" + allocMB + ", " +
+        "allocVCores=" + allocVCores + ", ");
+    System.out.println("<== ##");
+  }
+
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
index e403a12..f621aa2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -47,6 +47,24 @@ public abstract class RemoteNode implements 
Comparable<RemoteNode> {
   }
 
   /**
+   * Create new Instance.
+   * @param nodeId NodeId.
+   * @param httpAddress Http address.
+   * @param rackName Rack Name.
+   * @return RemoteNode instance.
+   */
+  @Private
+  @Unstable
+  public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
+      String rackName) {
+    RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
+    remoteNode.setNodeId(nodeId);
+    remoteNode.setHttpAddress(httpAddress);
+    remoteNode.setRackName(rackName);
+    return remoteNode;
+  }
+
+  /**
    * Get {@link NodeId}.
    * @return NodeId.
    */
@@ -79,6 +97,22 @@ public abstract class RemoteNode implements 
Comparable<RemoteNode> {
   public abstract void setHttpAddress(String httpAddress);
 
   /**
+   * Get Rack Name.
+   * @return Rack Name.
+   */
+  @Private
+  @Unstable
+  public abstract String getRackName();
+
+  /**
+   * Set Rack Name.
+   * @param rackName Rack Name.
+   */
+  @Private
+  @Unstable
+  public abstract void setRackName(String rackName);
+
+  /**
    * Use the underlying {@link NodeId} comparator.
    * @param other RemoteNode.
    * @return Comparison.
@@ -92,6 +126,7 @@ public abstract class RemoteNode implements 
Comparable<RemoteNode> {
   public String toString() {
     return "RemoteNode{" +
         "nodeId=" + getNodeId() + ", " +
+        "rackName=" + getRackName() + ", " +
         "httpAddress=" + getHttpAddress() + "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
index 3e4fd4a..c2492cf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
@@ -118,6 +118,25 @@ public class RemoteNodePBImpl extends RemoteNode {
   }
 
   @Override
+  public String getRackName() {
+    RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasRackName()) {
+      return null;
+    }
+    return (p.getRackName());
+  }
+
+  @Override
+  public void setRackName(String rackName) {
+    maybeInitBuilder();
+    if (rackName == null) {
+      builder.clearRackName();
+      return;
+    }
+    builder.setRackName(rackName);
+  }
+
+  @Override
   public int hashCode() {
     return getProto().hashCode();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 782dc02..ede4958 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -45,11 +45,14 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -61,6 +64,10 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class OpportunisticContainerAllocator {
 
+  private static final int NODE_LOCAL_LOOP = 0;
+  private static final int RACK_LOCAL_LOOP = 1;
+  private static final int OFF_SWITCH_LOOP = 2;
+
   /**
    * This class encapsulates application specific parameters used to build a
    * Container.
@@ -70,6 +77,7 @@ public class OpportunisticContainerAllocator {
     private Resource minResource;
     private Resource incrementResource;
     private int containerTokenExpiryInterval;
+    private int maxAllocationsPerSchedulerKeyPerRound = 1;
 
     /**
      * Return Max Resource.
@@ -135,6 +143,24 @@ public class OpportunisticContainerAllocator {
         int containerTokenExpiryInterval) {
       this.containerTokenExpiryInterval = containerTokenExpiryInterval;
     }
+
+    /**
+     * Get the Max Allocations per Scheduler Key per allocation round.
+     * @return maxAllocationsPerSchedulerKeyPerRound.
+     */
+    public int getMaxAllocationsPerSchedulerKeyPerRound() {
+      return maxAllocationsPerSchedulerKeyPerRound;
+    }
+
+    /**
+     * Set the Max Allocations per Scheduler Key per allocation round.
+     * @param maxAllocationsPerSchedulerKeyPerRound val.
+     */
+    public void setMaxAllocationsPerSchedulerKeyPerRound(
+        int maxAllocationsPerSchedulerKeyPerRound) {
+      this.maxAllocationsPerSchedulerKeyPerRound =
+          maxAllocationsPerSchedulerKeyPerRound;
+    }
   }
 
   /**
@@ -188,6 +214,72 @@ public class OpportunisticContainerAllocator {
 
   private final BaseContainerTokenSecretManager tokenSecretManager;
 
+  static class Allocation {
+    private final Container container;
+    private final String resourceName;
+
+    Allocation(Container container, String resourceName) {
+      this.container = container;
+      this.resourceName = resourceName;
+    }
+
+    Container getContainer() {
+      return container;
+    }
+
+    String getResourceName() {
+      return resourceName;
+    }
+  }
+
+  static class EnrichedResourceRequest {
+    private final Map<String, AtomicInteger> nodeLocations = new HashMap<>();
+    private final Map<String, AtomicInteger> rackLocations = new HashMap<>();
+    private final ResourceRequest request;
+
+    EnrichedResourceRequest(ResourceRequest request) {
+      this.request = request;
+    }
+
+    ResourceRequest getRequest() {
+      return request;
+    }
+
+    void addLocation(String location, int count) {
+      Map<String, AtomicInteger> m = rackLocations;
+      if (!location.startsWith("/")) {
+        m = nodeLocations;
+      }
+      if (count == 0) {
+        m.remove(location);
+      } else {
+        m.put(location, new AtomicInteger(count));
+      }
+    }
+
+    void removeLocation(String location) {
+      Map<String, AtomicInteger> m = rackLocations;
+      AtomicInteger count = m.get(location);
+      if (count == null) {
+        m = nodeLocations;
+        count = m.get(location);
+      }
+
+      if (count != null) {
+        if (count.decrementAndGet() == 0) {
+          m.remove(location);
+        }
+      }
+    }
+
+    Set<String> getNodeLocations() {
+      return nodeLocations.keySet();
+    }
+
+    Set<String> getRackLocations() {
+      return rackLocations.keySet();
+    }
+  }
   /**
    * Create a new Opportunistic Container Allocator.
    * @param tokenSecretManager TokenSecretManager
@@ -223,37 +315,55 @@ public class OpportunisticContainerAllocator {
     // Add OPPORTUNISTIC requests to the outstanding ones.
     opportContext.addToOutstandingReqs(oppResourceReqs);
 
-    // Satisfy the outstanding OPPORTUNISTIC requests.
+    Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
     List<Container> allocatedContainers = new ArrayList<>();
-    for (SchedulerRequestKey schedulerKey :
-        opportContext.getOutstandingOpReqs().descendingKeySet()) {
-      // Allocated containers :
-      //  Key = Requested Capability,
-      //  Value = List of Containers of given cap (the actual container size
-      //          might be different than what is requested, which is why
-      //          we need the requested capability (key) to match against
-      //          the outstanding reqs)
-      Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
-          opportContext, schedulerKey, applicationAttemptId, appSubmitter);
-      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
-        opportContext.matchAllocationToOutstandingRequest(
-            e.getKey(), e.getValue());
-        allocatedContainers.addAll(e.getValue());
+
+    // Satisfy the outstanding OPPORTUNISTIC requests.
+    boolean continueLoop = true;
+    while (continueLoop) {
+      continueLoop = false;
+      List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
+      for (SchedulerRequestKey schedulerKey :
+          opportContext.getOutstandingOpReqs().descendingKeySet()) {
+        // Allocated containers :
+        //  Key = Requested Capability,
+        //  Value = List of Containers of given cap (the actual container size
+        //          might be different than what is requested, which is why
+        //          we need the requested capability (key) to match against
+        //          the outstanding reqs)
+        Map<Resource, List<Allocation>> allocation = allocate(
+            rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
+            appSubmitter, nodeBlackList);
+        if (allocation.size() > 0) {
+          allocations.add(allocation);
+          continueLoop = true;
+        }
+      }
+      for (Map<Resource, List<Allocation>> allocation : allocations) {
+        for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) {
+          opportContext.matchAllocationToOutstandingRequest(
+              e.getKey(), e.getValue());
+          for (Allocation alloc : e.getValue()) {
+            allocatedContainers.add(alloc.getContainer());
+          }
+        }
       }
     }
 
     return allocatedContainers;
   }
 
-  private Map<Resource, List<Container>> allocate(long rmIdentifier,
+  private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
       OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
-      ApplicationAttemptId appAttId, String userName) throws YarnException {
-    Map<Resource, List<Container>> containers = new HashMap<>();
-    for (ResourceRequest anyAsk :
+      ApplicationAttemptId appAttId, String userName, Set<String> blackList)
+      throws YarnException {
+    Map<Resource, List<Allocation>> containers = new HashMap<>();
+    for (EnrichedResourceRequest enrichedAsk :
         appContext.getOutstandingOpReqs().get(schedKey).values()) {
       allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
-          appContext.getContainerIdGenerator(), appContext.getBlacklist(),
-          appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
+          appContext.getContainerIdGenerator(), blackList, appAttId,
+          appContext.getNodeMap(), userName, containers, enrichedAsk);
+      ResourceRequest anyAsk = enrichedAsk.getRequest();
       if (!containers.isEmpty()) {
         LOG.info("Opportunistic allocation requested for [priority={}, "
             + "allocationRequestId={}, num_containers={}, capability={}] "
@@ -269,43 +379,162 @@ public class OpportunisticContainerAllocator {
       AllocationParams appParams, ContainerIdGenerator idCounter,
       Set<String> blacklist, ApplicationAttemptId id,
       Map<String, RemoteNode> allNodes, String userName,
-      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
+      Map<Resource, List<Allocation>> allocations,
+      EnrichedResourceRequest enrichedAsk)
       throws YarnException {
+    if (allNodes.size() == 0) {
+      LOG.info("No nodes currently available to " +
+          "allocate OPPORTUNISTIC containers.");
+      return;
+    }
+    ResourceRequest anyAsk = enrichedAsk.getRequest();
     int toAllocate = anyAsk.getNumContainers()
-        - (containers.isEmpty() ? 0 :
-            containers.get(anyAsk.getCapability()).size());
-
-    List<RemoteNode> nodesForScheduling = new ArrayList<>();
-    for (Entry<String, RemoteNode> nodeEntry : allNodes.entrySet()) {
-      // Do not use blacklisted nodes for scheduling.
-      if (blacklist.contains(nodeEntry.getKey())) {
-        continue;
+        - (allocations.isEmpty() ? 0 :
+            allocations.get(anyAsk.getCapability()).size());
+    toAllocate = Math.min(toAllocate,
+        appParams.getMaxAllocationsPerSchedulerKeyPerRound());
+    int numAllocated = 0;
+    // Node Candidates are selected as follows:
+    // * Node local candidates selected in loop == 0
+    // * Rack local candidates selected in loop == 1
+    // * From loop == 2 onwards, we revert to off switch allocations.
+    int loopIndex = OFF_SWITCH_LOOP;
+    if (enrichedAsk.getNodeLocations().size() > 0) {
+      loopIndex = NODE_LOCAL_LOOP;
+    }
+    while (numAllocated < toAllocate) {
+      Collection<RemoteNode> nodeCandidates =
+          findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk);
+      for (RemoteNode rNode : nodeCandidates) {
+        String rNodeHost = rNode.getNodeId().getHost();
+        // Ignore black list
+        if (blacklist.contains(rNodeHost)) {
+          LOG.info("Nodes for scheduling has a blacklisted node" +
+              " [" + rNodeHost + "]..");
+          continue;
+        }
+        String location = ResourceRequest.ANY;
+        if (loopIndex == NODE_LOCAL_LOOP) {
+          if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
+            location = rNodeHost;
+          } else {
+            continue;
+          }
+        }
+        if (loopIndex == RACK_LOCAL_LOOP) {
+          if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
+            location = rNode.getRackName();
+          } else {
+            continue;
+          }
+        }
+        Container container = createContainer(rmIdentifier, appParams,
+            idCounter, id, userName, allocations, location,
+            anyAsk, rNode);
+        numAllocated++;
+        // Try to spread the allocations across the nodes.
+        // But don't add if it is a node local request.
+        if (loopIndex != NODE_LOCAL_LOOP) {
+          blacklist.add(rNode.getNodeId().getHost());
+        }
+        LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
+            "location [" + location + "]");
+        if (numAllocated >= toAllocate) {
+          break;
+        }
+      }
+      if (loopIndex == NODE_LOCAL_LOOP &&
+          enrichedAsk.getRackLocations().size() > 0) {
+        loopIndex = RACK_LOCAL_LOOP;
+      } else {
+        loopIndex++;
+      }
+      // Handle case where there are no nodes remaining after blacklist is
+      // considered.
+      if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
+        LOG.warn("Unable to allocate any opportunistic containers.");
+        break;
       }
-      nodesForScheduling.add(nodeEntry.getValue());
     }
-    if (nodesForScheduling.isEmpty()) {
-      LOG.warn("No nodes available for allocating opportunistic containers. [" 
+
-          "allNodes={}, blacklist={}]", allNodes, blacklist);
-      return;
+  }
+
+  private Collection<RemoteNode> findNodeCandidates(int loopIndex,
+      Map<String, RemoteNode> allNodes, Set<String> blackList,
+      EnrichedResourceRequest enrichedRR) {
+    if (loopIndex > 1) {
+      return allNodes.values();
+    } else {
+      LinkedList<RemoteNode> retList = new LinkedList<>();
+      int numContainers = enrichedRR.getRequest().getNumContainers();
+      while (numContainers > 0) {
+        if (loopIndex == 0) {
+          // Node local candidates
+          numContainers = collectNodeLocalCandidates(
+              allNodes, enrichedRR, retList, numContainers);
+        } else {
+          // Rack local candidates
+          numContainers = collectRackLocalCandidates(
+              allNodes, enrichedRR, retList, blackList, numContainers);
+        }
+        if (numContainers == enrichedRR.getRequest().getNumContainers()) {
+          // If there is no change in numContainers, then there is no point
+          // in looping again.
+          break;
+        }
+      }
+      return retList;
     }
-    int numAllocated = 0;
-    int nextNodeToSchedule = 0;
-    for (int numCont = 0; numCont < toAllocate; numCont++) {
-      nextNodeToSchedule++;
-      nextNodeToSchedule %= nodesForScheduling.size();
-      RemoteNode node = nodesForScheduling.get(nextNodeToSchedule);
-      Container container = buildContainer(rmIdentifier, appParams, idCounter,
-          anyAsk, id, userName, node);
-      List<Container> cList = containers.get(anyAsk.getCapability());
-      if (cList == null) {
-        cList = new ArrayList<>();
-        containers.put(anyAsk.getCapability(), cList);
+  }
+
+  private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
+      EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
+      Set<String> blackList, int numContainers) {
+    for (RemoteNode rNode : allNodes.values()) {
+      if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+        if (blackList.contains(rNode.getNodeId().getHost())) {
+          retList.addLast(rNode);
+        } else {
+          retList.addFirst(rNode);
+          numContainers--;
+        }
+      }
+      if (numContainers == 0) {
+        break;
+      }
+    }
+    return numContainers;
+  }
+
+  private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
+      EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
+      int numContainers) {
+    for (String nodeName : enrichedRR.getNodeLocations()) {
+      RemoteNode remoteNode = allNodes.get(nodeName);
+      if (remoteNode != null) {
+        retList.add(remoteNode);
+        numContainers--;
       }
-      cList.add(container);
-      numAllocated++;
-      LOG.info("Allocated [{}] as opportunistic.", container.getId());
+      if (numContainers == 0) {
+        break;
+      }
+    }
+    return numContainers;
+  }
+
+  private Container createContainer(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      ApplicationAttemptId id, String userName,
+      Map<Resource, List<Allocation>> allocations, String location,
+      ResourceRequest anyAsk, RemoteNode rNode) throws YarnException {
+    Container container = buildContainer(rmIdentifier, appParams,
+        idCounter, anyAsk, id, userName, rNode);
+    List<Allocation> allocList = allocations.get(anyAsk.getCapability());
+    if (allocList == null) {
+      allocList = new ArrayList<>();
+      allocations.put(anyAsk.getCapability(), allocList);
     }
-    LOG.info("Allocated {} opportunistic containers.", numAllocated);
+    allocList.add(new Allocation(container, location));
+    return container;
   }
 
   private Container buildContainer(long rmIdentifier,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index 1b1c5b9..246d450 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@@ -35,8 +34,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import static 
org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.Allocation;
 import static 
org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
 import static 
org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
+import static 
org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.EnrichedResourceRequest;
 
 /**
  * This encapsulates application specific information used by the
@@ -53,7 +54,8 @@ public class OpportunisticContainerContext {
       new ContainerIdGenerator();
 
   private volatile List<RemoteNode> nodeList = new LinkedList<>();
-  private final Map<String, RemoteNode> nodeMap = new LinkedHashMap<>();
+  private final LinkedHashMap<String, RemoteNode> nodeMap =
+      new LinkedHashMap<>();
 
   private final Set<String> blacklist = new HashSet<>();
 
@@ -61,7 +63,8 @@ public class OpportunisticContainerContext {
   // Resource Name (host/rack/any) and capability. This mapping is required
   // to match a received Container to an outstanding OPPORTUNISTIC
   // ResourceRequest (ask).
-  private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
+  private final TreeMap
+      <SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
       outstandingOpReqs = new TreeMap<>();
 
   public AllocationParams getAppParams() {
@@ -107,7 +110,7 @@ public class OpportunisticContainerContext {
     return blacklist;
   }
 
-  public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
+  public TreeMap<SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
       getOutstandingOpReqs() {
     return outstandingOpReqs;
   }
@@ -125,36 +128,32 @@ public class OpportunisticContainerContext {
     for (ResourceRequest request : resourceAsks) {
       SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
 
-      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
-      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
-        continue;
-      }
-
-      if (request.getNumContainers() == 0) {
-        continue;
-      }
-
-      Map<Resource, ResourceRequest> reqMap =
+      Map<Resource, EnrichedResourceRequest> reqMap =
           outstandingOpReqs.get(schedulerKey);
       if (reqMap == null) {
         reqMap = new HashMap<>();
         outstandingOpReqs.put(schedulerKey, reqMap);
       }
 
-      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
-      if (resourceRequest == null) {
-        resourceRequest = request;
-        reqMap.put(request.getCapability(), request);
+      EnrichedResourceRequest eReq = reqMap.get(request.getCapability());
+      if (eReq == null) {
+        eReq = new EnrichedResourceRequest(request);
+        reqMap.put(request.getCapability(), eReq);
+      }
+      // Set numContainers only for ANY request
+      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+        eReq.getRequest().setResourceName(ResourceRequest.ANY);
+        eReq.getRequest().setNumContainers(request.getNumContainers());
       } else {
-        resourceRequest.setNumContainers(
-            resourceRequest.getNumContainers() + request.getNumContainers());
+        eReq.addLocation(request.getResourceName(), 
request.getNumContainers());
       }
       if (ResourceRequest.isAnyLocation(request.getResourceName())) {
         LOG.info("# of outstandingOpReqs in ANY (at "
             + "priority = " + schedulerKey.getPriority()
             + ", allocationReqId = " + schedulerKey.getAllocationRequestId()
             + ", with capability = " + request.getCapability() + " ) : "
-            + resourceRequest.getNumContainers());
+            + ", with location = " + request.getResourceName() + " ) : "
+            + ", numContainers = " + eReq.getRequest().getNumContainers());
       }
     }
   }
@@ -163,25 +162,34 @@ public class OpportunisticContainerContext {
    * This method matches a returned list of Container Allocations to any
    * outstanding OPPORTUNISTIC ResourceRequest.
    * @param capability Capability
-   * @param allocatedContainers Allocated Containers
+   * @param allocations Allocations.
    */
   public void matchAllocationToOutstandingRequest(Resource capability,
-      List<Container> allocatedContainers) {
-    for (Container c : allocatedContainers) {
+      List<Allocation> allocations) {
+    for (OpportunisticContainerAllocator.Allocation allocation : allocations) {
       SchedulerRequestKey schedulerKey =
-          SchedulerRequestKey.extractFrom(c);
-      Map<Resource, ResourceRequest> asks =
+          SchedulerRequestKey.extractFrom(allocation.getContainer());
+      Map<Resource, EnrichedResourceRequest> asks =
           outstandingOpReqs.get(schedulerKey);
 
       if (asks == null) {
         continue;
       }
 
-      ResourceRequest rr = asks.get(capability);
-      if (rr != null) {
-        rr.setNumContainers(rr.getNumContainers() - 1);
-        if (rr.getNumContainers() == 0) {
+      EnrichedResourceRequest err = asks.get(capability);
+      if (err != null) {
+        int numContainers = err.getRequest().getNumContainers();
+        numContainers--;
+        err.getRequest().setNumContainers(numContainers);
+        if (numContainers == 0) {
           asks.remove(capability);
+          if (asks.size() == 0) {
+            outstandingOpReqs.remove(schedulerKey);
+          }
+        } else {
+          if (!ResourceRequest.isAnyLocation(allocation.getResourceName())) {
+            err.removeLocation(allocation.getResourceName());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index e889cde..8e59f14 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -30,6 +30,7 @@ import "yarn_service_protos.proto";
 message RemoteNodeProto {
   optional NodeIdProto node_id = 1;
   optional string http_address = 2;
+  optional string rack_name = 3;
 }
 
 message RegisterDistributedSchedulingAMResponseProto {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to