Modified: lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=487697&r1=487696&r2=487697 ============================================================================== --- lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 15 14:35:51 2006 @@ -68,6 +68,9 @@ Server taskReportServer = null; InterTrackerProtocol jobClient; + + // last heartbeat response recieved + short heartbeatResponseId = -1; StatusHttpServer server = null; @@ -187,7 +190,7 @@ } } } - + static String getCacheSubdir() { return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR; } @@ -451,15 +454,23 @@ } } - if (!transmitHeartBeat()) { + // Send the heartbeat and process the jobtracker's directives + HeartbeatResponse heartbeatResponse = transmitHeartBeat(); + TaskTrackerAction[] actions = heartbeatResponse.getActions(); + LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + + heartbeatResponse.getResponseId() + " and " + + ((actions != null) ? actions.length : 0) + " actions"); + + if (reinitTaskTracker(actions)) { return State.STALE; } + lastHeartbeat = now; justStarted = false; - checkForNewTasks(); + checkAndStartNewTasks(actions); markUnresponsiveTasks(); - closeCompletedTasks(); + closeCompletedTasks(actions); killOverflowingTasks(); //we've cleaned up, resume normal operation @@ -491,56 +502,94 @@ * @return false if the tracker was unknown * @throws IOException */ - private boolean transmitHeartBeat() throws IOException { + private HeartbeatResponse transmitHeartBeat() throws IOException { // // Build the heartbeat information for the JobTracker // - List<TaskStatus> taskReports = new ArrayList(runningTasks.size()); + List<TaskStatus> taskReports = + new ArrayList<TaskStatus>(runningTasks.size()); synchronized (this) { - for (TaskInProgress tip: runningTasks.values()) { - taskReports.add(tip.createStatus()); - } + for (TaskInProgress tip: runningTasks.values()) { + taskReports.add(tip.createStatus()); + } } TaskTrackerStatus status = new TaskTrackerStatus(taskTrackerName, localHostname, - httpPort, taskReports, - failures); - + httpPort, taskReports, + failures); + + // + // Check if we should ask for a new Task + // + boolean askForNewTask = false; + if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && + acceptNewTasks) { + checkLocalDirs(fConf.getLocalDirs()); + + if (enoughFreeSpace(minSpaceStart)) { + askForNewTask = true; + } + } + // // Xmit the heartbeat // + HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, + justStarted, askForNewTask, + heartbeatResponseId); + heartbeatResponseId = heartbeatResponse.getResponseId(); - int resultCode = jobClient.emitHeartbeat(status, justStarted); synchronized (this) { - for (TaskStatus taskStatus: taskReports) { - if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { - if (taskStatus.getIsMap()) { - mapTotal--; - } else { - reduceTotal--; - } - myMetrics.completeTask(); - runningTasks.remove(taskStatus.getTaskId()); + for (TaskStatus taskStatus : taskReports) { + if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { + if (taskStatus.getIsMap()) { + mapTotal--; + } else { + reduceTotal--; } + myMetrics.completeTask(); + runningTasks.remove(taskStatus.getTaskId()); + } } } - return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER; + return heartbeatResponse; } /** + * Check if the jobtracker directed a 'reset' of the tasktracker. + * + * @param actions the directives of the jobtracker for the tasktracker. + * @return <code>true</code> if tasktracker is to be reset, + * <code>false</code> otherwise. + */ + private boolean reinitTaskTracker(TaskTrackerAction[] actions) { + if (actions != null) { + for (TaskTrackerAction action : actions) { + if (action.getActionId() == + TaskTrackerAction.ActionType.REINIT_TRACKER) { + LOG.info("Recieved RenitTrackerAction from JobTracker"); + return true; + } + } + } + return false; + } + + /** * Check to see if there are any new tasks that we should run. * @throws IOException */ - private void checkForNewTasks() throws IOException { - // - // Check if we should ask for a new Task - // - if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && - acceptNewTasks) { - checkLocalDirs(fConf.getLocalDirs()); - - if (enoughFreeSpace(minSpaceStart)) { - Task t = jobClient.pollForNewTask(taskTrackerName); + private void checkAndStartNewTasks(TaskTrackerAction[] actions) + throws IOException { + if (actions == null) { + return; + } + + for (TaskTrackerAction action : actions) { + if (action.getActionId() == + TaskTrackerAction.ActionType.LAUNCH_TASK) { + Task t = ((LaunchTaskAction)(action)).getTask(); + LOG.info("LaunchTaskAction: " + t.getTaskId()); if (t != null) { startNewTask(t); } @@ -573,24 +622,73 @@ * Ask the JobTracker if there are any tasks that we should clean up, * either because we don't need them any more or because the job is done. */ - private void closeCompletedTasks() throws IOException { - String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName); - if (toCloseIds != null) { - synchronized (this) { - for (int i = 0; i < toCloseIds.length; i++) { - TaskInProgress tip = tasks.get(toCloseIds[i]); - if (tip != null) { - // remove the task from running jobs, removing the job if - // it is the last task - removeTaskFromJob(tip.getTask().getJobId(), tip); - tasksToCleanup.put(tip); + private void closeCompletedTasks(TaskTrackerAction[] actions) + throws IOException { + if (actions == null) { + return; + } + + for (TaskTrackerAction action : actions) { + TaskTrackerAction.ActionType actionType = action.getActionId(); + + if (actionType == TaskTrackerAction.ActionType.KILL_JOB) { + String jobId = ((KillJobAction)action).getJobId(); + LOG.info("Received 'KillJobAction' for job: " + jobId); + synchronized (runningJobs) { + RunningJob rjob = runningJobs.get(jobId); + if (rjob == null) { + LOG.warn("Unknown job " + jobId + " being deleted."); } else { - LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]); + synchronized (rjob) { + int noJobTasks = rjob.tasks.size(); + int taskCtr = 0; + + // Add this tips of this job to queue of tasks to be purged + for (TaskInProgress tip : rjob.tasks) { + // Purge the job files for the last element in rjob.tasks + if (++taskCtr == noJobTasks) { + tip.setPurgeJobFiles(true); + } + + tasksToCleanup.put(tip); + } + + // Remove this job + rjob.tasks.clear(); + runningJobs.remove(jobId); + } } } + } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) { + String taskId = ((KillTaskAction)action).getTaskId(); + LOG.info("Received KillTaskAction for task: " + taskId); + purgeTask(tasks.get(taskId), false); } } } + + /** + * Remove the tip and update all relevant state. + * + * @param tip [EMAIL PROTECTED] TaskInProgress} to be removed. + * @param purgeJobFiles <code>true</code> if the job files are to be + * purged, <code>false</code> otherwise. + */ + private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) { + if (tip != null) { + LOG.info("About to purge task: " + tip.getTask().getTaskId()); + + // Cleanup the job files? + tip.setPurgeJobFiles(purgeJobFiles); + + // Remove the task from running jobs, + // removing the job if it's the last task + removeTaskFromJob(tip.getTask().getJobId(), tip); + + // Add this tip to queue of tasks to be purged + tasksToCleanup.put(tip); + } + } /** Check if we're dangerously low on disk space * If so, kill jobs to free up space and make sure @@ -822,6 +920,9 @@ private boolean alwaysKeepTaskFiles; private TaskStatus taskStatus ; private boolean keepJobFiles; + + /** Cleanup the job files when the job is complete (done/failed) */ + private boolean purgeJobFiles = false; /** */ @@ -886,6 +987,10 @@ keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); } + public void setPurgeJobFiles(boolean purgeJobFiles) { + this.purgeJobFiles = purgeJobFiles; + } + /** */ public synchronized TaskStatus createStatus() { @@ -1017,32 +1122,39 @@ * We no longer need anything from this task, as the job has * finished. If the task is still running, kill it (and clean up */ - public synchronized void jobHasFinished() throws IOException { - - if (getRunState() == TaskStatus.State.RUNNING) { + public void jobHasFinished() throws IOException { + boolean killTask = false; + synchronized(this){ + killTask = (getRunState() == TaskStatus.State.RUNNING); + if (killTask) { killAndCleanup(false); - } else { - cleanup(); - } - if (keepJobFiles) - return; - - // Delete temp directory in case any task used PhasedFileSystem. - try{ - String systemDir = task.getConf().get("mapred.system.dir"); - Path taskTempDir = new Path(systemDir + "/" + - task.getJobId() + "/" + task.getTipId()); - if( fs.exists(taskTempDir)){ - fs.delete(taskTempDir) ; } - }catch(IOException e){ - LOG.warn("Error in deleting reduce temporary output",e); + } + if (!killTask) { + cleanup(); + } + if (keepJobFiles) + return; + + synchronized(this){ + // Delete temp directory in case any task used PhasedFileSystem. + try{ + String systemDir = task.getConf().get("mapred.system.dir"); + Path taskTempDir = new Path(systemDir + "/" + + task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId()); + if( fs.exists(taskTempDir)){ + fs.delete(taskTempDir) ; + } + }catch(IOException e){ + LOG.warn("Error in deleting reduce temporary output",e); + } + } + // Delete the job directory for this + // task if the job is done/failed + if (purgeJobFiles) { + this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + + JOBCACHE + Path.SEPARATOR + task.getJobId()); } - - // delete the job diretory for this task - // since the job is done/failed - this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + - JOBCACHE + Path.SEPARATOR + task.getJobId()); } /** @@ -1090,6 +1202,9 @@ * We no longer need anything from this task. Either the * controlling job is all done and the files have been copied * away, or the task failed and we don't need the remains. + * Any calls to cleanup should not lock the tip first. + * cleanup does the right thing- updates tasks in Tasktracker + * by locking tasktracker first and then locks the tip. */ void cleanup() throws IOException { String taskId = task.getTaskId();
Modified: lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml?view=diff&rev=487697&r1=487696&r2=487697 ============================================================================== --- lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml (original) +++ lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml Fri Dec 15 14:35:51 2006 @@ -15,6 +15,15 @@ <title>News</title> <section> + <title>15 December, 2006: release 0.9.2 available</title> + <p>This fixes critical bugs in 0.9.1. For details see the <a + href="http://tinyurl.com/ya8lfd">release notes</a>. The release can + be obtained from <a + href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a + nearby mirror</a>. + </p> </section> + + <section> <title>6 December, 2006: release 0.9.1 available</title> <p>This fixes critical bugs in 0.9.0. For details see the <a href="http://tinyurl.com/y55d7p">release notes</a>. The release can