Author: ddas
Date: Tue Mar 10 11:57:06 2009
New Revision: 752077
URL: http://svn.apache.org/viewvc?rev=752077&view=rev
Log:
Merge -r 752072:752073 from trunk onto 0.19 branch. Fixes HADOOP-5392.
Modified:
hadoop/core/branches/branch-0.19/ (props changed)
hadoop/core/branches/branch-0.19/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Propchange: hadoop/core/branches/branch-0.19/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 10 11:57:06 2009
@@ -1 +1 @@
-/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,736426,738697,740077,741703,741762,743745,743892,745180,746902-746903
+/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,736426,738697,740077,741703,741762,743745,743892,745180,746902-746903,752073
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=752077&r1=752076&r2=752077&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Tue Mar 10 11:57:06 2009
@@ -49,6 +49,9 @@
references to completedJobStore outside the block where the JobTracker is
locked.
(ddas)
+ HADOOP-5392. Fixes a problem to do with JT crashing during recovery when
+ the job files are garbled. (Amar Kamat vi ddas)
+
HADOOP-5421. Removes the test TestRecoveryManager.java from the 0.19 branch
as it has compilation issues. (ddas)
Propchange: hadoop/core/branches/branch-0.19/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 10 11:57:06 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
-/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,735082,736426,738697,740077,741703,741762,743296,743745,743892,745180,746902-746903
+/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,735082,736426,738697,740077,741703,741762,743296,743745,743892,745180,746902-746903,752073
Modified:
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752077&r1=752076&r2=752077&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Tue Mar 10 11:57:06 2009
@@ -545,11 +545,11 @@
LOG.info("Calling init from RM for job " +
jip.getJobID().toString());
try {
jip.initTasks();
- } catch (IOException ioe) {
+ } catch (Throwable t) {
LOG.error("Job initialization failed : \n"
- + StringUtils.stringifyException(ioe));
+ + StringUtils.stringifyException(t));
jip.fail(); // fail the job
- throw ioe;
+ throw new IOException(t);
}
}
}
@@ -820,19 +820,19 @@
expireLaunchingTasks.removeTask(attemptId);
}
- public void recover() throws IOException {
+ public void recover() {
// I. Init the jobs and cache the recovered job history filenames
Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
Iterator<JobID> idIter = jobsToRecover.iterator();
while (idIter.hasNext()) {
JobID id = idIter.next();
- LOG.info("Trying to recover job " + id);
- // 1. Create the job object
- JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
-
- String logFileName;
- Path jobHistoryFilePath;
+ LOG.info("Trying to recover details of job " + id);
try {
+ // 1. Create the job object
+ JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
+ String logFileName;
+ Path jobHistoryFilePath;
+
// 2. Get the log file and the file path
logFileName =
JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
@@ -845,19 +845,19 @@
// This makes sure that the (master) file exists
JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
jobHistoryFilePath);
- } catch (IOException ioe) {
- LOG.warn("Failed to recover job " + id + " history filename."
- + " Ignoring.", ioe);
+
+ // 4. Cache the history file name as it costs one dfs access
+ jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+
+ // 5. Sumbit the job to the jobtracker
+ addJob(id, job);
+ } catch (Throwable t) {
+ LOG.warn("Failed to recover job " + id + " history details."
+ + " Ignoring.", t);
// TODO : remove job details from the system directory
idIter.remove();
continue;
}
-
- // 4. Cache the history file name as it costs one dfs access
- jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
-
- // 5. Sumbit the job to the jobtracker
- addJob(id, job);
}
long recoveryStartTime = System.currentTimeMillis();
@@ -871,7 +871,14 @@
Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
String logFileName = jobHistoryFilePath.getName();
- FileSystem fs = jobHistoryFilePath.getFileSystem(conf);
+ FileSystem fs;
+ try {
+ fs = jobHistoryFilePath.getFileSystem(conf);
+ } catch (IOException ioe) {
+ LOG.warn("Failed to get the filesystem for job " + id + ".
Ignoring.",
+ ioe);
+ continue;
+ }
// 2. Parse the history file
// Note that this also involves job update
@@ -879,9 +886,9 @@
try {
JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
listener, fs);
- } catch (IOException e) {
- LOG.info("JobTracker failed to recover job " + pJob.getJobID() + "."
- + " Ignoring it.", e);
+ } catch (Throwable t) {
+ LOG.info("JobTracker failed to recover job " + pJob.getJobID()
+ + " from history. Ignoring it.", t);
}
// 3. Close the listener
@@ -900,9 +907,9 @@
JobHistory.JobInfo.checkpointRecovery(logFileName,
pJob.getJobConf());
}
- } catch (IOException ioe) {
+ } catch (Throwable t) {
LOG.warn("Failed to delete log file (" + logFileName + ") for job "
- + id + ". Ignoring it.", ioe);
+ + id + ". Ignoring it.", t);
}
// 6. Inform the jobtracker as to how much of the data is recovered.
@@ -1197,7 +1204,12 @@
&& !JobHistory.isDisableHistory()
&& systemDirData != null) {
for (FileStatus status : systemDirData) {
- recoveryManager.checkAndAddJob(status);
+ try {
+ recoveryManager.checkAndAddJob(status);
+ } catch (Throwable t) {
+ LOG.warn("Failed to add the job " + status.getPath().getName(),
+ t);
+ }
}
// Check if there are jobs to be recovered
@@ -1313,7 +1325,11 @@
taskScheduler.start();
// Start the recovery after starting the scheduler
- recoveryManager.recover();
+ try {
+ recoveryManager.recover();
+ } catch (Throwable t) {
+ LOG.warn("Recovery manager crashed! Ignoring.", t);
+ }
this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
@@ -2738,6 +2754,10 @@
// if job is not there in the cleanup list ... add it
synchronized (trackerToJobsToCleanup) {
Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
+ if (jobs == null) {
+ jobs = new HashSet<JobID>();
+ trackerToJobsToCleanup.put(trackerName, jobs);
+ }
jobs.add(taskId.getJobID());
}
continue;
Modified:
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=752077&r1=752076&r2=752077&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
(original)
+++
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Tue Mar 10 11:57:06 2009
@@ -61,6 +61,7 @@
*/
class JobTrackerRunner implements Runnable {
private JobTracker tracker = null;
+ private volatile boolean isActive = true;
JobConf jc = null;
@@ -72,6 +73,10 @@
return (tracker != null);
}
+ public boolean isActive() {
+ return isActive;
+ }
+
public int getJobTrackerPort() {
return tracker.getTrackerPort();
}
@@ -97,6 +102,7 @@
tracker.offerService();
} catch (Throwable e) {
LOG.error("Job tracker crashed", e);
+ isActive = false;
}
}
@@ -111,6 +117,7 @@
} catch (Throwable e) {
LOG.error("Problem shutting down job tracker", e);
}
+ isActive = false;
}
}
@@ -548,6 +555,21 @@
}
}
+ ClusterStatus status = jobTracker.getJobTracker().getClusterStatus(false);
+ while (jobTracker.isActive() && status.getJobTrackerState() ==
JobTracker.State.INITIALIZING) {
+ try {
+ LOG.info("JobTracker still initializing. Waiting.");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {}
+ status = jobTracker.getJobTracker().getClusterStatus(false);
+ }
+
+ if (!jobTracker.isActive()
+ || status.getJobTrackerState() != JobTracker.State.RUNNING) {
+ // return if jobtracker has crashed
+ return;
+ }
+
// Set the configuration for the task-trackers
this.jobTrackerPort = jobTracker.getJobTrackerPort();
this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();