Author: ddas
Date: Fri May 8 08:33:13 2009
New Revision: 772884
URL: http://svn.apache.org/viewvc?rev=772884&view=rev
Log:
HADOOP-4372. Improves the way history filenames are obtained and manipulated.
Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May 8 08:33:13 2009
@@ -320,6 +320,9 @@
HADOOP-5080. Add new test cases to TestMRCLI and TestHDFSCLI
(V.Karthikeyan via nigel)
+ HADOOP-4372. Improves the way history filenames are obtained and
manipulated.
+ (Amar Kamat via ddas)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri
May 8 08:33:13 2009
@@ -665,12 +665,9 @@
};
FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
- String filename;
+ String filename = null;
if (statuses.length == 0) {
- filename =
- encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, id));
- LOG.info("Nothing to recover! Generating a new filename " + filename
- + " for job " + id);
+ LOG.info("Nothing to recover for job " + id);
} else {
// return filename considering that fact the name can be a
// secondary filename like filename.recover
@@ -791,6 +788,9 @@
throws IOException {
String masterLogFileName =
JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+ if (masterLogFileName == null) {
+ return;
+ }
Path masterLogPath =
JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
@@ -833,10 +833,19 @@
* @param jobConfPath path to job conf xml file in HDFS.
* @param submitTime time when job tracker received the job
* @throws IOException
+ * @deprecated Use
+ * {...@link #logSubmitted(JobID, JobConf, String, long, boolean)}
instead.
*/
public static void logSubmitted(JobID jobId, JobConf jobConf,
String jobConfPath, long submitTime)
throws IOException {
+ logSubmitted(jobId, jobConf, jobConfPath, submitTime, true);
+ }
+
+ public static void logSubmitted(JobID jobId, JobConf jobConf,
+ String jobConfPath, long submitTime,
+ boolean restarted)
+ throws IOException {
FileSystem fs = null;
String userLogDir = null;
String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
@@ -849,8 +858,13 @@
String user = getUserName(jobConf);
// get the history filename
- String logFileName =
- getJobHistoryFileName(jobConf, jobId);
+ String logFileName = null;
+ if (restarted) {
+ logFileName = getJobHistoryFileName(jobConf, jobId);
+ } else {
+ logFileName =
+ encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+ }
// setup the history log file for this job
Path logFile = getJobHistoryLogLocation(logFileName);
@@ -868,8 +882,10 @@
// create output stream for logging in hadoop.job.history.location
fs = new Path(LOG_DIR).getFileSystem(jobConf);
- logFile = recoverJobHistoryFile(jobConf, logFile);
- logFileName = logFile.getName();
+ if (restarted) {
+ logFile = recoverJobHistoryFile(jobConf, logFile);
+ logFileName = logFile.getName();
+ }
int defaultBufferSize =
fs.getConf().getInt("io.file.buffer.size", 4096);
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri May 8 08:33:13 2009
@@ -365,6 +365,10 @@
return tasksInited.get();
}
+ boolean hasRestarted() {
+ return restartCount > 0;
+ }
+
/**
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
@@ -384,7 +388,7 @@
// log job info
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
- this.startTime);
+ this.startTime, hasRestarted());
// log the job priority
setPriority(this.priority);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=772884&r1=772883&r2=772884&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri
May 8 08:33:13 2009
@@ -1249,18 +1249,23 @@
// 3. Get the log file and the file path
String logFileName =
JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
- Path jobHistoryFilePath =
- JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
- // 4. Recover the history file. This involved
- // - deleting file.recover if file exists
- // - renaming file.recover to file if file doesnt exist
- // This makes sure that the (master) file exists
- JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
- jobHistoryFilePath);
+ if (logFileName != null) {
+ Path jobHistoryFilePath =
+ JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+
+ // 4. Recover the history file. This involved
+ // - deleting file.recover if file exists
+ // - renaming file.recover to file if file doesnt exist
+ // This makes sure that the (master) file exists
+ JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
+ jobHistoryFilePath);
- // 5. Cache the history file name as it costs one dfs access
- jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+ // 5. Cache the history file name as it costs one dfs access
+ jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+ } else {
+ LOG.info("No history file found for job " + id);
+ idIter.remove(); // remove from recovery list
+ }
// 6. Sumbit the job to the jobtracker
addJob(id, job);
@@ -2045,10 +2050,12 @@
// start the merge of log files
JobID id = job.getStatus().getJobID();
- try {
- JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
- } catch (IOException ioe) {
- LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
+ if (job.hasRestarted()) {
+ try {
+ JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
+ } catch (IOException ioe) {
+ LOG.info("Failed to finalize the log file recovery for job " + id,
ioe);
+ }
}
final JobTrackerInstrumentation metrics = getInstrumentation();