Author: cutting Date: Mon May 7 13:44:14 2007 New Revision: 535982 URL: http://svn.apache.org/viewvc?view=rev&rev=535982 Log: HADOOP-1270. Randomize the fetch of map outputs, speeding the shuffle. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535982&r1=535981&r2=535982 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon May 7 13:44:14 2007 @@ -345,6 +345,9 @@ 102. HADOOP-1326. Change JobClient#RunJob() to return the job. (omalley via cutting) +103. HADOOP-1270. Randomize the fetch of map outputs, speeding the + shuffle. (Arun C Murthy via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=535982&r1=535981&r2=535982 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May 7 13:44:14 2007 @@ -29,7 +29,6 @@ import java.text.NumberFormat; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Hashtable; import java.util.Iterator; @@ -484,10 +483,10 @@ private int probe_sample_size = 100; /** - * a hashmap from mapId to MapOutputLocation for retrials + * a list of map output locations for fetch retrials */ - private Map<Integer, MapOutputLocation> retryFetches = - new HashMap<Integer, MapOutputLocation>(); + private List<MapOutputLocation> retryFetches = + new ArrayList<MapOutputLocation>(); /** * a TreeSet for needed map outputs @@ -495,6 +494,8 @@ private Set <Integer> neededOutputs = Collections.synchronizedSet(new TreeSet<Integer>()); + private Random random = null; + /** Represents the result of an attempt to copy a map output */ private class CopyResult { @@ -783,12 +784,19 @@ this.shuffleMetrics = MetricsUtil.createRecord(metricsContext, "shuffleInput"); this.shuffleMetrics.setTag("user", conf.getUser()); + + // Seed the random number generator with a reasonably globally unique seed + long randomSeed = System.nanoTime() + + (long)Math.pow(this.reduceTask.getPartition(), + (this.reduceTask.getPartition()%10) + ); + this.random = new Random(randomSeed); } public boolean fetchOutputs() throws IOException { final int numOutputs = reduceTask.getNumMaps(); - Map<Integer, MapOutputLocation> knownOutputs = - new HashMap<Integer, MapOutputLocation>(); + List<MapOutputLocation> knownOutputs = + new ArrayList<MapOutputLocation>(numCopiers); int numInFlight = 0, numCopied = 0; int lowThreshold = numCopiers*2; long bytesTransferred = 0; @@ -834,15 +842,14 @@ // The replacements, if at all, will happen when we query the // tasktracker and put the mapId hashkeys with new // MapOutputLocations as values - knownOutputs.putAll(retryFetches); + knownOutputs.addAll(retryFetches); // The call getsMapCompletionEvents will modify fromEventId to a val // that it should be for the next call to getSuccessMapEvents List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId); // put discovered them on the known list for (int i=0; i < locs.size(); i++) { - knownOutputs.put(new Integer(locs.get(i).getMapId()), - locs.get(i)); + knownOutputs.add(locs.get(i)); } LOG.info(reduceTask.getTaskId() + " Got " + locs.size() + @@ -865,7 +872,11 @@ " known map output location(s); scheduling..."); synchronized (scheduledCopies) { - Iterator locIt = knownOutputs.values().iterator(); + // Randomize the map output locations to prevent + // all reduce-tasks swamping the same tasktracker + Collections.shuffle(knownOutputs, this.random); + + Iterator locIt = knownOutputs.iterator(); currentTime = System.currentTimeMillis(); while (locIt.hasNext()) { @@ -928,7 +939,7 @@ cr.getLocation().getMapTaskId() + " from host: " + cr.getHost()); } else { - retryFetches.put(new Integer(cr.getMapId()), cr.getLocation()); + retryFetches.add(cr.getLocation()); // wait a random amount of time for next contact currentTime = System.currentTimeMillis(); @@ -944,11 +955,11 @@ // the failure is due to a lost tasktracker (causes many // unnecessary backoffs). If not, we only take a small hit // polling the tasktracker a few more times - Iterator locIt = knownOutputs.values().iterator(); + Iterator locIt = knownOutputs.iterator(); while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); if (cr.getHost().equals(loc.getHost())) { - retryFetches.put(new Integer(loc.getMapId()), loc); + retryFetches.add(loc); locIt.remove(); } }