Author: ddas
Date: Thu Jun 4 08:34:00 2009
New Revision: 781683
URL: http://svn.apache.org/viewvc?rev=781683&view=rev
Log:
HADOOP-5170. Allows jobs to set max maps/reduces per-node and per-cluster.
Contributed by Matei Zaharia.
Added:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/mapred-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun 4 08:34:00 2009
@@ -134,6 +134,9 @@
HADOOP-5844. Use mysqldump when connecting to local mysql instance in
Sqoop.
(Aaron Kimball via tomwhite)
+ HADOOP-5170. Allows jobs to set max maps/reduces per-node and per-cluster.
+ (Matei Zaharia via ddas)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Thu Jun 4 08:34:00 2009
@@ -941,4 +941,32 @@
</description>
</property>
+<property>
+ <name>mapred.max.maps.per.node</name>
+ <value>-1</value>
+ <description>Per-node limit on running map tasks for the job. A value
+ of -1 signifies no limit.</description>
+</property>
+
+<property>
+ <name>mapred.max.reduces.per.node</name>
+ <value>-1</value>
+ <description>Per-node limit on running reduce tasks for the job. A value
+ of -1 signifies no limit.</description>
+</property>
+
+<property>
+ <name>mapred.running.map.limit</name>
+ <value>-1</value>
+ <description>Cluster-wide limit on running map tasks for the job. A value
+ of -1 signifies no limit.</description>
+</property>
+
+<property>
+ <name>mapred.running.reduce.limit</name>
+ <value>-1</value>
+ <description>Cluster-wide limit on running reduce tasks for the job. A value
+ of -1 signifies no limit.</description>
+</property>
+
</configuration>
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Thu Jun
4 08:34:00 2009
@@ -1430,6 +1430,78 @@
}
/**
+ * Get the per-node limit on running maps for the job
+ *
+ * @return per-node running map limit
+ */
+ public int getMaxMapsPerNode() {
+ return getInt("mapred.max.maps.per.node", -1);
+ }
+
+ /**
+ * Set the per-node limit on running maps for the job
+ *
+ * @param limit per-node running map limit
+ */
+ public void setMaxMapsPerNode(int limit) {
+ setInt("mapred.max.maps.per.node", limit);
+ }
+
+ /**
+ * Get the per-node limit on running reduces for the job
+ *
+ * @return per-node running reduce limit
+ */
+ public int getMaxReducesPerNode() {
+ return getInt("mapred.max.reduces.per.node", -1);
+ }
+
+ /**
+ * Set the per-node limit on running reduces for the job
+ *
+ * @param limit per-node running reduce limit
+ */
+ public void setMaxReducesPerNode(int limit) {
+ setInt("mapred.max.reduces.per.node", limit);
+ }
+
+ /**
+ * Get the cluster-wide limit on running maps for the job
+ *
+ * @return cluster-wide running map limit
+ */
+ public int getRunningMapLimit() {
+ return getInt("mapred.running.map.limit", -1);
+ }
+
+ /**
+ * Set the cluster-wide limit on running maps for the job
+ *
+ * @param limit cluster-wide running map limit
+ */
+ public void setRunningMapLimit(int limit) {
+ setInt("mapred.running.map.limit", limit);
+ }
+
+ /**
+ * Get the cluster-wide limit on running reduces for the job
+ *
+ * @return cluster-wide running reduce limit
+ */
+ public int getRunningReduceLimit() {
+ return getInt("mapred.running.reduce.limit", -1);
+ }
+
+ /**
+ * Set the cluster-wide limit on running reduces for the job
+ *
+ * @param limit cluster-wide running reduce limit
+ */
+ public void setRunningReduceLimit(int limit) {
+ setInt("mapred.running.reduce.limit", limit);
+ }
+
+ /**
* Normalize the negative values in configuration
*
* @param val
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=781683&r1=781682&r2=781683&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Thu Jun 4 08:34:00 2009
@@ -87,6 +87,12 @@
int speculativeMapTasks = 0;
int speculativeReduceTasks = 0;
+ // Limits on concurrent running tasks per-node and cluster-wide
+ private int maxMapsPerNode;
+ private int maxReducesPerNode;
+ private int runningMapLimit;
+ private int runningReduceLimit;
+
int mapFailuresPercent = 0;
int reduceFailuresPercent = 0;
int failedMapTIPs = 0;
@@ -257,6 +263,11 @@
this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+ this.maxMapsPerNode = conf.getMaxMapsPerNode();
+ this.maxReducesPerNode = conf.getMaxReducesPerNode();
+ this.runningMapLimit = conf.getRunningMapLimit();
+ this.runningReduceLimit = conf.getRunningReduceLimit();
MetricsContext metricsContext = MetricsUtil.getContext("mapred");
this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
@@ -1679,6 +1690,10 @@
//
this.clusterSize = clusterSize;
+ if (!belowRunningTaskLimit(tts, true)) {
+ return -1;
+ }
+
if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;
}
@@ -1883,9 +1898,13 @@
String taskTracker = tts.getTrackerName();
TaskInProgress tip = null;
-
+
// Update the last-known clusterSize
this.clusterSize = clusterSize;
+
+ if (!belowRunningTaskLimit(tts, false)) {
+ return -1;
+ }
if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;
@@ -1939,6 +1958,42 @@
}
return true;
}
+
+ /**
+ * Check whether we are below the running task limits (per node and cluster
+ * wide) for a given type of task on a given task tracker.
+ *
+ * @param tts task tracker to check on
+ * @param map true if looking at map tasks, false for reduce tasks
+ * @return true if we are below both the cluster-wide and the per-node
+ * running task limit for the given type of task
+ */
+ private boolean belowRunningTaskLimit(TaskTrackerStatus tts, boolean map) {
+ int runningTasks = map ? runningMapTasks : runningReduceTasks;
+ int clusterLimit = map ? runningMapLimit : runningReduceLimit;
+ int perNodeLimit = map ? maxMapsPerNode : maxReducesPerNode;
+
+ // Check cluster-wide limit
+ if (clusterLimit != -1 && runningTasks >= clusterLimit) {
+ return false;
+ }
+
+ // Check per-node limit
+ if (perNodeLimit != -1) {
+ int runningTasksOnNode = 0;
+ for (TaskStatus ts: tts.getTaskReports()) {
+ if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() == map &&
+ ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+ runningTasksOnNode++;
+ }
+ }
+ if (runningTasksOnNode >= perNodeLimit) {
+ return false;
+ }
+ }
+
+ return true;
+ }
/**
* A taskid assigned to this JobInProgress has reported in successfully.
Added:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java?rev=781683&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
(added)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRunningTaskLimits.java
Thu Jun 4 08:34:00 2009
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import
org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * Test the running task limits - mapred.max.maps.per.node,
+ * mapred.max.reduces.per.node, mapred.max.running.maps and
+ * mapred.max.running.reduces.
+ */
+public class TestRunningTaskLimits extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestRunningTaskLimits.class);
+ private static final Path TEST_DIR =
+ new Path(System.getProperty("test.build.data", "/tmp"),
+ "test-running-task-limits");
+
+ /**
+ * This test creates a cluster with 1 tasktracker with 3 map and 3 reduce
+ * slots. We then submit a job with a limit of 2 maps and 1 reduce per
+ * node, and check that these limits are obeyed in launching tasks.
+ */
+ public void testPerNodeLimits() throws Exception {
+ LOG.info("Running testPerNodeLimits");
+ FileSystem fs = FileSystem.get(new Configuration());
+ fs.delete(TEST_DIR, true); // cleanup test dir
+
+ // Create a cluster with 1 tasktracker with 3 map slots and 3 reduce slots
+ JobConf conf = new JobConf();
+ conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+ conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+
+ // Create a job with limits of 3 maps/node and 2 reduces/node
+ JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+ jobConf.setMaxMapsPerNode(2);
+ jobConf.setMaxReducesPerNode(1);
+
+ // Submit the job
+ RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+
+ // Wait 20 seconds for it to start up
+ UtilsForTests.waitFor(20000);
+
+ // Check the number of running tasks
+ JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ JobInProgress jip = jobTracker.getJob(rJob.getID());
+ assertEquals(2, jip.runningMaps());
+ assertEquals(1, jip.runningReduces());
+
+ rJob.killJob();
+ mr.shutdown();
+ }
+
+ /**
+ * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce
+ * slots each. We then submit a job with a limit of 5 maps and 3 reduces
+ * cluster-wide, and check that these limits are obeyed in launching tasks.
+ */
+ public void testClusterWideLimits() throws Exception {
+ LOG.info("Running testClusterWideLimits");
+ FileSystem fs = FileSystem.get(new Configuration());
+ fs.delete(TEST_DIR, true); // cleanup test dir
+
+ // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
+ JobConf conf = new JobConf();
+ conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+ conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+ MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
+
+ // Create a job with limits of 10 maps and 5 reduces on the entire cluster
+ JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+ jobConf.setRunningMapLimit(5);
+ jobConf.setRunningReduceLimit(3);
+
+ // Submit the job
+ RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+
+ // Wait 20 seconds for it to start up
+ UtilsForTests.waitFor(20000);
+
+ // Check the number of running tasks
+ JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ JobInProgress jip = jobTracker.getJob(rJob.getID());
+ assertEquals(5, jip.runningMaps());
+ assertEquals(3, jip.runningReduces());
+
+ rJob.killJob();
+ mr.shutdown();
+ }
+
+ /**
+ * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce
+ * slots each. We then submit a job with a limit of 5 maps and 3 reduces
+ * cluster-wide, and 2 maps and 2 reduces per node. We should end up with
+ * 4 maps and 3 reduces running: the maps hit the per-node limit first,
+ * while the reduces hit the cluster-wide limit.
+ */
+ public void testClusterWideAndPerNodeLimits() throws Exception {
+ LOG.info("Running testClusterWideAndPerNodeLimits");
+ FileSystem fs = FileSystem.get(new Configuration());
+ fs.delete(TEST_DIR, true); // cleanup test dir
+
+ // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
+ JobConf conf = new JobConf();
+ conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+ conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+ MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
+
+ // Create a job with limits of 10 maps and 5 reduces on the entire cluster
+ JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+ jobConf.setRunningMapLimit(5);
+ jobConf.setRunningReduceLimit(3);
+ jobConf.setMaxMapsPerNode(2);
+ jobConf.setMaxReducesPerNode(2);
+
+ // Submit the job
+ RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+
+ // Wait 20 seconds for it to start up
+ UtilsForTests.waitFor(20000);
+
+ // Check the number of running tasks
+ JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ JobInProgress jip = jobTracker.getJob(rJob.getID());
+ assertEquals(4, jip.runningMaps());
+ assertEquals(3, jip.runningReduces());
+
+ rJob.killJob();
+ mr.shutdown();
+ }
+
+ /**
+ * Create a JobConf for a job using the WaitingMapper and IdentityReducer,
+ * which will sleep until a signal file is created. In this test we never
+ * create the signal file so the job just occupies slots for the duration
+ * of the test as they are assigned to it.
+ */
+ JobConf createWaitJobConf(MiniMRCluster mr, String jobName,
+ int numMaps, int numRed)
+ throws IOException {
+ JobConf jobConf = mr.createJobConf();
+ Path inDir = new Path(TEST_DIR, "input");
+ Path outDir = new Path(TEST_DIR, "output-" + jobName);
+ String signalFile = new Path(TEST_DIR, "signal").toString();
+ jobConf.setJobName(jobName);
+ jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileInputFormat.setInputPaths(jobConf, inDir);
+ FileOutputFormat.setOutputPath(jobConf, outDir);
+ jobConf.setMapperClass(UtilsForTests.WaitingMapper.class);
+ jobConf.setReducerClass(IdentityReducer.class);
+ jobConf.setOutputKeyClass(BytesWritable.class);
+ jobConf.setOutputValueClass(BytesWritable.class);
+ jobConf.setInputFormat(RandomInputFormat.class);
+ jobConf.setNumMapTasks(numMaps);
+ jobConf.setNumReduceTasks(numRed);
+ jobConf.setJar("build/test/mapred/testjar/testjob.jar");
+ jobConf.set(UtilsForTests.getTaskSignalParameter(true), signalFile);
+ jobConf.set(UtilsForTests.getTaskSignalParameter(false), signalFile);
+ // Disable reduce slow start to begin reduces ASAP
+ jobConf.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
+ return jobConf;
+ }
+}