Author: ddas
Date: Fri Jul 11 00:34:20 2008
New Revision: 675846
URL: http://svn.apache.org/viewvc?rev=675846&view=rev
Log:
HADOOP-3478. Improves the handling of map output fetching. Now the
randomization is by the hosts (and not the map outputs themselves). Contributed
by Jothi Padmanabhan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=675846&r1=675845&r2=675846&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 11 00:34:20 2008
@@ -73,6 +73,10 @@
permitting users to define a more efficient method for cloning values from
the reduce than serialization/deserialization. (Runping Qi via cdouglas)
+ HADOOP-3478. Improves the handling of map output fetching. Now the
+ randomization is by the hosts (and not the map outputs themselves).
+ (Jothi Padmanabhan via ddas)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=675846&r1=675845&r2=675846&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri
Jul 11 00:34:20 2008
@@ -1433,8 +1433,9 @@
@SuppressWarnings("unchecked")
public boolean fetchOutputs() throws IOException {
- List<MapOutputLocation> knownOutputs =
- new ArrayList<MapOutputLocation>(numCopiers);
+ //The map for (Hosts, List of MapIds from this Host)
+ HashMap<String, List<MapOutputLocation>> mapLocations =
+ new HashMap<String, List<MapOutputLocation>>();
int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
long bytesTransferred = 0;
@@ -1472,6 +1473,9 @@
long lastProgressTime = startTime;
long lastOutputTime = 0;
IntWritable fromEventId = new IntWritable(0);
+
+ //List of unique hosts containing map outputs
+ List<String> hostList = new ArrayList<String>();
// loop until we get all required outputs
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
@@ -1489,30 +1493,40 @@
}
try {
- // Put the hash entries for the failed fetches. Entries here
- // might be replaced by (mapId) hashkeys from new successful
- // Map executions, if the fetch failures were due to lost tasks.
- // The replacements, if at all, will happen when we query the
- // tasktracker and put the mapId hashkeys with new
- // MapOutputLocations as values
- knownOutputs.addAll(retryFetches);
+ // Put the hash entries for the failed fetches.
+ Iterator<MapOutputLocation> locItr = retryFetches.iterator();
+ while (locItr.hasNext()) {
+ MapOutputLocation loc = locItr.next();
+ List<MapOutputLocation> locList =
+ mapLocations.get(loc.getHost());
+ if (locList == null) {
+ locList = new LinkedList<MapOutputLocation>();
+ mapLocations.put(loc.getHost(), locList);
+ hostList.add(loc.getHost());
+ }
+ //Add to the beginning of the list so that this map is
+ //tried again before the others and we can hasten the
+ //re-execution of this map should there be a problem
+ locList.add(0, loc);
+ }
// The call getMapCompletionEvents will update fromEventId to
// used for the next call to getMapCompletionEvents
- int currentNumKnownMaps = knownOutputs.size();
+
int currentNumObsoleteMapIds = obsoleteMapIds.size();
- getMapCompletionEvents(fromEventId, knownOutputs);
+ int numNewOutputs = getMapCompletionEvents(fromEventId,
+ mapLocations,
+ hostList);
- int numNewOutputs = knownOutputs.size()-currentNumKnownMaps;
if (numNewOutputs > 0 || logNow) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + numNewOutputs +
- " new map-outputs & number of known map outputs is " +
- knownOutputs.size());
+ " new map-outputs");
}
int numNewObsoleteMaps =
obsoleteMapIds.size()-currentNumObsoleteMapIds;
+
if (numNewObsoleteMaps > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + numNewObsoleteMaps +
@@ -1534,55 +1548,77 @@
}
// now walk through the cache and schedule what we can
- int numKnown = knownOutputs.size(), numScheduled = 0;
+ int numScheduled = 0;
int numDups = 0;
synchronized (scheduledCopies) {
+
// Randomize the map output locations to prevent
// all reduce-tasks swamping the same tasktracker
- Collections.shuffle(knownOutputs, this.random);
+ Collections.shuffle(hostList, this.random);
- Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
+ Iterator<String> hostsItr = hostList.iterator();
+ while (hostsItr.hasNext()) {
-
- while (locIt.hasNext()) {
-
- MapOutputLocation loc = locIt.next();
-
- // Do not schedule fetches from OBSOLETE maps
- if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
- locIt.remove();
- continue;
+ String host = hostsItr.next();
+
+ List<MapOutputLocation> knownOutputsByLoc =
+ mapLocations.get(host);
+
+ //Identify duplicate hosts here
+ if (uniqueHosts.contains(host)) {
+ numDups += knownOutputsByLoc.size() -1;
+ continue;
}
-
- Long penaltyEnd = penaltyBox.get(loc.getHost());
- boolean penalized = false, duplicate = false;
-
+
+ Long penaltyEnd = penaltyBox.get(host);
+ boolean penalized = false;
+
if (penaltyEnd != null) {
if (currentTime < penaltyEnd.longValue()) {
penalized = true;
} else {
- penaltyBox.remove(loc.getHost());
+ penaltyBox.remove(host);
}
}
- if (uniqueHosts.contains(loc.getHost())) {
- duplicate = true; numDups++;
- }
- if (!penalized && !duplicate) {
- uniqueHosts.add(loc.getHost());
+ if (penalized)
+ continue;
+
+ Iterator<MapOutputLocation> locItr =
+ knownOutputsByLoc.iterator();
+
+ while (locItr.hasNext()) {
+
+ MapOutputLocation loc = locItr.next();
+
+ // Do not schedule fetches from OBSOLETE maps
+ if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
+ locItr.remove();
+ continue;
+ }
+
+ uniqueHosts.add(host);
scheduledCopies.add(loc);
- locIt.remove(); // remove from knownOutputs
+ locItr.remove(); // remove from knownOutputs
numInFlight++; numScheduled++;
+
+ break; //we have a map from this host
+ }
+
+ if (knownOutputsByLoc.size() == 0) {
+ mapLocations.remove(host);
+ hostsItr.remove();
}
}
scheduledCopies.notifyAll();
}
if (numScheduled > 0 || logNow) {
LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
- " of " + numKnown + " known outputs (" + penaltyBox.size() +
- " slow hosts and " + numDups + " dup hosts)");
+ " outputs (" + penaltyBox.size() +
+ " slow hosts and" + numDups + " dup hosts)");
}
+
if (penaltyBox.size() > 0 && logNow) {
LOG.info("Penalized(slow) Hosts: ");
for (String host : penaltyBox.keySet()) {
@@ -1732,20 +1768,6 @@
LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +
(currentBackOff/1000) + " seconds");
-
- // other outputs from the failed host may be present in the
- // knownOutputs cache, purge them. This is important in case
- // the failure is due to a lost tasktracker (causes many
- // unnecessary backoffs). If not, we only take a small hit
- // polling the tasktracker a few more times
- Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
- while (locIt.hasNext()) {
- MapOutputLocation loc = locIt.next();
- if (cr.getHost().equals(loc.getHost())) {
- retryFetches.add(loc);
- locIt.remove();
- }
- }
}
uniqueHosts.remove(cr.getHost());
numInFlight--;
@@ -1844,16 +1866,21 @@
*
* @param fromEventId the first event ID we want to start from, this is
* modified by the call to this method
- * @param jobClient the [EMAIL PROTECTED] JobTracker}
- * @return the set of map-completion events from the given event ID
+ * @param mapLocations the hash map of map locations by host
+ * @param hostsList the list that contains unique hosts having
+ * map outputs, will be updated on the return
+ * of this method
+ * @return the number of new map-completion events from the given event ID
* @throws IOException
*/
- private void getMapCompletionEvents(IntWritable fromEventId,
- List<MapOutputLocation> knownOutputs)
+ private int getMapCompletionEvents(IntWritable fromEventId,
+ HashMap<String,List<MapOutputLocation>> mapLocations,
+ List<String> hostsList)
throws IOException {
long currentTime = System.currentTimeMillis();
long pollTime = lastPollTime + MIN_POLL_INTERVAL;
+ int numNewMaps = 0;
while (currentTime < pollTime) {
try {
Thread.sleep(pollTime-currentTime);
@@ -1895,8 +1922,14 @@
"/mapOutput?job=" + taskId.getJobID() +
"&map=" + taskId +
"&reduce=" + getPartition());
- knownOutputs.add(new MapOutputLocation(taskId, host,
- mapOutputLocation));
+ List<MapOutputLocation> loc = mapLocations.get(host);
+ if (loc == null) {
+ loc = new LinkedList<MapOutputLocation>();
+ mapLocations.put(host, loc);
+ hostsList.add(host);
+ }
+ loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
+ numNewMaps ++;
}
break;
case FAILED:
@@ -1917,7 +1950,7 @@
break;
}
}
-
+ return numNewMaps;
}