Author: ddas
Date: Wed Apr 1 05:21:08 2009
New Revision: 760785
URL: http://svn.apache.org/viewvc?rev=760785&view=rev
Log:
Merge -r 760782:760783 from trunk onto 0.20 branch. Fixes HADOOP-4374.
Modified:
hadoop/core/branches/branch-0.20/ (props changed)
hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 1 05:21:08 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=760785&r1=760784&r2=760785&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Wed Apr 1 05:21:08 2009
@@ -824,6 +824,10 @@
HADOOP-5576. Fix LocalRunner to work with the new context object API in
mapreduce. (Tom White via omalley)
+ HADOOP-4374. Installs a shutdown hook in the Task JVM so that log.index is
+ updated before the JVM exits. Also makes the update to log.index atomic.
+ (Ravi Gummadi via ddas)
+
Release 0.19.2 - Unreleased
BUG FIXES
Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 1 05:21:08 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java?rev=760785&r1=760784&r2=760785&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
Wed Apr 1 05:21:08 2009
@@ -36,6 +36,7 @@
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.log4j.LogManager;
+import org.apache.hadoop.util.StringUtils;
/**
* The main() for child processes.
@@ -67,6 +68,16 @@
defaultConf);
int numTasksToExecute = -1; //-1 signifies "no limit"
int numTasksExecuted = 0;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ if (taskid != null) {
+ TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ }
+ } catch (Throwable throwable) {
+ }
+ }
+ });
Thread t = new Thread() {
public void run() {
//every so often wake up and syncLogs so that we can track
@@ -193,10 +204,6 @@
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
- // do synclogs
- if (taskid != null) {
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
- }
}
}
}
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=760785&r1=760784&r2=760785&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
Wed Apr 1 05:21:08 2009
@@ -33,7 +33,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -51,7 +56,13 @@
new File(System.getProperty("hadoop.log.dir"),
"userlogs").getAbsoluteFile();
+ static LocalFileSystem localFS = null;
static {
+ try {
+ localFS = FileSystem.getLocal(new Configuration());
+ } catch (IOException ioe) {
+ LOG.warn("Getting local file system failed.");
+ }
if (!LOG_DIR.exists()) {
LOG_DIR.mkdirs();
}
@@ -126,6 +137,9 @@
return l;
}
+ private static File getTmpIndexFile(String taskid) {
+ return new File(getBaseDir(taskid), "log.tmp");
+ }
public static File getIndexFile(String taskid) {
return getIndexFile(taskid, false);
}
@@ -148,9 +162,12 @@
private static void writeToIndexFile(TaskAttemptID firstTaskid,
boolean isCleanup)
throws IOException {
- File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
+ // To ensure atomicity of updates to index file, write to temporary index
+ // file first and then rename.
+ File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
+
BufferedOutputStream bos =
- new BufferedOutputStream(new FileOutputStream(indexFile,false));
+ new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
DataOutputStream dos = new DataOutputStream(bos);
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
@@ -169,6 +186,11 @@
dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
.length() - prevLogLength)+"\n");
dos.close();
+
+ File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
+ Path indexFilePath = new Path(indexFile.getAbsolutePath());
+ Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+ localFS.rename (tmpIndexFilePath, indexFilePath);
}
private static void resetPrevLengths(TaskAttemptID firstTaskid) {
prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
Modified:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=760785&r1=760784&r2=760785&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
(original)
+++
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
Wed Apr 1 05:21:08 2009
@@ -62,20 +62,23 @@
/**
* Reads tasklog and returns it as string after trimming it.
- * @param filter Task log filer; can be STDOUT, STDERR,
+ * @param filter Task log filter; can be STDOUT, STDERR,
* SYSLOG, DEBUGOUT, DEBUGERR
* @param taskId The task id for which the log has to collected
+ * @param isCleanup whether the task is a cleanup attempt or not.
* @return task log as string
* @throws IOException
*/
- public static String readTaskLog(TaskLog.LogName filter, TaskAttemptID
taskId)
+ public static String readTaskLog(TaskLog.LogName filter,
+ TaskAttemptID taskId,
+ boolean isCleanup)
throws IOException {
// string buffer to store task log
StringBuffer result = new StringBuffer();
int res;
// reads the whole tasklog into inputstream
- InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1);
+ InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
isCleanup);
// construct string log from inputstream.
byte[] b = new byte[65536];
while (true) {
@@ -168,7 +171,7 @@
while (!job.isComplete()) ;
// return the output of debugout log.
- return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId);
+ return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId, false);
}
/**
Modified:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=760785&r1=760784&r2=760785&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
(original)
+++
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
Wed Apr 1 05:21:08 2009
@@ -32,6 +32,9 @@
import org.apache.hadoop.mapred.lib.IdentityReducer;
public class TestTaskFail extends TestCase {
+ private static String taskLog = "Task attempt log";
+ private static String cleanupLog = "cleanup attempt log";
+
public static class MapperClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
String taskid;
@@ -41,6 +44,7 @@
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
+ System.err.println(taskLog);
if (taskid.endsWith("_0")) {
throw new IOException();
} else if (taskid.endsWith("_1")) {
@@ -49,14 +53,23 @@
}
}
+ static class CommitterWithLogs extends FileOutputCommitter {
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ System.err.println(cleanupLog);
+ super.abortTask(context);
+ }
+ }
+
static class CommitterWithFailTaskCleanup extends FileOutputCommitter {
public void abortTask(TaskAttemptContext context) throws IOException {
+ System.err.println(cleanupLog);
System.exit(-1);
}
}
static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter {
public void abortTask(TaskAttemptContext context) throws IOException {
+ System.err.println(cleanupLog);
throw new IOException();
}
}
@@ -109,6 +122,12 @@
mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
assertTrue(ts != null);
assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ // validate task logs: tasklog should contain both task logs
+ // and cleanup logs
+ String log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, false);
+ assertTrue(log.contains(taskLog));
+ assertTrue(log.contains(cleanupLog));
attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
// this should be cleanup attempt since the second attempt fails
@@ -117,6 +136,15 @@
ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
assertTrue(ts != null);
assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ // validate tasklogs for task attempt
+ log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, false);
+ assertTrue(log.contains(taskLog));
+
+ // validate tasklogs for cleanup attempt
+ log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, true);
+ assertTrue(log.contains(cleanupLog));
}
public void testWithDFS() throws IOException {
@@ -134,11 +162,12 @@
final Path outDir = new Path("./output");
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
// launch job with fail tasks
- RunningJob rJob = launchJob(mr.createJobConf(), inDir, outDir, input);
+ JobConf jobConf = mr.createJobConf();
+ jobConf.setOutputCommitter(CommitterWithLogs.class);
+ RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
validateJob(rJob, mr);
// launch job with fail tasks and fail-cleanups
- JobConf jobConf = mr.createJobConf();
fileSys.delete(outDir, true);
jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
rJob = launchJob(jobConf, inDir, outDir, input);