Author: acmurthy
Date: Sat Oct 29 09:41:45 2011
New Revision: 1194854
URL: http://svn.apache.org/viewvc?rev=1194854&view=rev
Log:
MAPREDUCE-2355. Add a configuration knob
mapreduce.tasktracker.outofband.heartbeat.damper that limits out of band
heartbeats.
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml?rev=1194854&r1=1194853&r2=1194854&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
Sat Oct 29 09:41:45 2011
@@ -242,6 +242,24 @@
</property>
<property>
+ <name>mapreduce.tasktracker.outofband.heartbeat.damper</name>
+ <value>1000000</value>
+ <description>When out-of-band heartbeats are enabled, provides
+ damping to avoid overwhelming the JobTracker if too many out-of-band
+ heartbeats would occur. The damping is calculated such that the
+ heartbeat interval is divided by (T*D + 1) where T is the number
+ of completed tasks and D is the damper value.
+
+ Setting this to a high value like the default provides no damping --
+ as soon as any task finishes, a heartbeat will be sent. Setting this
+ parameter to 0 is equivalent to disabling the out-of-band heartbeat feature.
+ A value of 1 would indicate that, after one task has completed, the
+ time to wait before the next heartbeat would be 1/2 the usual time.
+ After two tasks have finished, it would be 1/3 the usual time, etc.
+ </description>
+</property>
+
+<property>
<name>mapred.jobtracker.restart.recover</name>
<value>false</value>
<description>"true" to enable (job) recovery upon restart,
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1194854&r1=1194853&r2=1194854&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sat Oct 29 09:41:45 2011
@@ -45,6 +45,7 @@ import java.util.Vector;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -335,9 +336,13 @@ public class TaskTracker implements MRCo
static final String TT_OUTOFBAND_HEARBEAT =
"mapreduce.tasktracker.outofband.heartbeat";
private volatile boolean oobHeartbeatOnTaskCompletion;
+ static final String TT_OUTOFBAND_HEARTBEAT_DAMPER =
+ "mapreduce.tasktracker.outofband.heartbeat.damper";
+ static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
+ private volatile int oobHeartbeatDamper;
// Track number of completed tasks to send an out-of-band heartbeat
- private IntWritable finishedCount = new IntWritable(0);
+ private AtomicInteger finishedCount = new AtomicInteger(0);
private MapEventsFetcherThread mapEventsFetcher;
final int workerThreads;
@@ -846,6 +851,9 @@ public class TaskTracker implements MRCo
oobHeartbeatOnTaskCompletion =
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+ oobHeartbeatDamper =
+ fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER,
+ DEFAULT_OOB_HEARTBEAT_DAMPER);
}
private void startJettyBugMonitor() {
@@ -1546,25 +1554,39 @@ public class TaskTracker implements MRCo
return recentMapEvents;
}
+ private long getHeartbeatInterval(int numFinishedTasks) {
+ return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
+ }
+
/**
* Main service loop. Will stay in this loop forever.
*/
State offerService() throws Exception {
- long lastHeartbeat = 0;
+ long lastHeartbeat = System.currentTimeMillis();
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();
-
- long waitTime = heartbeatInterval - (now - lastHeartbeat);
- if (waitTime > 0) {
+
+ // accelerate to account for multiple finished tasks up-front
+ long remaining =
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+ while (remaining > 0) {
// sleeps for the wait time or
- // until there are empty slots to schedule tasks
+ // until there are *enough* empty slots to schedule tasks
synchronized (finishedCount) {
- if (finishedCount.get() == 0) {
- finishedCount.wait(waitTime);
+ finishedCount.wait(remaining);
+
+ // Recompute
+ now = System.currentTimeMillis();
+ remaining =
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) -
now;
+
+ if (remaining <= 0) {
+ // Reset count
+ finishedCount.set(0);
+ break;
}
- finishedCount.set(0);
}
}
@@ -2434,8 +2456,7 @@ public class TaskTracker implements MRCo
private void notifyTTAboutTaskCompletion() {
if (oobHeartbeatOnTaskCompletion) {
synchronized (finishedCount) {
- int value = finishedCount.get();
- finishedCount.set(value+1);
+ finishedCount.incrementAndGet();
finishedCount.notify();
}
}