Author: ddas
Date: Wed Apr 1 05:01:11 2009
New Revision: 760783
URL: http://svn.apache.org/viewvc?rev=760783&view=rev
Log:
HADOOP-4374. Installs a shutdown hook in the Task JVM so that log.index is
updated before the JVM exits. Also makes the update to log.index atomic.
Contributed by Ravi Gummadi.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=760783&r1=760782&r2=760783&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 1 05:01:11 2009
@@ -1157,6 +1157,10 @@
HADOOP-5576. Fix LocalRunner to work with the new context object API in
mapreduce. (Tom White via omalley)
+ HADOOP-4374. Installs a shutdown hook in the Task JVM so that log.index is
+ updated before the JVM exits. Also makes the update to log.index atomic.
+ (Ravi Gummadi via ddas)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=760783&r1=760782&r2=760783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Wed Apr 1
05:01:11 2009
@@ -37,6 +37,7 @@
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.LogManager;
+import org.apache.hadoop.util.StringUtils;
/**
* The main() for child processes.
@@ -68,6 +69,16 @@
defaultConf);
int numTasksToExecute = -1; //-1 signifies "no limit"
int numTasksExecuted = 0;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ if (taskid != null) {
+ TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ }
+ } catch (Throwable throwable) {
+ }
+ }
+ });
Thread t = new Thread() {
public void run() {
//every so often wake up and syncLogs so that we can track
@@ -192,10 +203,6 @@
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
- // do synclogs
- if (taskid != null) {
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
- }
}
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=760783&r1=760782&r2=760783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Wed Apr
1 05:01:11 2009
@@ -33,7 +33,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
@@ -53,7 +58,13 @@
new File(System.getProperty("hadoop.log.dir"),
"userlogs").getAbsoluteFile();
+ static LocalFileSystem localFS = null;
static {
+ try {
+ localFS = FileSystem.getLocal(new Configuration());
+ } catch (IOException ioe) {
+ LOG.warn("Getting local file system failed.");
+ }
if (!LOG_DIR.exists()) {
LOG_DIR.mkdirs();
}
@@ -128,6 +139,9 @@
return l;
}
+ private static File getTmpIndexFile(String taskid) {
+ return new File(getBaseDir(taskid), "log.tmp");
+ }
public static File getIndexFile(String taskid) {
return getIndexFile(taskid, false);
}
@@ -150,9 +164,12 @@
private static void writeToIndexFile(TaskAttemptID firstTaskid,
boolean isCleanup)
throws IOException {
- File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
+ // To ensure atomicity of updates to index file, write to temporary index
+ // file first and then rename.
+ File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
+
BufferedOutputStream bos =
- new BufferedOutputStream(new FileOutputStream(indexFile,false));
+ new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
DataOutputStream dos = new DataOutputStream(bos);
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
@@ -171,6 +188,11 @@
dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
.length() - prevLogLength)+"\n");
dos.close();
+
+ File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
+ Path indexFilePath = new Path(indexFile.getAbsolutePath());
+ Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+ localFS.rename (tmpIndexFilePath, indexFilePath);
}
private static void resetPrevLengths(TaskAttemptID firstTaskid) {
prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=760783&r1=760782&r2=760783&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
Wed Apr 1 05:01:11 2009
@@ -62,20 +62,23 @@
/**
* Reads tasklog and returns it as string after trimming it.
- * @param filter Task log filer; can be STDOUT, STDERR,
+ * @param filter Task log filter; can be STDOUT, STDERR,
* SYSLOG, DEBUGOUT, DEBUGERR
* @param taskId The task id for which the log has to collected
+ * @param isCleanup whether the task is a cleanup attempt or not.
* @return task log as string
* @throws IOException
*/
- public static String readTaskLog(TaskLog.LogName filter, TaskAttemptID
taskId)
+ public static String readTaskLog(TaskLog.LogName filter,
+ TaskAttemptID taskId,
+ boolean isCleanup)
throws IOException {
// string buffer to store task log
StringBuffer result = new StringBuffer();
int res;
// reads the whole tasklog into inputstream
- InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1);
+ InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
isCleanup);
// construct string log from inputstream.
byte[] b = new byte[65536];
while (true) {
@@ -168,7 +171,7 @@
while (!job.isComplete()) ;
// return the output of debugout log.
- return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId);
+ return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId, false);
}
/**
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=760783&r1=760782&r2=760783&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java Wed
Apr 1 05:01:11 2009
@@ -32,6 +32,9 @@
import org.apache.hadoop.mapred.lib.IdentityReducer;
public class TestTaskFail extends TestCase {
+ private static String taskLog = "Task attempt log";
+ private static String cleanupLog = "cleanup attempt log";
+
public static class MapperClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
String taskid;
@@ -41,6 +44,7 @@
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
+ System.err.println(taskLog);
if (taskid.endsWith("_0")) {
throw new IOException();
} else if (taskid.endsWith("_1")) {
@@ -49,14 +53,23 @@
}
}
+ static class CommitterWithLogs extends FileOutputCommitter {
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ System.err.println(cleanupLog);
+ super.abortTask(context);
+ }
+ }
+
static class CommitterWithFailTaskCleanup extends FileOutputCommitter {
public void abortTask(TaskAttemptContext context) throws IOException {
+ System.err.println(cleanupLog);
System.exit(-1);
}
}
static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter {
public void abortTask(TaskAttemptContext context) throws IOException {
+ System.err.println(cleanupLog);
throw new IOException();
}
}
@@ -109,6 +122,12 @@
mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
assertTrue(ts != null);
assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ // validate task logs: tasklog should contain both task logs
+ // and cleanup logs
+ String log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, false);
+ assertTrue(log.contains(taskLog));
+ assertTrue(log.contains(cleanupLog));
attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
// this should be cleanup attempt since the second attempt fails
@@ -117,6 +136,15 @@
ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
assertTrue(ts != null);
assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ // validate tasklogs for task attempt
+ log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, false);
+ assertTrue(log.contains(taskLog));
+
+ // validate tasklogs for cleanup attempt
+ log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, true);
+ assertTrue(log.contains(cleanupLog));
}
public void testWithDFS() throws IOException {
@@ -134,11 +162,12 @@
final Path outDir = new Path("./output");
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
// launch job with fail tasks
- RunningJob rJob = launchJob(mr.createJobConf(), inDir, outDir, input);
+ JobConf jobConf = mr.createJobConf();
+ jobConf.setOutputCommitter(CommitterWithLogs.class);
+ RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
validateJob(rJob, mr);
// launch job with fail tasks and fail-cleanups
- JobConf jobConf = mr.createJobConf();
fileSys.delete(outDir, true);
jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
rJob = launchJob(jobConf, inDir, outDir, input);