Repository: hadoop Updated Branches: refs/heads/branch-2.6 783c99d29 -> 74aca3488
MAPREDUCE-6302. Preempt reducers after a configurable timeout irrespective of headroom. (kasha via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74aca348 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74aca348 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74aca348 Branch: refs/heads/branch-2.6 Commit: 74aca348820778f3ee88e3948a9984cfd3f6f46b Parents: 783c99d Author: Wangda Tan <[email protected]> Authored: Fri Apr 8 14:34:15 2016 -0700 Committer: Wangda Tan <[email protected]> Committed: Fri Apr 8 14:34:15 2016 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 159 +++++++++++-------- .../v2/app/rm/RMContainerRequestor.java | 3 +- .../v2/app/rm/TestRMContainerAllocator.java | 80 +++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 16 +- .../src/main/resources/mapred-default.xml | 31 ++-- .../resourcemanager/scheduler/Allocation.java | 7 +- 7 files changed, 213 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/74aca348/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7260470..e590269 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -27,6 +27,9 @@ Release 2.6.5 - UNRELEASED MAPREDUCE-6656. [NNBench] OP_DELETE operation isn't working after MAPREDUCE-6363. (J.Andreina via aajisaka) + MAPREDUCE-6302. Preempt reducers after a configurable timeout + irrespective of headroom. (kasha via wangda) + Release 2.6.4 - 2016-02-11 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/74aca348/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 46f3f42..6b62af4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -158,11 +158,13 @@ public class RMContainerAllocator extends RMContainerRequestor private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; - /** - * after this threshold, if the container request is not allocated, it is - * considered delayed. - */ - private long allocationDelayThresholdMs = 0; + + // Mapper allocation timeout, after which a reducer is forcibly preempted + private long reducerUnconditionalPreemptionDelayMs; + + // Duration to wait before preempting a reducer when there is NO room + private long reducerNoHeadroomPreemptionDelayMs = 0; + private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; @@ -192,7 +194,10 @@ public class RMContainerAllocator extends RMContainerRequestor maxReducePreemptionLimit = conf.getFloat( MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); - allocationDelayThresholdMs = conf.getInt( + reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt( + MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, + MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC); + reducerNoHeadroomPreemptionDelayMs = conf.getInt( MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms RackResolver.init(conf); @@ -448,59 +453,89 @@ public class RMContainerAllocator extends RMContainerRequestor if (reduceResourceRequest.equals(Resources.none())) { return; // no reduces } - //check if reduces have taken over the whole cluster and there are - //unassigned maps - if (scheduledRequests.maps.size() > 0) { - Resource resourceLimit = getResourceLimit(); - Resource availableResourceForMap = - Resources.subtract( - resourceLimit, - Resources.multiply(reduceResourceRequest, - assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size())); - // availableMemForMap must be sufficient to run at least 1 map - if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, - mapResourceRequest, getSchedulerResourceTypes()) <= 0) { - // to make sure new containers are given to maps and not reduces - // ramp down all scheduled reduces if any - // (since reduces are scheduled at higher priority than maps) - LOG.info("Ramping down all scheduled reduces:" - + scheduledRequests.reduces.size()); - for (ContainerRequest req : scheduledRequests.reduces.values()) { - pendingReduces.add(req); - } - scheduledRequests.reduces.clear(); - - //do further checking to find the number of map requests that were - //hanging around for a while - int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); - if (hangingMapRequests > 0) { - // preempt for making space for at least one map - int preemptionReduceNumForOneMap = - ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest, - reduceResourceRequest, getSchedulerResourceTypes()); - int preemptionReduceNumForPreemptionLimit = - ResourceCalculatorUtils.divideAndCeilContainers( - Resources.multiply(resourceLimit, maxReducePreemptionLimit), - reduceResourceRequest, getSchedulerResourceTypes()); - int preemptionReduceNumForAllMaps = - ResourceCalculatorUtils.divideAndCeilContainers( - Resources.multiply(mapResourceRequest, hangingMapRequests), - reduceResourceRequest, getSchedulerResourceTypes()); - int toPreempt = - Math.min(Math.max(preemptionReduceNumForOneMap, - preemptionReduceNumForPreemptionLimit), - preemptionReduceNumForAllMaps); - LOG.info("Going to preempt " + toPreempt - + " due to lack of space for maps"); - assignedRequests.preemptReduce(toPreempt); - } + if (assignedRequests.maps.size() > 0) { + // there are assigned mappers + return; + } + + if (scheduledRequests.maps.size() <= 0) { + // there are no pending requests for mappers + return; + } + // At this point: + // we have pending mappers and all assigned resources are taken by reducers + + if (reducerUnconditionalPreemptionDelayMs >= 0) { + // Unconditional preemption is enabled. + // If mappers are pending for longer than the configured threshold, + // preempt reducers irrespective of what the headroom is. + if (preemptReducersForHangingMapRequests( + reducerUnconditionalPreemptionDelayMs)) { + return; } } + + // The pending mappers haven't been waiting for too long. Let us see if + // the headroom can fit a mapper. + Resource availableResourceForMap = getAvailableResources(); + if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, + mapResourceRequest, getSchedulerResourceTypes()) > 0) { + // the available headroom is enough to run a mapper + return; + } + + // Available headroom is not enough to run mapper. See if we should hold + // off before preempting reducers and preempt if okay. + preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs); + } + + private boolean preemptReducersForHangingMapRequests(long pendingThreshold) { + int hangingMapRequests = getNumHangingRequests( + pendingThreshold, scheduledRequests.maps); + if (hangingMapRequests > 0) { + preemptReducer(hangingMapRequests); + return true; + } + return false; + } + + private void clearAllPendingReduceRequests() { + LOG.info("Ramping down all scheduled reduces:" + + scheduledRequests.reduces.size()); + for (ContainerRequest req : scheduledRequests.reduces.values()) { + pendingReduces.add(req); + } + scheduledRequests.reduces.clear(); } - - private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) { + + private void preemptReducer(int hangingMapRequests) { + clearAllPendingReduceRequests(); + + // preempt for making space for at least one map + int preemptionReduceNumForOneMap = + ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest, + reduceResourceRequest, getSchedulerResourceTypes()); + int preemptionReduceNumForPreemptionLimit = + ResourceCalculatorUtils.divideAndCeilContainers( + Resources.multiply(getResourceLimit(), maxReducePreemptionLimit), + reduceResourceRequest, getSchedulerResourceTypes()); + int preemptionReduceNumForAllMaps = + ResourceCalculatorUtils.divideAndCeilContainers( + Resources.multiply(mapResourceRequest, hangingMapRequests), + reduceResourceRequest, getSchedulerResourceTypes()); + int toPreempt = + Math.min(Math.max(preemptionReduceNumForOneMap, + preemptionReduceNumForPreemptionLimit), + preemptionReduceNumForAllMaps); + + LOG.info("Going to preempt " + toPreempt + + " due to lack of space for maps"); + assignedRequests.preemptReduce(toPreempt); + } + + private int getNumHangingRequests(long allocationDelayThresholdMs, + Map<TaskAttemptId, ContainerRequest> requestMap) { if (allocationDelayThresholdMs <= 0) return requestMap.size(); int hangingRequests = 0; @@ -528,9 +563,6 @@ public class RMContainerAllocator extends RMContainerRequestor // get available resources for this job Resource headRoom = getAvailableResources(); - if (headRoom == null) { - headRoom = Resources.none(); - } LOG.info("Recalculating schedule, headroom=" + headRoom); @@ -655,9 +687,7 @@ public class RMContainerAllocator extends RMContainerRequestor @SuppressWarnings("unchecked") private List<Container> getResources() throws Exception { // will be null the first time - Resource headRoom = - getAvailableResources() == null ? Resources.none() : - Resources.clone(getAvailableResources()); + Resource headRoom = Resources.clone(getAvailableResources()); AllocateResponse response; /* * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS @@ -698,9 +728,7 @@ public class RMContainerAllocator extends RMContainerRequestor // continue to attempt to contact the RM. throw e; } - Resource newHeadRoom = - getAvailableResources() == null ? Resources.none() - : getAvailableResources(); + Resource newHeadRoom = getAvailableResources(); List<Container> newContainers = response.getAllocatedContainers(); // Setting NMTokens if (response.getNMTokens() != null) { @@ -823,9 +851,6 @@ public class RMContainerAllocator extends RMContainerRequestor @Private public Resource getResourceLimit() { Resource headRoom = getAvailableResources(); - if (headRoom == null) { - headRoom = Resources.none(); - } Resource assignedMapResource = Resources.multiply(mapResourceRequest, assignedRequests.maps.size()); Resource assignedReduceResource = http://git-wip-us.apache.org/repos/asf/hadoop/blob/74aca348/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index bb9ad02..9ce245f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -333,7 +334,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { } protected Resource getAvailableResources() { - return availableResources; + return availableResources == null ? Resources.none() : availableResources; } protected void addContainerReq(ContainerRequest req) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/74aca348/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 025e555..ce101c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -412,7 +412,7 @@ public class TestRMContainerAllocator { MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, - appAttemptId, mockJob); + appAttemptId, mockJob, new SystemClock()); // add resources to scheduler dispatcher.await(); @@ -546,6 +546,69 @@ public class TestRMContainerAllocator { assignedRequests.preemptionWaitingReduces.size()); } + @Test(timeout = 30000) + public void testUnconditionalPreemptReducers() throws Exception { + LOG.info("Running testForcePreemptReducers"); + + int forcePreemptThresholdSecs = 2; + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + 2 * forcePreemptThresholdSecs); + conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, + forcePreemptThresholdSecs); + + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8)); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + ControlledClock clock = new ControlledClock(null); + clock.setTime(1); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob, clock); + allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); + allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); + RMContainerAllocator.AssignedRequests assignedRequests = + allocator.getAssignedRequests(); + RMContainerAllocator.ScheduledRequests scheduledRequests = + allocator.getScheduledRequests(); + ContainerRequestEvent event1 = + createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + scheduledRequests.maps.put(mock(TaskAttemptId.class), + new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + assignedRequests.reduces.put(mock(TaskAttemptId.class), + mock(Container.class)); + + clock.setTime(clock.getTime() + 1); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is preeempted too soon", 0, + assignedRequests.preemptionWaitingReduces.size()); + + clock.setTime(clock.getTime() + 1000 * forcePreemptThresholdSecs); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is not preeempted", 1, + assignedRequests.preemptionWaitingReduces.size()); + } + @Test public void testMapReduceScheduling() throws Exception { @@ -576,7 +639,7 @@ public class TestRMContainerAllocator { MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, - appAttemptId, mockJob); + appAttemptId, mockJob, new SystemClock()); // add resources to scheduler MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); @@ -1333,6 +1396,7 @@ public class TestRMContainerAllocator { when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -1468,6 +1532,7 @@ public class TestRMContainerAllocator { List<ContainerId> lastRelease = null; List<String> lastBlacklistAdditions; List<String> lastBlacklistRemovals; + Resource forceResourceLimit = null; // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @@ -1487,9 +1552,18 @@ public class TestRMContainerAllocator { lastRelease = release; lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; - return super.allocate( + Allocation allocation = super.allocate( applicationAttemptId, askCopy, release, blacklistAdditions, blacklistRemovals); + if (forceResourceLimit != null) { + // Test wants to force the non-default resource limit + allocation.setResourceLimit(forceResourceLimit); + } + return allocation; + } + + public void forceResourceLimit(Resource resource) { + this.forceResourceLimit = resource; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74aca348/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 1614bc6..67832f5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -598,10 +598,18 @@ public interface MRJobConfig { 50; /** - * The threshold in terms of seconds after which an unsatisfied mapper request - * triggers reducer preemption to free space. Default 0 implies that the reduces - * should be preempted immediately after allocation if there is currently no - * room for newly allocated mappers. + * Duration to wait before forcibly preempting a reducer to allow + * allocating new mappers, even when YARN reports positive headroom. + */ + public static final String MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = + "mapreduce.job.reducer.unconditional-preempt.delay.sec"; + + public static final int + DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = 5 * 60; + + /** + * Duration to wait before preempting a reducer, when there is no headroom + * to allocate new mappers. */ public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = "mapreduce.job.reducer.preempt.delay.sec"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/74aca348/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6eb73ac..47b59c4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -285,17 +285,28 @@ </property> -<property> - <name>mapreduce.job.reducer.preempt.delay.sec</name> - <value>0</value> - <description>The threshold in terms of seconds after which an unsatisfied mapper - request triggers reducer preemption to free space. Default 0 implies that the - reduces should be preempted immediately after allocation if there is currently no - room for newly allocated mappers. - </description> -</property> + <property> + <name>mapreduce.job.reducer.preempt.delay.sec</name> + <value>0</value> + <description>The threshold (in seconds) after which an unsatisfied + mapper request triggers reducer preemption when there is no anticipated + headroom. If set to 0 or a negative value, the reducer is preempted as + soon as lack of headroom is detected. Default is 0. + </description> + </property> -<property> + <property> + <name>mapreduce.job.reducer.unconditional-preempt.delay.sec</name> + <value>300</value> + <description>The threshold (in seconds) after which an unsatisfied + mapper request triggers a forced reducer preemption irrespective of the + anticipated headroom. By default, it is set to 5 mins. Setting it to 0 + leads to immediate reducer preemption. Setting to -1 disables this + preemption altogether. + </description> + </property> + + <property> <name>mapreduce.job.max.split.locations</name> <value>10</value> <description>The max number of block locations to store for each split for http://git-wip-us.apache.org/repos/asf/hadoop/blob/74aca348/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index 0cd336c..b81e585 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NMToken; @@ -30,11 +31,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; public class Allocation { final List<Container> containers; - final Resource resourceLimit; final Set<ContainerId> strictContainers; final Set<ContainerId> fungibleContainers; final List<ResourceRequest> fungibleResources; final List<NMToken> nmTokens; + private Resource resourceLimit; public Allocation(List<Container> containers, Resource resourceLimit, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, @@ -78,4 +79,8 @@ public class Allocation { return nmTokens; } + @VisibleForTesting + public void setResourceLimit(Resource resource) { + this.resourceLimit = resource; + } }
