Author: cutting Date: Mon Jan 8 11:42:24 2007 New Revision: 494172 URL: http://svn.apache.org/viewvc?view=rev&rev=494172 Log: HADOOP-600. Fix a race condition in the JobTracker. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=494172&r1=494171&r2=494172 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Jan 8 11:42:24 2007 @@ -11,6 +11,9 @@ 3. HADOOP-815. Fix memory leaks in JobTracker. (Arun C Murthy via cutting) + 4. HADOOP-600. Fix a race condition in JobTracker. + (Arun C Murthy via cutting) + Release 0.10.0 - 2007-01-05 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=494172&r1=494171&r2=494172 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jan 8 11:42:24 2007 @@ -221,36 +221,45 @@ // // Loop through all expired items in the queue // - synchronized (taskTrackers) { + // Need to lock the JobTracker here since we are + // manipulating it's data-structures via + // ExpireTrackers.run -> JobTracker.lostTaskTracker -> + // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt + // Also need to lock JobTracker before locking 'taskTracker' & + // 'trackerExpiryQueue' to prevent deadlock: + // @see [EMAIL PROTECTED] JobTracker.processHeartbeat(TaskTrackerStatus, boolean)} + synchronized (JobTracker.this) { + synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { - long now = System.currentTimeMillis(); - TaskTrackerStatus leastRecent = null; - while ((trackerExpiryQueue.size() > 0) && - ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) && - (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) { - - // Remove profile from head of queue - trackerExpiryQueue.remove(leastRecent); - String trackerName = leastRecent.getTrackerName(); - - // Figure out if last-seen time should be updated, or if tracker is dead - TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName()); - // Items might leave the taskTracker set through other means; the - // status stored in 'taskTrackers' might be null, which means the - // tracker has already been destroyed. - if (newProfile != null) { - if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) { - // Remove completely - updateTaskTrackerStatus(trackerName, null); - lostTaskTracker(leastRecent.getTrackerName(), - leastRecent.getHost()); - } else { - // Update time by inserting latest profile - trackerExpiryQueue.add(newProfile); - } - } + long now = System.currentTimeMillis(); + TaskTrackerStatus leastRecent = null; + while ((trackerExpiryQueue.size() > 0) && + ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) && + (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) { + + // Remove profile from head of queue + trackerExpiryQueue.remove(leastRecent); + String trackerName = leastRecent.getTrackerName(); + + // Figure out if last-seen time should be updated, or if tracker is dead + TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName()); + // Items might leave the taskTracker set through other means; the + // status stored in 'taskTrackers' might be null, which means the + // tracker has already been destroyed. + if (newProfile != null) { + if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) { + // Remove completely + updateTaskTrackerStatus(trackerName, null); + lostTaskTracker(leastRecent.getTrackerName(), + leastRecent.getHost()); + } else { + // Update time by inserting latest profile + trackerExpiryQueue.add(newProfile); + } } + } } + } } } catch (Exception t) { LOG.error("Tracker Expiry Thread got exception: " +