Author: cdouglas
Date: Fri Jun 19 05:42:53 2009
New Revision: 786377
URL: http://svn.apache.org/viewvc?rev=786377&view=rev
Log:
HADOOP-4687. Merge mapred/contrib changes -r 776174:785643
Modified:
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build-contrib.xml
(props changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml (contents,
props changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/
(props changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/data_join/ (props
changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/dynamic-scheduler/
(props changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/fairscheduler/ (props
changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/index/ (props changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/mrunit/ (props
changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/sqoop/ (props changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/streaming/ (props
changed)
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/vaidya/ (props
changed)
Propchange:
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
+/hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml Fri Jun 19
05:42:53 2009
@@ -54,6 +54,7 @@
<fileset dir="." includes="fairscheduler/build.xml"/>
<fileset dir="." includes="capacity-scheduler/build.xml"/>
<fileset dir="." includes="mrunit/build.xml"/>
+ <fileset dir="." includes="dynamic-scheduler/build.xml"/>
</subant>
<available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
<fail if="testsfailed">Tests failed!</fail>
Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
+/hadoop/core/trunk/src/contrib/build.xml:776175-786373
Propchange:
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
+/hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
Modified:
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
---
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml
(original)
+++
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/ivy.xml
Fri Jun 19 05:42:53 2009
@@ -24,7 +24,7 @@
<artifact conf="master"/>
</publications>
<dependencies>
- <dependency org="commons-cli"
+ <dependency org="commons-cli"
name="commons-cli"
rev="${commons-cli.version}"
conf="common->default"/>
Modified:
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
---
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
(original)
+++
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
Fri Jun 19 05:42:53 2009
@@ -164,10 +164,10 @@
* Sets the capacity of the given queue.
*
* @param queue name of the queue
- * @param gc percent of the cluster for the queue.
+ * @param capacity percent of the cluster for the queue.
*/
- public void setCapacity(String queue,float gc) {
- rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc);
+ public void setCapacity(String queue,float capacity) {
+ rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
}
/**
@@ -351,44 +351,4 @@
rmConf.setInt(
"mapred.capacity-scheduler.init-worker-threads", poolSize);
}
-
- /**
- * Get the upper limit on the maximum physical memory that can be specified
by
- * a job.
- *
- * @return upper limit for max pmem for tasks.
- */
- public long getLimitMaxPmemForTasks() {
- return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
- /**
- * Get the upper limit on the maximum physical memory that can be specified
by
- * a job.
- *
- * @param value
- */
- public void setLimitMaxPmemForTasks(long value) {
- rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
- }
-
- /**
- * Get cluster-wide default percentage of pmem in vmem.
- *
- * @return cluster-wide default percentage of pmem in vmem.
- */
- public float getDefaultPercentOfPmemInVmem() {
- return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
- /**
- * Set cluster-wide default percentage of pmem in vmem.
- *
- * @param value
- */
- public void setDefaultPercentOfPmemInVmem(float value) {
- rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
- }
}
Modified:
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
---
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Jun 19 05:42:53 2009
@@ -24,8 +24,6 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,8 +32,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.util.StringUtils;
-
/**
* A {...@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -78,14 +74,20 @@
private static class TaskSchedulingInfo {
/**
- * the actual gc, which depends on how many slots are available
+ * the actual capacity, which depends on how many slots are available
* in the cluster at any given time.
*/
int capacity = 0;
// number of running tasks
int numRunningTasks = 0;
- /** for each user, we need to keep track of number of running tasks */
- Map<String, Integer> numRunningTasksByUser =
+ // number of slots occupied by running tasks
+ int numSlotsOccupied = 0;
+
+ /**
+ * for each user, we need to keep track of number of slots occupied by
+ * running tasks
+ */
+ Map<String, Integer> numSlotsOccupiedByUser =
new HashMap<String, Integer>();
/**
@@ -93,32 +95,41 @@
*/
void resetTaskVars() {
numRunningTasks = 0;
- for (String s: numRunningTasksByUser.keySet()) {
- numRunningTasksByUser.put(s, 0);
+ numSlotsOccupied = 0;
+ for (String s: numSlotsOccupiedByUser.keySet()) {
+ numSlotsOccupiedByUser.put(s, Integer.valueOf(0));
}
}
/**
* return information about the tasks
*/
- public String toString(){
- float runningTasksAsPercent = capacity!= 0 ?
- ((float)numRunningTasks * 100/capacity):0;
+ @Override
+ public String toString() {
+ float occupiedSlotsAsPercent =
+ capacity != 0 ? ((float) numSlotsOccupied * 100 / capacity) : 0;
StringBuffer sb = new StringBuffer();
- sb.append("Capacity: " + capacity + "\n");
- sb.append(String.format("Running tasks: %.1f%% of Capacity\n",
- runningTasksAsPercent));
+ sb.append("Capacity: " + capacity + " slots\n");
+ sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
+ Integer.valueOf(numSlotsOccupied), Float
+ .valueOf(occupiedSlotsAsPercent)));
+ sb.append(String.format("Running tasks: %d\n", Integer
+ .valueOf(numRunningTasks)));
// include info on active users
- if (numRunningTasks != 0) {
+ if (numSlotsOccupied != 0) {
sb.append("Active users:\n");
- for (Map.Entry<String, Integer> entry:
numRunningTasksByUser.entrySet()) {
+ for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
+ .entrySet()) {
if ((entry.getValue() == null) || (entry.getValue().intValue() <=
0)) {
// user has no tasks running
continue;
}
- sb.append("User '" + entry.getKey()+ "': ");
- float p = (float)entry.getValue().intValue()*100/numRunningTasks;
- sb.append(String.format("%.1f%% of running tasks\n", p));
+ sb.append("User '" + entry.getKey() + "': ");
+ int numSlotsOccupiedByThisUser = entry.getValue().intValue();
+ float p =
+ (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
+ sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
+ .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
}
}
return sb.toString();
@@ -152,10 +163,10 @@
TaskSchedulingInfo mapTSI;
TaskSchedulingInfo reduceTSI;
- public QueueSchedulingInfo(String queueName, float gcPercent,
+ public QueueSchedulingInfo(String queueName, float capacityPercent,
int ulMin, JobQueuesManager jobQueuesManager) {
this.queueName = new String(queueName);
- this.capacityPercent = gcPercent;
+ this.capacityPercent = capacityPercent;
this.ulMin = ulMin;
this.jobQueuesManager = jobQueuesManager;
this.mapTSI = new TaskSchedulingInfo();
@@ -164,13 +175,14 @@
/**
* return information about the queue
+ * @return a String representing the information about the queue.
*/
+ @Override
public String toString(){
// We print out the queue information first, followed by info
// on map and reduce tasks and job info
StringBuffer sb = new StringBuffer();
sb.append("Queue configuration\n");
- //sb.append("Name: " + queueName + "\n");
sb.append("Capacity Percentage: ");
sb.append(capacityPercent);
sb.append("%\n");
@@ -278,16 +290,29 @@
/** our TaskScheduler object */
protected CapacityTaskScheduler scheduler;
- // can be replaced with a global type, if we have one
- protected static enum TYPE {
- MAP, REDUCE
- }
- protected TYPE type = null;
+ protected CapacityTaskScheduler.TYPE type = null;
abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
- JobInProgress job) throws IOException;
+ JobInProgress job) throws IOException;
+
+ int getSlotsOccupied(JobInProgress job) {
+ return getRunningTasks(job) * getSlotsPerTask(job);
+ }
+
+ abstract int getClusterCapacity();
+ abstract int getSlotsPerTask(JobInProgress job);
+ abstract int getRunningTasks(JobInProgress job);
abstract int getPendingTasks(JobInProgress job);
abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
+ /**
+ * To check if job has a speculative task on the particular tracker.
+ *
+ * @param job job to check for speculative tasks.
+ * @param tts task tracker on which speculative task would run.
+ * @return true if there is a speculative task to run on the tracker.
+ */
+ abstract boolean hasSpeculativeTask(JobInProgress job,
+ TaskTrackerStatus tts);
/**
* List of QSIs for assigning tasks.
@@ -308,12 +333,12 @@
public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
TaskSchedulingInfo t1 = getTSI(q1);
TaskSchedulingInfo t2 = getTSI(q2);
- // look at how much capacity they've filled. Treat a queue with gc=0
- // equivalent to a queue running at capacity
+ // look at how much capacity they've filled. Treat a queue with
+ // capacity=0 equivalent to a queue running at capacity
double r1 = (0 == t1.capacity)? 1.0f:
- (double)t1.numRunningTasks/(double)t1.capacity;
+ (double)t1.numSlotsOccupied/(double)t1.capacity;
double r2 = (0 == t2.capacity)? 1.0f:
- (double)t2.numRunningTasks/(double)t2.capacity;
+ (double)t2.numSlotsOccupied/(double)t2.capacity;
if (r1<r2) return -1;
else if (r1>r2) return 1;
else return 0;
@@ -335,7 +360,17 @@
protected final static ReduceQueueComparator reduceComparator = new
ReduceQueueComparator();
// and this is the comparator to use
protected QueueComparator queueComparator;
-
+
+ // Returns queues sorted according to the QueueComparator.
+ // Mainly for testing purposes.
+ String[] getOrderedQueues() {
+ List<String> queues = new ArrayList<String>(qsiForAssigningTasks.size());
+ for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+ queues.add(qsi.queueName);
+ }
+ return queues.toArray(new String[queues.size()]);
+ }
+
TaskSchedulingMgr(CapacityTaskScheduler sched) {
scheduler = sched;
}
@@ -352,24 +387,26 @@
}
- private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
- // what is our current capacity? It's capacity if we're running below
capacity.
- // If we're running over capacity, then its #running plus 1 (which is the
- // extra slot we're getting).
+ private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+ // what is our current capacity? It is equal to the queue-capacity if
+ // we're running below capacity. If we're running over capacity, then its
+ // #running plus slotPerTask of the job (which is the number of extra
+ // slots we're getting).
int currentCapacity;
TaskSchedulingInfo tsi = getTSI(qsi);
- if (tsi.numRunningTasks < tsi.capacity) {
+ if (tsi.numSlotsOccupied < tsi.capacity) {
currentCapacity = tsi.capacity;
}
else {
- currentCapacity = tsi.numRunningTasks+1;
+ currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
}
int limit = Math.max((int)(Math.ceil((double)currentCapacity/
(double)qsi.numJobsByUser.size())),
(int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
- if (tsi.numRunningTasksByUser.get(user) >= limit) {
- LOG.debug("User " + user + " is over limit, num running tasks = " +
- tsi.numRunningTasksByUser.get(user) + ", limit = " + limit);
+ String user = j.getProfile().getUser();
+ if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
+ LOG.debug("User " + user + " is over limit, num slots occupied = " +
+ tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
return true;
}
else {
@@ -398,31 +435,37 @@
continue;
}
// check if the job's user is over limit
- if (isUserOverLimit(j.getProfile().getUser(), qsi)) {
+ if (isUserOverLimit(j, qsi)) {
continue;
- }
- if (getPendingTasks(j) != 0) {
- // Not accurate TODO:
- // check if the job's memory requirements are met
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j,
taskTracker)) {
- // We found a suitable job. Get task from it.
- Task t = obtainNewTask(taskTracker, j);
- if (t != null) {
- // we're successful in getting a task
- return TaskLookupResult.getTaskFoundResult(t);
- }
+ }
+ //If this job meets memory requirements. Ask the JobInProgress for
+ //a task to be scheduled on the task tracker.
+ //if we find a job then we pass it on.
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+ taskTracker)) {
+ // We found a suitable job. Get task from it.
+ Task t = obtainNewTask(taskTracker, j);
+ //if there is a task return it immediately.
+ if (t != null) {
+ // we're successful in getting a task
+ return TaskLookupResult.getTaskFoundResult(t);
+ } else {
+ //skip to the next job in the queue.
+ LOG.debug("Job " + j.getJobID().toString()
+ + " returned no tasks of type " + type);
+ continue;
}
- else {
- // mem requirements not met or could not be computed for this TT
- // Rather than look at the next job,
- // we return nothing to the TT, with the hope that we improve
- // chances of finding a suitable TT for this job. This lets us
- // avoid starving jobs with high mem requirements.
+ } else {
+ //if memory requirements don't match then we check if the
+ //job has either pending or speculative task. If the job
+ //has pending or speculative task we block till this job
+ //tasks get scheduled. So that high memory jobs are not starved
+ if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
return TaskLookupResult.getMemFailedResult();
- }
- }
+ }
+ }//end of memory check block
// if we're here, this job has no task to run. Look at the next job.
- }
+ }//end of for loop
// if we're here, we haven't found any task to run among all jobs in
// the queue. This could be because there is nothing to run, or that
@@ -444,24 +487,29 @@
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
- if (getPendingTasks(j) != 0) {
- // Not accurate TODO:
- // check if the job's memory requirements are met
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j,
taskTracker)) {
- // We found a suitable job. Get task from it.
- Task t = obtainNewTask(taskTracker, j);
- if (t != null) {
- // we're successful in getting a task
- return TaskLookupResult.getTaskFoundResult(t);
- }
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+ taskTracker)) {
+ // We found a suitable job. Get task from it.
+ Task t = obtainNewTask(taskTracker, j);
+ //if there is a task return it immediately.
+ if (t != null) {
+ // we're successful in getting a task
+ return TaskLookupResult.getTaskFoundResult(t);
+ } else {
+ //skip to the next job in the queue.
+ continue;
}
- else {
- // mem requirements not met.
+ } else {
+ //if memory requirements don't match then we check if the
+ //job has either pending or speculative task. If the job
+ //has pending or speculative task we block till this job
+ //tasks get scheduled, so that high memory jobs are not
+ //starved
+ if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
return TaskLookupResult.getMemFailedResult();
- }
- }
- // if we're here, this job has no task to run. Look at the next job.
- }
+ }
+ }//end of memory check block
+ }//end of for loop
// found nothing for this queue, look at the next one.
String msg = "Found no task from the queue " + qsi.queueName;
@@ -473,8 +521,11 @@
// The caller is responsible for ensuring that the QSI objects and the
// collections are up-to-date.
private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws
IOException {
+
+ printQSIs();
+
for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
- // we may have queues with gc=0. We shouldn't look at jobs from
+ // we may have queues with capacity=0. We shouldn't look at jobs from
// these queues
if (0 == getTSI(qsi).capacity) {
continue;
@@ -500,20 +551,23 @@
// nothing to give
return TaskLookupResult.getNoTaskFoundResult();
}
-
+
// for debugging.
private void printQSIs() {
- StringBuffer s = new StringBuffer();
- for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
- TaskSchedulingInfo tsi = getTSI(qsi);
- Collection<JobInProgress> runJobs =
- scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
- s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" +
- tsi.numRunningTasks + ", gc=" + tsi.capacity
- + ", run jobs="+ runJobs.size() +
- "*** ");
+ if (LOG.isDebugEnabled()) {
+ StringBuffer s = new StringBuffer();
+ for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
+ TaskSchedulingInfo tsi = getTSI(qsi);
+ Collection<JobInProgress> runJobs =
+ scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+ s.append(String.format(" Queue '%s'(%s): runningTasks=%d, "
+ + "occupiedSlots=%d, capacity=%d, runJobs=%d", qsi.queueName,
+ this.type, Integer.valueOf(tsi.numRunningTasks), Integer
+ .valueOf(tsi.numSlotsOccupied), Integer
+ .valueOf(tsi.capacity), Integer.valueOf(runJobs.size())));
+ }
+ LOG.debug(s);
}
- LOG.debug(s);
}
/**
@@ -543,11 +597,14 @@
* The scheduling algorithms for map tasks.
*/
private static class MapSchedulingMgr extends TaskSchedulingMgr {
- MapSchedulingMgr(CapacityTaskScheduler dad) {
- super(dad);
- type = TaskSchedulingMgr.TYPE.MAP;
+
+ MapSchedulingMgr(CapacityTaskScheduler schedulr) {
+ super(schedulr);
+ type = CapacityTaskScheduler.TYPE.MAP;
queueComparator = mapComparator;
}
+
+ @Override
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws IOException {
ClusterStatus clusterStatus =
@@ -556,31 +613,57 @@
return job.obtainNewMapTask(taskTracker, numTaskTrackers,
scheduler.taskTrackerManager.getNumberOfUniqueHosts());
}
+
+ @Override
int getClusterCapacity() {
return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
}
+
+ @Override
int getRunningTasks(JobInProgress job) {
return job.runningMaps();
}
+
+ @Override
int getPendingTasks(JobInProgress job) {
return job.pendingMaps();
}
+ @Override
+ int getSlotsPerTask(JobInProgress job) {
+ long myVmem = job.getJobConf().getMemoryForMapTask();
+ return (int) (Math.ceil((float) myVmem
+ / (float) scheduler.getMemSizeForMapSlot()));
+ }
+
+ @Override
TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
return qsi.mapTSI;
}
+ @Override
+ boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+ //Check if job supports speculative map execution first then
+ //check if job has speculative maps.
+ return (job.getJobConf().getMapSpeculativeExecution())&& (
+ hasSpeculativeTask(job.getMapTasks(),
+ job.getStatus().mapProgress(), tts));
+ }
+
}
/**
* The scheduling algorithms for reduce tasks.
*/
private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
- ReduceSchedulingMgr(CapacityTaskScheduler dad) {
- super(dad);
- type = TaskSchedulingMgr.TYPE.REDUCE;
+
+ ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
+ super(schedulr);
+ type = CapacityTaskScheduler.TYPE.REDUCE;
queueComparator = reduceComparator;
}
+
+ @Override
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws IOException {
ClusterStatus clusterStatus =
@@ -589,19 +672,44 @@
return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
scheduler.taskTrackerManager.getNumberOfUniqueHosts());
}
+
+ @Override
int getClusterCapacity() {
- return
scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+ return scheduler.taskTrackerManager.getClusterStatus()
+ .getMaxReduceTasks();
}
+
+ @Override
int getRunningTasks(JobInProgress job) {
return job.runningReduces();
}
+
+ @Override
int getPendingTasks(JobInProgress job) {
return job.pendingReduces();
}
+ @Override
+ int getSlotsPerTask(JobInProgress job) {
+ long myVmem = job.getJobConf().getMemoryForReduceTask();
+ return (int) (Math.ceil((float) myVmem
+ / (float) scheduler.getMemSizeForReduceSlot()));
+ }
+
+ @Override
TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
return qsi.reduceTSI;
}
+
+ @Override
+ boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+ //check if the job supports reduce speculative execution first then
+ //check if the job has speculative tasks.
+ return (job.getJobConf().getReduceSpeculativeExecution()) && (
+ hasSpeculativeTask(job.getReduceTasks(),
+ job.getStatus().reduceProgress(), tts));
+ }
+
}
/** the scheduling mgrs for Map and Reduce tasks */
@@ -620,7 +728,10 @@
protected CapacitySchedulerConf schedConf;
/** whether scheduler has started or not */
private boolean started = false;
-
+
+ static String JOB_SCHEDULING_INFO_FORMAT_STRING =
+ "%s running map tasks using %d map slots,"
+ + " %s running reduce tasks using %d reduce slots.";
/**
* A clock class - can be mocked out for testing.
*/
@@ -629,13 +740,19 @@
return System.currentTimeMillis();
}
}
+
+ // can be replaced with a global type, if we have one
+ protected static enum TYPE {
+ MAP, REDUCE
+ }
+
private Clock clock;
private JobInitializationPoller initializationPoller;
- long limitMaxVmemForTasks;
- long limitMaxPmemForTasks;
- long defaultMaxVmPerTask;
- float defaultPercentOfPmemInVmem;
+ private long memSizeForMapSlotOnJT;
+ private long memSizeForReduceSlotOnJT;
+ private long limitMaxMemForMapTasks;
+ private long limitMaxMemForReduceTasks;
public CapacityTaskScheduler() {
this(new Clock());
@@ -652,37 +769,55 @@
this.schedConf = conf;
}
- /**
- * Normalize the negative values in configuration
- *
- * @param val
- * @return normalized value
- */
- private long normalizeMemoryConfigValue(long val) {
- if (val < 0) {
- val = JobConf.DISABLED_MEMORY_LIMIT;
- }
- return val;
- }
-
private void initializeMemoryRelatedConf() {
- limitMaxVmemForTasks =
- normalizeMemoryConfigValue(conf.getLong(
- JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+ memSizeForMapSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ memSizeForReduceSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForMapTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForReduceTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ LOG.info(String.format("Scheduler configured with "
+ + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
+ + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
+ + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
+ .valueOf(memSizeForReduceSlotOnJT), Long
+ .valueOf(limitMaxMemForMapTasks), Long
+ .valueOf(limitMaxMemForReduceTasks)));
+ }
- limitMaxPmemForTasks =
- normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
+ long getMemSizeForMapSlot() {
+ return memSizeForMapSlotOnJT;
+ }
- defaultMaxVmPerTask =
- normalizeMemoryConfigValue(conf.getLong(
- JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
+ long getMemSizeForReduceSlot() {
+ return memSizeForReduceSlotOnJT;
+ }
+
+ long getLimitMaxMemForMapSlot() {
+ return limitMaxMemForMapTasks;
+ }
+
+ long getLimitMaxMemForReduceSlot() {
+ return limitMaxMemForReduceTasks;
+ }
- defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
- if (defaultPercentOfPmemInVmem < 0) {
- defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
+ String[] getOrderedQueues(CapacityTaskScheduler.TYPE type) {
+ if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ return mapScheduler.getOrderedQueues();
+ } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ return reduceScheduler.getOrderedQueues();
}
+ return null;
}
@Override
@@ -707,15 +842,15 @@
Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
float totalCapacity = 0.0f;
for (String queueName: queues) {
- float gc = schedConf.getCapacity(queueName);
- if(gc == -1.0) {
+ float capacity = schedConf.getCapacity(queueName);
+ if(capacity == -1.0) {
queuesWithoutConfiguredCapacity.add(queueName);
}else {
- totalCapacity += gc;
+ totalCapacity += capacity;
}
int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
// create our QSI and add to our hashmap
- QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc,
+ QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity,
ulMin, jobQueuesManager);
queueInfoMap.put(queueName, qsi);
@@ -829,29 +964,49 @@
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
- int runningMaps = j.runningMaps();
- int runningReduces = j.runningReduces();
- qsi.mapTSI.numRunningTasks += runningMaps;
- qsi.reduceTSI.numRunningTasks += runningReduces;
+
+ int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
+ int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+ int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
+ int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
+ j.setSchedulingInfo(String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
+ Integer.valueOf(numMapsRunningForThisJob), Integer
+ .valueOf(numMapSlotsForThisJob), Integer
+ .valueOf(numReducesRunningForThisJob), Integer
+ .valueOf(numReduceSlotsForThisJob)));
+ qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
+ qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
+ qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
+ qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
Integer i =
- qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
- qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
- i+runningMaps);
- i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
- qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
- i+runningReduces);
- LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
- j.runningMaps() + ", run(r) = " + j.runningReduces() +
- ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
- j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
- ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
- j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
- + ", total(m) = " + j.numMapTasks + ", total(r) = " +
- j.numReduceTasks);
+ qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
+ qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
+ Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
+ i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
+ qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
+ Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
+ + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
+ + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
+ + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
+ .getJobID().toString(), Integer
+ .valueOf(numMapsRunningForThisJob), Integer
+ .valueOf(numMapSlotsForThisJob), Integer
+ .valueOf(numReducesRunningForThisJob), Integer
+ .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
+ .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
+ .valueOf(j.failedMapTasks),
+ Integer.valueOf(j.failedReduceTasks), Integer
+ .valueOf(j.speculativeMapTasks), Integer
+ .valueOf(j.speculativeReduceTasks), Integer
+ .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
+ }
+
/*
* it's fine walking down the entire list of running jobs - there
* probably will not be many, plus, we may need to go through the
- * list to compute numRunningTasksByUser. If this is expensive, we
+ * list to compute numSlotsOccupiedByUser. If this is expensive, we
* can keep a list of running jobs per user. Then we only need to
* consider the first few jobs per user.
*/
@@ -866,10 +1021,8 @@
* The grand plan for assigning a task.
* First, decide whether a Map or Reduce task should be given to a TT
* (if the TT can accept either).
- * Next, pick a queue. We only look at queues that need a slot. Among
- * these, we first look at queues whose ac is less than gc (queues that
- * gave up capacity in the past). Next, we look at any other queue that
- * needs a slot.
+ * Next, pick a queue. We only look at queues that need a slot. Among these,
+ * we first look at queues whose (# of running tasks)/capacity is the least.
* Next, pick a job in a queue. we pick the job at the front of the queue
* unless its user is over the user limit.
* Finally, given a job, pick a task from the job.
@@ -920,14 +1073,12 @@
// found a task; return
return Collections.singletonList(tlr.getTask());
}
- else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
==
- tlr.getLookUpStatus()) {
- // return no task
- return null;
- }
// if we didn't get any, look at map tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
- tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+ else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+ == tlr.getLookUpStatus() ||
+ TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+ == tlr.getLookUpStatus())
+ && (maxMapTasks > currentMapTasks)) {
mapScheduler.updateCollectionOfQSIs();
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -945,13 +1096,12 @@
// found a task; return
return Collections.singletonList(tlr.getTask());
}
- else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
==
- tlr.getLookUpStatus()) {
- return null;
- }
// if we didn't get any, look at reduce tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
- tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+ else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+ == tlr.getLookUpStatus()
+ || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+ == tlr.getLookUpStatus())
+ && (maxReduceTasks > currentReduceTasks)) {
reduceScheduler.updateCollectionOfQSIs();
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -964,38 +1114,6 @@
return null;
}
- /**
- * Kill the job if it has invalid requirements and return why it is killed
- *
- * @param job
- * @return string mentioning why the job is killed. Null if the job has valid
- * requirements.
- */
- private String killJobIfInvalidRequirements(JobInProgress job) {
- if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
- return null;
- }
- if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
- || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
- .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
- String msg =
- job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
- + job.getMaxPhysicalMemoryForTask()
- + "pmem) exceeds the cluster's max-memory-limits ("
- + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
- + "pmem). Cannot run in this cluster, so killing it.";
- LOG.warn(msg);
- try {
- taskTrackerManager.killJob(job.getJobID());
- return msg;
- } catch (IOException ioe) {
- LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
- + StringUtils.stringifyException(ioe));
- }
- }
- return null;
- }
-
// called when a job is added
synchronized void jobAdded(JobInProgress job) throws IOException {
QueueSchedulingInfo qsi =
@@ -1006,8 +1124,10 @@
if (null == i) {
i = 1;
// set the count for running tasks to 0
- qsi.mapTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
- qsi.reduceTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+ qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
+ Integer.valueOf(0));
+ qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
+ Integer.valueOf(0));
}
else {
i++;
@@ -1015,13 +1135,6 @@
qsi.numJobsByUser.put(job.getProfile().getUser(), i);
LOG.debug("Job " + job.getJobID().toString() + " is added under user "
+ job.getProfile().getUser() + ", user now has " + i + " jobs");
-
- // Kill the job if it cannot run in the cluster because of invalid
- // resource requirements.
- String statusMsg = killJobIfInvalidRequirements(job);
- if (statusMsg != null) {
- throw new IOException(statusMsg);
- }
}
// called when a job completes
@@ -1036,8 +1149,8 @@
if (0 == i.intValue()) {
qsi.numJobsByUser.remove(job.getProfile().getUser());
// remove job footprint from our TSIs
- qsi.mapTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
- qsi.reduceTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
+ qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
+ qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
LOG.debug("No more jobs for user, number of users = " +
qsi.numJobsByUser.size());
}
else {
Modified:
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
---
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
(original)
+++
hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
Fri Jun 19 05:42:53 2009
@@ -30,111 +30,32 @@
this.scheduler = capacityTaskScheduler;
}
- boolean isSchedulingBasedOnVmemEnabled() {
- LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
- + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
- if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
- || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+ boolean isSchedulingBasedOnMemEnabled() {
+ if (scheduler.getLimitMaxMemForMapSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT
+ || scheduler.getLimitMaxMemForReduceSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT
+ || scheduler.getMemSizeForMapSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT
+ || scheduler.getMemSizeForReduceSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT) {
return false;
}
return true;
}
- boolean isSchedulingBasedOnPmemEnabled() {
- LOG.debug("defaultPercentOfPmemInVmem : "
- + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
- + scheduler.limitMaxPmemForTasks);
- if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
- || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
- return false;
- }
- return true;
- }
-
- /**
- * Obtain the virtual memory allocated for a job's tasks.
- *
- * If the job has a configured value for the max-virtual memory, that will be
- * returned. Else, the cluster-wide default max-virtual memory for tasks is
- * returned.
- *
- * This method can only be called after
- * {...@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
- *
- * @param jConf JobConf of the job
- * @return the virtual memory allocated for the job's tasks.
- */
- private long getVirtualMemoryForTask(JobConf jConf) {
- long vMemForTask = jConf.getMaxVirtualMemoryForTask();
- if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- vMemForTask =
- new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- scheduler.defaultMaxVmPerTask);
- }
- return vMemForTask;
- }
-
- /**
- * Obtain the physical memory allocated for a job's tasks.
- *
- * If the job has a configured value for the max physical memory, that
- * will be returned. Else, the cluster-wide default physical memory for
- * tasks is returned.
- *
- * This method can only be called after
- * {...@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
- *
- * @param jConf JobConf of the job
- * @return the physical memory allocated for the job's tasks
- */
- private long getPhysicalMemoryForTask(JobConf jConf) {
- long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
- if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- pMemForTask =
- Math.round(getVirtualMemoryForTask(jConf)
- * scheduler.defaultPercentOfPmemInVmem);
- }
- return pMemForTask;
- }
-
- static class Memory {
- long vmem;
- long pmem;
-
- Memory(long vm, long pm) {
- this.vmem = vm;
- this.pmem = pm;
- }
- }
-
/**
* Find the memory that is already used by all the running tasks
* residing on the given TaskTracker.
*
* @param taskTracker
+ * @param taskType
* @return amount of memory that is used by the residing tasks,
* null if memory cannot be computed for some reason.
*/
- private synchronized Memory getMemReservedForTasks(
- TaskTrackerStatus taskTracker) {
- boolean disabledVmem = false;
- boolean disabledPmem = false;
-
- if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
- disabledVmem = true;
- }
-
- if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT)
{
- disabledPmem = true;
- }
-
- if (disabledVmem && disabledPmem) {
- return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
+ synchronized Long getMemReservedForTasks(
+ TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
long vmem = 0;
- long pmem = 0;
for (TaskStatus task : taskTracker.getTaskReports()) {
// the following task states are one in which the slot is
@@ -142,12 +63,12 @@
// accounted in used memory.
if ((task.getRunState() == TaskStatus.State.RUNNING)
|| (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
- JobInProgress job = scheduler.taskTrackerManager.getJob(
- task.getTaskID().getJobID());
+ JobInProgress job =
+ scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
if (job == null) {
// This scenario can happen if a job was completed/killed
- // and retired from JT's memory. In this state, we can ignore
- // the running task status and compute memory for the rest of
+ // and retired from JT's memory. In this state, we can ignore
+ // the running task status and compute memory for the rest of
// the tasks. However, any scheduling done with this computation
// could result in over-subscribing of memory for tasks on this
// TT (as the unaccounted for task is still running).
@@ -155,123 +76,99 @@
// One of the ways of doing that is to return null from here
// and check for null in the calling method.
LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
- + "a running / commit pending task: " + task.getTaskID()
- + " but no corresponding job was found. "
- + "Maybe job was retired. Not computing "
- + "memory values for this TT.");
+ + "a running / commit pending task: " + task.getTaskID()
+ + " but no corresponding job was found. "
+ + "Maybe job was retired. Not computing "
+ + "memory values for this TT.");
return null;
}
-
- JobConf jConf =
- scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
- .getJobConf();
- if (!disabledVmem) {
- vmem += getVirtualMemoryForTask(jConf);
- }
- if (!disabledPmem) {
- pmem += getPhysicalMemoryForTask(jConf);
+
+ JobConf jConf = job.getJobConf();
+
+ // Get the memory "allotted" for this task by rounding off the job's
+ // tasks' memory limits to the nearest multiple of the slot-memory-size
+ // set on JT. This essentially translates to tasks of a high memory job
+ // using multiple slots.
+ long myVmem = 0;
+ if (task.getIsMap() &&
taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ myVmem = jConf.getMemoryForMapTask();
+ myVmem =
+ (long) (scheduler.getMemSizeForMapSlot() * Math
+ .ceil((float) myVmem
+ / (float) scheduler.getMemSizeForMapSlot()));
+ } else if (!task.getIsMap()
+ && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ myVmem = jConf.getMemoryForReduceTask();
+ myVmem =
+ (long) (scheduler.getMemSizeForReduceSlot() * Math
+ .ceil((float) myVmem
+ / (float) scheduler.getMemSizeForReduceSlot()));
}
+ vmem += myVmem;
}
}
- return new Memory(vmem, pmem);
+ return Long.valueOf(vmem);
}
/**
- * Check if a TT has enough pmem and vmem to run this job.
+ * Check if a TT has enough memory to run of task specified from this job.
* @param job
+ * @param taskType
* @param taskTracker
* @return true if this TT has enough memory for this job. False otherwise.
*/
boolean matchesMemoryRequirements(JobInProgress job,
- TaskTrackerStatus taskTracker) {
+ CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
- // ////////////// vmem based scheduling
- if (!isSchedulingBasedOnVmemEnabled()) {
- LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
- + "and limitMaxVmemPerTasks is not configured. Scheduling based "
- + "on job's memory requirements is disabled, ignoring any value "
- + "set by job.");
- return true;
- }
-
- TaskTrackerStatus.ResourceStatus resourceStatus =
- taskTracker.getResourceStatus();
- long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
- long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
-
- if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
- || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
- return true;
- }
+ LOG.debug("Matching memory requirements of " + job.getJobID().toString()
+ + " for scheduling on " + taskTracker.trackerName);
- if (reservedVMemOnTT > totalVMemOnTT) {
+ if (!isSchedulingBasedOnMemEnabled()) {
+ LOG.debug("Scheduling based on job's memory requirements is disabled."
+ + " Ignoring any value set by job.");
return true;
}
- long jobVMemForTask = job.getMaxVirtualMemoryForTask();
- if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- jobVMemForTask = scheduler.defaultMaxVmPerTask;
- }
-
- Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
- if (memReservedForTasks == null) {
+ Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
+ if (memUsedOnTT == null) {
// For some reason, maybe because we could not find the job
// corresponding to a running task (as can happen if the job
// is retired in between), we could not compute the memory state
// on this TT. Treat this as an error, and fail memory
// requirements.
- LOG.info("Could not compute memory for taskTracker: "
- + taskTracker.getHost() + ". Failing memory requirements.");
+ LOG.info("Could not compute memory for taskTracker: "
+ + taskTracker.getHost() + ". Failing memory requirements.");
return false;
}
- long vmemUsedOnTT = memReservedForTasks.vmem;
- long pmemUsedOnTT = memReservedForTasks.pmem;
- long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
+ long totalMemUsableOnTT = 0;
- if (jobVMemForTask > freeVmemUsedOnTT) {
+ long memForThisTask = 0;
+ if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ memForThisTask = job.getJobConf().getMemoryForMapTask();
+ totalMemUsableOnTT =
+ scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
+ } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ memForThisTask = job.getJobConf().getMemoryForReduceTask();
+ totalMemUsableOnTT =
+ scheduler.getMemSizeForReduceSlot()
+ * taskTracker.getMaxReduceTasks();
+ }
+
+ long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
+ if (memForThisTask > freeMemOnTT) {
+ LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+ + freeMemOnTT + "). A " + taskType + " task from "
+ + job.getJobID().toString() + " cannot be scheduled on TT "
+ + taskTracker.trackerName);
return false;
}
- // ////////////// pmem based scheduling
-
- long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
- long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
- long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
- long freePmemUsedOnTT = 0;
-
- if (isSchedulingBasedOnPmemEnabled()) {
- if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
- || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
- return true;
- }
-
- if (reservedPmemOnTT > totalPmemOnTT) {
- return true;
- }
-
- if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- jobPMemForTask =
- Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
- }
-
- freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
-
- if (jobPMemForTask > freePmemUsedOnTT) {
- return false;
- }
- } else {
- LOG.debug("One of the configuration parameters "
- + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
- + "configured. Scheduling based on job's physical memory "
- + "requirements is disabled, ignoring any value set by job.");
- }
-
- LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
- + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
- + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
- + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
+ LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+ + freeMemOnTT + ". A " + taskType.toString() + " task from "
+ + job.getJobID().toString() + " matches memory requirements on TT "
+ + taskTracker.trackerName);
return true;
}
}