Author: omalley
Date: Wed May 7 16:01:48 2008
New Revision: 654315
URL: http://svn.apache.org/viewvc?rev=654315&view=rev
Log:
HADOOP-3297. Fetch more task completion events from the job
tracker and task tracker. Contributed by Devaraj Das.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=654315&r1=654314&r2=654315&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 7 16:01:48 2008
@@ -125,6 +125,9 @@
HADOOP-3248. Optimization of saveFSImage. (Dhruba via shv)
+ HADOOP-3297. Fetch more task completion events from the job
+ tracker and task tracker. (ddas via omalley)
+
BUG FIXES
HADOOP-2905. 'fsck -move' triggers NPE in NameNode.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=654315&r1=654314&r2=654315&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed May
7 16:01:48 2008
@@ -430,6 +430,9 @@
/** Number of ms before timing out a copy */
private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
+ /** Max events to fetch in one go from the tasktracker */
+ private static final int MAX_EVENTS_TO_FETCH = 10000;
+
/**
* our reduce task instance
*/
@@ -528,11 +531,6 @@
private static final long MIN_POLL_INTERVAL = 1000;
/**
- * the number of map output locations to poll for at one time
- */
- private int probe_sample_size = 100;
-
- /**
* a list of map output locations for fetch retrials
*/
private List<MapOutputLocation> retryFetches =
@@ -1014,15 +1012,11 @@
new ArrayList<MapOutputLocation>(numCopiers);
int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
- int lowThreshold = numCopiers*2;
long bytesTransferred = 0;
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
final Progress copyPhase =
reduceTask.getProgress().phase();
- //tweak the probe sample size (make it a function of numCopiers)
- probe_sample_size = Math.max(numCopiers*5, 50);
-
for (int i = 0; i < numOutputs; i++) {
neededOutputs.add(i);
copyPhase.addPhase(); // add sub-phase per file
@@ -1067,30 +1061,20 @@
// MapOutputLocations as values
knownOutputs.addAll(retryFetches);
- // ensure we have enough to keep us busy
- boolean busy = isBusy(numInFlight, numCopiers, lowThreshold,
- uniqueHosts.size(), probe_sample_size,
- numOutputs - numCopied);
- if (!busy) {
- // The call getMapCompletionEvents will update fromEventId to
- // used for the next call to getMapCompletionEvents
- int currentNumKnownMaps = knownOutputs.size();
- int currentNumObsoleteMapIds = obsoleteMapIds.size();
+ // The call getMapCompletionEvents will update fromEventId to
+ // used for the next call to getMapCompletionEvents
+ int currentNumKnownMaps = knownOutputs.size();
+ int currentNumObsoleteMapIds = obsoleteMapIds.size();
getMapCompletionEvents(fromEventId, knownOutputs);
-
- LOG.info(reduceTask.getTaskID() + ": " +
- "Got " + (knownOutputs.size()-currentNumKnownMaps) +
- " new map-outputs & " +
- (obsoleteMapIds.size()-currentNumObsoleteMapIds) +
- " obsolete map-outputs from tasktracker and " +
- retryFetches.size() + " map-outputs from previous
failures"
- );
- } else {
- LOG.info(" Busy enough - did not query the tasktracker for "
- + "new map outputs. Have "+ retryFetches.size()
- + " map outputs from previous failures");
- }
+
+ LOG.info(reduceTask.getTaskID() + ": " +
+ "Got " + (knownOutputs.size()-currentNumKnownMaps) +
+ " new map-outputs & " +
+ (obsoleteMapIds.size()-currentNumObsoleteMapIds) +
+ " obsolete map-outputs from tasktracker and " +
+ retryFetches.size() + " map-outputs from previous failures"
+ );
// clear the "failed" fetches hashmap
retryFetches.clear();
}
@@ -1418,24 +1402,6 @@
}
}
- /** Added a check for whether #uniqueHosts < #copiers, and if so conclude
- * we are not busy enough. The logic is that we fetch only one map output
- * at a time from any given host and uniqueHosts keep a track of that.
- * As soon as we add a host to uniqueHosts, a 'copy' from that is
- * scheduled as well. Thus, when the size of uniqueHosts is >= numCopiers,
- * it means that all copiers are busy. Although the converse is not true
- * (e.g. in the case where we have more copiers than the number of hosts
- * in the cluster), but it should generally be useful to do this check.
- **/
- private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold,
- int uniqueHostsSize, int probeSampleSize,
- int remainCopy) {
- if ((numInFlight < lowThreshold && remainCopy > probeSampleSize) ||
- uniqueHostsSize < numCopiers) {
- return false;
- }
- return true;
- }
private CopyResult getCopyResult() {
synchronized (copyResults) {
@@ -1477,7 +1443,7 @@
TaskCompletionEvent events[] =
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
- fromEventId.get(), probe_sample_size);
+ fromEventId.get(), MAX_EVENTS_TO_FETCH);
// Note the last successful poll time-stamp
lastPollTime = currentTime;
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=654315&r1=654314&r2=654315&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed
May 7 16:01:48 2008
@@ -161,12 +161,11 @@
/**
* the minimum interval between jobtracker polls
*/
- private static final int MIN_POLL_INTERVAL = 5000;
private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
/**
* Number of maptask completion events locations to poll for at one time
*/
- private int probe_sample_size = 50;
+ private int probe_sample_size = 500;
private ShuffleServerMetrics shuffleServerMetrics;
/** This class contains the methods that should be used for metrics-reporting
@@ -412,9 +411,8 @@
this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart",
0L);
this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill",
0L);
- int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
//tweak the probe sample size (make it a function of numCopiers)
- probe_sample_size = Math.max(numCopiers*5, 50);
+ probe_sample_size =
this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
this.myMetrics = new TaskTrackerMetrics();
@@ -530,33 +528,15 @@
}
// now fetch all the map task events for all the reduce tasks
// possibly belonging to different jobs
+ boolean fetchAgain = false; //flag signifying whether we want to
fetch
+ //immediately again.
for (FetchStatus f : fList) {
+ long currentTime = System.currentTimeMillis();
try {
- f.fetchMapCompletionEvents();
- long startWait;
- long endWait;
- // polling interval is heartbeat interval
- int waitTime = heartbeatInterval;
- // Thread will wait for a minumum of MIN_POLL_INTERVAL,
- // if it is notified before that, notification will be ignored.
- int minWait = MIN_POLL_INTERVAL;
- synchronized (waitingOn) {
- try {
- while (true) {
- startWait = System.currentTimeMillis();
- waitingOn.wait(waitTime);
- endWait = System.currentTimeMillis();
- int diff = (int)(endWait - startWait);
- if (diff >= minWait) {
- break;
- }
- minWait = minWait - diff;
- waitTime = minWait;
- }
- } catch (InterruptedException ie) {
- LOG.info("Shutting down: " + getName());
- return;
- }
+ //the method below will return true when we have not
+ //fetched all available events yet
+ if (f.fetchMapCompletionEvents(currentTime)) {
+ fetchAgain = true;
}
} catch (Exception e) {
LOG.warn(
@@ -565,6 +545,17 @@
StringUtils.stringifyException(e));
}
}
+ synchronized (waitingOn) {
+ try {
+ int waitTime;
+ if (!fetchAgain) {
+ waitingOn.wait(heartbeatInterval);
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Shutting down: " + getName());
+ return;
+ }
+ }
} catch (Exception e) {
LOG.info("Ignoring exception " + e.getMessage());
}
@@ -579,6 +570,8 @@
private List<TaskCompletionEvent> allMapEvents;
/** What jobid this fetchstatus object is for*/
private JobID jobId;
+ private long lastFetchTime;
+ private boolean fetchAgain;
public FetchStatus(JobID jobId, int numMaps) {
this.fromEventId = new IntWritable(0);
@@ -610,12 +603,26 @@
return mapEvents;
}
- public void fetchMapCompletionEvents() throws IOException {
+ public boolean fetchMapCompletionEvents(long currTime) throws IOException {
+ if (!fetchAgain && (currTime - lastFetchTime) < heartbeatInterval) {
+ return false;
+ }
+ int currFromEventId = fromEventId.get();
List <TaskCompletionEvent> recentMapEvents =
queryJobTracker(fromEventId, jobId, jobClient);
synchronized (allMapEvents) {
allMapEvents.addAll(recentMapEvents);
}
+ lastFetchTime = currTime;
+ if (fromEventId.get() - currFromEventId >= probe_sample_size) {
+ //return true when we have fetched the full payload, indicating
+ //that we should fetch again immediately (there might be more to
+ //fetch
+ fetchAgain = true;
+ return true;
+ }
+ fetchAgain = false;
+ return false;
}
}