Author: ddas
Date: Fri Jul 11 00:34:20 2008
New Revision: 675846

URL: http://svn.apache.org/viewvc?rev=675846&view=rev
Log:
HADOOP-3478. Improves the handling of map output fetching. Now the 
randomization is by the hosts (and not the map outputs themselves). Contributed 
by Jothi Padmanabhan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=675846&r1=675845&r2=675846&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 11 00:34:20 2008
@@ -73,6 +73,10 @@
     permitting users to define a more efficient method for cloning values from
     the reduce than serialization/deserialization. (Runping Qi via cdouglas)
 
+    HADOOP-3478. Improves the handling of map output fetching. Now the
+    randomization is by the hosts (and not the map outputs themselves). 
+    (Jothi Padmanabhan via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=675846&r1=675845&r2=675846&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java 
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri 
Jul 11 00:34:20 2008
@@ -1433,8 +1433,9 @@
     
     @SuppressWarnings("unchecked")
     public boolean fetchOutputs() throws IOException {
-      List<MapOutputLocation> knownOutputs = 
-        new ArrayList<MapOutputLocation>(numCopiers);
+      //The map for (Hosts, List of MapIds from this Host)
+      HashMap<String, List<MapOutputLocation>> mapLocations = 
+        new HashMap<String, List<MapOutputLocation>>();
       int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
       long           bytesTransferred = 0;
@@ -1472,6 +1473,9 @@
       long lastProgressTime = startTime;
       long lastOutputTime = 0;
       IntWritable fromEventId = new IntWritable(0);
+
+      //List of unique hosts containing map outputs
+      List<String> hostList = new ArrayList<String>();
       
         // loop until we get all required outputs
         while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
@@ -1489,30 +1493,40 @@
           }
           
           try {
-            // Put the hash entries for the failed fetches. Entries here
-            // might be replaced by (mapId) hashkeys from new successful 
-            // Map executions, if the fetch failures were due to lost tasks.
-            // The replacements, if at all, will happen when we query the
-            // tasktracker and put the mapId hashkeys with new 
-            // MapOutputLocations as values
-            knownOutputs.addAll(retryFetches);
+            // Put the hash entries for the failed fetches.
+            Iterator<MapOutputLocation> locItr = retryFetches.iterator();
+            while (locItr.hasNext()) {
+              MapOutputLocation loc = locItr.next(); 
+              List<MapOutputLocation> locList = 
+                mapLocations.get(loc.getHost());
+              if (locList == null) {
+                locList = new LinkedList<MapOutputLocation>();
+                mapLocations.put(loc.getHost(), locList);
+                hostList.add(loc.getHost());
+              }
+              //Add to the beginning of the list so that this map is 
+              //tried again before the others and we can hasten the 
+              //re-execution of this map should there be a problem
+              locList.add(0, loc);
+            }
              
             // The call getMapCompletionEvents will update fromEventId to
             // used for the next call to getMapCompletionEvents
-            int currentNumKnownMaps = knownOutputs.size();
+
             int currentNumObsoleteMapIds = obsoleteMapIds.size();
-            getMapCompletionEvents(fromEventId, knownOutputs);
 
+            int numNewOutputs = getMapCompletionEvents(fromEventId, 
+                                                       mapLocations, 
+                                                       hostList);
 
-            int numNewOutputs = knownOutputs.size()-currentNumKnownMaps;
             if (numNewOutputs > 0 || logNow) {
               LOG.info(reduceTask.getTaskID() + ": " +  
                   "Got " + numNewOutputs + 
-                  " new map-outputs & number of known map outputs is " + 
-                  knownOutputs.size());
+                  " new map-outputs"); 
             }
             
             int numNewObsoleteMaps = 
obsoleteMapIds.size()-currentNumObsoleteMapIds;
+
             if (numNewObsoleteMaps > 0) {
               LOG.info(reduceTask.getTaskID() + ": " +  
                   "Got " + numNewObsoleteMaps + 
@@ -1534,55 +1548,77 @@
           }
           
           // now walk through the cache and schedule what we can
-          int numKnown = knownOutputs.size(), numScheduled = 0;
+          int numScheduled = 0;
           int numDups = 0;
           
           synchronized (scheduledCopies) {
+
             // Randomize the map output locations to prevent 
             // all reduce-tasks swamping the same tasktracker
-            Collections.shuffle(knownOutputs, this.random);
+            Collections.shuffle(hostList, this.random);
             
-            Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
+            Iterator<String> hostsItr = hostList.iterator();
+            while (hostsItr.hasNext()) {
             
-            
-            while (locIt.hasNext()) {
-              
-              MapOutputLocation loc = locIt.next();
-              
-              // Do not schedule fetches from OBSOLETE maps
-              if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
-                locIt.remove();
-                continue;
+              String host = hostsItr.next();
+
+              List<MapOutputLocation> knownOutputsByLoc = 
+                mapLocations.get(host);
+
+              //Identify duplicate hosts here
+              if (uniqueHosts.contains(host)) {
+                 numDups += knownOutputsByLoc.size() -1; 
+                 continue;
               }
-              
-              Long penaltyEnd = penaltyBox.get(loc.getHost());
-              boolean penalized = false, duplicate = false;
-              
+
+              Long penaltyEnd = penaltyBox.get(host);
+              boolean penalized = false;
+            
               if (penaltyEnd != null) {
                 if (currentTime < penaltyEnd.longValue()) {
                   penalized = true;
                 } else {
-                  penaltyBox.remove(loc.getHost());
+                  penaltyBox.remove(host);
                 }
               }
-              if (uniqueHosts.contains(loc.getHost())) {
-                duplicate = true; numDups++;
-              }
               
-              if (!penalized && !duplicate) {
-                uniqueHosts.add(loc.getHost());
+              if (penalized)
+                continue;
+
+              Iterator<MapOutputLocation> locItr = 
+                knownOutputsByLoc.iterator();
+            
+              while (locItr.hasNext()) {
+              
+                MapOutputLocation loc = locItr.next();
+              
+                // Do not schedule fetches from OBSOLETE maps
+                if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
+                  locItr.remove();
+                  continue;
+                }
+
+                uniqueHosts.add(host);
                 scheduledCopies.add(loc);
-                locIt.remove();  // remove from knownOutputs
+                locItr.remove();  // remove from knownOutputs
                 numInFlight++; numScheduled++;
+
+                break; //we have a map from this host
+              }
+       
+              if (knownOutputsByLoc.size() == 0) {
+                mapLocations.remove(host);
+                hostsItr.remove();
               }
             }
             scheduledCopies.notifyAll();
           }
           if (numScheduled > 0 || logNow) {
             LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
-                   " of " + numKnown + " known outputs (" + penaltyBox.size() +
-                   " slow hosts and " + numDups + " dup hosts)");
+                   " outputs (" + penaltyBox.size() +
+                   " slow hosts and" + numDups + " dup hosts)");
           }
+
           if (penaltyBox.size() > 0 && logNow) {
             LOG.info("Penalized(slow) Hosts: ");
             for (String host : penaltyBox.keySet()) {
@@ -1732,20 +1768,6 @@
               LOG.warn(reduceTask.getTaskID() + " adding host " +
                        cr.getHost() + " to penalty box, next contact in " +
                        (currentBackOff/1000) + " seconds");
-              
-              // other outputs from the failed host may be present in the
-              // knownOutputs cache, purge them. This is important in case
-              // the failure is due to a lost tasktracker (causes many
-              // unnecessary backoffs). If not, we only take a small hit
-              // polling the tasktracker a few more times
-              Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
-              while (locIt.hasNext()) {
-                MapOutputLocation loc = locIt.next();
-                if (cr.getHost().equals(loc.getHost())) {
-                  retryFetches.add(loc);
-                  locIt.remove();
-                }
-              }
             }
             uniqueHosts.remove(cr.getHost());
             numInFlight--;
@@ -1844,16 +1866,21 @@
      * 
      * @param fromEventId the first event ID we want to start from, this is
      *                     modified by the call to this method
-     * @param jobClient the [EMAIL PROTECTED] JobTracker}
-     * @return the set of map-completion events from the given event ID 
+     * @param mapLocations the hash map of map locations by host
+     * @param hostsList    the list that contains unique hosts having 
+     *                     map outputs, will be updated on the return 
+     *                     of this method
+     * @return the number of new map-completion events from the given event ID 
      * @throws IOException
      */  
-    private void getMapCompletionEvents(IntWritable fromEventId, 
-                                        List<MapOutputLocation> knownOutputs)
+    private int getMapCompletionEvents(IntWritable fromEventId, 
+                        HashMap<String,List<MapOutputLocation>> mapLocations,
+                        List<String> hostsList)
     throws IOException {
       
       long currentTime = System.currentTimeMillis();    
       long pollTime = lastPollTime + MIN_POLL_INTERVAL;
+      int numNewMaps = 0;
       while (currentTime < pollTime) {
         try {
           Thread.sleep(pollTime-currentTime);
@@ -1895,8 +1922,14 @@
                                     "/mapOutput?job=" + taskId.getJobID() +
                                     "&map=" + taskId + 
                                     "&reduce=" + getPartition());
-            knownOutputs.add(new MapOutputLocation(taskId, host, 
-                                                   mapOutputLocation));
+            List<MapOutputLocation> loc = mapLocations.get(host);
+            if (loc == null) {
+              loc = new LinkedList<MapOutputLocation>();
+              mapLocations.put(host, loc);
+              hostsList.add(host);
+             }
+            loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
+            numNewMaps ++;
           }
           break;
           case FAILED:
@@ -1917,7 +1950,7 @@
           break;
         }
       }
-
+      return numNewMaps;
     }
     
     


Reply via email to