This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 21ec686be3a YARN-11702: Fix Yarn over allocating containers (#6990) Contributed by Syed Shameerur Rahman. 21ec686be3a is described below commit 21ec686be3a741ae62925111c9d17253c183bcf6 Author: Syed Shameerur Rahman <rhma...@amazon.com> AuthorDate: Wed Sep 25 09:40:15 2024 +0530 YARN-11702: Fix Yarn over allocating containers (#6990) Contributed by Syed Shameerur Rahman. Reviewed-by: Akira Ajisaka <aajis...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 11 + .../src/main/resources/yarn-default.xml | 15 + .../scheduler/AbstractYarnScheduler.java | 191 +++++++++++ .../scheduler/SchedulerApplicationAttempt.java | 3 +- .../scheduler/capacity/CapacityScheduler.java | 4 + .../scheduler/fair/FairScheduler.java | 5 + .../scheduler/TestAbstractYarnScheduler.java | 355 +++++++++++++++++++++ .../scheduler/capacity/TestUtils.java | 14 + 8 files changed, 597 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 164647b2d0e..f02ad15e3db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1537,6 +1537,17 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY = 10; + /** + * The configuration key for enabling or disabling the auto-correction of container allocation. + */ + public static final String RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = RM_PREFIX + + "scheduler.autocorrect.container.allocation"; + + /** + * Default value: {@value}. + */ + public static final boolean DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = false; + /** Whether to enable log aggregation */ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f5081d645ca..9013627eb75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -182,6 +182,21 @@ <name>yarn.resourcemanager.principal</name> </property> + <property> + <description> + This configuration key enables or disables the auto-correction of container allocation in + YARN. Due to the asynchronous nature of container request and allocation, YARN may sometimes + over-allocate more containers than requested. The auto-correction feature addresses this by + automatically adjusting the number of requested containers based on those already allocated, + preventing extra containers from being allocated. + While the extra allocated containers will be released by the client within a few seconds, + this may not be a concern in normal circumstances. However, if the user is worried about + resource contention due to over-allocation, enabling this feature can help avoid such cases. + </description> + <name>yarn.resourcemanager.scheduler.autocorrect.container.allocation</name> + <value>false</value> + </property> + <property> <description>The address of the scheduler interface.</description> <name>yarn.resourcemanager.scheduler.address</name> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4ccc30ce39f..6cdb85b466a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +36,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,6 +157,7 @@ public abstract class AbstractYarnScheduler Thread updateThread; private final Object updateThreadMonitor = new Object(); private Timer releaseCache; + private boolean autoCorrectContainerAllocation; /* * All schedulers which are inheriting AbstractYarnScheduler should use @@ -212,6 +219,9 @@ public abstract class AbstractYarnScheduler conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); + autoCorrectContainerAllocation = + conf.getBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION, + YarnConfiguration.DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION); long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); @@ -624,6 +634,106 @@ public abstract class AbstractYarnScheduler } } + /** + * Autocorrect container resourceRequests by decrementing the number of newly allocated containers + * from the current container request. This also updates the newlyAllocatedContainers to be within + * the limits of the current container resourceRequests. + * ResourceRequests locality/resourceName is not considered while autocorrecting the container + * request, hence when there are two types of resourceRequest which is same except for the + * locality/resourceName, it is counted as same {@link ContainerObjectType} and the container + * ask and number of newly allocated container is decremented accordingly. + * For example when a client requests for 4 containers with locality/resourceName + * as "node1", AMRMClientaugments the resourceRequest into two + * where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1), + * if Yarn allocated 6 containers previously, it will release 2 containers as well as + * update the container ask to 0. + * + * If there is a client which directly calls Yarn (without AMRMClient) with + * two where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1) + * the autocorrection may not work as expected. The use case of such client is very rare. + * + * <p> + * This method is called from {@link AbstractYarnScheduler#allocate} method. It is package private + * to be used within the scheduler package only. + * @param resourceRequests List of resources to be allocated + * @param application ApplicationAttempt + */ + @VisibleForTesting + protected void autoCorrectContainerAllocation(List<ResourceRequest> resourceRequests, + SchedulerApplicationAttempt application) { + + // if there is no resourceRequests for containers or no newly allocated container from + // the previous request there is nothing to do. + if (!autoCorrectContainerAllocation || resourceRequests.isEmpty() || + application.newlyAllocatedContainers.isEmpty()) { + return; + } + + // iterate newlyAllocatedContainers and form a mapping of container type + // and number of its occurrence. + Map<ContainerObjectType, List<RMContainer>> allocatedContainerMap = new HashMap<>(); + for (RMContainer rmContainer : application.newlyAllocatedContainers) { + Container container = rmContainer.getContainer(); + ContainerObjectType containerObjectType = new ContainerObjectType( + container.getAllocationRequestId(), container.getPriority(), + container.getExecutionType(), container.getResource()); + allocatedContainerMap.computeIfAbsent(containerObjectType, + k -> new ArrayList<>()).add(rmContainer); + } + + Map<ContainerObjectType, Integer> extraContainerAllocatedMap = new HashMap<>(); + // iterate through resourceRequests and update the request by + // decrementing the already allocated containers. + for (ResourceRequest request : resourceRequests) { + ContainerObjectType containerObjectType = + new ContainerObjectType(request.getAllocationRequestId(), + request.getPriority(), request.getExecutionTypeRequest().getExecutionType(), + request.getCapability()); + int numContainerAllocated = allocatedContainerMap.getOrDefault(containerObjectType, + Collections.emptyList()).size(); + if (numContainerAllocated > 0) { + int numContainerAsk = request.getNumContainers(); + int updatedContainerRequest = numContainerAsk - numContainerAllocated; + if (updatedContainerRequest < 0) { + // add an entry to extra allocated map + extraContainerAllocatedMap.put(containerObjectType, Math.abs(updatedContainerRequest)); + LOG.debug("{} container of the resource type: {} will be released", + Math.abs(updatedContainerRequest), request); + // if newlyAllocatedContainer count is more than the current container + // resourceRequests, reset it to 0. + updatedContainerRequest = 0; + } + + // update the request + LOG.debug("Updating container resourceRequests from {} to {} for the resource type: {}", + numContainerAsk, updatedContainerRequest, request); + request.setNumContainers(updatedContainerRequest); + } + } + + // Iterate over the entries in extraContainerAllocatedMap + for (Map.Entry<ContainerObjectType, Integer> entry : extraContainerAllocatedMap.entrySet()) { + ContainerObjectType containerObjectType = entry.getKey(); + int extraContainers = entry.getValue(); + + // Get the list of allocated containers for the current ContainerObjectType + List<RMContainer> allocatedContainers = allocatedContainerMap.get(containerObjectType); + if (allocatedContainers != null) { + for (RMContainer rmContainer : allocatedContainers) { + if (extraContainers > 0) { + // Change the state of the container from ALLOCATED to EXPIRED since it is not required. + LOG.debug("Removing extra container:{}", rmContainer.getContainer()); + completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( + rmContainer.getContainerId(), SchedulerUtils.EXPIRED_CONTAINER), + RMContainerEventType.EXPIRE); + application.newlyAllocatedContainers.remove(rmContainer); + extraContainers--; + } + } + } + } + } + private RMContainer recoverAndCreateContainer(NMContainerStatus status, RMNode node, String queueName) { Container container = @@ -658,6 +768,14 @@ public abstract class AbstractYarnScheduler return; } + // when auto correct container allocation is enabled, there can be a case when extra containers + // go to expired state from allocated state. When such scenario happens do not re-attempt the + // container request since this is expected. + if (autoCorrectContainerAllocation && + RMContainerState.EXPIRED.equals(rmContainer.getState())) { + return; + } + // Add resource request back to Scheduler ApplicationAttempt. // We lookup the application-attempt here again using @@ -1678,4 +1796,77 @@ public abstract class AbstractYarnScheduler } return apps; } + + /** + * ContainerObjectType is a container object with the following properties. + * Namely allocationId, priority, executionType and resourceType. + */ + protected class ContainerObjectType extends Object { + private final long allocationId; + private final Priority priority; + private final ExecutionType executionType; + private final Resource resource; + + public ContainerObjectType(long allocationId, Priority priority, + ExecutionType executionType, Resource resource) { + this.allocationId = allocationId; + this.priority = priority; + this.executionType = executionType; + this.resource = resource; + } + + public long getAllocationId() { + return allocationId; + } + + public Priority getPriority() { + return priority; + } + + public ExecutionType getExecutionType() { + return executionType; + } + + public Resource getResource() { + return resource; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(allocationId) + .append(priority) + .append(executionType) + .append(resource) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != this.getClass()) { + return false; + } + + ContainerObjectType other = (ContainerObjectType) obj; + return new EqualsBuilder() + .append(allocationId, other.getAllocationId()) + .append(priority, other.getPriority()) + .append(executionType, other.getExecutionType()) + .append(resource, other.getResource()) + .isEquals(); + } + + @Override + public String toString() { + return "{ContainerObjectType: " + + ", Priority: " + getPriority() + + ", Allocation Id: " + getAllocationId() + + ", Execution Type: " + getExecutionType() + + ", Resource: " + getResource() + + "}"; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 5121453e395..420cfdf25af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -862,7 +862,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { updateContainerErrors.add(error); } - protected synchronized void addToNewlyAllocatedContainers( + @VisibleForTesting + public synchronized void addToNewlyAllocatedContainers( SchedulerNode node, RMContainer rmContainer) { ContainerId matchedContainerId = getUpdateContext().matchContainerToOutstandingIncreaseReq( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d4eff239fff..9f20d6ba685 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1363,6 +1363,10 @@ public class CapacityScheduler extends application.showRequests(); } + // update the current container ask by considering the already allocated + // containers from previous allocation request and return updatedNewlyAllocatedContainers. + autoCorrectContainerAllocation(ask, application); + // Update application requests if (application.updateResourceRequests(ask) || application .updateSchedulingRequests(schedulingRequests)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f72351e26c7..ab81f607075 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -943,6 +943,11 @@ public class FairScheduler extends } application.showRequests(); + // update the current container ask by considering the already allocated containers + // from previous allocation request as well as populate the updatedNewlyAllocatedContainers + // list according the to the current ask. + autoCorrectContainerAllocation(ask, application); + // Update application requests application.updateResourceRequests(ask); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 67b3dee2b80..595eb852ee8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.createResourceRequest; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -65,17 +66,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -83,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -268,6 +277,352 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { Assert.assertEquals(0, scheduler.getNumClusterNodes()); } + /** + * Test for testing autocorrect container allocation feature. + */ + @Test + public void testAutoCorrectContainerAllocation() { + Configuration conf = new Configuration(getConf()); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION, true); + conf.setBoolean("yarn.scheduler.capacity.root.auto-create-child-queue.enabled", + true); + MockRM rm = new MockRM(conf); + rm.start(); + AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler(); + + String host = "127.0.0.1"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * 1024), 1, host); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + //add app begin + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId1, 1); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt1.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt1.getSubmissionContext()).thenReturn(submissionContext); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + + rm.getRMContext().getRMApps().put(appId1, app1); + + ApplicationPlacementContext apc = new ApplicationPlacementContext("user", + "root"); + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, "user", "user", apc); + scheduler.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + scheduler.handle(addAttemptEvent1); + + SchedulerApplicationAttempt application = scheduler.getApplicationAttempt(appAttemptId); + SchedulerNode schedulerNode = scheduler.getSchedulerNode(node.getNodeID()); + Priority priority = Priority.newInstance(0); + NodeId nodeId = NodeId.newInstance("foo.bar.org", 1234); + + // test different container ask and newly allocated container. + testContainerAskAndNewlyAllocatedContainerZero(scheduler, application, priority); + testContainerAskAndNewlyAllocatedContainerOne(scheduler, application, schedulerNode, + nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId()); + testContainerAskZeroAndNewlyAllocatedContainerOne(scheduler, application, schedulerNode, + nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId()); + testContainerAskFourAndNewlyAllocatedContainerEight(scheduler, application, schedulerNode, + nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId()); + testContainerAskFourAndNewlyAllocatedContainerSix(scheduler, application, schedulerNode, + nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId()); + } + + /** + * Creates a mock instance of {@link RMContainer} with the specified parameters. + * + * @param containerId The ID of the container + * @param nodeId The NodeId of the node where the container is allocated + * @param appAttemptId The ApplicationAttemptId of the application attempt + * @param allocationId The allocation ID of the container + * @param memory The amount of memory (in MB) requested for the container + * @param priority The priority of the container request + * @param executionType The execution type of the container request + * @return A mock instance of RMContainer with the specified parameters + */ + private RMContainer createMockRMContainer(int containerId, NodeId nodeId, + ApplicationAttemptId appAttemptId, long allocationId, int memory, + Priority priority, ExecutionType executionType) { + // Create a mock instance of Container + Container container = mock(Container.class); + + // Mock the Container instance with the specified parameters + when(container.getResource()).thenReturn(Resource.newInstance(memory, 1)); + when(container.getPriority()).thenReturn(priority); + when(container.getId()).thenReturn(ContainerId.newContainerId(appAttemptId, containerId)); + when(container.getNodeId()).thenReturn(nodeId); + when(container.getAllocationRequestId()).thenReturn(allocationId); + when(container.getExecutionType()).thenReturn(executionType); + when(container.getContainerToken()).thenReturn(Token.newInstance(new byte[0], "kind", + new byte[0], "service")); + + // Create a mock instance of RMContainerImpl + RMContainer rmContainer = mock(RMContainerImpl.class); + + // Set up the behavior of the mock RMContainer + when(rmContainer.getContainer()).thenReturn(container); + when(rmContainer.getContainerId()).thenReturn( + ContainerId.newContainerId(appAttemptId, containerId)); + + return rmContainer; + } + + /** + * Tests the behavior when the container ask is 1 and there are no newly allocated containers. + * + * @param scheduler The AbstractYarnScheduler instance to test. + * @param application The SchedulerApplicationAttempt instance representing the application. + * @param priority The priority of the resource request. + */ + private void testContainerAskAndNewlyAllocatedContainerZero(AbstractYarnScheduler scheduler, + SchedulerApplicationAttempt application, Priority priority) { + // Create a resource request with 1 container, 1024 MB memory, and GUARANTEED execution type + ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1, + priority, 0, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY); + + // Create a list with the resource request + List<ResourceRequest> containerAsk = new ArrayList<>(); + containerAsk.add(resourceRequest); + + // Call the autoCorrectContainerAllocation method + scheduler.autoCorrectContainerAllocation(containerAsk, application); + + // Assert that the container ask remains unchanged (1 container) + assertEquals(1, containerAsk.get(0).getNumContainers()); + + // Assert that there are no newly allocated containers + assertEquals(0, application.pullNewlyAllocatedContainers().size()); + } + + /** + * Tests the behavior when the container ask is 1 and there is one newly allocated container. + * + * @param scheduler The AbstractYarnScheduler instance to test + * @param application The SchedulerApplicationAttempt instance representing the application + * @param schedulerNode The SchedulerNode instance representing the node + * @param nodeId The NodeId of the node + * @param priority The priority of the resource request + * @param appAttemptId The ApplicationAttemptId of the application attempt + */ + private void testContainerAskAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler, + SchedulerApplicationAttempt application, + SchedulerNode schedulerNode, NodeId nodeId, + Priority priority, ApplicationAttemptId appAttemptId) { + // Create a resource request with 1 container, 1024 MB memory, and GUARANTEED execution type + ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1, + priority, 0L, ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + ResourceRequest.ANY); + List<ResourceRequest> containerAsk = new ArrayList<>(); + containerAsk.add(resourceRequest); + + // Create an RMContainer with the specified parameters + RMContainer rmContainer = createMockRMContainer(1, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + + // Add the RMContainer to the newly allocated containers of the application + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer); + + // Call the autoCorrectContainerAllocation method + scheduler.autoCorrectContainerAllocation(containerAsk, application); + + // Assert that the container ask is updated to 0 + assertEquals(0, containerAsk.get(0).getNumContainers()); + + // Assert that there is one newly allocated container + assertEquals(1, application.pullNewlyAllocatedContainers().size()); + } + + /** + * Tests the behavior when the container ask is 0 and there is one newly allocated container. + * + * @param scheduler The AbstractYarnScheduler instance to test + * @param application The SchedulerApplicationAttempt instance representing the application + * @param schedulerNode The SchedulerNode instance representing the node + * @param nodeId The NodeId of the node + * @param priority The priority of the resource request + * @param appAttemptId The ApplicationAttemptId of the application attempt + */ + private void testContainerAskZeroAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler, + SchedulerApplicationAttempt application, SchedulerNode schedulerNode, NodeId nodeId, + Priority priority, ApplicationAttemptId appAttemptId) { + // Create a resource request with 0 containers, 1024 MB memory, and GUARANTEED execution type + ResourceRequest resourceRequest = createResourceRequest(1024, 1, + 0, priority, 0L, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY); + List<ResourceRequest> containerAsk = new ArrayList<>(); + containerAsk.add(resourceRequest); + + // Create an RMContainer with the specified parameters + RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + + // Add the RMContainer to the newly allocated containers of the application + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1); + + // Call the autoCorrectContainerAllocation method + scheduler.autoCorrectContainerAllocation(containerAsk, application); + + // Assert that the container ask remains 0 + assertEquals(0, resourceRequest.getNumContainers()); + + // Assert that there are no newly allocated containers + assertEquals(0, application.pullNewlyAllocatedContainers().size()); + } + + /** + * Tests the behavior when the container ask consists of four unique resource requests + * and there are eight newly allocated containers (two containers for each resource request type). + * + * @param scheduler The AbstractYarnScheduler instance to test + * @param application The SchedulerApplicationAttempt instance representing the application + * @param schedulerNode The SchedulerNode instance representing the node + * @param nodeId The NodeId of the node + * @param priority The priority of the resource requests + * @param appAttemptId The ApplicationAttemptId of the application attempt + */ + private void testContainerAskFourAndNewlyAllocatedContainerEight(AbstractYarnScheduler scheduler, + SchedulerApplicationAttempt application, SchedulerNode schedulerNode, + NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) { + // Create four unique resource requests + ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 1, + priority, 0L, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY); + ResourceRequest resourceRequest2 = createResourceRequest(2048, 1, 1, + priority, 0L, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY); + ResourceRequest resourceRequest3 = createResourceRequest(1024, 1, 1, + priority, 1L, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY); + ResourceRequest resourceRequest4 = createResourceRequest(1024, 1, 1, + priority, 0L, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), ResourceRequest.ANY); + + // Add the resource requests to a list + List<ResourceRequest> ask4 = new ArrayList<>(); + ask4.add(resourceRequest1); + ask4.add(resourceRequest2); + ask4.add(resourceRequest3); + ask4.add(resourceRequest4); + + // Create eight RMContainers (two for each resource request type) + RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId, + 0L, 2048, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId, + 0L, 2048, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId, + 1L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId, + 1L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer7 = createMockRMContainer(7, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.OPPORTUNISTIC); + RMContainer rmContainer8 = createMockRMContainer(8, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.OPPORTUNISTIC); + + // Add the RMContainers to the newly allocated containers of the application + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer7); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer8); + + // Call the autoCorrectContainerAllocation method + scheduler.autoCorrectContainerAllocation(ask4, application); + + // Assert that all resource requests have 0 containers + for (ResourceRequest rr : ask4) { + assertEquals(0, rr.getNumContainers()); + } + + // Assert that there are four newly allocated containers + assertEquals(4, application.pullNewlyAllocatedContainers().size()); + } + + /** + * Tests the behavior when the container ask consists of two resource requests. + * i.e one for any host and one for a specific host , + * each requesting four containers, and there are six newly allocated containers. + * + * @param scheduler The AbstractYarnScheduler instance to test + * @param application The SchedulerApplicationAttempt instance representing the application + * @param schedulerNode The SchedulerNode instance representing the node + * @param nodeId The NodeId of the node + * @param priority The priority of the resource requests + * @param appAttemptId The ApplicationAttemptId of the application attempt + */ + private void testContainerAskFourAndNewlyAllocatedContainerSix(AbstractYarnScheduler scheduler, + SchedulerApplicationAttempt application, SchedulerNode schedulerNode, + NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) { + // Create a resource request for any host, requesting 4 containers + ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 4, + priority, 0L, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY); + + // Create a resource request for a specific host, requesting 4 containers + ResourceRequest resourceRequest2 = createResourceRequest(1024, 1, 4, + priority, 0L, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), nodeId.getHost()); + + // Add the resource requests to a list + List<ResourceRequest> containerAsk = new ArrayList<>(); + containerAsk.add(resourceRequest1); + containerAsk.add(resourceRequest2); + + // Create six RMContainers with the specified parameters + RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId, + 0L, 1024, priority, ExecutionType.GUARANTEED); + + // Add the RMContainers to the newly allocated containers of the application + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5); + application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6); + + // Call the autoCorrectContainerAllocation method + scheduler.autoCorrectContainerAllocation(containerAsk, application); + + // Assert that all resource requests have 0 containers + for (ResourceRequest resourceRequest : containerAsk) { + assertEquals(0, resourceRequest.getNumContainers()); + } + + // Assert that there are four newly allocated containers + assertEquals(4, application.pullNewlyAllocatedContainers().size()); + } + @Test public void testUpdateMaxAllocationUsesTotal() throws IOException { final int configuredMaxVCores = 20; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 9f928482a75..06600e8710b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -195,6 +195,20 @@ public class TestUtils { request.setNodeLabelExpression(labelExpression); return request; } + + public static ResourceRequest createResourceRequest(int memory, int vcores, int numContainers, + Priority priority, long allocationId, ExecutionTypeRequest type, String resourceName) { + ResourceRequest request = + recordFactory.newRecordInstance(ResourceRequest.class); + Resource capability = Resources.createResource(memory, vcores); + request.setNumContainers(numContainers); + request.setCapability(capability); + request.setPriority(priority); + request.setAllocationRequestId(allocationId); + request.setExecutionTypeRequest(type); + request.setResourceName(resourceName); + return request; + } public static ResourceRequest createResourceRequest( String resourceName, int memory, int numContainers, boolean relaxLocality, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org