Repository: hadoop
Updated Branches:
refs/heads/branch-2 013951781 -> 2d8f9e3fd
MAPREDUCE-6302. Incorrect headroom can lead to a deadlock between map and
reduce allocations. (kasha)
(cherry picked from commit 4aa9b3e75ca86917125e56e1b438668273a5d87f)
Conflicts:
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2d8f9e3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2d8f9e3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2d8f9e3f
Branch: refs/heads/branch-2
Commit: 2d8f9e3fd9b68bde292adbfb39f988f6964617da
Parents: 0139517
Author: Karthik Kambatla <[email protected]>
Authored: Sun Oct 4 23:49:02 2015 -0700
Committer: Karthik Kambatla <[email protected]>
Committed: Fri Oct 9 07:54:47 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/rm/RMContainerAllocator.java | 159 +++++++++++--------
.../v2/app/rm/RMContainerRequestor.java | 3 +-
.../v2/app/rm/TestRMContainerAllocator.java | 86 +++++++++-
.../apache/hadoop/mapreduce/MRJobConfig.java | 16 +-
.../src/main/resources/mapred-default.xml | 31 ++--
.../resourcemanager/scheduler/Allocation.java | 8 +-
7 files changed, 217 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8f9e3f/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt
b/hadoop-mapreduce-project/CHANGES.txt
index e952be3..2157e17 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -328,6 +328,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6503. archive-logs tool should use HADOOP_PREFIX instead
of HADOOP_HOME (rkanter)
+ MAPREDUCE-6302. Preempt reducers after a configurable timeout irrespective
+ of headroom. (kasha)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8f9e3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 82a3eec..fadab46 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -158,11 +158,13 @@ public class RMContainerAllocator extends
RMContainerRequestor
private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0;
- /**
- * after this threshold, if the container request is not allocated, it is
- * considered delayed.
- */
- private long allocationDelayThresholdMs = 0;
+
+ // Mapper allocation timeout, after which a reducer is forcibly preempted
+ private long reducerUnconditionalPreemptionDelayMs;
+
+ // Duration to wait before preempting a reducer when there is NO room
+ private long reducerNoHeadroomPreemptionDelayMs = 0;
+
private float reduceSlowStart = 0;
private int maxRunningMaps = 0;
private int maxRunningReduces = 0;
@@ -198,7 +200,10 @@ public class RMContainerAllocator extends
RMContainerRequestor
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
- allocationDelayThresholdMs = conf.getInt(
+ reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt(
+ MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
+ MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC);
+ reducerNoHeadroomPreemptionDelayMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec ->
ms
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
@@ -462,59 +467,89 @@ public class RMContainerAllocator extends
RMContainerRequestor
if (reduceResourceRequest.equals(Resources.none())) {
return; // no reduces
}
- //check if reduces have taken over the whole cluster and there are
- //unassigned maps
- if (scheduledRequests.maps.size() > 0) {
- Resource resourceLimit = getResourceLimit();
- Resource availableResourceForMap =
- Resources.subtract(
- resourceLimit,
- Resources.multiply(reduceResourceRequest,
- assignedRequests.reduces.size()
- - assignedRequests.preemptionWaitingReduces.size()));
- // availableMemForMap must be sufficient to run at least 1 map
- if
(ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
- mapResourceRequest, getSchedulerResourceTypes()) <= 0) {
- // to make sure new containers are given to maps and not reduces
- // ramp down all scheduled reduces if any
- // (since reduces are scheduled at higher priority than maps)
- LOG.info("Ramping down all scheduled reduces:"
- + scheduledRequests.reduces.size());
- for (ContainerRequest req : scheduledRequests.reduces.values()) {
- pendingReduces.add(req);
- }
- scheduledRequests.reduces.clear();
-
- //do further checking to find the number of map requests that were
- //hanging around for a while
- int hangingMapRequests =
getNumOfHangingRequests(scheduledRequests.maps);
- if (hangingMapRequests > 0) {
- // preempt for making space for at least one map
- int preemptionReduceNumForOneMap =
-
ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
- reduceResourceRequest, getSchedulerResourceTypes());
- int preemptionReduceNumForPreemptionLimit =
- ResourceCalculatorUtils.divideAndCeilContainers(
- Resources.multiply(resourceLimit, maxReducePreemptionLimit),
- reduceResourceRequest, getSchedulerResourceTypes());
- int preemptionReduceNumForAllMaps =
- ResourceCalculatorUtils.divideAndCeilContainers(
- Resources.multiply(mapResourceRequest, hangingMapRequests),
- reduceResourceRequest, getSchedulerResourceTypes());
- int toPreempt =
- Math.min(Math.max(preemptionReduceNumForOneMap,
- preemptionReduceNumForPreemptionLimit),
- preemptionReduceNumForAllMaps);
- LOG.info("Going to preempt " + toPreempt
- + " due to lack of space for maps");
- assignedRequests.preemptReduce(toPreempt);
- }
+ if (assignedRequests.maps.size() > 0) {
+ // there are assigned mappers
+ return;
+ }
+
+ if (scheduledRequests.maps.size() <= 0) {
+ // there are no pending requests for mappers
+ return;
+ }
+ // At this point:
+ // we have pending mappers and all assigned resources are taken by reducers
+
+ if (reducerUnconditionalPreemptionDelayMs >= 0) {
+ // Unconditional preemption is enabled.
+ // If mappers are pending for longer than the configured threshold,
+ // preempt reducers irrespective of what the headroom is.
+ if (preemptReducersForHangingMapRequests(
+ reducerUnconditionalPreemptionDelayMs)) {
+ return;
}
}
+
+ // The pending mappers haven't been waiting for too long. Let us see if
+ // the headroom can fit a mapper.
+ Resource availableResourceForMap = getAvailableResources();
+ if
(ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
+ mapResourceRequest, getSchedulerResourceTypes()) > 0) {
+ // the available headroom is enough to run a mapper
+ return;
+ }
+
+ // Available headroom is not enough to run mapper. See if we should hold
+ // off before preempting reducers and preempt if okay.
+ preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
+ }
+
+ private boolean preemptReducersForHangingMapRequests(long pendingThreshold) {
+ int hangingMapRequests = getNumHangingRequests(
+ pendingThreshold, scheduledRequests.maps);
+ if (hangingMapRequests > 0) {
+ preemptReducer(hangingMapRequests);
+ return true;
+ }
+ return false;
+ }
+
+ private void clearAllPendingReduceRequests() {
+ LOG.info("Ramping down all scheduled reduces:"
+ + scheduledRequests.reduces.size());
+ for (ContainerRequest req : scheduledRequests.reduces.values()) {
+ pendingReduces.add(req);
+ }
+ scheduledRequests.reduces.clear();
}
-
- private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest>
requestMap) {
+
+ private void preemptReducer(int hangingMapRequests) {
+ clearAllPendingReduceRequests();
+
+ // preempt for making space for at least one map
+ int preemptionReduceNumForOneMap =
+ ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
+ reduceResourceRequest, getSchedulerResourceTypes());
+ int preemptionReduceNumForPreemptionLimit =
+ ResourceCalculatorUtils.divideAndCeilContainers(
+ Resources.multiply(getResourceLimit(), maxReducePreemptionLimit),
+ reduceResourceRequest, getSchedulerResourceTypes());
+ int preemptionReduceNumForAllMaps =
+ ResourceCalculatorUtils.divideAndCeilContainers(
+ Resources.multiply(mapResourceRequest, hangingMapRequests),
+ reduceResourceRequest, getSchedulerResourceTypes());
+ int toPreempt =
+ Math.min(Math.max(preemptionReduceNumForOneMap,
+ preemptionReduceNumForPreemptionLimit),
+ preemptionReduceNumForAllMaps);
+
+ LOG.info("Going to preempt " + toPreempt
+ + " due to lack of space for maps");
+ assignedRequests.preemptReduce(toPreempt);
+ }
+
+ private int getNumHangingRequests(long allocationDelayThresholdMs,
+ Map<TaskAttemptId, ContainerRequest> requestMap) {
if (allocationDelayThresholdMs <= 0)
return requestMap.size();
int hangingRequests = 0;
@@ -542,9 +577,6 @@ public class RMContainerAllocator extends
RMContainerRequestor
// get available resources for this job
Resource headRoom = getAvailableResources();
- if (headRoom == null) {
- headRoom = Resources.none();
- }
LOG.info("Recalculating schedule, headroom=" + headRoom);
@@ -671,9 +703,7 @@ public class RMContainerAllocator extends
RMContainerRequestor
applyConcurrentTaskLimits();
// will be null the first time
- Resource headRoom =
- getAvailableResources() == null ? Resources.none() :
- Resources.clone(getAvailableResources());
+ Resource headRoom = Resources.clone(getAvailableResources());
AllocateResponse response;
/*
* If contact with RM is lost, the AM will wait
MR_AM_TO_RM_WAIT_INTERVAL_MS
@@ -714,9 +744,7 @@ public class RMContainerAllocator extends
RMContainerRequestor
// continue to attempt to contact the RM.
throw e;
}
- Resource newHeadRoom =
- getAvailableResources() == null ? Resources.none()
- : getAvailableResources();
+ Resource newHeadRoom = getAvailableResources();
List<Container> newContainers = response.getAllocatedContainers();
// Setting NMTokens
if (response.getNMTokens() != null) {
@@ -876,9 +904,6 @@ public class RMContainerAllocator extends
RMContainerRequestor
@Private
public Resource getResourceLimit() {
Resource headRoom = getAvailableResources();
- if (headRoom == null) {
- headRoom = Resources.none();
- }
Resource assignedMapResource =
Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
Resource assignedReduceResource =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8f9e3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index d612126..a639d55 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -386,7 +387,7 @@ public abstract class RMContainerRequestor extends
RMCommunicator {
}
protected Resource getAvailableResources() {
- return availableResources;
+ return availableResources == null ? Resources.none() : availableResources;
}
protected void addContainerReq(ContainerRequest req) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8f9e3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 2a7d1c8..ab1e629 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -430,7 +430,7 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
- appAttemptId, mockJob);
+ appAttemptId, mockJob, new SystemClock());
// add resources to scheduler
dispatcher.await();
@@ -565,6 +565,69 @@ public class TestRMContainerAllocator {
}
@Test(timeout = 30000)
+ public void testUnconditionalPreemptReducers() throws Exception {
+ LOG.info("Running testForcePreemptReducers");
+
+ int forcePreemptThresholdSecs = 2;
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+ 2 * forcePreemptThresholdSecs);
+ conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
+ forcePreemptThresholdSecs);
+
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8));
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ ControlledClock clock = new ControlledClock(null);
+ clock.setTime(1);
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob, clock);
+ allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
+ allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
+ RMContainerAllocator.AssignedRequests assignedRequests =
+ allocator.getAssignedRequests();
+ RMContainerAllocator.ScheduledRequests scheduledRequests =
+ allocator.getScheduledRequests();
+ ContainerRequestEvent event1 =
+ createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+ scheduledRequests.maps.put(mock(TaskAttemptId.class),
+ new RMContainerRequestor.ContainerRequest(event1, null,
clock.getTime()));
+ assignedRequests.reduces.put(mock(TaskAttemptId.class),
+ mock(Container.class));
+
+ clock.setTime(clock.getTime() + 1);
+ allocator.preemptReducesIfNeeded();
+ Assert.assertEquals("The reducer is preeempted too soon", 0,
+ assignedRequests.preemptionWaitingReduces.size());
+
+ clock.setTime(clock.getTime() + 1000 * forcePreemptThresholdSecs);
+ allocator.preemptReducesIfNeeded();
+ Assert.assertEquals("The reducer is not preeempted", 1,
+ assignedRequests.preemptionWaitingReduces.size());
+ }
+
+ @Test(timeout = 30000)
public void testExcessReduceContainerAssign() throws Exception {
final Configuration conf = new Configuration();
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
@@ -589,7 +652,7 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
- appAttemptId, mockJob);
+ appAttemptId, mockJob, new SystemClock());
// request to allocate two reduce priority containers
final String[] locations = new String[] { host };
@@ -633,7 +696,8 @@ public class TestRMContainerAllocator {
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator =
- new MyContainerAllocator(null, conf, appAttemptId, mockJob) {
+ new MyContainerAllocator(null, conf, appAttemptId, mockJob,
+ new SystemClock()) {
@Override
protected void register() {
}
@@ -725,7 +789,7 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
- appAttemptId, mockJob);
+ appAttemptId, mockJob, new SystemClock());
// add resources to scheduler
MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
@@ -1628,6 +1692,7 @@ public class TestRMContainerAllocator {
List<ContainerId> lastRelease = null;
List<String> lastBlacklistAdditions;
List<String> lastBlacklistRemovals;
+ Resource forceResourceLimit = null;
// override this to copy the objects otherwise FifoScheduler updates the
// numContainers in same objects as kept by RMContainerAllocator
@@ -1650,9 +1715,18 @@ public class TestRMContainerAllocator {
lastRelease = release;
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
- return super.allocate(
+ Allocation allocation = super.allocate(
applicationAttemptId, askCopy, release, blacklistAdditions,
blacklistRemovals, increaseRequests, decreaseRequests);
+ if (forceResourceLimit != null) {
+ // Test wants to force the non-default resource limit
+ allocation.setResourceLimit(forceResourceLimit);
+ }
+ return allocation;
+ }
+
+ public void forceResourceLimit(Resource resource) {
+ this.forceResourceLimit = resource;
}
}
@@ -2672,7 +2746,7 @@ public class TestRMContainerAllocator {
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
- appAttemptId, mockJob) {
+ appAttemptId, mockJob, new SystemClock()) {
@Override
protected void register() {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8f9e3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 202f860..401aedb 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -703,10 +703,18 @@ public interface MRJobConfig {
10 * 1000l;
/**
- * The threshold in terms of seconds after which an unsatisfied mapper
request
- * triggers reducer preemption to free space. Default 0 implies that the
reduces
- * should be preempted immediately after allocation if there is currently no
- * room for newly allocated mappers.
+ * Duration to wait before forcibly preempting a reducer to allow
+ * allocating new mappers, even when YARN reports positive headroom.
+ */
+ public static final String MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC =
+ "mapreduce.job.reducer.unconditional-preempt.delay.sec";
+
+ public static final int
+ DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = 5 * 60;
+
+ /**
+ * Duration to wait before preempting a reducer, when there is no headroom
+ * to allocate new mappers.
*/
public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
"mapreduce.job.reducer.preempt.delay.sec";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8f9e3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index ae9bf8f..0d15ca8 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -98,17 +98,28 @@
</description>
</property>
-<property>
- <name>mapreduce.job.reducer.preempt.delay.sec</name>
- <value>0</value>
- <description>The threshold in terms of seconds after which an unsatisfied
mapper
- request triggers reducer preemption to free space. Default 0 implies that
the
- reduces should be preempted immediately after allocation if there is
currently no
- room for newly allocated mappers.
- </description>
-</property>
+ <property>
+ <name>mapreduce.job.reducer.preempt.delay.sec</name>
+ <value>0</value>
+ <description>The threshold (in seconds) after which an unsatisfied
+ mapper request triggers reducer preemption when there is no anticipated
+ headroom. If set to 0 or a negative value, the reducer is preempted as
+ soon as lack of headroom is detected. Default is 0.
+ </description>
+ </property>
-<property>
+ <property>
+ <name>mapreduce.job.reducer.unconditional-preempt.delay.sec</name>
+ <value>300</value>
+ <description>The threshold (in seconds) after which an unsatisfied
+ mapper request triggers a forced reducer preemption irrespective of the
+ anticipated headroom. By default, it is set to 5 mins. Setting it to 0
+ leads to immediate reducer preemption. Setting to -1 disables this
+ preemption altogether.
+ </description>
+ </property>
+
+ <property>
<name>mapreduce.job.max.split.locations</name>
<value>10</value>
<description>The max number of block locations to store for each split for
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8f9e3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
index 0654560..2edaf9a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
@@ -21,6 +21,7 @@ package
org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -30,13 +31,13 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
public class Allocation {
final List<Container> containers;
- final Resource resourceLimit;
final Set<ContainerId> strictContainers;
final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources;
final List<NMToken> nmTokens;
final List<Container> increasedContainers;
final List<Container> decreasedContainers;
+ private Resource resourceLimit;
public Allocation(List<Container> containers, Resource resourceLimit,
@@ -98,4 +99,9 @@ public class Allocation {
public List<Container> getDecreasedContainers() {
return decreasedContainers;
}
+
+ @VisibleForTesting
+ public void setResourceLimit(Resource resource) {
+ this.resourceLimit = resource;
+ }
}