Author: ddas
Date: Wed Jun 3 03:51:21 2009
New Revision: 781255
URL: http://svn.apache.org/viewvc?rev=781255&view=rev
Log:
HADOOP-5924. Fixes a corner case problem to do with job recovery with empty
history files. Also, after a JT restart, sends KillTaskAction to tasks that
report back but the corresponding job hasn't been initialized yet. Contributed
by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun 3 03:51:21 2009
@@ -863,6 +863,11 @@
HADOOP-5908. Fixes a problem to do with ArithmeticException in the
JobTracker when there are jobs with 0 maps. (Amar Kamat via ddas)
+ HADOOP-5924. Fixes a corner case problem to do with job recovery with
+ empty history files. Also, after a JT restart, sends KillTaskAction to
+ tasks that report back but the corresponding job hasn't been initialized
+ yet. (Amar Kamat via ddas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Wed
Jun 3 03:51:21 2009
@@ -936,19 +936,16 @@
throws IOException {
FileStatus[] contents = fs.listStatus(jobDirPath);
int matchCount = 0;
- if (contents != null && contents.length >=3) {
+ if (contents != null && contents.length >=2) {
for (FileStatus status : contents) {
if ("job.xml".equals(status.getPath().getName())) {
++matchCount;
}
- if ("job.jar".equals(status.getPath().getName())) {
- ++matchCount;
- }
if ("job.split".equals(status.getPath().getName())) {
++matchCount;
}
}
- if (matchCount == 3) {
+ if (matchCount == 2) {
return true;
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed
Jun 3 03:51:21 2009
@@ -875,6 +875,11 @@
String logFileName = null;
if (restarted) {
logFileName = getJobHistoryFileName(jobConf, jobId);
+ if (logFileName == null) {
+ logFileName =
+ encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf,
jobId));
+
+ }
} else {
logFileName =
encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed
Jun 3 03:51:21 2009
@@ -1439,6 +1439,10 @@
Map<String, Set<JobID>> trackerToJobsToCleanup =
new HashMap<String, Set<JobID>>();
+ // (trackerID --> list of tasks to cleanup)
+ Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup =
+ new HashMap<String, Set<TaskAttemptID>>();
+
// All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -2823,8 +2827,8 @@
String
taskTracker) {
Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
+ List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
if (taskIds != null) {
- List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
for (TaskAttemptID killTaskId : taskIds) {
TaskInProgress tip = taskidToTIPMap.get(killTaskId);
if (tip == null) {
@@ -2842,10 +2846,18 @@
}
}
}
-
- return killList;
}
- return null;
+
+ // add the stray attempts for uninited jobs
+ synchronized (trackerToTasksToCleanup) {
+ Set<TaskAttemptID> set = trackerToTasksToCleanup.remove(taskTracker);
+ if (set != null) {
+ for (TaskAttemptID id : set) {
+ killList.add(new KillTaskAction(id));
+ }
+ }
+ }
+ return killList;
}
/**
@@ -3521,6 +3533,19 @@
continue;
}
+ if (!job.inited()) {
+ // if job is not yet initialized ... kill the attempt
+ synchronized (trackerToTasksToCleanup) {
+ Set<TaskAttemptID> tasks = trackerToTasksToCleanup.get(trackerName);
+ if (tasks == null) {
+ tasks = new HashSet<TaskAttemptID>();
+ trackerToTasksToCleanup.put(trackerName, tasks);
+ }
+ tasks.add(taskId);
+ }
+ continue;
+ }
+
TaskInProgress tip = taskidToTIPMap.get(taskId);
// Check if the tip is known to the jobtracker. In case of a restarted
// jt, some tasks might join in later
@@ -3585,6 +3610,10 @@
trackerToJobsToCleanup.remove(trackerName);
}
+ synchronized (trackerToTasksToCleanup) {
+ trackerToTasksToCleanup.remove(trackerName);
+ }
+
// Inform the recovery manager
recoveryManager.unMarkTracker(trackerName);
Modified:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=781255&r1=781254&r2=781255&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
(original)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Wed Jun 3 03:51:21 2009
@@ -34,11 +34,11 @@
* recover previosuly submitted jobs.
*/
public class TestJobTrackerRestart extends TestCase {
- final Path testDir =
+ static final Path testDir =
new Path(System.getProperty("test.build.data","/tmp"),
"jt-restart-testing");
final Path inDir = new Path(testDir, "input");
- final Path shareDir = new Path(testDir, "share");
+ static final Path shareDir = new Path(testDir, "share");
final Path outputDir = new Path(testDir, "output");
private static int numJobsSubmitted = 0;
@@ -400,6 +400,115 @@
&& status.getReduceTasks() == 0;
}
+ /** Committer with setup waiting
+ */
+ static class CommitterWithDelaySetup extends FileOutputCommitter {
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ while (true) {
+ if (fs.exists(shareDir)) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+ super.cleanupJob(context);
+ }
+ }
+
+ /** Tests a job on jobtracker with restart-recovery turned on and empty
+ * jobhistory file.
+ * Preparation :
+ * - Configure a job with
+ * - num-maps : 0 (long waiting setup)
+ * - num-reducers : 0
+ *
+ * Check if the job succeedes after restart.
+ *
+ * Assumption that map slots are given first for setup.
+ */
+ public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs,
+ MiniMRCluster mr)
+ throws IOException {
+ mr.startTaskTracker(null, null, 1, 1);
+ FileSystem fileSys = dfs.getFileSystem();
+
+ cleanUp(fileSys, shareDir);
+ cleanUp(fileSys, inDir);
+ cleanUp(fileSys, outputDir);
+
+ JobConf conf = mr.createJobConf();
+ conf.setNumReduceTasks(0);
+ conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
+ fileSys.delete(outputDir, false);
+ RunningJob job1 =
+ UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
+
+ conf.setNumReduceTasks(0);
+ conf.setOutputCommitter(CommitterWithDelaySetup.class);
+ Path inDir2 = new Path(testDir, "input2");
+ fileSys.mkdirs(inDir2);
+ Path outDir2 = new Path(testDir, "output2");
+ fileSys.delete(outDir2, false);
+ JobConf newConf = getJobs(mr.createJobConf(),
+ new JobPriority[] {JobPriority.NORMAL},
+ new int[] {10}, new int[] {0},
+ outDir2, inDir2,
+ getMapSignalFile(shareDir),
+ getReduceSignalFile(shareDir))[0];
+
+ JobClient jobClient = new JobClient(newConf);
+ RunningJob job2 = jobClient.submitJob(newConf);
+ JobID id = job2.getID();
+
+ /*RunningJob job2 =
+ UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
+
+ JobID id = job2.getID();*/
+ JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
+
+ jip.initTasks();
+
+ // find out the history filename
+ String history =
+ JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
+ Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
+
+ // make sure that setup is launched
+ while (jip.runningMaps() == 0) {
+ UtilsForTests.waitFor(100);
+ }
+
+ id = job1.getID();
+ jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
+
+ jip.initTasks();
+
+ // make sure that cleanup is launched and is waiting
+ while (!jip.isCleanupLaunched()) {
+ UtilsForTests.waitFor(100);
+ }
+
+ mr.stopJobTracker();
+
+ // delete the history file .. just to be safe.
+ FileSystem historyFS = historyPath.getFileSystem(conf);
+ historyFS.delete(historyPath, false);
+ historyFS.create(historyPath).close(); // create an empty file
+
+
+ UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir),
getReduceSignalFile(shareDir), (short)1);
+
+ // Turn on the recovery
+ mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
+ true);
+
+ mr.startJobTracker();
+
+ job1.waitForCompletion();
+ job2.waitForCompletion();
+ }
+
public void testJobTrackerRestart() throws IOException {
String namenode = null;
MiniDFSCluster dfs = null;
@@ -450,6 +559,9 @@
// Test jobtracker with restart-recovery turned off
testRestartWithoutRecovery(dfs, mr);
+
+ // test recovery with empty file
+ testJobRecoveryWithEmptyHistory(dfs, mr);
} finally {
if (mr != null) {
try {