This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21ec686be3a YARN-11702: Fix Yarn over allocating containers (#6990) 
Contributed by Syed Shameerur Rahman.
21ec686be3a is described below

commit 21ec686be3a741ae62925111c9d17253c183bcf6
Author: Syed Shameerur Rahman <rhma...@amazon.com>
AuthorDate: Wed Sep 25 09:40:15 2024 +0530

    YARN-11702: Fix Yarn over allocating containers (#6990) Contributed by Syed 
Shameerur Rahman.
    
    Reviewed-by: Akira Ajisaka <aajis...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  11 +
 .../src/main/resources/yarn-default.xml            |  15 +
 .../scheduler/AbstractYarnScheduler.java           | 191 +++++++++++
 .../scheduler/SchedulerApplicationAttempt.java     |   3 +-
 .../scheduler/capacity/CapacityScheduler.java      |   4 +
 .../scheduler/fair/FairScheduler.java              |   5 +
 .../scheduler/TestAbstractYarnScheduler.java       | 355 +++++++++++++++++++++
 .../scheduler/capacity/TestUtils.java              |  14 +
 8 files changed, 597 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 164647b2d0e..f02ad15e3db 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1537,6 +1537,17 @@ public class YarnConfiguration extends Configuration {
   public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY 
=
       10;
 
+  /**
+   * The configuration key for enabling or disabling the auto-correction of 
container allocation.
+   */
+  public static final String RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = 
RM_PREFIX
+      + "scheduler.autocorrect.container.allocation";
+
+  /**
+   * Default value: {@value}.
+   */
+  public static final boolean 
DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = false;
+
   /** Whether to enable log aggregation */
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f5081d645ca..9013627eb75 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -182,6 +182,21 @@
     <name>yarn.resourcemanager.principal</name>
   </property>
 
+  <property>
+    <description>
+      This configuration key enables or disables the auto-correction of 
container allocation in
+      YARN. Due to the asynchronous nature of container request and 
allocation, YARN may sometimes
+      over-allocate more containers than requested. The auto-correction 
feature addresses this by
+      automatically adjusting the number of requested containers based on 
those already allocated,
+      preventing extra containers from being allocated.
+      While the extra allocated containers will be released by the client 
within a few seconds,
+      this may not be a concern in normal circumstances. However, if the user 
is worried about
+      resource contention due to over-allocation, enabling this feature can 
help avoid such cases.
+    </description>
+    
<name>yarn.resourcemanager.scheduler.autocorrect.container.allocation</name>
+    <value>false</value>
+  </property>
+
   <property>
     <description>The address of the scheduler interface.</description>
     <name>yarn.resourcemanager.scheduler.address</name>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 4ccc30ce39f..6cdb85b466a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,6 +36,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,6 +157,7 @@ public abstract class AbstractYarnScheduler
   Thread updateThread;
   private final Object updateThreadMonitor = new Object();
   private Timer releaseCache;
+  private boolean autoCorrectContainerAllocation;
 
   /*
    * All schedulers which are inheriting AbstractYarnScheduler should use
@@ -212,6 +219,9 @@ public abstract class AbstractYarnScheduler
         conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
     skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
+    autoCorrectContainerAllocation =
+        
conf.getBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION,
+            
YarnConfiguration.DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION);
     long configuredMaximumAllocationWaitTime =
         
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
           
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
@@ -624,6 +634,106 @@ public abstract class AbstractYarnScheduler
     }
   }
 
+  /**
+   * Autocorrect container resourceRequests by decrementing the number of 
newly allocated containers
+   * from the current container request. This also updates the 
newlyAllocatedContainers to be within
+   * the limits of the current container resourceRequests.
+   * ResourceRequests locality/resourceName is not considered while 
autocorrecting the container
+   * request, hence when there are two types of resourceRequest which is same 
except for the
+   * locality/resourceName, it is counted as same {@link ContainerObjectType} 
and the container
+   * ask and number of newly allocated container is decremented accordingly.
+   * For example when a client requests for 4 containers with 
locality/resourceName
+   * as "node1", AMRMClientaugments the resourceRequest into two
+   * where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1),
+   * if Yarn allocated 6 containers previously, it will release 2 containers 
as well as
+   * update the container ask to 0.
+   *
+   * If there is a client which directly calls Yarn (without AMRMClient) with
+   * two where R1(numContainer=4,locality=*) and 
R2(numContainer=4,locality=node1)
+   * the autocorrection may not work as expected. The use case of such client 
is very rare.
+   *
+   * <p>
+   * This method is called from {@link AbstractYarnScheduler#allocate} method. 
It is package private
+   * to be used within the scheduler package only.
+   * @param resourceRequests List of resources to be allocated
+   * @param application ApplicationAttempt
+   */
+  @VisibleForTesting
+  protected void autoCorrectContainerAllocation(List<ResourceRequest> 
resourceRequests,
+      SchedulerApplicationAttempt application) {
+
+    // if there is no resourceRequests for containers or no newly allocated 
container from
+    // the previous request there is nothing to do.
+    if (!autoCorrectContainerAllocation || resourceRequests.isEmpty() ||
+        application.newlyAllocatedContainers.isEmpty()) {
+      return;
+    }
+
+    // iterate newlyAllocatedContainers and form a mapping of container type
+    // and number of its occurrence.
+    Map<ContainerObjectType, List<RMContainer>> allocatedContainerMap = new 
HashMap<>();
+    for (RMContainer rmContainer : application.newlyAllocatedContainers) {
+      Container container = rmContainer.getContainer();
+      ContainerObjectType containerObjectType = new ContainerObjectType(
+          container.getAllocationRequestId(), container.getPriority(),
+          container.getExecutionType(), container.getResource());
+      allocatedContainerMap.computeIfAbsent(containerObjectType,
+          k -> new ArrayList<>()).add(rmContainer);
+    }
+
+    Map<ContainerObjectType, Integer> extraContainerAllocatedMap = new 
HashMap<>();
+    // iterate through resourceRequests and update the request by
+    // decrementing the already allocated containers.
+    for (ResourceRequest request : resourceRequests) {
+      ContainerObjectType containerObjectType =
+          new ContainerObjectType(request.getAllocationRequestId(),
+              request.getPriority(), 
request.getExecutionTypeRequest().getExecutionType(),
+              request.getCapability());
+      int numContainerAllocated = 
allocatedContainerMap.getOrDefault(containerObjectType,
+          Collections.emptyList()).size();
+      if (numContainerAllocated > 0) {
+        int numContainerAsk = request.getNumContainers();
+        int updatedContainerRequest = numContainerAsk - numContainerAllocated;
+        if (updatedContainerRequest < 0) {
+          // add an entry to extra allocated map
+          extraContainerAllocatedMap.put(containerObjectType, 
Math.abs(updatedContainerRequest));
+          LOG.debug("{} container of the resource type: {} will be released",
+              Math.abs(updatedContainerRequest), request);
+          // if newlyAllocatedContainer count is more than the current 
container
+          // resourceRequests, reset it to 0.
+          updatedContainerRequest = 0;
+        }
+
+        // update the request
+        LOG.debug("Updating container resourceRequests from {} to {} for the 
resource type: {}",
+            numContainerAsk, updatedContainerRequest, request);
+        request.setNumContainers(updatedContainerRequest);
+      }
+    }
+
+    // Iterate over the entries in extraContainerAllocatedMap
+    for (Map.Entry<ContainerObjectType, Integer> entry : 
extraContainerAllocatedMap.entrySet()) {
+      ContainerObjectType containerObjectType = entry.getKey();
+      int extraContainers = entry.getValue();
+
+      // Get the list of allocated containers for the current 
ContainerObjectType
+      List<RMContainer> allocatedContainers = 
allocatedContainerMap.get(containerObjectType);
+      if (allocatedContainers != null) {
+        for (RMContainer rmContainer : allocatedContainers) {
+          if (extraContainers > 0) {
+            // Change the state of the container from ALLOCATED to EXPIRED 
since it is not required.
+            LOG.debug("Removing extra container:{}", 
rmContainer.getContainer());
+            completedContainer(rmContainer, 
SchedulerUtils.createAbnormalContainerStatus(
+                rmContainer.getContainerId(), 
SchedulerUtils.EXPIRED_CONTAINER),
+                RMContainerEventType.EXPIRE);
+            application.newlyAllocatedContainers.remove(rmContainer);
+            extraContainers--;
+          }
+        }
+      }
+    }
+  }
+
   private RMContainer recoverAndCreateContainer(NMContainerStatus status,
       RMNode node, String queueName) {
     Container container =
@@ -658,6 +768,14 @@ public abstract class AbstractYarnScheduler
       return;
     }
 
+    // when auto correct container allocation is enabled, there can be a case 
when extra containers
+    // go to expired state from allocated state. When such scenario happens do 
not re-attempt the
+    // container request since this is expected.
+    if (autoCorrectContainerAllocation &&
+        RMContainerState.EXPIRED.equals(rmContainer.getState())) {
+      return;
+    }
+
     // Add resource request back to Scheduler ApplicationAttempt.
 
     // We lookup the application-attempt here again using
@@ -1678,4 +1796,77 @@ public abstract class AbstractYarnScheduler
     }
     return apps;
   }
+
+  /**
+   * ContainerObjectType is a container object with the following properties.
+   * Namely allocationId, priority, executionType and resourceType.
+   */
+  protected class ContainerObjectType extends Object {
+    private final long allocationId;
+    private final Priority priority;
+    private final ExecutionType executionType;
+    private final Resource resource;
+
+    public ContainerObjectType(long allocationId, Priority priority,
+        ExecutionType executionType, Resource resource) {
+      this.allocationId = allocationId;
+      this.priority = priority;
+      this.executionType = executionType;
+      this.resource = resource;
+    }
+
+    public long getAllocationId() {
+      return allocationId;
+    }
+
+    public Priority getPriority() {
+      return priority;
+    }
+
+    public ExecutionType getExecutionType() {
+      return executionType;
+    }
+
+    public Resource getResource() {
+      return resource;
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder(17, 37)
+          .append(allocationId)
+          .append(priority)
+          .append(executionType)
+          .append(resource)
+          .toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) {
+        return false;
+      }
+      if (obj.getClass() != this.getClass()) {
+        return false;
+      }
+
+      ContainerObjectType other = (ContainerObjectType) obj;
+      return new EqualsBuilder()
+          .append(allocationId, other.getAllocationId())
+          .append(priority, other.getPriority())
+          .append(executionType, other.getExecutionType())
+          .append(resource, other.getResource())
+          .isEquals();
+    }
+
+    @Override
+    public String toString() {
+      return "{ContainerObjectType: "
+          + ", Priority: " + getPriority()
+          + ", Allocation Id: " + getAllocationId()
+          + ", Execution Type: " + getExecutionType()
+          + ", Resource: " + getResource()
+          + "}";
+    }
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 5121453e395..420cfdf25af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -862,7 +862,8 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     updateContainerErrors.add(error);
   }
 
