Author: omalley
Date: Fri Mar 4 03:30:02 2011
New Revision: 1077009
URL: http://svn.apache.org/viewvc?rev=1077009&view=rev
Log:
commit 1f2a5df1b2a40a3aebca212904165f3fd6d9f85c
Author: Arun C Murthy <[email protected]>
Date: Mon Sep 28 14:28:46 2009 -0700
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
heartbeat on task-completion for better job-latency. Contributed by Arun C.
Murthy
Configuration changes:
add mapreduce.tasktracker.outofband.heartbeat
from:
https://issues.apache.org/jira/secure/attachment/12420718/MAPREDUCE-270_yhadoop20.patch
+++ b/YAHOO-CHANGES.txt
+60. MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
+ heartbeat on task-completion for better job-latency. Contributed by
+ Arun C. Murthy
+ Configuration changes:
+ add mapreduce.tasktracker.outofband.heartbeat
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
Fri Mar 4 03:30:02 2011
@@ -223,6 +223,14 @@
</property>
<property>
+ <name>mapreduce.tasktracker.outofband.heartbeat</name>
+ <value>false</value>
+ <description>Expert: Set this to true to let the tasktracker send an
+ out-of-band heartbeat on task-completion for better latency.
+ </description>
+</property>
+
+<property>
<name>mapred.jobtracker.restart.recover</name>
<value>false</value>
<description>"true" to enable (job) recovery upon restart,
@@ -611,7 +619,7 @@
<name>mapred.heartbeats.in.second</name>
<value>100</value>
<description>Expert: Approximate number of heart-beats that could arrive
- JobTracker in a second. Assuming each RPC can be processed
+ at JobTracker in a second. Assuming each RPC can be processed
in 10msec, the default value is made 100 RPCs in a second.
</description>
</property>
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:30:02 2011
@@ -133,9 +133,21 @@ public class JobTracker implements MRCon
// The maximum number of blacklists for a tracker after which the
// tracker could be blacklisted across all jobs
private int MAX_BLACKLISTS_PER_TRACKER = 4;
+
// Approximate number of heartbeats that could arrive JobTracker
// in a second
- private int NUM_HEARTBEATS_IN_SECOND = 100;
+ static final String JT_HEARTBEATS_IN_SECOND = "mapred.heartbeats.in.second";
+ private int NUM_HEARTBEATS_IN_SECOND;
+ private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
+ private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
+
+ // Scaling factor for heartbeats, used for testing only
+ static final String JT_HEARTBEATS_SCALING_FACTOR =
+ "mapreduce.jobtracker.heartbeats.scaling.factor";
+ private float HEARTBEATS_SCALING_FACTOR;
+ private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
+ private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+
public static enum State { INITIALIZING, RUNNING }
State state = State.INITIALIZING;
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
@@ -1908,8 +1920,19 @@ public class JobTracker implements MRCon
MAX_COMPLETE_USER_JOBS_IN_MEMORY =
conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
MAX_BLACKLISTS_PER_TRACKER =
conf.getInt("mapred.max.tracker.blacklists", 4);
+
NUM_HEARTBEATS_IN_SECOND =
- conf.getInt("mapred.heartbeats.in.second", 100);
+ conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
+ if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
+ NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND;
+ }
+
+ HEARTBEATS_SCALING_FACTOR =
+ conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR,
+ DEFAULT_HEARTBEATS_SCALING_FACTOR);
+ if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) {
+ HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
+ }
//This configuration is there solely for tuning purposes and
//once this feature has been tested in real clusters and an appropriate
@@ -2979,15 +3002,16 @@ public class JobTracker implements MRCon
/**
* Calculates next heartbeat interval using cluster size.
- * Heartbeat interval is incremented 1second for every 50 nodes.
+ * Heartbeat interval is incremented by 1 second for every 100 nodes by
default.
* @return next heartbeat interval.
*/
public int getNextHeartbeatInterval() {
// get the no of task trackers
int clusterSize = getClusterStatus().getTaskTrackers();
int heartbeatInterval = Math.max(
- (int)(1000 * Math.ceil((double)clusterSize /
-
NUM_HEARTBEATS_IN_SECOND)),
+ (int)(1000 * HEARTBEATS_SCALING_FACTOR *
+ Math.ceil((double)clusterSize /
+ NUM_HEARTBEATS_IN_SECOND)),
HEARTBEAT_INTERVAL_MIN) ;
return heartbeatInterval;
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar 4 03:30:02 2011
@@ -538,7 +538,12 @@ abstract class TaskRunner extends Thread
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been
cleaned up");
}
- tip.reportTaskFinished();
+
+ // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
+ // *false* since the task has either
+ // a) SUCCEEDED - which means commit has been done
+ // b) FAILED - which means we do not need to commit
+ tip.reportTaskFinished(false);
}
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar 4 03:30:02 2011
@@ -205,6 +205,16 @@ public class TaskTracker
private int maxMapSlots;
private int maxReduceSlots;
private int failures;
+
+ // Performance-related config knob to send an out-of-band heartbeat
+ // on task completion
+ static final String TT_OUTOFBAND_HEARBEAT =
+ "mapreduce.tasktracker.outofband.heartbeat";
+ private volatile boolean oobHeartbeatOnTaskCompletion;
+
+ // Track number of completed tasks to send an out-of-band heartbeat
+ private IntWritable finishedCount = new IntWritable(0);
+
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
private CleanupQueue directoryCleanupThread;
@@ -565,6 +575,9 @@ public class TaskTracker
if (shouldStartHealthMonitor(this.fConf)) {
startHealthMonitor(this.fConf);
}
+
+ oobHeartbeatOnTaskCompletion =
+ fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
}
public static Class<? extends TaskTrackerInstrumentation>
getInstrumentationClass(
@@ -1041,8 +1054,14 @@ public class TaskTracker
long waitTime = heartbeatInterval - (now - lastHeartbeat);
if (waitTime > 0) {
- // sleeps for the wait time
- Thread.sleep(waitTime);
+ // sleeps for the wait time or
+ // until there are empty slots to schedule tasks
+ synchronized (finishedCount) {
+ if (finishedCount.get() == 0) {
+ finishedCount.wait(waitTime);
+ }
+ finishedCount.set(0);
+ }
}
// If the TaskTracker is just starting up:
@@ -1758,6 +1777,19 @@ public class TaskTracker
}
}
+ /**
+ * Notify the tasktracker to send an out-of-band heartbeat.
+ */
+ private void notifyTTAboutTaskCompletion() {
+ if (oobHeartbeatOnTaskCompletion) {
+ synchronized (finishedCount) {
+ int value = finishedCount.get();
+ finishedCount.set(value+1);
+ finishedCount.notify();
+ }
+ }
+ }
+
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
@@ -2100,9 +2132,21 @@ public class TaskTracker
return wasKilled;
}
- void reportTaskFinished() {
- taskFinished();
- releaseSlot();
+ /**
+ * A task is reporting in as 'done'.
+ *
+ * We need to notify the tasktracker to send an out-of-band heartbeat.
+ * If isn't <code>commitPending</code>, we need to finalize the task
+ * and release the slot it's occupied.
+ *
+ * @param commitPending is the task-commit pending?
+ */
+ void reportTaskFinished(boolean commitPending) {
+ if (!commitPending) {
+ taskFinished();
+ releaseSlot();
+ }
+ notifyTTAboutTaskCompletion();
}
/* State changes:
@@ -2403,6 +2447,7 @@ public class TaskTracker
}
removeFromMemoryManager(task.getTaskID());
releaseSlot();
+ notifyTTAboutTaskCompletion();
}
private synchronized void releaseSlot() {
@@ -2714,9 +2759,7 @@ public class TaskTracker
tip = tasks.get(taskid);
}
if (tip != null) {
- if (!commitPending) {
- tip.reportTaskFinished();
- }
+ tip.reportTaskFinished(commitPending);
} else {
LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
Fri Mar 4 03:30:02 2011
@@ -21,6 +21,9 @@ import java.io.IOException;
import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
public class TestMapredHeartbeat extends TestCase {
@@ -42,7 +45,7 @@ public class TestMapredHeartbeat extends
// test configured heartbeat interval
taskTrackers = 5;
- conf.setInt("mapred.heartbeats.in.second", 1);
+ conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 1);
mr = new MiniMRCluster(taskTrackers, "file:///", 3,
null, null, conf);
jc = new JobClient(mr.createJobConf());
@@ -55,7 +58,7 @@ public class TestMapredHeartbeat extends
// test configured heartbeat interval is capped with min value
taskTrackers = 5;
- conf.setInt("mapred.heartbeats.in.second", 10);
+ conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 10);
mr = new MiniMRCluster(taskTrackers, "file:///", 3,
null, null, conf);
jc = new JobClient(mr.createJobConf());
@@ -68,6 +71,37 @@ public class TestMapredHeartbeat extends
if (mr != null) { mr.shutdown(); }
}
}
+
+ public void testOutOfBandHeartbeats() throws Exception {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ try {
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 4, true, null);
+
+ int taskTrackers = 1;
+ JobConf jobConf = new JobConf();
+ jobConf.setFloat(JobTracker.JT_HEARTBEATS_SCALING_FACTOR, 30.0f);
+ jobConf.setBoolean(TaskTracker.TT_OUTOFBAND_HEARBEAT, true);
+ mr = new MiniMRCluster(taskTrackers,
+ dfs.getFileSystem().getUri().toString(), 3,
+ null, null, jobConf);
+ long start = System.currentTimeMillis();
+ TestMiniMRDFSSort.runRandomWriter(mr.createJobConf(), new Path("rw"));
+ long end = System.currentTimeMillis();
+
+ final int expectedRuntimeSecs = 120;
+ final int runTimeSecs = (int)((end-start) / 1000);
+ System.err.println("Runtime is " + runTimeSecs);
+ assertEquals("Actual runtime " + runTimeSecs + "s not less than expected
" +
+ "runtime of " + expectedRuntimeSecs + "s!",
+ true, (runTimeSecs <= 120));
+ } finally {
+ if (mr != null) { mr.shutdown(); }
+ if (dfs != null) { dfs.shutdown(); }
+ }
+ }
+
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Fri Mar 4 03:30:02 2011
@@ -71,7 +71,7 @@ public class TestMiniMRDFSSort extends T
return setup;
}
- private static void runRandomWriter(JobConf job, Path sortInput)
+ public static void runRandomWriter(JobConf job, Path sortInput)
throws Exception {
// Scale down the default settings for RandomWriter for the test-case
// Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP