Author: omalley
Date: Fri Mar 4 03:31:50 2011
New Revision: 1077031
URL: http://svn.apache.org/viewvc?rev=1077031&view=rev
Log:
commit ebf9b7a20a18ec8b7b6c86a26becdb7020726cee
Author: Hemanth Yamijala <[email protected]>
Date: Wed Oct 21 23:06:57 2009 +0530
MAPREDUCE:1105 from
https://issues.apache.org/jira/secure/attachment/12422823/MAPREDUCE-1105-yahoo-version20-5.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1105. Remove max limit configuration in capacity scheduler in
+ favor of max capacity percentage thus allowing the limit to go over
+ queue capacity. (Rahul Kumar Singh via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
Modified:
hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
Fri Mar 4 03:31:50 2011
@@ -16,6 +16,25 @@
</property>
<property>
+ <name>mapred.capacity-scheduler.queue.default.maximum-capacity</name>
+ <value>-1</value>
+ <description>
+ maximum-capacity defines a limit beyond which a queue cannot use the
capacity of the cluster.
+ This provides a means to limit how much excess capacity a queue can
use. By default, there is no limit.
+ The maximum-capacity of a queue can only be greater than or equal to
its minimum capacity.
+ Default value of -1 implies a queue can use complete capacity of the
cluster.
+
+ This property could be to curtail certain jobs which are long running
in nature from occupying more than a
+ certain percentage of the cluster, which in the absence of
pre-emption, could lead to capacity guarantees of
+ other queues being affected.
+
+ One important thing to note is that maximum-capacity is a percentage ,
so based on the cluster's capacity
+ the max capacity would change. So if large no of nodes or racks get
added to the cluster , max Capacity in
+ absolute terms would increase accordingly.
+ </description>
+ </property>
+
+ <property>
<name>mapred.capacity-scheduler.queue.default.supports-priority</name>
<value>false</value>
<description>If true, priorities of jobs will be taken into
@@ -46,44 +65,6 @@
</description>
</property>
-<property>
- <name>mapred.capacity-scheduler.queue.default.max.map.slots</name>
- <value>-1</value>
- <description>
- This value is the maximum map slots that can be used in a
- queue at any point of time. So for example assuming above config value
- is 100 , not more than 100 tasks would be in the queue at any point of
- time, assuming each task takes one slot.
-
- Default value of -1 would disable this capping feature
-
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity will be
- used by the other queues. If queue capacity is less than the above
- limit , then the limit would be the queue capacity - as in the current
- implementation
- </description>
-</property>
-
-<property>
- <name>mapred.capacity-scheduler.queue.default.max.reduce.slots</name>
- <value>-1</value>
- <description>
- This value is the maximum reduce slots that can be used in a
- queue at any point of time. So for example assuming above config value
- is 100 , not more than 100 reduce tasks would be in the queue at any
point
- of time, assuming each task takes one slot.
-
- Default value of -1 would disable this capping feature
-
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity will be
- used by the other queues. If queue capacity is less than the above
- limit , then the limit would be the queue capacity - as in the current
- implementation
- </description>
-</property>
-
<!-- The default configuration settings for the capacity task scheduler -->
<!-- The default values would be applied to all the queues which don't have
-->
<!-- the appropriate property for the particular queue -->
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
Fri Mar 4 03:31:50 2011
@@ -75,17 +75,14 @@ class CapacitySchedulerConf {
static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
"mapred.capacity-scheduler.task.limit.maxpmem";
- /**
- * Configuration that provides the maximum cap for the map task in a queue
- * at any given point of time.
- */
- static final String MAX_MAP_CAP_PROPERTY = "max.map.slots";
+
+ private static final String CAPACITY_PROPERTY = "capacity";
/**
- * Configuration that provides the maximum cap for the reduce task in a
queue
- * at any given point of time.
+ * A maximum capacity defines a limit beyond which a queue
+ * cannot expand .
*/
- static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots";
+ static final String MAX_CAPACITY_PROPERTY ="maximum-capacity";
/**
* The constant which defines the default initialization thread
@@ -104,9 +101,9 @@ class CapacitySchedulerConf {
private int defaultMaxJobsPerUsersToInitialize;
/**
- * Create a new ResourceManagerConf.
+ * Create a new Capacity scheduler conf.
* This method reads from the default configuration file mentioned in
- * {@link RM_CONF_FILE}, that must be present in the classpath of the
+ * {@link SCHEDULER_CONF_FILE}, that must be present in the classpath of the
* application.
*/
public CapacitySchedulerConf() {
@@ -116,7 +113,7 @@ class CapacitySchedulerConf {
}
/**
- * Create a new ResourceManagerConf reading the specified configuration
+ * Create a new Cacpacity scheduler conf reading the specified configuration
* file.
*
* @param configFile {@link Path} to the configuration file containing
@@ -163,16 +160,15 @@ class CapacitySchedulerConf {
//In case of both capacity and default capacity not configured.
//Last check is if the configuration is specified and is marked as
//negative we throw exception
- String raw = rmConf.getRaw(toFullPropertyName(queue,
- "capacity"));
+ String raw = rmConf.getRaw(toFullPropertyName(queue, CAPACITY_PROPERTY));
if(raw == null) {
return -1;
}
- float result = rmConf.getFloat(toFullPropertyName(queue,
- "capacity"),
- -1);
+ float result = rmConf.getFloat(
+ toFullPropertyName(queue, CAPACITY_PROPERTY), -1);
if (result < 0.0 || result > 100.0) {
- throw new IllegalArgumentException("Illegal capacity for queue " + queue
+
+ throw new IllegalArgumentException(
+ "Illegal capacity for queue " + queue +
" of " + result);
}
return result;
@@ -185,7 +181,53 @@ class CapacitySchedulerConf {
* @param capacity percent of the cluster for the queue.
*/
public void setCapacity(String queue,float capacity) {
- rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
+ rmConf.setFloat(toFullPropertyName(queue, CAPACITY_PROPERTY),capacity);
+ }
+
+ /**
+ * Return the maximum percentage of the cluster capacity that can be used by
+ * the given queue.
+ * This percentage defines a limit beyond which a
+ * queue cannot use the capacity of cluster.
+ * This provides a means to limit how much excess capacity a
+ * queue can use. By default, there is no limit.
+ *
+ * The maximum-capacity of a queue can only be
+ * greater than or equal to its minimum capacity.
+ *
+ * @param queue name of the queue.
+ * @return maximum-capacity for the given queue
+ */
+ public float getMaxCapacity(String queue) {
+ float result = rmConf.getFloat(
+ toFullPropertyName(queue, MAX_CAPACITY_PROPERTY), -1);
+
+ //if result is 0 or less than 0 set it to -1
+ result = (result <= 0) ? -1 : result;
+
+ if (result > 100.0) {
+ throw new IllegalArgumentException(
+ "Illegal " + MAX_CAPACITY_PROPERTY +
+ " for queue " + queue + " of " + result);
+ }
+
+ if ((result != -1) && (result < getCapacity(queue))) {
+ throw new IllegalArgumentException(
+ MAX_CAPACITY_PROPERTY + " " + result +
+ " for a queue should be greater than or equal to capacity ");
+ }
+ return result;
+ }
+
+ /**
+ * Sets the maxCapacity of the given queue.
+ *
+ * @param queue name of the queue
+ * @param maxCapacity percent of the cluster for the queue.
+ */
+ public void setMaxCapacity(String queue,float maxCapacity) {
+ rmConf.setFloat(
+ toFullPropertyName(queue, MAX_CAPACITY_PROPERTY), maxCapacity);
}
/**
@@ -369,40 +411,4 @@ class CapacitySchedulerConf {
rmConf.setInt(
"mapred.capacity-scheduler.init-worker-threads", poolSize);
}
-
- /**
- * get the max map slots cap
- * @param queue
- * @return
- */
- public int getMaxMapCap(String queue) {
- return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
- }
-
- /**
- * Used for testing
- * @param queue
- * @param val
- */
- public void setMaxMapCap(String queue,int val) {
- rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
- }
-
- /**
- * get the max reduce slots cap
- * @param queue
- * @return
- */
- public int getMaxReduceCap(String queue) {
- return
rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);
- }
-
- /**
- * Used for testing
- * @param queue
- * @param val
- */
- public void setMaxReduceCap(String queue,int val) {
- rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
- }
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Mar 4 03:31:50 2011
@@ -31,8 +31,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.mapred.TaskTrackerStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -77,10 +75,7 @@ class CapacityTaskScheduler extends Task
**********************************************************************/
private static class TaskSchedulingInfo {
-
- private static final String LIMIT_NORMALIZED_CAPACITY_STRING
- = "(Capacity is restricted to max limit of %d slots.\n" +
- "Remaining %d slots will be used by other queues.)\n";
+
/**
* the actual capacity, which depends on how many slots are available
* in the cluster at any given time.
@@ -91,14 +86,9 @@ class CapacityTaskScheduler extends Task
// number of slots occupied by running tasks
int numSlotsOccupied = 0;
- /**
- * max task limit
- * This value is the maximum slots that can be used in a
- * queue at any point of time. So for example assuming above config value
- * is 100 , not more than 100 tasks would be in the queue at any point of
- * time, assuming each task takes one slot.
- */
- private int maxTaskLimit = -1;
+ //the actual maximum capacity which depends on how many slots are available
+ //in cluster at any given time.
+ private int maxCapacity = -1;
/**
* for each user, we need to keep track of number of slots occupied by
@@ -119,26 +109,19 @@ class CapacityTaskScheduler extends Task
}
- int getMaxTaskLimit() {
- return maxTaskLimit;
- }
-
- void setMaxTaskLimit(int maxTaskCap) {
- this.maxTaskLimit = maxTaskCap;
- }
-
/**
- * This method checks for maxTaskLimit and sends minimum of maxTaskLimit
and
+ * Returns the actual capacity.
* capacity.
+ *
* @return
*/
int getCapacity() {
- return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit
:
- capacity;
+ return capacity;
}
/**
* Mutator method for capacity
+ *
* @param capacity
*/
void setCapacity(int capacity) {
@@ -157,13 +140,9 @@ class CapacityTaskScheduler extends Task
StringBuffer sb = new StringBuffer();
sb.append("Capacity: " + capacity + " slots\n");
- //If maxTaskLimit is less than the capacity
- if (maxTaskLimit >= 0 && maxTaskLimit < capacity) {
- sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING,
- maxTaskLimit, (capacity-maxTaskLimit)));
- }
- if (maxTaskLimit >= 0) {
- sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit));
+
+ if(getMaxCapacity() >= 0) {
+ sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n");
}
sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
Integer.valueOf(numSlotsOccupied), Float
@@ -189,21 +168,42 @@ class CapacityTaskScheduler extends Task
}
return sb.toString();
}
+
+ int getMaxCapacity() {
+ return maxCapacity;
+ }
+
+ void setMaxCapacity(int maxCapacity) {
+ this.maxCapacity = maxCapacity;
+ }
}
private static class QueueSchedulingInfo {
String queueName;
- /** capacity(%) is set in the config */
+ /**
+ * capacity(%) is set in the config
+ */
float capacityPercent = 0;
+
+ /**
+ * maxCapacityPercent(%) is set in config as
+ * mapred.capacity-scheduler.queue.<queue-name>.maximum-capacity
+ * maximum-capacity percent defines a limit beyond which a queue
+ * cannot expand. Remember this limit is dynamic and changes w.r.t
+ * cluster size.
+ */
+ float maxCapacityPercent = -1;
/**
* to handle user limits, we need to know how many users have jobs in
* the queue.
*/
Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
- /** min value of user limit (same for all users) */
+ /**
+ * min value of user limit (same for all users)
+ */
int ulMin;
/**
@@ -218,21 +218,22 @@ class CapacityTaskScheduler extends Task
TaskSchedulingInfo mapTSI;
TaskSchedulingInfo reduceTSI;
- public QueueSchedulingInfo(String queueName, float capacityPercent,
- int ulMin, JobQueuesManager jobQueuesManager,
- int mapCap, int reduceCap) {
+ public QueueSchedulingInfo(
+ String queueName, float capacityPercent,
+ float maxCapacityPercent, int ulMin, JobQueuesManager jobQueuesManager
+ ) {
this.queueName = new String(queueName);
this.capacityPercent = capacityPercent;
+ this.maxCapacityPercent = maxCapacityPercent;
this.ulMin = ulMin;
this.jobQueuesManager = jobQueuesManager;
this.mapTSI = new TaskSchedulingInfo();
this.reduceTSI = new TaskSchedulingInfo();
- this.mapTSI.setMaxTaskLimit(mapCap);
- this.reduceTSI.setMaxTaskLimit(reduceCap);
}
/**
* return information about the queue
+ *
* @return a String representing the information about the queue.
*/
@Override
@@ -508,6 +509,10 @@ class CapacityTaskScheduler extends Task
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
+ //Check if queue is over maximum-capacity
+
if(this.areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+ continue;
+ }
// check if the job's user is over limit
if (isUserOverLimit(j, qsi)) {
continue;
@@ -572,6 +577,12 @@ class CapacityTaskScheduler extends Task
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
+ //Check if queue is over maximum-capacity
+ if (this.areTasksInQueueOverMaxCapacity(
+ qsi, j.getNumSlotsPerTask(type))) {
+ continue;
+ }
+
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
taskTrackerStatus)) {
// We found a suitable job. Get task from it.
@@ -657,8 +668,10 @@ class CapacityTaskScheduler extends Task
if (0 == getTSI(qsi).getCapacity()) {
continue;
}
-
- if(this.areTasksInQueueOverLimit(qsi)) {
+
+ //This call is for optimization if we are already over the
+ //maximum-capacity we avoid traversing the queues.
+ if(this.areTasksInQueueOverMaxCapacity(qsi,1)) {
continue;
}
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -685,22 +698,32 @@ class CapacityTaskScheduler extends Task
/**
- * Check if the max task limit is set for this queue
- * if set , ignore this qsi if current num of occupied
- * slots of a TYPE in the queue is >= getMaxTaskCap().
+ * Check if maximum-capacity is set for this queue.
+ * If set and greater than 0 ,
+ * check if numofslotsoccupied+numSlotsPerTask is greater than
+ * maximum-capacity , if yes , implies this queue is over limit.
+ *
+ * Incase noOfSlotsOccupied is less than maximum-capacity ,but ,
+ * numOfSlotsOccupied + noSlotsPerTask is more than maximum-capacity we
+ * still dont assign the task . This may lead to under utilization of very
+ * small set of slots. But this is ok , as we strictly respect the
+ * maximum-capacity limit.
+ *
* @param qsi
- * @return
+ * @return true if queue is over limit.
*/
- private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) {
+ private boolean areTasksInQueueOverMaxCapacity(
+ QueueSchedulingInfo qsi, int numSlotsPerTask) {
TaskSchedulingInfo tsi = getTSI(qsi);
- if (tsi.getMaxTaskLimit() >= 0) {
- if (tsi.numSlotsOccupied >= tsi.getCapacity()) {
+ if (tsi.getMaxCapacity() >= 0) {
+ if ((tsi.numSlotsOccupied + numSlotsPerTask) > tsi.getMaxCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Queue " + qsi.queueName + " has reached its max " + type +
- " limit ");
+ "Queue " + qsi.queueName + " " + "has reached its max " + type +
+ "Capacity");
LOG.debug("Current running tasks " + tsi.getCapacity());
+
}
return true;
}
@@ -719,12 +742,12 @@ class CapacityTaskScheduler extends Task
s.append(
String.format(
" Queue '%s'(%s): runningTasks=%d, "
- + "occupiedSlots=%d, capacity=%d, runJobs=%d maxTaskLimit=%d
",
+ + "occupiedSlots=%d, capacity=%d, runJobs=%d maxCapacity=%d ",
qsi.queueName,
this.type, Integer.valueOf(tsi.numRunningTasks), Integer
.valueOf(tsi.numSlotsOccupied), Integer
.valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
- Integer.valueOf(tsi.getMaxTaskLimit())));
+ Integer.valueOf(tsi.getMaxCapacity())));
}
LOG.debug(s);
}
@@ -792,7 +815,7 @@ class CapacityTaskScheduler extends Task
@Override
int getSlotsPerTask(JobInProgress job) {
return
-
job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());
+
job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());
}
@Override
@@ -1051,19 +1074,20 @@ class CapacityTaskScheduler extends Task
}
Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
- float totalCapacity = 0.0f;
+ float totalCapacityPercent = 0.0f;
for (String queueName: queues) {
- float capacity = schedConf.getCapacity(queueName);
- if(capacity == -1.0) {
+ float capacityPercent = schedConf.getCapacity(queueName);
+ if (capacityPercent == -1.0) {
queuesWithoutConfiguredCapacity.add(queueName);
}else {
- totalCapacity += capacity;
+ totalCapacityPercent += capacityPercent;
}
+
+ float maxCapacityPercent = schedConf.getMaxCapacity(queueName);
int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
// create our QSI and add to our hashmap
QueueSchedulingInfo qsi = new QueueSchedulingInfo(
- queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap(
- queueName), schedConf.getMaxReduceCap(queueName));
+ queueName, capacityPercent, maxCapacityPercent ,ulMin,
jobQueuesManager);
queueInfoMap.put(queueName, qsi);
// create the queues of job objects
@@ -1075,18 +1099,27 @@ class CapacityTaskScheduler extends Task
queueManager.setSchedulerInfo(queueName, schedulingInfo);
}
- float remainingQuantityToAllocate = 100 - totalCapacity;
+ float remainingQuantityToAllocate = 100 - totalCapacityPercent;
float quantityToAllocate =
remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
for(String queue: queuesWithoutConfiguredCapacity) {
QueueSchedulingInfo qsi = queueInfoMap.get(queue);
qsi.capacityPercent = quantityToAllocate;
+ if(qsi.maxCapacityPercent >= 0) {
+ if(qsi.capacityPercent > qsi.maxCapacityPercent) {
+ throw new IllegalStateException(
+ " Allocated capacity of " + qsi.capacityPercent +
+ " to unconfigured queue " + qsi.queueName +
+ " is greater than maximum Capacity " + qsi.maxCapacityPercent);
+ }
+ }
schedConf.setCapacity(queue, quantityToAllocate);
}
- if (totalCapacity > 100.0) {
- throw new IllegalArgumentException("Sum of queue capacities over 100% at
"
- + totalCapacity);
+ if (totalCapacityPercent > 100.0) {
+ throw new IllegalArgumentException(
+ "Sum of queue capacities over 100% at "
+ + totalCapacityPercent);
}
// let our mgr objects know about the queues
@@ -1151,22 +1184,37 @@ class CapacityTaskScheduler extends Task
* to make scheduling decisions. For example, we don't need an exact count
* of numRunningTasks. Once we count upto the grid capacity, any
* number beyond that will make no difference.
- *
- **/
- private synchronized void updateQSIObjects(int mapClusterCapacity,
+ */
+ private synchronized void updateQSIObjects(
+ int mapClusterCapacity,
int reduceClusterCapacity) {
// if # of slots have changed since last time, update.
// First, compute whether the total number of TT slots have changed
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
// compute new capacities, if TT slots have changed
if (mapClusterCapacity != prevMapClusterCapacity) {
- qsi.mapTSI.setCapacity((int)
+ qsi.mapTSI.setCapacity(
+ (int)
(qsi.capacityPercent*mapClusterCapacity/100));
+
+ //compute new max map capacities
+ if(qsi.maxCapacityPercent > 0) {
+ qsi.mapTSI.setMaxCapacity(
+ (int) (qsi.maxCapacityPercent * mapClusterCapacity / 100));
+ }
}
if (reduceClusterCapacity != prevReduceClusterCapacity) {
- qsi.reduceTSI.setCapacity((int)
- (qsi.capacityPercent*reduceClusterCapacity/100));
+ qsi.reduceTSI.setCapacity(
+ (int)
+ (qsi.capacityPercent * reduceClusterCapacity / 100));
+
+ //compute new max reduce capacities
+ if (qsi.maxCapacityPercent > 0) {
+ qsi.reduceTSI.setMaxCapacity(
+ (int) (qsi.maxCapacityPercent * reduceClusterCapacity / 100));
+ }
}
+
// reset running/pending tasks, tasks per user
qsi.mapTSI.resetTaskVars();
qsi.reduceTSI.resetTaskVars();
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Mar 4 03:31:50 2011
@@ -36,7 +36,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType;
@@ -864,83 +863,45 @@ public class TestCapacityScheduler exten
}
/**
- * Test the max map limit.
+ * Test the max Capacity for map and reduce
* @throws IOException
*/
- public void testMaxMapCap() throws IOException {
+ public void testMaxCapacities() throws IOException {
this.setUp(4,1,1);
taskTrackerManager.addQueues(new String[] {"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
- resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("default",2);
- resConf.setMaxReduceCap("default",-1);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- //submit the Job
- FakeJobInProgress fjob1 =
- submitJobAndInit(JobStatus.PREP,3,1,"default","user");
-
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-
- //Once the 2 tasks are running the third assigment should be reduce.
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
- //This should fail.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
- //Now complete the task 1.
- // complete the job
- taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
- fjob1);
- //We have completed the tt1 task which was a map task so we expect one map
- //task to be picked up
- checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
- }
+ queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
- /**
- * Test max reduce limit
- * @throws IOException
- */
- public void testMaxReduceCap() throws IOException {
- this.setUp(4, 1, 1);
- taskTrackerManager.addQueues(new String[]{"default"});
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("default", -1);
- resConf.setMaxReduceCap("default", 2);
+ resConf.setMaxCapacity("default", 50.0f);
scheduler.setResourceManagerConf(resConf);
+ scheduler.setAssignMultipleTasks(true);
scheduler.start();
//submit the Job
FakeJobInProgress fjob1 =
- submitJobAndInit(JobStatus.PREP, 1, 3, "default", "user");
+ submitJobAndInit(JobStatus.PREP, 4, 4, "default", "user");
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
- List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
-
- //This should fail. 1 map, 2 reduces , we have reached the limit.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
- //Now complete the task 1 i.e map task.
- // complete the job
- taskTrackerManager.finishTask(
- "tt1", task1.get(0).getTaskID().toString(),
- fjob1);
-
- //This should still fail as only map task is done
- task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
-
- //Complete the reduce task
- taskTrackerManager.finishTask(
- "tt2", task2.get(0).getTaskID().toString(), fjob1);
+ //default queue has min capacity of 1 and max capacity of 2
- //One reduce is done hence assign the new reduce.
- checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+ //first call of assign task should give task from default queue.
+ //default uses 1 map and 1 reduce slots are used
+ checkMultipleAssignment(
+ "tt1", "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+
+ //second call of assign task
+ //default uses 2 map and 2 reduce slots
+ checkMultipleAssignment(
+ "tt2", "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2");
+
+
+ //Now we have reached the max capacity limit for default ,
+ //no further tasks would be assigned to this queue.
+ checkMultipleAssignment(
+ "tt3", null,
+ null);
}
// test if the queue reflects the changes
@@ -1295,6 +1256,27 @@ public class TestCapacityScheduler exten
assertEquals(18.75f, resConf.getCapacity("q4"));
}
+ public void testCapacityAllocFailureWithLowerMaxCapacity()
+ throws Exception {
+ String[] qs = {"default", "q1"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+ queues.add(new FakeQueueInfo("q1", -1.0f, true, 50));
+ resConf.setFakeQueues(queues);
+ resConf.setMaxCapacity("q1", 40.0f);
+ scheduler.setResourceManagerConf(resConf);
+ try {
+ scheduler.start();
+ fail("Scheduler start should fail ");
+ } catch (IllegalStateException ise) {
+ assertEquals(
+ ise.getMessage(),
+ " Allocated capacity of " + 50.0f + " to unconfigured queue " +
+ "q1" + " is greater than maximum Capacity " + 40.0f);
+ }
+ }
+
// Tests how capacity is computed and assignment of tasks done
// on the basis of the capacity.
public void testCapacityBasedAllocation() throws Exception {
@@ -1377,26 +1359,26 @@ public class TestCapacityScheduler exten
}
/**
- * Creates a queue with max task limit of 2
+ * Creates a queue with max capacity of 50%
* submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
* given to high ram job and are reserved , no other tasks are accepted .
*
* @throws IOException
*/
- public void testHighMemoryBlockingWithMaxLimit()
+ public void testHighMemoryBlockingWithMaxCapacity()
throws IOException {
- // 2 map and 1 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
+ queues.add(new FakeQueueInfo("defaultXYZ", 25.0f, true, 50));
resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("defaultXYZ",2);
+
+ //defaultXYZ can go up to 2 map and 2 reduce slots
+ resConf.setMaxCapacity("defaultXYZ", 50.0f);
+
scheduler.setTaskTrackerManager(taskTrackerManager);
- // enabled memory-based scheduling
- // Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
2 * 1024);
@@ -1404,74 +1386,92 @@ public class TestCapacityScheduler exten
JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.getConf().setLong(
JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
- 1 * 1024);
+ 2 * 1024);
scheduler.getConf().setLong(
JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
+ scheduler.setAssignMultipleTasks(true);
- // The situation : Submit 2 jobs with high memory map task
- //Set the max limit for queue to 2 ,
- // try submitting more map tasks to the queue , it should not happen
-
- LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
- + "2 map tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
- jConf.setMemoryForReduceTask(0);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(0);
+ jConf.setNumReduceTasks(1);
jConf.setQueueName("defaultXYZ");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
- + "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(2);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(2);
jConf.setQueueName("defaultXYZ");
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- // first, a map from j1 will run this is a high memory job so it would
- // occupy the 2 slots
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ //high ram map from job 1 and normal reduce task from job 1
+ List<Task> tasks = checkMultipleAssignment(
+ "tt1", "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. The second job's reduce should be scheduled.
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
- //at this point , the scheduler tries to schedule another map from j2 for
- //another task tracker.
- // This should not happen as all the map slots are taken
- //by the first task itself.hence reduce task from the second job is given
-
- checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+ //we have reached the maximum limit for map, so no more map tasks.
+ //we have used 1 reduce already and 1 more reduce slot is left for the
+ //before we reach maxcapacity for reduces.
+ // But current 1 slot + 2 slots for high ram reduce would
+ //mean we are crossing the maxium capacity.hence nothing would be assigned
+ //in this call
+ checkMultipleAssignment("tt2",null,null);
+
+ //complete the high ram job on tt1.
+ for (Task task : tasks) {
+ taskTrackerManager.finishTask(
+ "tt1", task.getTaskID().toString(),
+ job1);
+ }
+
+ //At this point we have 1 high ram map and 1 high ram reduce.
+ List<Task> t2 = checkMultipleAssignment(
+ "tt2", "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 2, 200.0f,0,2);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+
+ //complete the high ram job on tt1.
+ for (Task task : t2) {
+ taskTrackerManager.finishTask(
+ "tt2", task.getTaskID().toString(),
+ job2);
+ }
+
+ //1st map & 2nd reduce from job2
+ checkMultipleAssignment(
+ "tt2", "attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_r_000002_0 on tt2");
}
/**
* test if user limits automatically adjust to max map or reduce limit
*/
- public void testUserLimitsWithMaxLimits() throws Exception {
- setUp(4, 4, 4);
+ public void testUserLimitsWithMaxCapacities() throws Exception {
+ setUp(2, 2, 2);
// set up some queues
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("default", 2);
- resConf.setMaxReduceCap("default", 2);
+ resConf.setMaxCapacity("default", 75.0f);
scheduler.setResourceManagerConf(resConf);
+ scheduler.setAssignMultipleTasks(true);
scheduler.start();
// submit a job
@@ -1480,37 +1480,27 @@ public class TestCapacityScheduler exten
FakeJobInProgress fjob2 =
submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
- // for queue 'default', the capacity for maps is 2.
- // But the max map limit is 2
- // hence user should be getting not more than 1 as it is the 50%.
- Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
- //Now we should get the task from the other job. As the
- //first user has reached his max map limit.
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-
- //Now we are done with map limit , now if we ask for task we should
- // get reduce from 1st job
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
- // Now we're at full capacity for maps. 1 done with reduces for job 1 so
- // now we should get 1 reduces for job 2
- Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
-
- taskTrackerManager.finishTask(
- "tt1", t1.getTaskID().toString(),
- fjob1);
-
- //tt1 completed the task so we have 1 map slot for u1
- // we are assigning the 2nd map task from fjob1
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-
- taskTrackerManager.finishTask(
- "tt4", t4.getTaskID().toString(),
- fjob2);
- //tt4 completed the task , so we have 1 reduce slot for u2
- //we are assigning the 2nd reduce from fjob2
- checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
+ // for queue 'default', maxCapacity for map and reduce is 3.
+ // initial user limit for 50% assuming there are 2 users/queue is.
+ // 1 map and 1 reduce.
+ // after max capacity it is 1.5 each.
+
+ //first job would be given 1 job each.
+ List<Task> t1 = this.checkMultipleAssignment(
+ "tt1", "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+
+ //for user u1 we have reached the limit. that is 1 job.
+ //1 more map and reduce tasks.
+ List<Task> t2 = this.checkMultipleAssignment(
+ "tt1", "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
+
+ t1 = this.checkMultipleAssignment(
+ "tt2", "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2");
+ t1 = this.checkMultipleAssignment("tt2", null,null);
}
@@ -3085,20 +3075,22 @@ public class TestCapacityScheduler exten
private void checkMemReservedForTasksOnTT(String taskTracker,
Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
Long observedMemForMapsOnTT =
-
scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+ scheduler.memoryMatcher.getMemReservedForTasks(
+ tracker(taskTracker).getStatus(),
TaskType.MAP);
Long observedMemForReducesOnTT =
-
scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+ scheduler.memoryMatcher.getMemReservedForTasks(
+ tracker(taskTracker).getStatus(),
TaskType.REDUCE);
if (expectedMemForMapsOnTT == null) {
- assertTrue(observedMemForMapsOnTT == null);
+ assertEquals(observedMemForMapsOnTT, null);
} else {
- assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+ assertEquals(observedMemForMapsOnTT, (expectedMemForMapsOnTT));
}
if (expectedMemForReducesOnTT == null) {
- assertTrue(observedMemForReducesOnTT == null);
+ assertEquals(observedMemForReducesOnTT, null);
} else {
- assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+ assertEquals(observedMemForReducesOnTT, (expectedMemForReducesOnTT));
}
}
@@ -3187,4 +3179,48 @@ public class TestCapacityScheduler exten
assertEquals(scheduler.getLimitMaxMemForMapSlot(),3);
assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
}
+
+ /**
+ * Checks for multiple assignment.
+ *
+ * @param taskTrackerName
+ * @param mapAttempt
+ * @param reduceAttempt
+ * @return
+ * @throws IOException
+ */
+ private List<Task> checkMultipleAssignment(
+ String taskTrackerName, String mapAttempt, String reduceAttempt)
+ throws IOException {
+ List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+ LOG.info(
+ " mapAttempt " + mapAttempt + " reduceAttempt " + reduceAttempt +
+ " assignTasks result " + tasks);
+
+ if (tasks == null || tasks.isEmpty()) {
+ if (mapAttempt != null || reduceAttempt != null ) {
+ fail(
+ " improper attempt " + tasks + " expected attempts are map : " +
+ mapAttempt + " reduce : " + reduceAttempt);
+ } else {
+ return tasks;
+ }
+ }
+
+ if (tasks.size() == 1 && (mapAttempt != null && reduceAttempt != null)) {
+ fail(
+ " improper attempt " + tasks + " expected attempts are map : " +
+ mapAttempt + " reduce : " + reduceAttempt);
+ }
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ assertEquals(task.toString(), mapAttempt);
+ }
+
+ if (task.toString().contains("_r")) {
+ assertEquals(task.toString(), reduceAttempt);
+ }
+ }
+ return tasks;
+ }
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
Fri Mar 4 03:31:50 2011
@@ -278,6 +278,26 @@ public class TestCapacitySchedulerConf e
}
}
+ public void testInvalidMaxCapacity() throws IOException {
+ openFile();
+ startConfig();
+ writeProperty(
+ "mapred.capacity-scheduler.queue.default.capacity", "70");
+ writeProperty(
+ "mapred.capacity-scheduler.queue.default.maximum-capacity", "50");
+ endConfig();
+ testConf = new CapacitySchedulerConf(new Path(testConfFile));
+
+ try {
+ testConf.getMaxCapacity("default");
+ fail(" getMaxCapacity worked " + testConf.getCapacity("default"));
+ } catch (IllegalArgumentException e) {
+ assertEquals(
+ CapacitySchedulerConf.MAX_CAPACITY_PROPERTY + " 50.0"+
+ " for a queue should be greater than or equal to capacity ",
e.getMessage());
+ }
+ }
+
public void testInitializationPollerProperties()
throws Exception {
/*
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml?rev=1077031&r1=1077030&r2=1077031&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
Fri Mar 4 03:31:50 2011
@@ -204,36 +204,28 @@
users, no user can use more than 25% of the queue's resources.
A
value of 100 implies no user limits are imposed.</td>
</tr>
-
<tr><td>mapred.capacity-scheduler.queue.<queue-name>.max.map.slots</td>
+
<tr><td>mapred.capacity-scheduler.queue.<queue-name>.maximum-capacity</td>
<td>
- This value is the maximum max slots that can be used in a
- queue at any point of time. So for example assuming above
config value
- is 100 , not more than 100 tasks would be in the queue at
any point of
- time, assuming each task takes one slot.
+ maximum-capacity defines a limit beyond which a queue cannot
+ use the capacity of the cluster.This provides a means to
limit
+ how much excess capacity a queue can use. By default, there
+ is no limit.
+ The maximum-capacity of a queue can only be greater than or
+ equal to its minimum capacity.
+ Default value of -1 implies a queue can use complete capacity
+ of the cluster.
- Default value of -1 would disable this capping feature
+ This property could be to curtail certain jobs which are long
+ running in nature from occupying more than a certain
+ percentage of the cluster, which in the absence of
+ pre-emption, could lead to capacity guarantees of other
queues
+ being affected.
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity
will be
- used by the other queues. If queue capacity is less than
the above
- limit , then the limit would be the queue capacity - as in
the current
- implementation
- </td>
- </tr>
-
<tr><td>mapred.capacity-scheduler.queue.<queue-name>.max.reduce.slots</td>
- <td>
- This value is the maximum reduce slots that can be used in a
- queue at any point of time. So for example assuming above
config value
- is 100 , not more than 100 tasks would be in the queue at
any point of
- time, assuming each task takes one slot.
-
- Default value of -1 would disable this capping feature
-
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity
will be
- used by the other queues. If queue capacity is less than
the above
- limit , then the limit would be the queue capacity - as in
the current
- implementation
+ One important thing to note is that maximum-capacity is a
+ percentage , so based on the cluster's capacity
+ it would change. So if large no of nodes or racks get added
+ to the cluster , maximum Capacity in
+ absolute terms would increase accordingly.
</td>
</tr>
</table>