Author: ddas
Date: Fri Feb 20 13:38:05 2009
New Revision: 746233
URL: http://svn.apache.org/viewvc?rev=746233&view=rev
Log:
HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when a job
finishes. This fixes a bunch of problems to do with NPE when a completed job is
not in memory and a tasktracker comes to the jobtracker with a status report of
a task belonging to that job. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=746233&r1=746232&r2=746233&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb 20 13:38:05 2009
@@ -813,6 +813,11 @@
(HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task
(HADOOP-5235).
(Amareshwari Sriramadasu via ddas)
+ HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when
+ a job finishes. This fixes a bunch of problems to do with NPE when a
completed
+ job is not in memory and a tasktracker comes to the jobtracker with a
status
+ report of a task belonging to that job. (Amar Kamat via ddas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=746233&r1=746232&r2=746233&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Feb 20 13:38:05 2009
@@ -2609,9 +2609,6 @@
}
boolean isComplete() {
- int runState = this.status.getRunState();
- return runState == JobStatus.SUCCEEDED
- || runState == JobStatus.FAILED
- || runState == JobStatus.KILLED;
+ return status.isJobComplete();
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=746233&r1=746232&r2=746233&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri
Feb 20 13:38:05 2009
@@ -270,6 +270,14 @@
priority = jp;
}
+ /**
+ * Returns true if the status is for a completed job.
+ */
+ public synchronized boolean isJobComplete() {
+ return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED);
+ }
+
///////////////////////////////////////
// Writable
///////////////////////////////////////
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=746233&r1=746232&r2=746233&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri
Feb 20 13:38:05 2009
@@ -1219,6 +1219,10 @@
TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
new TreeMap<String, ArrayList<JobInProgress>>();
+ // (trackerID --> list of jobs to cleanup)
+ Map<String, Set<JobID>> trackerToJobsToCleanup =
+ new HashMap<String, Set<JobID>>();
+
// All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -1835,6 +1839,9 @@
long now = System.currentTimeMillis();
+ // mark the job for cleanup at all the trackers
+ addJobForCleanup(id);
+
// add the blacklisted trackers to potentially faulty list
if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
if (job.getNoOfBlackListedTrackers() > 0) {
@@ -2320,6 +2327,12 @@
actions.addAll(killTasksList);
}
+ // Check for jobs to be killed/cleanedup
+ List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
+ if (killJobsList != null) {
+ actions.addAll(killJobsList);
+ }
+
// Check for tasks whose outputs can be saved
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
@@ -2496,27 +2509,58 @@
Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
if (taskIds != null) {
List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
- Set<JobID> killJobIds = new TreeSet<JobID>();
for (TaskAttemptID killTaskId : taskIds) {
TaskInProgress tip = taskidToTIPMap.get(killTaskId);
+ if (tip == null) {
+ continue;
+ }
if (tip.shouldClose(killTaskId)) {
//
// This is how the JobTracker ends a task at the TaskTracker.
// It may be successfully completed, or may be killed in
// mid-execution.
//
- if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING ||
- tip.getJob().getStatus().getRunState() == JobStatus.PREP) {
+ if (!tip.getJob().isComplete()) {
killList.add(new KillTaskAction(killTaskId));
LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
- } else {
- JobID killJobId = tip.getJob().getStatus().getJobID();
- killJobIds.add(killJobId);
}
}
}
- for (JobID killJobId : killJobIds) {
+ return killList;
+ }
+ return null;
+ }
+
+ /**
+ * Add a job to cleanup for the tracker.
+ */
+ private void addJobForCleanup(JobID id) {
+ for (String taskTracker : taskTrackers.keySet()) {
+ LOG.debug("Marking job " + id + " for cleanup by tracker " +
taskTracker);
+ synchronized (trackerToJobsToCleanup) {
+ Set<JobID> jobsToKill = trackerToJobsToCleanup.get(taskTracker);
+ if (jobsToKill == null) {
+ jobsToKill = new HashSet<JobID>();
+ trackerToJobsToCleanup.put(taskTracker, jobsToKill);
+ }
+ jobsToKill.add(id);
+ }
+ }
+ }
+
+ /**
+ * A tracker wants to know if any job needs cleanup because the job
completed.
+ */
+ private List<TaskTrackerAction> getJobsForCleanup(String taskTracker) {
+ Set<JobID> jobs = null;
+ synchronized (trackerToJobsToCleanup) {
+ jobs = trackerToJobsToCleanup.remove(taskTracker);
+ }
+ if (jobs != null) {
+ // prepare the actions list
+ List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
+ for (JobID killJobId : jobs) {
killList.add(new KillJobAction(killJobId));
LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
}
@@ -2538,6 +2582,9 @@
if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
TaskAttemptID taskId = taskStatus.getTaskID();
TaskInProgress tip = taskidToTIPMap.get(taskId);
+ if (tip == null) {
+ continue;
+ }
if (tip.shouldCommit(taskId)) {
saveList.add(new CommitTaskAction(taskId));
LOG.debug(tts.getTrackerName() +
@@ -3091,16 +3138,23 @@
for (TaskStatus report : status.getTaskReports()) {
report.setTaskTracker(trackerName);
TaskAttemptID taskId = report.getTaskID();
+
+ // expire it
+ expireLaunchingTasks.removeTask(taskId);
+
+ JobInProgress job = getJob(taskId.getJobID());
+ if (job == null) {
+ continue;
+ }
+
TaskInProgress tip = taskidToTIPMap.get(taskId);
// Check if the tip is known to the jobtracker. In case of a restarted
// jt, some tasks might join in later
if (tip != null || hasRestarted()) {
- JobInProgress job = getJob(taskId.getJobID());
if (tip == null) {
tip = job.getTaskInProgress(taskId.getTaskID());
job.addRunningTaskToTIP(tip, taskId, status, false);
}
- expireLaunchingTasks.removeTask(taskId);
// Update the job and inform the listeners if necessary
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
@@ -3148,6 +3202,12 @@
*/
void lostTaskTracker(String trackerName) {
LOG.info("Lost tracker '" + trackerName + "'");
+
+ // remove the tracker from the local structures
+ synchronized (trackerToJobsToCleanup) {
+ trackerToJobsToCleanup.remove(trackerName);
+ }
+
Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
trackerToTaskMap.remove(trackerName);