-  protected synchronized void addToNewlyAllocatedContainers(
+  @VisibleForTesting
+  public synchronized void addToNewlyAllocatedContainers(
       SchedulerNode node, RMContainer rmContainer) {
     ContainerId matchedContainerId =
         getUpdateContext().matchContainerToOutstandingIncreaseReq(
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d4eff239fff..9f20d6ba685 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1363,6 +1363,10 @@ public class CapacityScheduler extends
           application.showRequests();
         }
 
+        // update the current container ask by considering the already 
allocated
+        // containers from previous allocation request and return 
updatedNewlyAllocatedContainers.
+        autoCorrectContainerAllocation(ask, application);
+
         // Update application requests
         if (application.updateResourceRequests(ask) || application
             .updateSchedulingRequests(schedulingRequests)) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index f72351e26c7..ab81f607075 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -943,6 +943,11 @@ public class FairScheduler extends
         }
         application.showRequests();
 
+        // update the current container ask by considering the already 
allocated containers
+        // from previous allocation request as well as populate the 
updatedNewlyAllocatedContainers
+        // list according the to the current ask.
+        autoCorrectContainerAllocation(ask, application);
+
         // Update application requests
         application.updateResourceRequests(ask);
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index 67b3dee2b80..595eb852ee8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import static 
org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.createResourceRequest;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -65,17 +66,24 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -83,6 +91,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import org.junit.Assert;
@@ -268,6 +277,352 @@ public class TestAbstractYarnScheduler extends 
ParameterizedSchedulerTestBase {
     Assert.assertEquals(0, scheduler.getNumClusterNodes());
   }
 
+  /**
+   * Test for testing autocorrect container allocation feature.
+   */
+  @Test
+  public void testAutoCorrectContainerAllocation() {
+    Configuration conf = new Configuration(getConf());
+    
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION,
 true);
+    
conf.setBoolean("yarn.scheduler.capacity.root.auto-create-child-queue.enabled",
+        true);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    AbstractYarnScheduler scheduler = (AbstractYarnScheduler) 
rm.getResourceScheduler();
+
+    String host = "127.0.0.1";
+    RMNode node =
+        MockNodes.newNodeInfo(0, MockNodes.newResource(4 * 1024), 1, host);
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+    //add app begin
+    ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId1, 1);
+
+    RMAppAttemptMetrics attemptMetric1 =
+        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+    RMAppImpl app1 = mock(RMAppImpl.class);
+    when(app1.getApplicationId()).thenReturn(appId1);
+    RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt1.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext = mock(
+        ApplicationSubmissionContext.class);
+    when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt1.getAppAttemptId()).thenReturn(appAttemptId);
+    when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
+    when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
+
+    rm.getRMContext().getRMApps().put(appId1, app1);
+
+    ApplicationPlacementContext apc = new ApplicationPlacementContext("user",
+        "root");
+    SchedulerEvent addAppEvent1 =
+        new AppAddedSchedulerEvent(appId1, "user", "user", apc);
+    scheduler.handle(addAppEvent1);
+    SchedulerEvent addAttemptEvent1 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+    scheduler.handle(addAttemptEvent1);
+
+    SchedulerApplicationAttempt application = 
scheduler.getApplicationAttempt(appAttemptId);
+    SchedulerNode schedulerNode = scheduler.getSchedulerNode(node.getNodeID());
+    Priority priority = Priority.newInstance(0);
+    NodeId nodeId = NodeId.newInstance("foo.bar.org", 1234);
+
+    // test different container ask and newly allocated container.
+    testContainerAskAndNewlyAllocatedContainerZero(scheduler, application, 
priority);
+    testContainerAskAndNewlyAllocatedContainerOne(scheduler, application, 
schedulerNode,
+        nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
+    testContainerAskZeroAndNewlyAllocatedContainerOne(scheduler, application, 
schedulerNode,
+        nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
+    testContainerAskFourAndNewlyAllocatedContainerEight(scheduler, 
application, schedulerNode,
+        nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
+    testContainerAskFourAndNewlyAllocatedContainerSix(scheduler, application, 
schedulerNode,
+        nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
+  }
+
+  /**
+   * Creates a mock instance of {@link RMContainer} with the specified 
parameters.
+   *
+   * @param containerId     The ID of the container
+   * @param nodeId          The NodeId of the node where the container is 
allocated
+   * @param appAttemptId    The ApplicationAttemptId of the application attempt
+   * @param allocationId    The allocation ID of the container
+   * @param memory          The amount of memory (in MB) requested for the 
container
+   * @param priority        The priority of the container request
+   * @param executionType   The execution type of the container request
+   * @return A mock instance of RMContainer with the specified parameters
+   */
+  private RMContainer createMockRMContainer(int containerId, NodeId nodeId,
+      ApplicationAttemptId appAttemptId, long allocationId, int memory,
+      Priority priority, ExecutionType executionType) {
+    // Create a mock instance of Container
+    Container container = mock(Container.class);
+
+    // Mock the Container instance with the specified parameters
+    when(container.getResource()).thenReturn(Resource.newInstance(memory, 1));
+    when(container.getPriority()).thenReturn(priority);
+    
when(container.getId()).thenReturn(ContainerId.newContainerId(appAttemptId, 
containerId));
+    when(container.getNodeId()).thenReturn(nodeId);
+    when(container.getAllocationRequestId()).thenReturn(allocationId);
+    when(container.getExecutionType()).thenReturn(executionType);
+    when(container.getContainerToken()).thenReturn(Token.newInstance(new 
byte[0], "kind",
+        new byte[0], "service"));
+
+    // Create a mock instance of RMContainerImpl
+    RMContainer rmContainer = mock(RMContainerImpl.class);
+
+    // Set up the behavior of the mock RMContainer
+    when(rmContainer.getContainer()).thenReturn(container);
+    when(rmContainer.getContainerId()).thenReturn(
+        ContainerId.newContainerId(appAttemptId, containerId));
+
+    return rmContainer;
+  }
+
+  /**
+   * Tests the behavior when the container ask is 1 and there are no newly 
allocated containers.
+   *
+   * @param scheduler         The AbstractYarnScheduler instance to test.
+   * @param application       The SchedulerApplicationAttempt instance 
representing the application.
+   * @param priority          The priority of the resource request.
+   */
+  private void 
testContainerAskAndNewlyAllocatedContainerZero(AbstractYarnScheduler scheduler,
+      SchedulerApplicationAttempt application, Priority priority) {
+    // Create a resource request with 1 container, 1024 MB memory, and 
GUARANTEED execution type
+    ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1,
+        priority, 0,
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), 
ResourceRequest.ANY);
+
+    // Create a list with the resource request
+    List<ResourceRequest> containerAsk = new ArrayList<>();
+    containerAsk.add(resourceRequest);
+
+    // Call the autoCorrectContainerAllocation method
+    scheduler.autoCorrectContainerAllocation(containerAsk, application);
+
+    // Assert that the container ask remains unchanged (1 container)
+    assertEquals(1, containerAsk.get(0).getNumContainers());
+
+    // Assert that there are no newly allocated containers
+    assertEquals(0, application.pullNewlyAllocatedContainers().size());
+  }
+
+  /**
+   * Tests the behavior when the container ask is 1 and there is one newly 
allocated container.
+   *
+   * @param scheduler      The AbstractYarnScheduler instance to test
+   * @param application    The SchedulerApplicationAttempt instance 
representing the application
+   * @param schedulerNode  The SchedulerNode instance representing the node
+   * @param nodeId         The NodeId of the node
+   * @param priority       The priority of the resource request
+   * @param appAttemptId   The ApplicationAttemptId of the application attempt
+   */
+  private void 
testContainerAskAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler,
+      SchedulerApplicationAttempt application,
+      SchedulerNode schedulerNode, NodeId nodeId,
+      Priority priority, ApplicationAttemptId appAttemptId) {
+    // Create a resource request with 1 container, 1024 MB memory, and 
GUARANTEED execution type
+    ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1,
+        priority, 0L, 
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
+        ResourceRequest.ANY);
+    List<ResourceRequest> containerAsk = new ArrayList<>();
+    containerAsk.add(resourceRequest);
+
+    // Create an RMContainer with the specified parameters
+    RMContainer rmContainer = createMockRMContainer(1, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+
+    // Add the RMContainer to the newly allocated containers of the application
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer);
+
+    // Call the autoCorrectContainerAllocation method
+    scheduler.autoCorrectContainerAllocation(containerAsk, application);
+
+    // Assert that the container ask is updated to 0
+    assertEquals(0, containerAsk.get(0).getNumContainers());
+
+    // Assert that there is one newly allocated container
+    assertEquals(1, application.pullNewlyAllocatedContainers().size());
+  }
+
+  /**
+   * Tests the behavior when the container ask is 0 and there is one newly 
allocated container.
+   *
+   * @param scheduler      The AbstractYarnScheduler instance to test
+   * @param application    The SchedulerApplicationAttempt instance 
representing the application
+   * @param schedulerNode  The SchedulerNode instance representing the node
+   * @param nodeId         The NodeId of the node
+   * @param priority       The priority of the resource request
+   * @param appAttemptId   The ApplicationAttemptId of the application attempt
+   */
+  private void 
testContainerAskZeroAndNewlyAllocatedContainerOne(AbstractYarnScheduler 
scheduler,
+      SchedulerApplicationAttempt application, SchedulerNode schedulerNode, 
NodeId nodeId,
+      Priority priority, ApplicationAttemptId appAttemptId) {
+    // Create a resource request with 0 containers, 1024 MB memory, and 
GUARANTEED execution type
+    ResourceRequest resourceRequest = createResourceRequest(1024, 1,
+        0, priority, 0L,
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), 
ResourceRequest.ANY);
+    List<ResourceRequest> containerAsk = new ArrayList<>();
+    containerAsk.add(resourceRequest);
+
+    // Create an RMContainer with the specified parameters
+    RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+
+    // Add the RMContainer to the newly allocated containers of the application
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
+
+    // Call the autoCorrectContainerAllocation method
+    scheduler.autoCorrectContainerAllocation(containerAsk, application);
+
+    // Assert that the container ask remains 0
+    assertEquals(0, resourceRequest.getNumContainers());
+
+    // Assert that there are no newly allocated containers
+    assertEquals(0, application.pullNewlyAllocatedContainers().size());
+  }
+
+  /**
+   * Tests the behavior when the container ask consists of four unique 
resource requests
+   * and there are eight newly allocated containers (two containers for each 
resource request type).
+   *
+   * @param scheduler      The AbstractYarnScheduler instance to test
+   * @param application    The SchedulerApplicationAttempt instance 
representing the application
+   * @param schedulerNode  The SchedulerNode instance representing the node
+   * @param nodeId         The NodeId of the node
+   * @param priority       The priority of the resource requests
+   * @param appAttemptId   The ApplicationAttemptId of the application attempt
+   */
+  private void 
testContainerAskFourAndNewlyAllocatedContainerEight(AbstractYarnScheduler 
scheduler,
+      SchedulerApplicationAttempt application, SchedulerNode schedulerNode,
+      NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) {
+    // Create four unique resource requests
+    ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 1,
+        priority, 0L,
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), 
ResourceRequest.ANY);
+    ResourceRequest resourceRequest2 = createResourceRequest(2048, 1, 1,
+        priority, 0L,
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), 
ResourceRequest.ANY);
+    ResourceRequest resourceRequest3 = createResourceRequest(1024, 1, 1,
+        priority, 1L,
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), 
ResourceRequest.ANY);
+    ResourceRequest resourceRequest4 = createResourceRequest(1024, 1, 1,
+        priority, 0L,
+        ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), 
ResourceRequest.ANY);
+
+    // Add the resource requests to a list
+    List<ResourceRequest> ask4 = new ArrayList<>();
+    ask4.add(resourceRequest1);
+    ask4.add(resourceRequest2);
+    ask4.add(resourceRequest3);
+    ask4.add(resourceRequest4);
+
+    // Create eight RMContainers (two for each resource request type)
+    RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId,
+        0L, 2048, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId,
+        0L, 2048, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId,
+        1L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId,
+        1L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer7 = createMockRMContainer(7, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.OPPORTUNISTIC);
+    RMContainer rmContainer8 = createMockRMContainer(8, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.OPPORTUNISTIC);
+
+    // Add the RMContainers to the newly allocated containers of the 
application
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer7);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer8);
+
+    // Call the autoCorrectContainerAllocation method
+    scheduler.autoCorrectContainerAllocation(ask4, application);
+
+    // Assert that all resource requests have 0 containers
+    for (ResourceRequest rr : ask4) {
+      assertEquals(0, rr.getNumContainers());
+    }
+
+    // Assert that there are four newly allocated containers
+    assertEquals(4, application.pullNewlyAllocatedContainers().size());
+  }
+
+  /**
+   * Tests the behavior when the container ask consists of two resource 
requests.
+   * i.e one for any host and one for a specific host ,
+   * each requesting four containers, and there are six newly allocated 
containers.
+   *
+   * @param scheduler      The AbstractYarnScheduler instance to test
+   * @param application    The SchedulerApplicationAttempt instance 
representing the application
+   * @param schedulerNode  The SchedulerNode instance representing the node
+   * @param nodeId         The NodeId of the node
+   * @param priority       The priority of the resource requests
+   * @param appAttemptId   The ApplicationAttemptId of the application attempt
+   */
+  private void 
testContainerAskFourAndNewlyAllocatedContainerSix(AbstractYarnScheduler 
scheduler,
+      SchedulerApplicationAttempt application, SchedulerNode schedulerNode,
+      NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) {
+    // Create a resource request for any host, requesting 4 containers
+    ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 4,
+        priority, 0L,
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), 
ResourceRequest.ANY);
+
+    // Create a resource request for a specific host, requesting 4 containers
+    ResourceRequest resourceRequest2 = createResourceRequest(1024, 1, 4,
+        priority, 0L,
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), 
nodeId.getHost());
+
+    // Add the resource requests to a list
+    List<ResourceRequest> containerAsk = new ArrayList<>();
+    containerAsk.add(resourceRequest1);
+    containerAsk.add(resourceRequest2);
+
+    // Create six RMContainers with the specified parameters
+    RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+    RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId,
+        0L, 1024, priority, ExecutionType.GUARANTEED);
+
+    // Add the RMContainers to the newly allocated containers of the 
application
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5);
+    application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6);
+
+    // Call the autoCorrectContainerAllocation method
+    scheduler.autoCorrectContainerAllocation(containerAsk, application);
+
+    // Assert that all resource requests have 0 containers
+    for (ResourceRequest resourceRequest : containerAsk) {
+      assertEquals(0, resourceRequest.getNumContainers());
+    }
+
+    // Assert that there are four newly allocated containers
+    assertEquals(4, application.pullNewlyAllocatedContainers().size());
+  }
+
   @Test
   public void testUpdateMaxAllocationUsesTotal() throws IOException {
     final int configuredMaxVCores = 20;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 9f928482a75..06600e8710b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -195,6 +195,20 @@ public class TestUtils {
     request.setNodeLabelExpression(labelExpression);
     return request;
   }
+
+  public static ResourceRequest createResourceRequest(int memory, int vcores, 
int numContainers,
+      Priority priority, long allocationId, ExecutionTypeRequest type, String 
resourceName) {
+    ResourceRequest request =
+        recordFactory.newRecordInstance(ResourceRequest.class);
+    Resource capability = Resources.createResource(memory, vcores);
+    request.setNumContainers(numContainers);
+    request.setCapability(capability);
+    request.setPriority(priority);
+    request.setAllocationRequestId(allocationId);
+    request.setExecutionTypeRequest(type);
+    request.setResourceName(resourceName);
+    return request;
+  }
   
   public static ResourceRequest createResourceRequest(
       String resourceName, int memory, int numContainers, boolean 
relaxLocality,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to