Author: cutting Date: Thu Jan 4 11:00:49 2007 New Revision: 492681 URL: http://svn.apache.org/viewvc?view=rev&rev=492681 Log: HADOOP-840. Queue task cleanups. Contributed by Owen & Mahadev.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=492681&r1=492680&r2=492681 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 4 11:00:49 2007 @@ -185,6 +185,9 @@ intermediate outputs may happen at any time, potentially causing task timeouts. (Devaraj Das via cutting) +53. HADOOP-840. In task tracker, queue task cleanups and perform them + in a separate thread. (omalley & Mahadev Konar via cutting) + Release 0.9.2 - 2006-12-15 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=492681&r1=492680&r2=492681 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jan 4 11:00:49 2007 @@ -21,7 +21,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.ipc.*; -import org.apache.hadoop.io.*; import org.apache.hadoop.metrics.Metrics; import org.apache.hadoop.util.*; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -29,6 +28,8 @@ import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; import javax.servlet.ServletContext; @@ -77,7 +78,7 @@ boolean shuttingDown = false; - Map<String, TaskInProgress> tasks = null; + Map<String, TaskInProgress> tasks = new HashMap(); /** * Map from taskId -> TaskInProgress. */ @@ -134,7 +135,8 @@ /** * A list of tips that should be cleaned up. */ - private BlockingQueue tasksToCleanup = new BlockingQueue(); + private BlockingQueue<TaskTrackerAction> tasksToCleanup = + new LinkedBlockingQueue(); /** * A daemon-thread that pulls tips off the list of things to cleanup. @@ -144,8 +146,22 @@ public void run() { while (true) { try { - TaskInProgress tip = (TaskInProgress) tasksToCleanup.take(); - tip.jobHasFinished(); + TaskTrackerAction action = tasksToCleanup.take(); + if (action instanceof KillJobAction) { + purgeJob((KillJobAction) action); + } else if (action instanceof KillTaskAction) { + TaskInProgress tip; + KillTaskAction killAction = (KillTaskAction) action; + synchronized (TaskTracker.this) { + tip = tasks.get(killAction.getTaskId()); + } + LOG.info("Received KillTaskAction for task: " + + killAction.getTaskId()); + purgeTask(tip); + } else { + LOG.error("Non-delete action given to cleanup thread: " + + action); + } } catch (Throwable except) { LOG.warn(StringUtils.stringifyException(except)); } @@ -163,7 +179,7 @@ synchronized (runningJobs) { RunningJob rJob = null; if (!runningJobs.containsKey(jobId)) { - rJob = new RunningJob(localJobFile); + rJob = new RunningJob(jobId, localJobFile); rJob.localized = false; rJob.tasks = new HashSet(); rJob.jobFile = localJobFile; @@ -227,7 +243,7 @@ fConf.deleteLocalFiles(SUBDIR); // Clear out state tables - this.tasks = new TreeMap(); + this.tasks.clear(); this.runningTasks = new TreeMap(); this.runningJobs = new TreeMap(); this.mapTotal = 0; @@ -324,6 +340,8 @@ } RunJar.unJar(new File(localJarFile.toString()), workDir); } + rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || + localJobConf.getKeepFailedTaskFiles()); rjob.localized = true; } } @@ -475,10 +493,16 @@ lastHeartbeat = now; justStarted = false; - - checkAndStartNewTasks(actions); + if (actions != null){ + for(TaskTrackerAction action: actions) { + if (action instanceof LaunchTaskAction) { + startNewTask((LaunchTaskAction) action); + } else { + tasksToCleanup.put(action); + } + } + } markUnresponsiveTasks(); - closeCompletedTasks(actions); killOverflowingTasks(); //we've cleaned up, resume normal operation @@ -584,28 +608,6 @@ } /** - * Check to see if there are any new tasks that we should run. - * @throws IOException - */ - private void checkAndStartNewTasks(TaskTrackerAction[] actions) - throws IOException { - if (actions == null) { - return; - } - - for (TaskTrackerAction action : actions) { - if (action.getActionId() == - TaskTrackerAction.ActionType.LAUNCH_TASK) { - Task t = ((LaunchTaskAction)(action)).getTask(); - LOG.info("LaunchTaskAction: " + t.getTaskId()); - if (t != null) { - startNewTask(t); - } - } - } - } - - /** * Kill any tasks that have not reported progress in the last X seconds. */ private synchronized void markUnresponsiveTasks() throws IOException { @@ -621,59 +623,48 @@ LOG.info(tip.getTask().getTaskId() + ": " + msg); ReflectionUtils.logThreadInfo(LOG, "lost task", 30); tip.reportDiagnosticInfo(msg); - purgeTask(tip, false); + purgeTask(tip); } } } /** - * Ask the JobTracker if there are any tasks that we should clean up, - * either because we don't need them any more or because the job is done. + * The task tracker is done with this job, so we need to clean up. + * @param action The action with the job + * @throws IOException */ - private void closeCompletedTasks(TaskTrackerAction[] actions) - throws IOException { - if (actions == null) { - return; + private void purgeJob(KillJobAction action) throws IOException { + String jobId = action.getJobId(); + LOG.info("Received 'KillJobAction' for job: " + jobId); + RunningJob rjob = null; + synchronized (runningJobs) { + rjob = runningJobs.get(jobId); } - for (TaskTrackerAction action : actions) { - TaskTrackerAction.ActionType actionType = action.getActionId(); - - if (actionType == TaskTrackerAction.ActionType.KILL_JOB) { - String jobId = ((KillJobAction)action).getJobId(); - LOG.info("Received 'KillJobAction' for job: " + jobId); - synchronized (runningJobs) { - RunningJob rjob = runningJobs.get(jobId); - if (rjob == null) { - LOG.warn("Unknown job " + jobId + " being deleted."); - } else { - synchronized (rjob) { - int noJobTasks = rjob.tasks.size(); - int taskCtr = 0; - - // Add this tips of this job to queue of tasks to be purged - for (TaskInProgress tip : rjob.tasks) { - // Purge the job files for the last element in rjob.tasks - if (++taskCtr == noJobTasks) { - tip.setPurgeJobFiles(true); - } - - tasksToCleanup.put(tip); - } - - // Remove this job - rjob.tasks.clear(); - runningJobs.remove(jobId); - } - } + if (rjob == null) { + LOG.warn("Unknown job " + jobId + " being deleted."); + } else { + synchronized (rjob) { + // Add this tips of this job to queue of tasks to be purged + for (TaskInProgress tip : rjob.tasks) { + tip.jobHasFinished(); } - } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) { - String taskId = ((KillTaskAction)action).getTaskId(); - LOG.info("Received KillTaskAction for task: " + taskId); - purgeTask(tasks.get(taskId), false); + // Delete the job directory for this + // task if the job is done/failed + if (!rjob.keepJobFiles){ + fConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE + + Path.SEPARATOR + rjob.getJobId()); + } + // Remove this job + rjob.tasks.clear(); } } - } + + synchronized(runningJobs) { + runningJobs.remove(jobId); + } + } + /** * Remove the tip and update all relevant state. @@ -682,19 +673,14 @@ * @param purgeJobFiles <code>true</code> if the job files are to be * purged, <code>false</code> otherwise. */ - private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) { + private void purgeTask(TaskInProgress tip) throws IOException { if (tip != null) { LOG.info("About to purge task: " + tip.getTask().getTaskId()); - // Cleanup the job files? - tip.setPurgeJobFiles(purgeJobFiles); - // Remove the task from running jobs, // removing the job if it's the last task removeTaskFromJob(tip.getTask().getJobId(), tip); - - // Add this tip to queue of tasks to be purged - tasksToCleanup.put(tip); + tip.jobHasFinished(); } } @@ -718,7 +704,7 @@ " Killing task."; LOG.info(killMe.getTask().getTaskId() + ": " + msg); killMe.reportDiagnosticInfo(msg); - purgeTask(killMe, false); + purgeTask(killMe); } } } @@ -793,7 +779,9 @@ * All exceptions are handled locally, so that we don't mess up the * task tracker. */ - private void startNewTask(Task t) { + private void startNewTask(LaunchTaskAction action) { + Task t = action.getTask(); + LOG.info("LaunchTaskAction: " + t.getTaskId()); TaskInProgress tip = new TaskInProgress(t, this.fConf); synchronized (this) { tasks.put(t.getTaskId(), tip); @@ -862,51 +850,6 @@ return; } } - - /** - * This class implements a queue that is put between producer and - * consumer threads. It will grow without bound. - * @author Owen O'Malley - */ - static private class BlockingQueue { - private List queue; - - /** - * Create an empty queue. - */ - public BlockingQueue() { - queue = new ArrayList(); - } - - /** - * Put the given object at the back of the queue. - * @param obj - */ - public void put(Object obj) { - synchronized (queue) { - queue.add(obj); - queue.notify(); - } - } - - /** - * Take the object at the front of the queue. - * It blocks until there is an object available. - * @return the head of the queue - */ - public Object take() { - synchronized (queue) { - while (queue.isEmpty()) { - try { - queue.wait(); - } catch (InterruptedException ie) {} - } - Object result = queue.get(0); - queue.remove(0); - return result; - } - } - } /////////////////////////////////////////////////////// // TaskInProgress maintains all the info for a Task that @@ -929,9 +872,6 @@ private TaskStatus taskStatus ; private boolean keepJobFiles; - /** Cleanup the job files when the job is complete (done/failed) */ - private boolean purgeJobFiles = false; - /** */ public TaskInProgress(Task task, JobConf conf) { @@ -995,10 +935,6 @@ keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); } - public void setPurgeJobFiles(boolean purgeJobFiles) { - this.purgeJobFiles = purgeJobFiles; - } - /** */ public synchronized TaskStatus createStatus() { @@ -1158,12 +1094,6 @@ LOG.warn("Error in deleting reduce temporary output",e); } } - // Delete the job directory for this - // task if the job is done/failed - if (purgeJobFiles) { - this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + - JOBCACHE + Path.SEPARATOR + task.getJobId()); - } } /** @@ -1225,17 +1155,20 @@ keepFailedTaskFiles)) { return; } - synchronized (this) { - try { - runner.close(); - } catch (Throwable ie) { - } - } } - this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + - JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR + - taskId); + synchronized (this) { + try { + runner.close(); + defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + + JOBCACHE + Path.SEPARATOR + + task.getJobId() + + Path.SEPARATOR + taskId); + } catch (Throwable ie) { + LOG.info("Error cleaning up task runner: " + + StringUtils.stringifyException(ie)); + } } + } public boolean equals(Object obj) { return (obj instanceof TaskInProgress) && @@ -1355,15 +1288,26 @@ * The datastructure for initializing a job */ static class RunningJob{ - Path jobFile; + private String jobid; + private Path jobFile; // keep this for later use Set<TaskInProgress> tasks; boolean localized; - - RunningJob(Path jobFile) { + boolean keepJobFiles; + RunningJob(String jobid, Path jobFile) { + this.jobid = jobid; localized = false; tasks = new HashSet(); this.jobFile = jobFile; + keepJobFiles = false; + } + + Path getJobFile() { + return jobFile; + } + + String getJobId() { + return jobid; } }