Author: cutting
Date: Thu Mar  2 15:05:43 2006
New Revision: 382545

URL: http://svn.apache.org/viewcvs?rev=382545&view=rev
Log:
Fix for HADOOP-16.  Splitting and other job planning is now performed in a 
separate thread.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=382545&r1=382544&r2=382545&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Mar  
2 15:05:43 2006
@@ -585,7 +585,7 @@
                     // Connection failed.  Let's wait a little bit and retry
                     try {
                         if (System.currentTimeMillis() - start > 5000) {
-                            LOG.info("Waiting to find target node");
+                            LOG.info("Waiting to find target node: " + target);
                         }
                         Thread.sleep(6000);
                     } catch (InterruptedException iex) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=382545&r1=382544&r2=382545&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu 
Mar  2 15:05:43 2006
@@ -550,6 +550,10 @@
         int endBlock = -1;
         Block blocks[] = dir.getFile(src);
 
+        if (blocks == null) {                     // no blocks
+            return new UTF8[0][];
+        }
+
         //
         // First, figure out where the range falls in
         // the blocklist.
@@ -579,7 +583,7 @@
         if (startBlock < 0 || endBlock < 0) {
             return new UTF8[0][];
         } else {
-            UTF8 hosts[][] = new UTF8[endBlock - startBlock + 1][];
+            UTF8 hosts[][] = new UTF8[(endBlock - startBlock) + 1][];
             for (int i = startBlock; i <= endBlock; i++) {
                 TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]);
                 Vector v = new Vector();
@@ -587,7 +591,7 @@
                     DatanodeInfo cur = (DatanodeInfo) it.next();
                     v.add(cur.getHost());
                 }
-                hosts[i] = (UTF8[]) v.toArray(new UTF8[v.size()]);
+                hosts[i-startBlock] = (UTF8[]) v.toArray(new UTF8[v.size()]);
             }
             return hosts;
         }

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=382545&r1=382544&r2=382545&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java 
Thu Mar  2 15:05:43 2006
@@ -44,6 +44,7 @@
     int numReduceTasks = 0;
 
     JobTracker jobtracker = null;
+    TreeMap cachedHints = new TreeMap();
 
     long startTime;
     long finishTime;
@@ -62,7 +63,7 @@
         this.conf = conf;
         this.jobtracker = jobtracker;
         this.profile = new JobProfile(jobid, jobFile, url);
-        this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.RUNNING);
+        this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
         this.startTime = System.currentTimeMillis();
 
         this.localJobFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, 
jobid + ".xml");
@@ -92,9 +93,10 @@
     }
 
     /**
-     * Construct the splits, etc
+     * Construct the splits, etc.  This is invoked from an async
+     * thread so that split-computation doesn't block anyone.
      */
-    void initTasks() throws IOException {
+    public void initTasks() throws IOException {
         if (tasksInited) {
             return;
         }
@@ -153,9 +155,33 @@
             reduces[i] = new TaskInProgress(jobFile, maps, i, jobtracker, 
conf, this);
         }
 
+        //
+        // Obtain some tasktracker-cache information for the map task splits.
+        //
+        for (int i = 0; i < maps.length; i++) {
+            String hints[][] = fs.getFileCacheHints(splits[i].getFile(), 
splits[i].getStart(), splits[i].getLength());
+            cachedHints.put(maps[i].getTIPId(), hints);
+        }
+
+        this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, 
JobStatus.RUNNING);
         tasksInited = true;
     }
 
+    /**
+     * This is called by TaskInProgress objects.  The JobInProgress
+     * prefetches and caches a lot of these hints.  If the hint is
+     * not available, then we pass it through to the filesystem.
+     */
+    String[][] getFileCacheHints(String tipID, File f, long start, long len) 
throws IOException {
+        String results[][] = (String[][]) cachedHints.get(tipID);
+        if (tipID == null) {
+            FileSystem fs = FileSystem.get(conf);
+            results = fs.getFileCacheHints(f, start, len);
+            cachedHints.put(tipID, results);
+        }
+        return results;
+    }
+
     /////////////////////////////////////////////////////
     // Accessors for the JobInProgress
     /////////////////////////////////////////////////////
