Author: omalley
Date: Fri Mar 4 03:24:59 2011
New Revision: 1076948
URL: http://svn.apache.org/viewvc?rev=1076948&view=rev
Log:
commit a9d789e37205a8b111b98030d82aa722603fc441
Author: Lee Tucker <[email protected]>
Date: Thu Jul 30 17:40:31 2009 -0700
Applying patch 2808668.mr416.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
Fri Mar 4 03:24:59 2011
@@ -92,17 +92,79 @@ public class JobHistory {
public static final int JOB_NAME_TRIM_LENGTH = 50;
private static String JOBTRACKER_UNIQUE_STRING = null;
private static String LOG_DIR = null;
- private static Map<String, ArrayList<PrintWriter>> openJobs =
- new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
private static boolean disableHistory = false;
private static final String SECONDARY_FILE_SUFFIX = ".recover";
private static long jobHistoryBlockSize = 0;
private static String jobtrackerHostname;
+ private static JobHistoryFilesManager fileManager =
+ new JobHistoryFilesManager();
final static FsPermission HISTORY_DIR_PERMISSION =
FsPermission.createImmutable((short) 0750); // rwxr-x---
final static FsPermission HISTORY_FILE_PERMISSION =
FsPermission.createImmutable((short) 0740); // rwxr-----
private static JobConf jtConf;
+ private static Path DONE = null; // folder for completed jobs
+ /**
+ * A class that manages all the files related to a job. For now
+ * - writers : list of open files
+ * - job history filename
+ * - job conf filename
+ */
+ private static class JobHistoryFilesManager {
+ // a private (virtual) folder for all the files related to a running job
+ private static class FilesHolder {
+ ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
+ Path historyFilename; // path of job history file
+ Path confFilename; // path of job's conf
+ }
+
+ // cache from job-key to files associated with it.
+ private Map<JobID, FilesHolder> fileCache =
+ new ConcurrentHashMap<JobID, FilesHolder>();
+
+ private FilesHolder getFileHolder(JobID id) {
+ FilesHolder holder = fileCache.get(id);
+ if (holder == null) {
+ holder = new FilesHolder();
+ fileCache.put(id, holder);
+ }
+ return holder;
+ }
+
+ void addWriter(JobID id, PrintWriter writer) {
+ FilesHolder holder = getFileHolder(id);
+ holder.writers.add(writer);
+ }
+
+ void setHistoryFile(JobID id, Path file) {
+ FilesHolder holder = getFileHolder(id);
+ holder.historyFilename = file;
+ }
+
+ void setConfFile(JobID id, Path file) {
+ FilesHolder holder = getFileHolder(id);
+ holder.confFilename = file;
+ }
+
+ ArrayList<PrintWriter> getWriters(JobID id) {
+ FilesHolder holder = fileCache.get(id);
+ return holder == null ? null : holder.writers;
+ }
+
+ Path getHistoryFile(JobID id) {
+ FilesHolder holder = fileCache.get(id);
+ return holder == null ? null : holder.historyFilename;
+ }
+
+ Path getConfFileWriters(JobID id) {
+ FilesHolder holder = fileCache.get(id);
+ return holder == null ? null : holder.confFilename;
+ }
+
+ void purgeJob(JobID id) {
+ fileCache.remove(id);
+ }
+ }
/**
* Record types are identifiers for each line of log in history files.
* A record type appears as the first token in a single line of log.
@@ -152,6 +214,7 @@ public class JobHistory {
"file:///" + new File(
System.getProperty("hadoop.log.dir")).getAbsolutePath()
+ File.separator + "history");
+ DONE = new Path(LOG_DIR, "done");
JOBTRACKER_UNIQUE_STRING = hostname + "_" +
String.valueOf(jobTrackerStartTime) + "_";
jobtrackerHostname = hostname;
@@ -169,6 +232,9 @@ public class JobHistory {
conf.getLong("mapred.jobtracker.job.history.block.size",
3 * 1024 * 1024);
jtConf = conf;
+
+ // create the done folder with appropriate permission
+ fs.mkdirs(DONE, HISTORY_DIR_PERMISSION);
} catch(IOException e) {
LOG.error("Failed to initialize JobHistory log file", e);
disableHistory = true;
@@ -388,6 +454,13 @@ public class JobHistory {
}
/**
+ * Get the history location for completed jobs
+ */
+ static Path getCompletedJobHistoryLocation() {
+ return DONE;
+ }
+
+ /**
* Base class contais utility stuff to manage types key value pairs with
enums.
*/
static class KeyValuePair{
@@ -639,6 +712,16 @@ public class JobHistory {
*/
public static synchronized String getJobHistoryFileName(JobConf jobConf,
JobID id)
+ throws IOException {
+ return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
+ }
+
+ /**
+ * @param dir The directory where to search.
+ */
+ static synchronized String getJobHistoryFileName(JobConf jobConf,
+ JobID id,
+ Path dir)
throws IOException {
String user = getUserName(jobConf);
String jobName = trimJobName(getJobName(jobConf));
@@ -672,26 +755,33 @@ public class JobHistory {
}
};
- FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
+ FileStatus[] statuses = fs.listStatus(dir, filter);
String filename = null;
if (statuses.length == 0) {
LOG.info("Nothing to recover for job " + id);
} else {
// return filename considering that fact the name can be a
// secondary filename like filename.recover
- filename = decodeJobHistoryFileName(statuses[0].getPath().getName());
- // Remove the '.recover' suffix if it exists
- if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
- int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
- filename = filename.substring(0, newLength);
- }
- filename = encodeJobHistoryFileName(filename);
+ filename = getPrimaryFilename(statuses[0].getPath().getName(),
jobName);
LOG.info("Recovered job history filename for job " + id + " is "
+ filename);
}
return filename;
}
+ // removes all extra extensions from a filename and returns the
core/primary
+ // filename
+ private static String getPrimaryFilename(String filename, String jobName)
+ throws IOException{
+ filename = decodeJobHistoryFileName(filename);
+ // Remove the '.recover' suffix if it exists
+ if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
+ int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
+ filename = filename.substring(0, newLength);
+ }
+ return encodeJobHistoryFileName(filename);
+ }
+
/** Since there was a restart, there should be a master file and
* a recovery file. Once the recovery is complete, the master should be
* deleted as an indication that the recovery file should be treated as
the
@@ -788,33 +878,33 @@ public class JobHistory {
/** Finalize the recovery and make one file in the end.
* This invloves renaming the recover file to the master file.
+ * Note that this api should be invoked only if recovery is involved.
* @param id Job id
* @param conf the job conf
* @throws IOException
*/
- static synchronized void finalizeRecovery(JobID id, JobConf conf)
+ static synchronized void finalizeRecovery(JobID id, JobConf conf)
throws IOException {
- String masterLogFileName =
- JobHistory.JobInfo.getJobHistoryFileName(conf, id);
- if (masterLogFileName == null) {
- return;
- }
- Path masterLogPath =
- JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
- String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
- Path tmpLogPath =
- JobHistory.JobInfo.getJobHistoryLogLocation(tmpLogFileName);
- if (masterLogPath != null) {
- FileSystem fs = masterLogPath.getFileSystem(conf);
+ Path tmpLogPath = fileManager.getHistoryFile(id);
+ if (tmpLogPath == null) {
+ LOG.debug("No file for job with " + id + " found in cache!");
+ return;
+ }
+ String tmpLogFileName = tmpLogPath.getName();
+
+ // get the primary filename from the cached filename
+ String masterLogFileName =
+ getPrimaryFilename(tmpLogFileName, getJobName(conf));
+ Path masterLogPath = new Path(tmpLogPath.getParent(),
masterLogFileName);
+
+ // rename the tmp file to the master file. Note that this should be
+ // done only when the file is closed and handles are released.
+ LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
+ FileSystem fs = tmpLogPath.getFileSystem(jtConf);
+ fs.rename(tmpLogPath, masterLogPath);
+ // update the cache
+ fileManager.setHistoryFile(id, masterLogPath);
- // rename the tmp file to the master file. Note that this should be
- // done only when the file is closed and handles are released.
- if(fs.exists(tmpLogPath)) {
- LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
- fs.rename(tmpLogPath, masterLogPath);
- }
- }
-
// do the same for the user file too
masterLogPath =
JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName,
@@ -823,7 +913,7 @@ public class JobHistory {
JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName,
conf);
if (masterLogPath != null) {
- FileSystem fs = masterLogPath.getFileSystem(conf);
+ fs = masterLogPath.getFileSystem(conf);
if (fs.exists(tmpLogPath)) {
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
+ " in user directory");
@@ -846,6 +936,38 @@ public class JobHistory {
}
/**
+ * Move the completed job into the completed folder.
+ * This assumes that the jobhistory file is closed and all operations on
the
+ * jobhistory file is complete.
+ * This *should* be the last call to jobhistory for a given job.
+ */
+ static void markCompleted(JobID id) throws IOException {
+ Path path = fileManager.getHistoryFile(id);
+ if (path == null) {
+ LOG.info("No file for job-history with " + id + " found in cache!");
+ return;
+ }
+ Path newPath = new Path(DONE, path.getName());
+ LOG.info("Moving completed job from " + path + " to " + newPath);
+ FileSystem fs = path.getFileSystem(jtConf);
+ fs.rename(path, newPath);
+
+ Path confPath = fileManager.getConfFileWriters(id);
+ if (confPath == null) {
+ LOG.info("No file for jobconf with " + id + " found in cache!");
+ return;
+ }
+ // move the conf too
+ newPath = new Path(DONE, confPath.getName());
+ LOG.info("Moving configuration of completed job from " + confPath
+ + " to " + newPath);
+ fs.rename(confPath, newPath);
+
+ // purge the job from the cache
+ fileManager.purgeJob(id);
+ }
+
+ /**
* Log job submitted event to history. Creates a new file in history
* for the job. if history file creation fails, it disables history
* for all other events.
@@ -885,7 +1007,12 @@ public class JobHistory {
if (logFileName == null) {
logFileName =
encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf,
jobId));
-
+ } else {
+ String parts[] = logFileName.split("_");
+ //TODO this is a hack :(
+ // jobtracker-hostname_jobtracker-identifier_
+ String jtUniqueString = parts[0] + "_" + parts[1] + "_";
+ jobUniqueString = jtUniqueString + jobId.toString();
}
} else {
logFileName =
@@ -900,7 +1027,6 @@ public class JobHistory {
getJobHistoryLogLocationForUser(logFileName, jobConf);
try{
- ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
FSDataOutputStream out = null;
PrintWriter writer = null;
@@ -922,7 +1048,10 @@ public class JobHistory {
fs.getDefaultReplication(),
jobHistoryBlockSize, null);
writer = new PrintWriter(out);
- writers.add(writer);
+ fileManager.addWriter(jobId, writer);
+
+ // cache it ...
+ fileManager.setHistoryFile(jobId, logFile);
}
if (userLogFile != null) {
// Get the actual filename as recoverJobHistoryFile() might return
@@ -936,11 +1065,10 @@ public class JobHistory {
out = fs.create(userLogFile, true, 4096);
writer = new PrintWriter(out);
- writers.add(writer);
+ fileManager.addWriter(jobId, writer);
}
-
- openJobs.put(jobUniqueString, writers);
+ ArrayList<PrintWriter> writers = fileManager.getWriters(jobId);
// Log the history meta info
JobHistory.MetaInfoManager.logMetaInfo(writers);
@@ -985,6 +1113,7 @@ public class JobHistory {
if (LOG_DIR != null) {
jobFilePath = new Path(LOG_DIR + File.separator +
jobUniqueString + "_conf.xml");
+ fileManager.setConfFile(jobId, jobFilePath);
}
Path userJobFilePath = null;
if (userLogDir != null) {
@@ -1018,7 +1147,7 @@ public class JobHistory {
+ jobFilePath + "and" + userJobFilePath );
}
} catch (IOException ioe) {
- LOG.error("Failed to store job conf on the local filesystem ", ioe);
+ LOG.error("Failed to store job conf in the log dir", ioe);
} finally {
if (jobFileOut != null) {
try {
@@ -1041,8 +1170,7 @@ public class JobHistory {
public static void logInited(JobID jobId, long startTime,
int totalMaps, int totalReduces) {
if (!disableHistory){
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
@@ -1078,8 +1206,7 @@ public class JobHistory {
*/
public static void logStarted(JobID jobId){
if (!disableHistory){
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
@@ -1106,8 +1233,7 @@ public class JobHistory {
Counters counters){
if (!disableHistory){
// close job file for this job
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
@@ -1126,7 +1252,6 @@ public class JobHistory {
for (PrintWriter out : writer) {
out.close();
}
- openJobs.remove(logFileKey);
}
Thread historyCleaner = new Thread(new HistoryCleaner());
historyCleaner.start();
@@ -1141,8 +1266,7 @@ public class JobHistory {
*/
public static void logFailed(JobID jobid, long timestamp, int
finishedMaps, int finishedReduces){
if (!disableHistory){
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
@@ -1152,7 +1276,6 @@ public class JobHistory {
for (PrintWriter out : writer) {
out.close();
}
- openJobs.remove(logFileKey);
}
}
}
@@ -1171,8 +1294,7 @@ public class JobHistory {
public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
int finishedReduces) {
if (!disableHistory) {
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
if (null != writer) {
JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
@@ -1183,7 +1305,6 @@ public class JobHistory {
for (PrintWriter out : writer) {
out.close();
}
- openJobs.remove(logFileKey);
}
}
}
@@ -1194,8 +1315,7 @@ public class JobHistory {
*/
public static void logJobPriority(JobID jobid, JobPriority priority){
if (!disableHistory){
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
@@ -1220,8 +1340,7 @@ public class JobHistory {
public static void logJobInfo(JobID jobid, long submitTime, long
launchTime)
{
if (!disableHistory){
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
@@ -1252,8 +1371,8 @@ public class JobHistory {
public static void logStarted(TaskID taskId, String taskType,
long startTime, String splitLocations) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- + taskId.getJobID());
+ JobID id = taskId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.Task,
@@ -1274,8 +1393,8 @@ public class JobHistory {
public static void logFinished(TaskID taskId, String taskType,
long finishTime, Counters counters){
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- + taskId.getJobID());
+ JobID id = taskId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.Task,
@@ -1296,8 +1415,8 @@ public class JobHistory {
*/
public static void logUpdates(TaskID taskId, long finishTime){
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- + taskId.getJobID());
+ JobID id = taskId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.Task,
@@ -1326,8 +1445,8 @@ public class JobHistory {
String error,
TaskAttemptID failedDueToAttempt){
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- + taskId.getJobID());
+ JobID id = taskId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
String failedAttempt = failedDueToAttempt == null
@@ -1388,8 +1507,8 @@ public class JobHistory {
String trackerName, int httpPort,
String taskType) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
@@ -1438,8 +1557,8 @@ public class JobHistory {
String stateString,
Counters counter) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
@@ -1487,8 +1606,8 @@ public class JobHistory {
long timestamp, String hostName,
String error, String taskType) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
@@ -1533,8 +1652,8 @@ public class JobHistory {
long timestamp, String hostName,
String error, String taskType) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
@@ -1585,8 +1704,8 @@ public class JobHistory {
int httpPort,
String taskType) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
@@ -1640,8 +1759,8 @@ public class JobHistory {
String hostName, String taskType,
String stateString, Counters counter) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
@@ -1691,8 +1810,8 @@ public class JobHistory {
String hostName, String error,
String taskType) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
@@ -1737,8 +1856,8 @@ public class JobHistory {
String hostName, String error,
String taskType) {
if (!disableHistory){
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
- +
taskAttemptId.getJobID());
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
@@ -1800,9 +1919,8 @@ public class JobHistory {
lastRan = now;
isRunning = true;
try {
- Path logDir = new Path(LOG_DIR);
- FileSystem fs = logDir.getFileSystem(jtConf);
- FileStatus[] historyFiles = fs.listStatus(logDir);
+ FileSystem fs = DONE.getFileSystem(jtConf);
+ FileStatus[] historyFiles = fs.listStatus(DONE);
// delete if older than 30 days
if (historyFiles != null) {
for (FileStatus f : historyFiles) {
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:24:59 2011
@@ -1718,7 +1718,7 @@ public class JobTracker implements MRCon
String historyLogDir = null;
FileSystem historyFS = null;
if (historyInitialized) {
- historyLogDir = conf.get("hadoop.job.history.location");
+ historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString();
infoServer.setAttribute("historyLogDir", historyLogDir);
historyFS = new Path(historyLogDir).getFileSystem(conf);
infoServer.setAttribute("fileSys", historyFS);
@@ -2197,6 +2197,13 @@ public class JobTracker implements MRCon
}
}
+ // mark the job as completed
+ try {
+ JobHistory.JobInfo.markCompleted(id);
+ } catch (IOException ioe) {
+ LOG.info("Failed to mark job " + id + " as completed!", ioe);
+ }
+
final JobTrackerInstrumentation metrics = getInstrumentation();
metrics.finalizeJob(conf, id);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
Fri Mar 4 03:24:59 2011
@@ -34,6 +34,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobHistory.*;
import org.apache.commons.logging.Log;
@@ -423,6 +424,20 @@ public class TestJobHistory extends Test
}
/**
+ * Returns the conf file name in the same
+ * @param path path of the jobhistory file
+ * @param running whether the job is running or completed
+ */
+ private static Path getPathForConf(Path path, Path dir) {
+ String parts[] = path.getName().split("_");
+ //TODO this is a hack :(
+ // jobtracker-hostname_jobtracker-identifier_
+ String id = parts[2] + "_" + parts[3] + "_" + parts[4];
+ String jobUniqueString = parts[0] + "_" + parts[1] + "_" + id;
+ return new Path(dir, jobUniqueString + "_conf.xml");
+ }
+
+ /**
* Validates the format of contents of history file
* (1) history file exists and in correct location
* (2) Verify if the history file is parsable
@@ -448,10 +463,12 @@ public class TestJobHistory extends Test
String status, boolean splitsCanBeEmpty) throws IOException {
// Get the history file name
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+ Path dir = JobHistory.getCompletedJobHistoryLocation();
+ String logFileName =
+ JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir);
// Framework history log file location
- Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+ Path logFile = new Path(dir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
// Check if the history file exists
@@ -743,11 +760,13 @@ public class TestJobHistory extends Test
RunningJob job, JobConf conf) throws IOException
{
JobID id = job.getID();
+ Path doneDir = JobHistory.getCompletedJobHistoryLocation();
// Get the history file name
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+ String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
+ doneDir);
// Framework history log file location
- Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+ Path logFile = new Path(doneDir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
// Check if the history file exists
@@ -803,6 +822,43 @@ public class TestJobHistory extends Test
// Run a job that will be succeeded and validate its history file
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+
+ Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ JobID id = job.getID();
+ String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
+ doneDir);
+ // Framework history log file location
+ Path logFile = new Path(doneDir, logFileName);
+ FileSystem fileSys = logFile.getFileSystem(conf);
+
+ // Check if the history file exists
+ assertTrue("History file does not exist", fileSys.exists(logFile));
+
+ // check if the corresponding conf file exists
+ Path confFile = getPathForConf(logFile, doneDir);
+ assertTrue("Config for completed jobs doesnt exist",
+ fileSys.exists(confFile));
+
+ // check if the file exists in a done folder
+ assertTrue("Completed job config doesnt exist in the done folder",
+ "done".equals(confFile.getParent().getName()));
+
+ // check if the file exists in a done folder
+ assertTrue("Completed jobs doesnt exist in the done folder",
+ "done".equals(logFile.getParent().getName()));
+
+
+ // check if the job file is removed from the history location
+ Path runningJobsHistoryFolder = logFile.getParent().getParent();
+ Path runningJobHistoryFilename =
+ new Path(runningJobsHistoryFolder, logFile.getName());
+ Path runningJobConfFilename =
+ new Path(runningJobsHistoryFolder, confFile.getName());
+ assertFalse("History file not deleted from the running folder",
+ fileSys.exists(runningJobHistoryFilename));
+ assertFalse("Config for completed jobs not deleted from running folder",
+ fileSys.exists(runningJobConfFilename));
+
validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
validateJobHistoryFileContent(mr, job, conf);
@@ -842,7 +898,9 @@ public class TestJobHistory extends Test
private static void validateJobHistoryUserLogLocation(JobID id, JobConf
conf)
throws IOException {
// Get the history file name
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+ Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
+ doneDir);
// User history log file location
Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
@@ -952,10 +1010,12 @@ public class TestJobHistory extends Test
String status) throws IOException {
// Get the history file name
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+ Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
+ doneDir);
// Framework history log file location
- Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+ Path logFile = new Path(doneDir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
// Check if the history file exists
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1076948&r1=1076947&r2=1076948&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Fri Mar 4 03:24:59 2011
@@ -329,7 +329,9 @@ public class TestJobTrackerRestart exten
private void testJobHistoryFiles(JobID id, JobConf conf)
throws IOException {
// Get the history files for users
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+ Path dir = JobHistory.getCompletedJobHistoryLocation();
+ String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
+ dir);
String tempLogFileName =
JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
@@ -349,7 +351,7 @@ public class TestJobTrackerRestart exten
// II. Framework files
// Get the history file
- logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+ logFile = new Path(dir, logFileName);
fileSys = logFile.getFileSystem(conf);
// Check if the history file exists