Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=689064&r1=689063&r2=689064&view=diff ============================================================================== --- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original) +++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Tue Aug 26 06:07:43 2008 @@ -1082,7 +1082,9 @@ </p> <p>Users/admins can also specify the maximum virtual memory - of the launched child-task using <code>mapred.child.ulimit</code>. + of the launched child-task, and any sub-process it launches + recursively, using <code>mapred.child.ulimit</code>. Note that + the value set here is a per process limit. The value for <code>mapred.child.ulimit</code> should be specified in kilo bytes (KB). And also the value must be greater than or equal to the -Xmx passed to JavaVM, else the VM might not start. @@ -1094,6 +1096,27 @@ <a href="cluster_setup.html#Configuring+the+Environment+of+the+Hadoop+Daemons"> cluster_setup.html </a></p> + <p>There are two additional parameters that influence virtual memory + limits for tasks run on a tasktracker. The parameter + <code>mapred.tasktracker.maxmemory</code> is set by admins + to limit the total memory all tasks that it runs can use together. + Setting this enables the parameter <code>mapred.task.maxmemory</code> + that can be used to specify the maximum virtual memory the entire + process tree starting from the launched child-task requires. + This is a cumulative limit of all processes in the process tree. + By specifying this value, users can be assured that the system will + run their tasks only on tasktrackers that have atleast this amount + of free memory available. If at any time during task execution, this + limit is exceeded, the task would be killed by the system. By default, + any task would get a share of + <code>mapred.tasktracker.maxmemory</code>, divided + equally among the number of slots. The user can thus verify if the + tasks need more memory than this, and specify it in + <code>mapred.task.maxmemory</code>. Specifically, this value must be + greater than any value specified for a maximum heap-size + of the child jvm via <code>mapred.child.java.opts</code>, or a ulimit + value in <code>mapred.child.ulimit</code>. </p> + <p>The task tracker has local directory, <code> ${mapred.local.dir}/taskTracker/</code> to create localized cache and localized job. It can define multiple local directories
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=689064&r1=689063&r2=689064&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Tue Aug 26 06:07:43 2008 @@ -105,6 +105,12 @@ private static final Log LOG = LogFactory.getLog(JobConf.class); /** + * A value which if set for memory related configuration options, + * indicates that the options are turned off. + */ + public static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L; + + /** * Construct a map/reduce job configuration. */ public JobConf() {} @@ -1285,6 +1291,67 @@ return get("job.local.dir"); } + /** + * The maximum amount of virtual memory all tasks running on a + * tasktracker, including sub-processes they launch, can use. + * + * This value is used to compute the amount of free memory + * available for tasks. Any task scheduled on this tasktracker is + * guaranteed and constrained to use a share of this amount. Any task + * exceeding its share will be killed. + * + * If set to [EMAIL PROTECTED] #DISABLED_VIRTUAL_MEMORY_LIMIT}, this functionality + * is disabled. + * + * @return maximum amount of virtual memory to divide among + * @see #getMaxVirtualMemoryForTask() + */ + public long getMaxVirtualMemoryForTasks() { + return getLong("mapred.tasktracker.tasks.maxmemory", + DISABLED_VIRTUAL_MEMORY_LIMIT); + } + + /** + * Set the maximum amount of virtual memory all tasks running on a + * tasktracker, including sub-processes they launch, can use. + * + * @param vmem maximum amount of virtual memory that can be used. + * @see #getMaxVirtualMemoryForTasks() + */ + public void setMaxVirtualMemoryForTasks(long vmem) { + setLong("mapred.tasktracker.tasks.maxmemory", vmem); + } + + /** + * The maximum amount of memory any task of this job will use. + * + * A task of this job will be scheduled on a tasktracker, only if the + * amount of free memory on the tasktracker is greater than + * or equal to this value. + * + * If set to [EMAIL PROTECTED] #DISABLED_VIRTUAL_MEMORY_LIMIT}, tasks are assured + * a memory limit on the tasktracker equal to + * mapred.tasktracker.tasks.maxmemory/number of slots. If the value of + * mapred.tasktracker.tasks.maxmemory is set to -1, this value is + * ignored. + * + * @return The maximum amount of memory any task of this job will use. + * @see #getMaxVirtualMemoryForTasks() + */ + public long getMaxVirtualMemoryForTask() { + return getLong("mapred.task.maxmemory", DISABLED_VIRTUAL_MEMORY_LIMIT); + } + + /** + * Set the maximum amount of memory any task of this job can use. + * + * @param vmem Maximum amount of memory any task of this job can use. + * @see #getMaxVirtualMemoryForTask() + */ + public void setMaxVirtualMemoryForTask(long vmem) { + setLong("mapred.task.maxmemory", vmem); + } + /** * Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=689064&r1=689063&r2=689064&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 26 06:07:43 2008 @@ -133,7 +133,8 @@ private boolean hasSpeculativeMaps; private boolean hasSpeculativeReduces; private long inputLength = 0; - + private long maxVirtualMemoryForTask; + // Per-job counters public static enum Counter { NUM_FAILED_MAPS, @@ -225,6 +226,7 @@ this.nonRunningReduces = new LinkedList<TaskInProgress>(); this.runningReduces = new LinkedHashSet<TaskInProgress>(); this.resourceEstimator = new ResourceEstimator(this); + this.maxVirtualMemoryForTask = conf.getMaxVirtualMemoryForTask(); } /** @@ -441,6 +443,11 @@ this.priority = priority; } } + + // Accessors for resources. + public long getMaxVirtualMemoryForTask() { + return maxVirtualMemoryForTask; + } long getInputLength() { return inputLength; @@ -1089,9 +1096,10 @@ long outSize = resourceEstimator.getEstimatedMapOutputSize(); - if(tts.getAvailableSpace() < outSize) { + long availSpace = tts.getResourceStatus().getAvailableSpace(); + if(availSpace < outSize) { LOG.warn("No room for map task. Node " + node + - " has " + tts.getAvailableSpace() + + " has " + availSpace + " bytes free; but we expect map to take " + outSize); return -1; //see if a different TIP might work better. @@ -1295,9 +1303,10 @@ } long outSize = resourceEstimator.getEstimatedReduceInputSize(); - if(tts.getAvailableSpace() < outSize) { + long availSpace = tts.getResourceStatus().getAvailableSpace(); + if(availSpace < outSize) { LOG.warn("No room for reduce task. Node " + taskTracker + " has " + - tts.getAvailableSpace() + + availSpace + " bytes free; but we expect reduce input to take " + outSize); return -1; //see if a different TIP might work better. Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=689064&r1=689063&r2=689064&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Aug 26 06:07:43 2008 @@ -1653,7 +1653,15 @@ TaskInProgress tip = getTip(tipid); return (tip == null ? null : tip.getCounters()); } - + + /** + * Returns the configured task scheduler for this job tracker. + * @return the configured task scheduler + */ + TaskScheduler getTaskScheduler() { + return taskScheduler; + } + /** * Returns specified TaskInProgress, or null. */ Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=689064&r1=689063&r2=689064&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Aug 26 06:07:43 2008 @@ -177,6 +177,11 @@ private MapEventsFetcherThread mapEventsFetcher; int workerThreads; private CleanupQueue directoryCleanupThread; + private long maxVirtualMemoryForTasks + = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT; + private long defaultMemoryPerTask = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT; + + /** * the minimum interval between jobtracker polls */ @@ -432,6 +437,13 @@ "Map-events fetcher for all reduce tasks " + "on " + taskTrackerName); mapEventsFetcher.start(); + maxVirtualMemoryForTasks = fConf.getMaxVirtualMemoryForTasks(); + if (maxVirtualMemoryForTasks != + JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) { + defaultMemoryPerTask = maxVirtualMemoryForTasks / + (maxCurrentMapTasks + + maxCurrentReduceTasks); + } this.running = true; } @@ -704,7 +716,11 @@ } launchTaskForJob(tip, new JobConf(rjob.jobFile)); } - + + private long getDefaultMemoryPerTask() { + return defaultMemoryPerTask; + } + private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ synchronized (tip) { try { @@ -1034,7 +1050,12 @@ if (askForNewTask) { checkLocalDirs(fConf.getLocalDirs()); askForNewTask = enoughFreeSpace(localMinSpaceStart); - status.setAvailableSpace( getFreeSpace() ); + status.getResourceStatus().setAvailableSpace( getFreeSpace() ); + long freeVirtualMem = findFreeVirtualMemory(); + LOG.debug("Setting amount of free virtual memory for the new task: " + + freeVirtualMem); + status.getResourceStatus().setFreeVirtualMemory(freeVirtualMem); + status.getResourceStatus().setDefaultVirtualMemoryPerTask(getDefaultMemoryPerTask()); } // @@ -1080,6 +1101,68 @@ } /** + * Return the maximum amount of memory available for all tasks on + * this tracker + * @return maximum amount of virtual memory + */ + long getMaxVirtualMemoryForTasks() { + return maxVirtualMemoryForTasks; + } + + /** + * Find the minimum amount of virtual memory that would be + * available for a new task. + * + * The minimum amount of virtual memory is computed by looking + * at the maximum amount of virtual memory that is allowed for + * all tasks in the system, as per mapred.tasktracker.tasks.maxmemory, + * and the total amount of maximum virtual memory that can be + * used by all currently running tasks. + * + * @return amount of free virtual memory that can be assured for + * new tasks + */ + private synchronized long findFreeVirtualMemory() { + + if (maxVirtualMemoryForTasks == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) { + // this will disable picking up tasks based on free memory. + return JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT; + } + + long maxMemoryUsed = 0L; + for (TaskInProgress tip: runningTasks.values()) { + // the following task states are one in which the slot is + // still occupied and hence memory of the task should be + // accounted in used memory. + if ((tip.getRunState() == TaskStatus.State.RUNNING) + || (tip.getRunState() == TaskStatus.State.UNASSIGNED) + || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) { + maxMemoryUsed += getMemoryForTask(tip); + } + } + + return (maxVirtualMemoryForTasks - maxMemoryUsed); + } + + /** + * Return the memory allocated for a TIP. + * + * If the TIP's job has a configured value for the max memory that is + * returned. Else, the default memory that would be assigned for the + * task is returned. + * @param tip The TaskInProgress + * @return the memory allocated for the TIP. + */ + private long getMemoryForTask(TaskInProgress tip) { + long memForTask = tip.getJobConf().getMaxVirtualMemoryForTask(); + if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) { + memForTask = this.getDefaultMemoryPerTask(); + } + return memForTask; + } + + + /** * Check if the jobtracker directed a 'reset' of the tasktracker. * * @param actions the directives of the jobtracker for the tasktracker. Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=689064&r1=689063&r2=689064&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Aug 26 06:07:43 2008 @@ -48,13 +48,98 @@ volatile long lastSeen; private int maxMapTasks; private int maxReduceTasks; - long availableSpace; //space available on this node + + /** + * Class representing a collection of resources on this tasktracker. + */ + static class ResourceStatus implements Writable { + + private long freeVirtualMemory; + private long defaultVirtualMemoryPerTask; + private long availableSpace; + + ResourceStatus() { + freeVirtualMemory = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT; + defaultVirtualMemoryPerTask = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT; + availableSpace = Long.MAX_VALUE; + } + + /** + * Set the amount of free virtual memory that is available for running + * a new task + * @param freeVMem amount of free virtual memory + */ + void setFreeVirtualMemory(long freeVmem) { + freeVirtualMemory = freeVmem; + } + + /** + * Get the amount of free virtual memory that will be available for + * running a new task. + * + * If this is [EMAIL PROTECTED] JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should + * be ignored and not used in computation. + * + [EMAIL PROTECTED] amount of free virtual memory. + */ + long getFreeVirtualMemory() { + return freeVirtualMemory; + } + + /** + * Set the default amount of virtual memory per task. + * @param vmem amount of free virtual memory. + */ + void setDefaultVirtualMemoryPerTask(long defaultVmem) { + defaultVirtualMemoryPerTask = defaultVmem; + } + + /** + * Get the default amount of virtual memory per task. + * + * This amount will be returned if a task's job does not specify any + * virtual memory itself. If this is + * [EMAIL PROTECTED] JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should be ignored + * and not used in any computation. + * + * @return default amount of virtual memory per task. + */ + long getDefaultVirtualMemoryPerTask() { + return defaultVirtualMemoryPerTask; + } + + void setAvailableSpace(long availSpace) { + availableSpace = availSpace; + } + /** + * Will return LONG_MAX if space hasn't been measured yet. + * @return bytes of available local disk space on this tasktracker. + */ + long getAvailableSpace() { + return availableSpace; + } + + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, freeVirtualMemory); + WritableUtils.writeVLong(out, defaultVirtualMemoryPerTask); + WritableUtils.writeVLong(out, availableSpace); + } + + public void readFields(DataInput in) throws IOException { + freeVirtualMemory = WritableUtils.readVLong(in);; + defaultVirtualMemoryPerTask = WritableUtils.readVLong(in);; + availableSpace = WritableUtils.readVLong(in);; + } + } + + private ResourceStatus resStatus; + /** */ public TaskTrackerStatus() { taskReports = new ArrayList<TaskStatus>(); - this.availableSpace = Long.MAX_VALUE; //not measured by default. + resStatus = new ResourceStatus(); } /** @@ -71,7 +156,7 @@ this.failures = failures; this.maxMapTasks = maxMapTasks; this.maxReduceTasks = maxReduceTasks; - this.availableSpace = Long.MAX_VALUE; //not measured by default. + this.resStatus = new ResourceStatus(); } /** @@ -171,18 +256,15 @@ } /** - * Will return LONG_MAX if space hasn't been measured yet. - * @return bytes of available local disk space on this tasktracker. + * Return the [EMAIL PROTECTED] ResourceStatus} object configured with this + * status. + * + * @return the resource status */ - public long getAvailableSpace() { - return availableSpace; - } - - public void setAvailableSpace(long a) { - availableSpace = a; + ResourceStatus getResourceStatus() { + return resStatus; } - /////////////////////////////////////////// // Writable /////////////////////////////////////////// @@ -193,8 +275,9 @@ out.writeInt(failures); out.writeInt(maxMapTasks); out.writeInt(maxReduceTasks); + resStatus.write(out); out.writeInt(taskReports.size()); - out.writeLong(availableSpace); + for (TaskStatus taskStatus : taskReports) { TaskStatus.writeTaskStatus(out, taskStatus); } @@ -207,9 +290,10 @@ this.failures = in.readInt(); this.maxMapTasks = in.readInt(); this.maxReduceTasks = in.readInt(); + resStatus.readFields(in); taskReports.clear(); int numTasks = in.readInt(); - this.availableSpace = in.readLong(); + for (int i = 0; i < numTasks; i++) { taskReports.add(TaskStatus.readTaskStatus(in)); } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=689064&r1=689063&r2=689064&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Aug 26 06:07:43 2008 @@ -73,7 +73,11 @@ public int getJobTrackerInfoPort() { return tracker.getInfoPort(); } - + + public JobTracker getJobTracker() { + return tracker; + } + /** * Create the job tracker and run it. */ @@ -116,12 +120,18 @@ volatile boolean isDead = false; int numDir; - TaskTrackerRunner(int trackerId, int numDir, String hostname) + TaskTrackerRunner(int trackerId, int numDir, String hostname, + JobConf cfg) throws IOException { this.trackerId = trackerId; this.numDir = numDir; localDirs = new String[numDir]; - JobConf conf = createJobConf(); + JobConf conf = null; + if (cfg == null) { + conf = createJobConf(); + } else { + conf = createJobConf(cfg); + } if (hostname != null) { conf.set("slave.host.name", hostname); } @@ -216,6 +226,10 @@ taskTrackerList.get(taskTracker)).getLocalDir(); } + public JobTrackerRunner getJobTrackerRunner() { + return jobTracker; + } + /** * Get the number of task trackers in the cluster */ @@ -413,7 +427,7 @@ } TaskTrackerRunner taskTracker; taskTracker = new TaskTrackerRunner(idx, numDir, - hosts == null ? null : hosts[idx]); + hosts == null ? null : hosts[idx], conf); Thread taskTrackerThread = new Thread(taskTracker); taskTrackerList.add(taskTracker); Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java?rev=689064&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java Tue Aug 26 06:07:43 2008 @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.util.ToolRunner; + +import junit.framework.TestCase; + +/** + * This test class tests the functionality related to configuring, reporting + * and computing memory related parameters in a Map/Reduce cluster. + * + * Each test sets up a [EMAIL PROTECTED] MiniMRCluster} with a locally defined + * [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates + * the memory related configuration is correctly computed and reported from + * the tasktracker in + * [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskScheduler.assignTasks()}. + * + */ +public class TestHighRAMJobs extends TestCase { + + private static final Log LOG = LogFactory.getLog(TestHighRAMJobs.class); + + private static final String DEFAULT_SLEEP_JOB_MAP_COUNT = "1"; + private static final String DEFAULT_SLEEP_JOB_REDUCE_COUNT = "1"; + private static final String DEFAULT_MAP_SLEEP_TIME = "1000"; + private static final String DEFAULT_REDUCE_SLEEP_TIME = "1000"; + private static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L; + + private MiniDFSCluster miniDFSCluster; + private MiniMRCluster miniMRCluster; + + public static class FakeTaskScheduler extends JobQueueTaskScheduler { + + private boolean hasPassed = true; + private String message; + private boolean isFirstTime = true; + + public FakeTaskScheduler() { + super(); + } + + public boolean hasTestPassed() { + return hasPassed; + } + + public String getFailureMessage() { + return message; + } + + @Override + public List<Task> assignTasks(TaskTrackerStatus status) + throws IOException { + TestHighRAMJobs.LOG.info("status = " + status.getResourceStatus().getFreeVirtualMemory()); + + long initialFreeMemory = getConf().getLong("initialFreeMemory", 0L); + long memoryPerTaskOnTT = getConf().getLong("memoryPerTaskOnTT", 0L); + + if (isFirstTime) { + isFirstTime = false; + if (initialFreeMemory != status.getResourceStatus().getFreeVirtualMemory()) { + hasPassed = false; + message = "Initial memory expected = " + initialFreeMemory + + " reported = " + status.getResourceStatus().getFreeVirtualMemory(); + } + if (memoryPerTaskOnTT != status.getResourceStatus().getDefaultVirtualMemoryPerTask()) { + hasPassed = false; + message = "Memory per task on TT expected = " + memoryPerTaskOnTT + + " reported = " + + status.getResourceStatus().getDefaultVirtualMemoryPerTask(); + } + } else if (initialFreeMemory != DISABLED_VIRTUAL_MEMORY_LIMIT) { + + long memoryPerTask = memoryPerTaskOnTT; // by default + if (getConf().getLong("memoryPerTask", 0L) != + DISABLED_VIRTUAL_MEMORY_LIMIT) { + memoryPerTask = getConf().getLong("memoryPerTask", 0L); + } + + long expectedFreeMemory = 0; + int runningTaskCount = status.countMapTasks() + + status.countReduceTasks(); + expectedFreeMemory = initialFreeMemory - + (memoryPerTask * runningTaskCount); + + TestHighRAMJobs.LOG.info("expected free memory = " + + expectedFreeMemory + ", reported = " + + status.getResourceStatus().getFreeVirtualMemory()); + if (expectedFreeMemory != status.getResourceStatus().getFreeVirtualMemory()) { + hasPassed = false; + message = "Expected free memory after " + runningTaskCount + + " tasks are scheduled = " + expectedFreeMemory + + ", reported = " + status.getResourceStatus().getFreeVirtualMemory(); + } + } + return super.assignTasks(status); + } + } + + /* Test that verifies default values are configured and reported + * correctly. + */ + public void testDefaultValuesForHighRAMJobs() throws Exception { + long defaultMemoryLimit = DISABLED_VIRTUAL_MEMORY_LIMIT; + try { + setUpCluster(defaultMemoryLimit, defaultMemoryLimit, + defaultMemoryLimit, null); + runJob(defaultMemoryLimit, DEFAULT_MAP_SLEEP_TIME, + DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT, + DEFAULT_SLEEP_JOB_REDUCE_COUNT); + verifyTestResults(); + } finally { + tearDownCluster(); + } + } + + /* Test that verifies default value for memory per task on TT + * when the number of slots is non-default. + */ + public void testDefaultMemoryPerTask() throws Exception { + long maxVmem = 1024*1024*1024L; + JobConf conf = new JobConf(); + conf.setInt("mapred.tasktracker.map.tasks.maximum", 1); + conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); + // change number of slots to 2. + long defaultMemPerTaskOnTT = maxVmem / 2; + try { + setUpCluster(maxVmem, defaultMemPerTaskOnTT, + DISABLED_VIRTUAL_MEMORY_LIMIT, conf); + runJob(DISABLED_VIRTUAL_MEMORY_LIMIT, DEFAULT_MAP_SLEEP_TIME, + DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT, + DEFAULT_SLEEP_JOB_REDUCE_COUNT); + verifyTestResults(); + } finally { + tearDownCluster(); + } + } + + /* Test that verifies configured value for free memory is + * reported correctly. The test does NOT configure a value for + * memory per task. Hence, it also verifies that the default value + * per task on the TT is calculated correctly. + */ + public void testConfiguredValueForFreeMemory() throws Exception { + long maxVmem = 1024*1024*1024L; + long defaultMemPerTaskOnTT = maxVmem/4; // 4 = default number of slots. + try { + setUpCluster(maxVmem, defaultMemPerTaskOnTT, + DISABLED_VIRTUAL_MEMORY_LIMIT, null); + runJob(DISABLED_VIRTUAL_MEMORY_LIMIT, "10000", + DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT, + DEFAULT_SLEEP_JOB_REDUCE_COUNT); + verifyTestResults(); + } finally { + tearDownCluster(); + } + } + + public void testHighRAMJob() throws Exception { + long maxVmem = 1024*1024*1024L; + long defaultMemPerTaskOnTT = maxVmem/4; // 4 = default number of slots. + /* Set a HIGH RAM requirement for a job. As 4 is the + * default number of slots, we set up the memory limit + * per task to be more than 25%. + */ + long maxVmemPerTask = maxVmem/3; + try { + setUpCluster(maxVmem, defaultMemPerTaskOnTT, + maxVmemPerTask, null); + /* set up sleep limits higher, so the scheduler will see varying + * number of running tasks at a time. Also modify the number of + * map tasks so we test the iteration over more than one task. + */ + runJob(maxVmemPerTask, "10000", "10000", "2", + DEFAULT_SLEEP_JOB_REDUCE_COUNT); + verifyTestResults(); + } finally { + tearDownCluster(); + } + } + + private void setUpCluster(long initialFreeMemory, long memoryPerTaskOnTT, + long memoryPerTask, JobConf conf) + throws Exception { + if (conf == null) { + conf = new JobConf(); + } + conf.setClass("mapred.jobtracker.taskScheduler", + TestHighRAMJobs.FakeTaskScheduler.class, + TaskScheduler.class); + if (initialFreeMemory != -1L) { + conf.setMaxVirtualMemoryForTasks(initialFreeMemory); + } + conf.setLong("initialFreeMemory", initialFreeMemory); + conf.setLong("memoryPerTaskOnTT", memoryPerTaskOnTT); + conf.setLong("memoryPerTask", memoryPerTask); + miniDFSCluster = new MiniDFSCluster(conf, 1, true, null); + FileSystem fileSys = miniDFSCluster.getFileSystem(); + String namenode = fileSys.getUri().toString(); + miniMRCluster = new MiniMRCluster(1, namenode, 3, + null, null, conf); + } + + private void runJob(long memoryPerTask, String mapSleepTime, + String reduceSleepTime, String mapTaskCount, + String reduceTaskCount) + throws Exception { + Configuration sleepJobConf = new Configuration(); + sleepJobConf.set("mapred.job.tracker", "localhost:" + + miniMRCluster.getJobTrackerPort()); + if (memoryPerTask != -1L) { + sleepJobConf.setLong("mapred.task.maxmemory", memoryPerTask); + } + launchSleepJob(mapSleepTime, reduceSleepTime, + mapTaskCount, reduceTaskCount, sleepJobConf); + } + + private void launchSleepJob(String mapSleepTime, String reduceSleepTime, + String mapTaskCount, String reduceTaskCount, + Configuration conf) throws Exception { + String[] args = { "-m", mapTaskCount, "-r", reduceTaskCount, + "-mt", mapSleepTime, "-rt", reduceSleepTime }; + ToolRunner.run(conf, new SleepJob(), args); + } + + private void verifyTestResults() { + FakeTaskScheduler scheduler = + (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner(). + getJobTracker().getTaskScheduler(); + assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed()); + } + + private void tearDownCluster() { + if (miniMRCluster != null) { miniMRCluster.shutdown(); } + if (miniDFSCluster != null) { miniDFSCluster.shutdown(); } + } +}
