Author: cutting Date: Thu Mar 1 12:14:21 2007 New Revision: 513478 URL: http://svn.apache.org/viewvc?view=rev&rev=513478 Log: HADOOP-1000. Fix so that log messages in task subprocesses are not written to a task's standard error. Contributed by Arun.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/log4j.properties lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513478&r1=513477&r2=513478 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Mar 1 12:14:21 2007 @@ -171,6 +171,9 @@ 51. HADOOP-941. Enhance record facility. (Milind Bhandarkar via cutting) +52. HADOOP-1000. Fix so that log messages in task subprocesses are + not written to a task's standard error. (Arun C Murthy via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/conf/log4j.properties URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/log4j.properties?view=diff&rev=513478&r1=513477&r2=513478 ============================================================================== --- lucene/hadoop/trunk/conf/log4j.properties (original) +++ lucene/hadoop/trunk/conf/log4j.properties Thu Mar 1 12:14:21 2007 @@ -40,6 +40,27 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n # +# TaskLog Appender +# + +#Default values +hadoop.tasklog.taskid=null +hadoop.tasklog.noKeepSplits=4 +hadoop.tasklog.totalLogFileSize=100 +hadoop.tasklog.purgeLogSplits=true +hadoop.tasklog.logsRetainHours=12 + +log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender +log4j.appender.TLA.taskId=${hadoop.tasklog.taskid} +log4j.appender.TLA.noKeepSplits=${hadoop.tasklog.noKeepSplits} +log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize} +log4j.appender.TLA.purgeLogSplits=${hadoop.tasklog.purgeLogSplits} +log4j.appender.TLA.logsRetainHours=${hadoop.tasklog.logsRetainHours} + +log4j.appender.TLA.layout=org.apache.log4j.PatternLayout +log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# # Rolling File Appender # Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=513478&r1=513477&r2=513478 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Thu Mar 1 12:14:21 2007 @@ -45,6 +45,10 @@ } } + private static File getTaskLogDir(String taskid, LogFilter filter) { + return new File(new File(LOG_DIR, taskid), filter.getPrefix()); + } + /** * The filter for userlogs. */ @@ -53,7 +57,10 @@ STDOUT ("stdout"), /** Log on the stderr of the task. */ - STDERR ("stderr"); + STDERR ("stderr"), + + /** Log on the map-reduce system logs of the task. */ + SYSLOG ("syslog"); private String prefix; @@ -102,20 +109,17 @@ * @param taskId taskid of the task * @param filter the [EMAIL PROTECTED] LogFilter} to apply on userlogs. */ - Writer(JobConf conf, String taskId, LogFilter filter) { - this.conf = conf; + Writer(String taskId, LogFilter filter, + int noKeepSplits, long totalLogSize, boolean purgeLogSplits, int logsRetainHours) { this.taskId = taskId; this.filter = filter; - this.taskLogDir = new File(new File(LOG_DIR, this.taskId), - this.filter.getPrefix()); + this.taskLogDir = getTaskLogDir(this.taskId, this.filter); - noKeepSplits = this.conf.getInt("mapred.userlog.num.splits", 4); - splitFileSize = - (this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024) / noKeepSplits; - purgeLogSplits = this.conf.getBoolean("mapred.userlog.purgesplits", - true); - logsRetainHours = this.conf.getInt("mapred.userlog.retain.hours", 12); + this.noKeepSplits = noKeepSplits; + this.splitFileSize = (totalLogSize / noKeepSplits); + this.purgeLogSplits = purgeLogSplits; + this.logsRetainHours = logsRetainHours; } private static class TaskLogsPurgeFilter implements FileFilter { @@ -220,7 +224,7 @@ * * @throws IOException */ - public void close() throws IOException { + public synchronized void close() throws IOException { // Close the final split if (out != null) { out.close(); @@ -306,8 +310,7 @@ this.taskId = taskId; this.filter = filter; - this.taskLogDir = new File(new File(LOG_DIR, this.taskId), - this.filter.getPrefix()); + this.taskLogDir = getTaskLogDir(this.taskId, this.filter); } private static class IndexRecord { Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?view=auto&rev=513478 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Thu Mar 1 12:14:21 2007 @@ -0,0 +1,120 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.ErrorCode; +import org.apache.log4j.spi.LoggingEvent; + +/** + * A simple log4j-appender for the task child's + * map-reduce system logs. + * + * @author Arun C Murthy + */ +public class TaskLogAppender extends AppenderSkeleton { + private TaskLog.Writer taskLogWriter = null; + private String taskId; + private int noKeepSplits; + private long totalLogFileSize; + private boolean purgeLogSplits; + private int logsRetainHours; + + public void activateOptions() { + taskLogWriter = + new TaskLog.Writer(taskId, TaskLog.LogFilter.SYSLOG, + noKeepSplits, totalLogFileSize, purgeLogSplits, logsRetainHours); + try { + taskLogWriter.init(); + } catch (IOException ioe) { + taskLogWriter = null; + errorHandler.error("Failed to initialize the task's logging " + + "infrastructure: " + StringUtils.stringifyException(ioe)); + } + } + + protected synchronized void append(LoggingEvent event) { + if (taskLogWriter == null) { + errorHandler.error("Calling 'append' on uninitialize/closed logger"); + return; + } + + if (this.layout == null) { + errorHandler.error("No layout for appender " + name , + null, ErrorCode.MISSING_LAYOUT ); + } + + // Log the message to the task's log + String logMessage = this.layout.format(event); + try { + taskLogWriter.write(logMessage.getBytes(), 0, logMessage.length()); + } catch (IOException ioe) { + errorHandler.error("Failed to log: '" + logMessage + + "' to the task's logging infrastructure with the exception: " + + StringUtils.stringifyException(ioe)); + } + } + + public boolean requiresLayout() { + return true; + } + + public synchronized void close() { + if (taskLogWriter != null) { + try { + taskLogWriter.close(); + } catch (IOException ioe) { + errorHandler.error("Failed to close the task's log with the exception: " + + StringUtils.stringifyException(ioe)); + } + } else { + errorHandler.error("Calling 'close' on uninitialize/closed logger"); + } + } + + /** + * Getter/Setter methods for log4j. + */ + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public int getNoKeepSplits() { + return noKeepSplits; + } + + public void setNoKeepSplits(int noKeepSplits) { + this.noKeepSplits = noKeepSplits; + } + + public int getLogsRetainHours() { + return logsRetainHours; + } + + public void setLogsRetainHours(int logsRetainHours) { + this.logsRetainHours = logsRetainHours; + } + + public boolean isPurgeLogSplits() { + return purgeLogSplits; + } + + public void setPurgeLogSplits(boolean purgeLogSplits) { + this.purgeLogSplits = purgeLogSplits; + } + + public long getTotalLogFileSize() { + return totalLogFileSize; + } + + public void setTotalLogFileSize(long splitFileSize) { + this.totalLogFileSize = splitFileSize; + } + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=513478&r1=513477&r2=513478 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Mar 1 12:14:21 2007 @@ -49,9 +49,17 @@ this.tracker = tracker; this.conf = conf; this.taskStdOutLogWriter = - new TaskLog.Writer(conf, t.getTaskId(), TaskLog.LogFilter.STDOUT); + new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDOUT, + this.conf.getInt("mapred.userlog.num.splits", 4), + this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, + this.conf.getBoolean("mapred.userlog.purgesplits", true), + this.conf.getInt("mapred.userlog.retain.hours", 12)); this.taskStdErrLogWriter = - new TaskLog.Writer(conf, t.getTaskId(), TaskLog.LogFilter.STDERR); + new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDERR, + this.conf.getInt("mapred.userlog.num.splits", 4), + this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, + this.conf.getBoolean("mapred.userlog.purgesplits", true), + this.conf.getInt("mapred.userlog.retain.hours", 12)); } public Task getTask() { return t; } @@ -166,7 +174,7 @@ classPath.append(sep); classPath.append(workDir); // Build exec child jmv args. - Vector vargs = new Vector(8); + Vector<String> vargs = new Vector<String>(8); File jvm = // use same jvm as parent new File(new File(System.getProperty("java.home"), "bin"), "java"); @@ -209,6 +217,15 @@ // Add classpath. vargs.add("-classpath"); vargs.add(classPath.toString()); + + // Setup the log4j prop + vargs.add("-Dhadoop.log.dir=" + System.getProperty("hadoop.log.dir")); + vargs.add("-Dhadoop.root.logger=INFO,TLA"); + vargs.add("-Dhadoop.tasklog.taskid=" + t.getTaskId()); + vargs.add("-Dhadoop.tasklog.noKeepSplits=" + conf.getInt("mapred.userlog.num.splits", 4)); + vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + (conf.getInt("mapred.userlog.limit.kb", 100) * 1024)); + vargs.add("-Dhadoop.tasklog.purgeLogSplits=" + conf.getBoolean("mapred.userlog.purgesplits", true)); + vargs.add("-Dhadoop.tasklog.logsRetainHours=" + conf.getInt("mapred.userlog.retain.hours", 12)); // Add java.library.path; necessary for native-hadoop libraries String libraryPath = System.getProperty("java.library.path"); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=513478&r1=513477&r2=513478 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 1 12:14:21 2007 @@ -63,6 +63,7 @@ import org.apache.hadoop.util.RunJar; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.log4j.LogManager; /******************************************************* * TaskTracker is a process that starts and tracks MR Tasks @@ -1451,6 +1452,11 @@ ByteArrayOutputStream baos = new ByteArrayOutputStream(); throwable.printStackTrace(new PrintStream(baos)); umbilical.reportDiagnosticInfo(taskid, baos.toString()); + } finally { + // Shutting down log4j of the child-vm... + // This assumes that on return from Task.run() + // there is no more logging done. + LogManager.shutdown(); } } Modified: lucene/hadoop/trunk/src/webapps/task/tasklog.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasklog.jsp?view=diff&rev=513478&r1=513477&r2=513478 ============================================================================== --- lucene/hadoop/trunk/src/webapps/task/tasklog.jsp (original) +++ lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Thu Mar 1 12:14:21 2007 @@ -4,7 +4,8 @@ import="javax.servlet.http.*" import="java.io.*" %> -<% +<%! + String taskId = null; long logOffset = -1, logLength = -1; boolean tailLog = false; long tailSize = 1024; @@ -12,8 +13,93 @@ boolean entireLog = false; boolean plainText = false; TaskLog.LogFilter filter = null; + + private void printTaskLog(JspWriter out, TaskLog.LogFilter filter) + throws IOException { + if (!plainText) { + out.println("<br><b><u>" + filter + " logs</u></b><br>"); + out.println("<table border=2 cellpadding=\"2\">"); + } + + boolean gotRequiredData = true; + try { + TaskLog.Reader taskLogReader = new TaskLog.Reader(taskId, filter); + byte[] b = null; + int bytesRead = 0; + int targetLength = 0; + + if (entireLog) { + b = taskLogReader.fetchAll(); + targetLength = bytesRead = b.length; + } else { + if (tailLog) { + b = new byte[(int)tailSize]; + targetLength = (int)tailSize; + bytesRead = taskLogReader.tail(b, 0, b.length, tailSize, tailWindow); + } else { + b = new byte[(int)logLength]; + targetLength = (int)logLength; + bytesRead = taskLogReader.read(b, 0, b.length, logOffset, logLength); + } + } + + if (bytesRead != targetLength && + targetLength <= taskLogReader.getTotalLogSize()) { + if( !plainText) { + out.println("<b>Warning: Could not fetch " + targetLength + + " bytes from the task-logs; probably purged!</b><br/>"); + }else{ + out.println("Warning: Could not fetch " + targetLength + + " bytes from the task-logs; probably purged!"); + } + gotRequiredData = false; + } + String logData = new String(b, 0, bytesRead); + if (!plainText) { + out.print("<tr><td><pre>" + logData + "</pre></td></tr>"); + } else { + out.print(logData); + } + } catch (IOException ioe) { + out.println("Failed to retrieve '" + filter + "' logs for task: " + taskId); + } - String taskId = request.getParameter("taskid"); + if( !plainText ) { + out.println("</table>\n"); + } + + if (!entireLog && !plainText) { + if (tailLog) { + if (gotRequiredData) { + out.println("<a href='/tasklog.jsp?taskid=" + taskId + + "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow+1) + + "&filter=" + filter + "'>Earlier</a>"); + } + if (tailWindow > 1) { + out.println("<a href='/tasklog.jsp?taskid=" + taskId + + "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow-1) + + "&filter=" + filter + "'>Later</a>"); + } + } else { + if (gotRequiredData) { + out.println("<a href='/tasklog.jsp?taskid=" + taskId + + "&tail=false&off=" + Math.max(0, (logOffset-logLength)) + + "&len=" + logLength + "&filter=" + filter + "'>Earlier</a>"); + } + out.println("<a href='/tasklog.jsp?taskid=" + taskId + + "&tail=false&off=" + (logOffset+logLength) + + "&len=" + logLength + "&filter=" + filter + "'>Later</a>"); + } + } + + if (!plainText) { + out.println("<hr><br>"); + } + } +%> + +<% + taskId = request.getParameter("taskid"); if (taskId == null) { out.println("<h2>Missing 'taskid' for fetching logs!</h2>"); return; @@ -78,91 +164,18 @@ if( !plainText ) { out.println("<html>"); - out.println("<title>Task Logs: '" + taskId + "' (" + logFilter + ")</title>"); + out.println("<title>Task Logs: '" + taskId + "'</title>"); out.println("<body>"); - out.println("<h1>Task Logs from " + taskId + "'s " + logFilter + "</h1><br>"); - out.println("<h2>'" + logFilter + "':</h2>"); - out.println("<pre>"); - - } -%> - -<% - boolean gotRequiredData = true; - try { - TaskLog.Reader taskLogReader = new TaskLog.Reader(taskId, filter); - byte[] b = null; - int bytesRead = 0; - int targetLength = 0; - - if (entireLog) { - b = taskLogReader.fetchAll(); - targetLength = bytesRead = b.length; - } else { - if (tailLog) { - b = new byte[(int)tailSize]; - targetLength = (int)tailSize; - bytesRead = taskLogReader.tail(b, 0, b.length, tailSize, tailWindow); - } else { - b = new byte[(int)logLength]; - targetLength = (int)logLength; - bytesRead = taskLogReader.read(b, 0, b.length, logOffset, logLength); - } - } - - if (bytesRead != targetLength && - targetLength <= taskLogReader.getTotalLogSize()) { - if( !plainText) { - out.println("<b>Warning: Could not fetch " + targetLength + - " bytes from the task-logs; probably purged!</b><br/>"); - }else{ - out.println("Warning: Could not fetch " + targetLength + - " bytes from the task-logs; probably purged!"); - } - gotRequiredData = false; - } - String logData = new String(b, 0, bytesRead); - out.println(logData); - } catch (IOException ioe) { - out.println("Failed to retrieve '" + logFilter + "' logs for task: " + taskId); - } - - if( !plainText ) { - out.println("</pre>"); - } -%> -<% - if (!entireLog && !plainText) { - if (tailLog) { - if (gotRequiredData) { - out.println("<a href='/tasklog.jsp?taskid=" + taskId + - "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow+1) + - "&filter=" + logFilter + "'>Earlier</a>"); - } - if (tailWindow > 1) { - out.println("<a href='/tasklog.jsp?taskid=" + taskId + - "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow-1) - + "&filter=" + logFilter + "'>Later</a>"); - } - } else { - if (gotRequiredData) { - out.println("<a href='/tasklog.jsp?taskid=" + taskId + - "&tail=false&off=" + Math.max(0, (logOffset-logLength)) + - "&len=" + logLength + "&filter=" + logFilter + "'>Earlier</a>"); - } - out.println("<a href='/tasklog.jsp?taskid=" + taskId + - "&tail=false&off=" + (logOffset+logLength) + - "&len=" + logLength + "&filter=" + logFilter + "'>Later</a>"); - } - } - if( !plainText ) { - String otherFilter = (logFilter.equals("stdout") ? "stderr" : "stdout"); - out.println("<br><br>See <a href='/tasklog.jsp?taskid=" + taskId + - "&all=true&filter=" + otherFilter + "'>" + otherFilter + "</a>" + - " logs of this task"); - out.println("<hr>"); + out.println("<h1>Task Logs: '" + taskId + "'</h1><br>"); + + printTaskLog(out, TaskLog.LogFilter.STDOUT); + printTaskLog(out, TaskLog.LogFilter.STDERR); + printTaskLog(out, TaskLog.LogFilter.SYSLOG); + out.println("<a href='http://lucene.apache.org/hadoop'>Hadoop</a>, 2006.<br>"); out.println("</body>"); out.println("</html>"); - } + } else { + printTaskLog(out, filter); + } %>