@@ -252,12 +278,8 @@
      */
     public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) {
         if (! tasksInited) {
-            try {
-                initTasks();
-            } catch (IOException ie) {
-                ie.printStackTrace();
-                LOG.info("Cannot create task split for " + profile.getJobId());
-            }
+            LOG.info("Cannot create task split for " + profile.getJobId());
+            return null;
         }
 
         Task t = null;
@@ -271,30 +293,62 @@
         // we call obtainNewMapTask() really fast, twice in a row.
         // There's not enough time for the "recentTasks"
         //
+
+        //
+        // Compute avg progress through the map tasks
+        //
+        for (int i = 0; i < maps.length; i++) {        
+            totalProgress += maps[i].getProgress();
+        }
+        double avgProgress = totalProgress / maps.length;
+
+        //
+        // See if there is a split over a block that is stored on
+        // the TaskTracker checking in.  That means the block
+        // doesn't have to be transmitted from another node.
+        //
         for (int i = 0; i < maps.length; i++) {
             if (maps[i].hasTaskWithCacheHit(taskTracker, tts)) {
                 if (cacheTarget < 0) {
                     cacheTarget = i;
                     break;
                 }
-            } else if (maps[i].hasTask()) {
-                if (stdTarget < 0) {
-                    stdTarget = i;
-                    break;
+            }
+        }
+
+        //
+        // If there's no cached target, see if there's
+        // a std. task to run.
+        //
+        if (cacheTarget < 0) {
+            for (int i = 0; i < maps.length; i++) {
+                if (maps[i].hasTask()) {
+                    if (stdTarget < 0) {
+                        stdTarget = i;
+                        break;
+                    }
                 }
             }
-            totalProgress += maps[i].getProgress();
         }
-        double avgProgress = totalProgress / maps.length;
 
-        for (int i = 0; i < maps.length; i++) {        
-            if (maps[i].hasSpeculativeTask(avgProgress)) {
-                if (specTarget < 0) {
-                    specTarget = i;
+        //
+        // If no cached-target and no std target, see if
+        // there's a speculative task to run.
+        //
+        if (cacheTarget < 0 && stdTarget < 0) {
+            for (int i = 0; i < maps.length; i++) {        
+                if (maps[i].hasSpeculativeTask(avgProgress)) {
+                    if (specTarget < 0) {
+                        specTarget = i;
+                        break;
+                    }
                 }
             }
         }
-        
+
+        //
+        // Run whatever we found
+        //
         if (cacheTarget >= 0) {
             t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress);
         } else if (stdTarget >= 0) {
@@ -312,12 +366,8 @@
      */
     public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) 
{
         if (! tasksInited) {
-            try {
-                initTasks();
-            } catch (IOException ie) {
-                ie.printStackTrace();
-                LOG.info("Cannot create task split for " + profile.getJobId());
-            }
+            LOG.info("Cannot create task split for " + profile.getJobId());
+            return null;
         }
 
         Task t = null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=382545&r1=382544&r2=382545&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Thu 
Mar  2 15:05:43 2006
@@ -39,6 +39,7 @@
     public static final int RUNNING = 1;
     public static final int SUCCEEDED = 2;
     public static final int FAILED = 3;
+    public static final int PREP = 4;
 
     String jobid;
     float mapProgress;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=382545&r1=382544&r2=382545&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu 
Mar  2 15:05:43 2006
@@ -33,6 +33,7 @@
  * @author Mike Cafarella
  *******************************************************/
 public class JobTracker implements MRConstants, InterTrackerProtocol, 
JobSubmissionProtocol {
+    static long JOBINIT_SLEEP_INTERVAL = 2000;
     static long RETIRE_JOB_INTERVAL;
     static long RETIRE_JOB_CHECK_INTERVAL;
     static float TASK_ALLOC_EPSILON;
@@ -156,14 +157,21 @@
                 }
                 
                 synchronized (jobs) {
-                    for (Iterator it = jobs.keySet().iterator(); it.hasNext(); 
) {
-                        String jobid = (String) it.next();
-                        JobInProgress job = (JobInProgress) jobs.get(jobid);
-
-                        if (job.getStatus().getRunState() != JobStatus.RUNNING 
&&
-                            (job.getFinishTime() + RETIRE_JOB_INTERVAL < 
System.currentTimeMillis())) {
-                            it.remove();
-                            jobsByArrival.remove(job);
+                    synchronized (jobInitQueue) {
+                        synchronized (jobsByArrival) {
+                            for (Iterator it = jobs.keySet().iterator(); 
it.hasNext(); ) {
+                                String jobid = (String) it.next();
+                                JobInProgress job = (JobInProgress) 
jobs.get(jobid);
+
+                                if (job.getStatus().getRunState() != 
JobStatus.RUNNING &&
+                                    job.getStatus().getRunState() != 
JobStatus.PREP &&
+                                    (job.getFinishTime() + RETIRE_JOB_INTERVAL 
< System.currentTimeMillis())) {
+                                    it.remove();
+                            
+                                    jobInitQueue.remove(job);
+                                    jobsByArrival.remove(job);
+                                }
+                            }
                         }
                     }
                 }
@@ -175,6 +183,43 @@
     }
 
     /////////////////////////////////////////////////////////////////
+    //  Used to init new jobs that have just been created
+    /////////////////////////////////////////////////////////////////
+    class JobInitThread implements Runnable {
+        boolean shouldRun = true;
+        public JobInitThread() {
+        }
+        public void run() {
+            while (shouldRun) {
+                JobInProgress job = null;
+                synchronized (jobInitQueue) {
+                    if (jobInitQueue.size() > 0) {
+                        job = (JobInProgress) jobInitQueue.elementAt(0);
+                        jobInitQueue.remove(job);
+                    } else {
+                        try {
+                            jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
+                        } catch (InterruptedException iex) {
+                        }
+                    }
+                }
+                try {
+                    if (job != null) {
+                        job.initTasks();
+                    }
+                } catch (Exception e) {
+                    LOG.log(Level.WARNING, "job init failed", e);
+                    job.kill();
+                }
+            }
+        }
+        public void stopIniter() {
+            shouldRun = false;
+        }
+    }
+
+
+    /////////////////////////////////////////////////////////////////
     // The real JobTracker
     ////////////////////////////////////////////////////////////////
     int port;
@@ -221,8 +266,10 @@
     int totalMaps = 0;
     int totalReduces = 0;
     TreeMap taskTrackers = new TreeMap();
+    Vector jobInitQueue = new Vector();
     ExpireTrackers expireTrackers = new ExpireTrackers();
     RetireJobs retireJobs = new RetireJobs();
+    JobInitThread initJobs = new JobInitThread();
 
     /**
      * It might seem like a bug to maintain a TreeSet of status objects,
@@ -307,6 +354,7 @@
 
         new Thread(this.expireTrackers).start();
         new Thread(this.retireJobs).start();
+        new Thread(this.initJobs).start();
     }
 
     public static InetSocketAddress getAddress(Configuration conf) {
@@ -521,67 +569,69 @@
         // has not yet been removed from the pool, making capacity seem
         // larger than it really is.)
         //
-        if ((numMaps < maxCurrentTasks) &&
-            (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) {
-
-            int totalNeededMaps = 0;
-            for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
-                JobInProgress job = (JobInProgress) it.next();
-                if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-                    continue;
-                }
+        synchronized (jobsByArrival) {
+            if ((numMaps < maxCurrentTasks) &&
+                (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) {
+
+                int totalNeededMaps = 0;
+                for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
+                    JobInProgress job = (JobInProgress) it.next();
+                    if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+                        continue;
+                    }
 
-                Task t = job.obtainNewMapTask(taskTracker, tts);
-                if (t != null) {
-                    return t;
-                }
+                    Task t = job.obtainNewMapTask(taskTracker, tts);
+                    if (t != null) {
+                        return t;
+                    }
 
-                //
-                // Beyond the highest-priority task, reserve a little 
-                // room for failures and speculative executions; don't 
-                // schedule tasks to the hilt.
-                //
-                totalNeededMaps += job.desiredMaps();
-                double padding = 0;
-                if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
-                    padding = Math.min(maxCurrentTasks, totalNeededMaps * 
PAD_FRACTION);
-                }
-                if (totalNeededMaps + padding >= totalCapacity) {
-                    break;
+                    //
+                    // Beyond the highest-priority task, reserve a little 
+                    // room for failures and speculative executions; don't 
+                    // schedule tasks to the hilt.
+                    //
+                    totalNeededMaps += job.desiredMaps();
+                    double padding = 0;
+                    if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
+                        padding = Math.min(maxCurrentTasks, totalNeededMaps * 
PAD_FRACTION);
+                    }
+                    if (totalNeededMaps + padding >= totalCapacity) {
+                        break;
+                    }
                 }
             }
-        }
-
-        //
-        // Same thing, but for reduce tasks
-        //
-        if ((numReduces < maxCurrentTasks) &&
-            (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) {
 
-            int totalNeededReduces = 0;
-            for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
-                JobInProgress job = (JobInProgress) it.next();
-                if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-                    continue;
-                }
+            //
+            // Same thing, but for reduce tasks
+            //
+            if ((numReduces < maxCurrentTasks) &&
+                (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) {
+
+                int totalNeededReduces = 0;
+                for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
+                    JobInProgress job = (JobInProgress) it.next();
+                    if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+                        continue;
+                    }
 
-                Task t = job.obtainNewReduceTask(taskTracker, tts);
-                if (t != null) {
-                    return t;
-                }
+                    Task t = job.obtainNewReduceTask(taskTracker, tts);
+                    if (t != null) {
+                        return t;
+                    }
 
-                //
-                // Beyond the highest-priority task, reserve a little 
-                // room for failures and speculative executions; don't 
-                // schedule tasks to the hilt.
-                //
-                totalNeededReduces += job.desiredReduces();
-                double padding = 0;
-                if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
-                    padding = Math.min(maxCurrentTasks, totalNeededReduces * 
PAD_FRACTION);
-                }
-                if (totalNeededReduces + padding >= totalCapacity) {
-                    break;
+                    //
+                    // Beyond the highest-priority task, reserve a little 
+                    // room for failures and speculative executions; don't 
+                    // schedule tasks to the hilt.
+                    //
+                    totalNeededReduces += job.desiredReduces();
+                    double padding = 0;
+                    if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
+                        padding = Math.min(maxCurrentTasks, totalNeededReduces 
* PAD_FRACTION);
+                    }
+                    if (totalNeededReduces + padding >= totalCapacity) {
+                        break;
+                    }
                 }
             }
         }
@@ -645,9 +695,31 @@
     ////////////////////////////////////////////////////
     // JobSubmissionProtocol
     ////////////////////////////////////////////////////
+    /**
+     * JobTracker.submitJob() kicks off a new job.  
+     *
+     * Create a 'JobInProgress' object, which contains both JobProfile
+     * and JobStatus.  Those two sub-objects are sometimes shipped outside
+     * of the JobTracker.  But JobInProgress adds info that's useful for
+     * the JobTracker alone.
+     *
+     * We add the JIP to the jobInitQueue, which is processed 
+     * asynchronously to handle split-computation and build up
+     * the right TaskTracker/Block mapping.
+     */
     public synchronized JobStatus submitJob(String jobFile) throws IOException 
{
         totalSubmissions++;
-        JobInProgress job = createJob(jobFile);
+        JobInProgress job = new JobInProgress(jobFile, this, this.conf);
+        synchronized (jobs) {
+            synchronized (jobsByArrival) {
+                synchronized (jobInitQueue) {
+                    jobs.put(job.getProfile().getJobId(), job);
+                    jobsByArrival.add(job);
+                    jobInitQueue.add(job);
+                    jobInitQueue.notifyAll();
+                }
+            }
+        }
         return job.getStatus();
     }
 
@@ -730,25 +802,6 @@
      */
     String createUniqueId() {
         return "" + Integer.toString(Math.abs(r.nextInt()),36);
-    }
-
-    /**
-     * JobProfile createJob() kicks off a new job.  
-     * This function creates a job profile and also decomposes it into
-     * tasks.  The tasks are added to the unassignedTasks structure.  
-     * (The precise structure will change as we get more sophisticated about 
-     * task allocation.)
-     *
-     * Create a 'JobInProgress' object, which contains both JobProfile
-     * and JobStatus.  Those two sub-objects are sometimes shipped outside
-     * of the JobTracker.  But JobInProgress adds info that's useful for
-     * the JobTracker alone.
-     */
-    JobInProgress createJob(String jobFile) throws IOException {
-        JobInProgress job = new JobInProgress(jobFile, this, this.conf);
-        jobs.put(job.getProfile().getJobId(), job);
-        jobsByArrival.add(job);
-        return job;
     }
 
     ////////////////////////////////////////////////////

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=382545&r1=382544&r2=382545&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java 
Thu Mar  2 15:05:43 2006
@@ -360,8 +360,7 @@
             try {
                 if (isMapTask()) {
                     if (hints == null) {
-                        FileSystem fs = FileSystem.get(conf);
-                        hints = fs.getFileCacheHints(split.getFile(), 
split.getStart(), split.getLength());
+                        hints = job.getFileCacheHints(getTIPId(), 
split.getFile(), split.getStart(), split.getLength());
                     }
                     if (hints != null) {
                         for (int i = 0; i < hints.length; i++) {


Reply via email to