Author: cdouglas Date: Sat Jan 19 18:39:10 2008 New Revision: 613499 URL: http://svn.apache.org/viewvc?rev=613499&view=rev Log: HADOOP-2367. Add ability to profile a subset of map/reduce tasks and fetch the result to the local filesystem of the submitting application. Also includes a general IntegerRanges extension to Configuration for setting positive, ranged parameters. Contributed by Owen O'Malley.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Sat Jan 19 18:39:10 2008 @@ -101,6 +101,11 @@ sequence files as BytesWritable/BytesWritable regardless of the key and value types used to write the file. (cdouglas via omalley) + HADOOP-2367. Add ability to profile a subset of map/reduce tasks and fetch + the result to the local filesystem of the submitting application. Also + includes a general IntegerRanges extension to Configuration for setting + positive, ranged parameters. (Owen O'Malley via cdouglas) + IMPROVEMENTS HADOOP-2045. Change committer list on website to a table, so that Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Sat Jan 19 18:39:10 2008 @@ -32,10 +32,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -461,6 +463,103 @@ */ public void setBoolean(String name, boolean value) { set(name, Boolean.toString(value)); + } + + /** + * A class that represents a set of positive integer ranges. It parses + * strings of the form: "2-3,5,7-" where ranges are separated by comma and + * the lower/upper bounds are separated by dash. Either the lower or upper + * bound may be omitted meaning all values up to or over. So the string + * above means 2, 3, 5, and 7, 8, 9, ... + */ + public static class IntegerRanges { + private static class Range { + int start; + int end; + } + + List<Range> ranges = new ArrayList<Range>(); + + public IntegerRanges() { + } + + public IntegerRanges(String newValue) { + StringTokenizer itr = new StringTokenizer(newValue, ","); + while (itr.hasMoreTokens()) { + String rng = itr.nextToken().trim(); + String[] parts = rng.split("-", 3); + if (parts.length < 1 || parts.length > 2) { + throw new IllegalArgumentException("integer range badly formed: " + + rng); + } + Range r = new Range(); + r.start = convertToInt(parts[0], 0); + if (parts.length == 2) { + r.end = convertToInt(parts[1], Integer.MAX_VALUE); + } else { + r.end = r.start; + } + if (r.start > r.end) { + throw new IllegalArgumentException("IntegerRange from " + r.start + + " to " + r.end + " is invalid"); + } + ranges.add(r); + } + } + + /** + * Convert a string to an int treating empty strings as the default value. + * @param value the string value + * @param defaultValue the value for if the string is empty + * @return the desired integer + */ + private static int convertToInt(String value, int defaultValue) { + String trim = value.trim(); + if (trim.length() == 0) { + return defaultValue; + } + return Integer.parseInt(trim); + } + + /** + * Is the given value in the set of ranges + * @param value the value to check + * @return is the value in the ranges? + */ + public boolean isIncluded(int value) { + for(Range r: ranges) { + if (r.start <= value && value <= r.end) { + return true; + } + } + return false; + } + + public String toString() { + StringBuffer result = new StringBuffer(); + boolean first = true; + for(Range r: ranges) { + if (first) { + first = false; + } else { + result.append(','); + } + result.append(r.start); + result.append('-'); + result.append(r.end); + } + return result.toString(); + } + } + + /** + * Parse the given attribute as a set of integer ranges + * @param name the attribute name + * @param defaultValue the default value if it is not set + * @return a new set of ranges from the configured value + */ + public IntegerRanges getRange(String name, String defaultValue) { + return new IntegerRanges(get(name, defaultValue)); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Sat Jan 19 18:39:10 2008 @@ -22,7 +22,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -50,6 +52,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -755,7 +758,17 @@ public JobStatus[] jobsToComplete() throws IOException { return jobSubmitClient.jobsToComplete(); } - + + private static void downloadProfile(TaskCompletionEvent e + ) throws IOException { + URLConnection connection = new URL(e.getTaskTrackerHttp() + + "&plaintext=true&filter=profile" + ).openConnection(); + InputStream in = connection.getInputStream(); + OutputStream out = new FileOutputStream(e.getTaskId() + ".profile"); + IOUtils.copyBytes(in, out, 64 * 1024, true); + } + /** * Get the jobs that are submitted. * @@ -792,7 +805,10 @@ running = jc.submitJob(job); String jobId = running.getJobID(); LOG.info("Running job: " + jobId); - int eventCounter = 0; + int eventCounter = 0; + boolean profiling = job.getProfileEnabled(); + Configuration.IntegerRanges mapRanges = job.getProfileTaskRange(true); + Configuration.IntegerRanges reduceRanges = job.getProfileTaskRange(false); while (true) { try { @@ -812,48 +828,56 @@ lastReport = report; } - if (filter != TaskStatusFilter.NONE){ - TaskCompletionEvent[] events = - running.getTaskCompletionEvents(eventCounter); - eventCounter += events.length; - for(TaskCompletionEvent event : events){ - switch(filter){ - case SUCCEEDED: - if (event.getTaskStatus() == - TaskCompletionEvent.Status.SUCCEEDED){ - LOG.info(event.toString()); - displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp()); - } - break; - case FAILED: - if (event.getTaskStatus() == - TaskCompletionEvent.Status.FAILED){ - LOG.info(event.toString()); - // Displaying the task diagnostic information - String taskId = event.getTaskId(); - String tipId = TaskInProgress.getTipId(taskId); - String[] taskDiagnostics = - jc.jobSubmitClient.getTaskDiagnostics(jobId, tipId, - taskId); - if (taskDiagnostics != null) { - for(String diagnostics : taskDiagnostics){ - System.err.println(diagnostics); - } + TaskCompletionEvent[] events = + running.getTaskCompletionEvents(eventCounter); + eventCounter += events.length; + for(TaskCompletionEvent event : events){ + TaskCompletionEvent.Status status = event.getTaskStatus(); + if (profiling && + (status == TaskCompletionEvent.Status.SUCCEEDED || + status == TaskCompletionEvent.Status.FAILED) && + (event.isMap ? mapRanges : reduceRanges). + isIncluded(event.idWithinJob())) { + downloadProfile(event); + } + switch(filter){ + case NONE: + break; + case SUCCEEDED: + if (event.getTaskStatus() == + TaskCompletionEvent.Status.SUCCEEDED){ + LOG.info(event.toString()); + displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp()); + } + break; + case FAILED: + if (event.getTaskStatus() == + TaskCompletionEvent.Status.FAILED){ + LOG.info(event.toString()); + // Displaying the task diagnostic information + String taskId = event.getTaskId(); + String tipId = TaskInProgress.getTipId(taskId); + String[] taskDiagnostics = + jc.jobSubmitClient.getTaskDiagnostics(jobId, tipId, + taskId); + if (taskDiagnostics != null) { + for(String diagnostics : taskDiagnostics){ + System.err.println(diagnostics); } - // Displaying the task logs - displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp()); - } - break; - case KILLED: - if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){ - LOG.info(event.toString()); } - break; - case ALL: - LOG.info(event.toString()); + // Displaying the task logs displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp()); - break; } + break; + case KILLED: + if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){ + LOG.info(event.toString()); + } + break; + case ALL: + LOG.info(event.toString()); + displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp()); + break; } } retries = MAX_RETRIES; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Sat Jan 19 18:39:10 2008 @@ -1186,7 +1186,47 @@ return JobPriority.valueOf(prio); } - + + /** + * Get whether the task profiling is enabled. + * @return true if some tasks will be profiled + */ + public boolean getProfileEnabled() { + return getBoolean("mapred.task.profile", false); + } + + /** + * Set whether the system should collect profiler information for some of + * the tasks in this job? The information is stored in the the user log + * directory. + * @param newValue true means it should be gathered + */ + public void setProfileEnabled(boolean newValue) { + setBoolean("mapred.task.profile", newValue); + } + + /** + * Get the range of maps or reduces to profile. + * @param isMap is the task a map? + * @return the task ranges + */ + public IntegerRanges getProfileTaskRange(boolean isMap) { + return getRange((isMap ? "mapred.task.profile.maps" : + "mapred.task.profile.reduces"), "0-2"); + } + + /** + * Set the ranges of maps or reduces to profile. setProfileEnabled(true) + * must also be called. + * @param newValue a set of integer ranges of the map ids + */ + public void setProfileTaskRange(boolean isMap, String newValue) { + // parse the value to make sure it is legal + new Configuration.IntegerRanges(newValue); + set((isMap ? "mapred.task.profile.maps" : "mapred.task.profile.reduces"), + newValue); + } + /** * Set the debug script to run when the map tasks fail. * Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Sat Jan 19 18:39:10 2008 @@ -61,6 +61,9 @@ /** Log on the map-reduce system logs of the task. */ SYSLOG ("syslog"), + /** The java profiler information. */ + PROFILE ("profile.out"), + /** Log the debug script's stdout */ DEBUGOUT ("debugout"); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Sat Jan 19 18:39:10 2008 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.File; import java.io.InputStream; import java.io.IOException; import java.io.OutputStream; @@ -30,14 +31,67 @@ * A servlet that is run by the TaskTrackers to provide the task logs via http. */ public class TaskLogServlet extends HttpServlet { + + private boolean haveTaskLog(String taskId, TaskLog.LogName type) { + File f = TaskLog.getTaskLogFile(taskId, type); + return f.canRead(); + } + + /** + * Find the next quotable character in the given array. + * @param data the bytes to look in + * @param offset the first index to look in + * @param end the index after the last one to look in + * @return the index of the quotable character or end if none was found + */ + private static int findFirstQuotable(byte[] data, int offset, int end) { + while (offset < end) { + switch (data[offset]) { + case '<': + case '>': + case '&': + return offset; + default: + offset += 1; + } + } + return offset; + } + + private static void quotedWrite(OutputStream out, byte[] data, int offset, + int length) throws IOException { + int end = offset + length; + while (offset < end) { + int next = findFirstQuotable(data, offset, end); + out.write(data, offset, next - offset); + offset = next; + if (offset < end) { + switch (data[offset]) { + case '<': + out.write("<".getBytes()); + break; + case '>': + out.write(">".getBytes()); + break; + case '&': + out.write("&".getBytes()); + break; + default: + out.write(data[offset]); + break; + } + offset += 1; + } + } + } + private void printTaskLog(HttpServletResponse response, OutputStream out, String taskId, long start, long end, boolean plainText, TaskLog.LogName filter) throws IOException { if (!plainText) { out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" + - "<table border=2 cellpadding=\"2\">\n" + - "<tr><td><pre>\n").getBytes()); + "<pre>\n").getBytes()); } try { @@ -48,7 +102,11 @@ while (true) { result = taskLogReader.read(b); if (result > 0) { - out.write(b, 0, result); + if (plainText) { + out.write(b, 0, result); + } else { + quotedWrite(out, b, 0, result); + } } else { break; } @@ -60,7 +118,7 @@ } catch (IOException ioe) { if (filter == TaskLog.LogName.DEBUGOUT) { if (!plainText) { - out.write("</pre></td></tr></table><hr><br>\n".getBytes()); + out.write("</pre><hr><br>\n".getBytes()); } // do nothing } @@ -132,8 +190,14 @@ TaskLog.LogName.STDERR); printTaskLog(response, out, taskId, start, end, plainText, TaskLog.LogName.SYSLOG); - printTaskLog(response, out, taskId, start, end, plainText, - TaskLog.LogName.DEBUGOUT); + if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) { + printTaskLog(response, out, taskId, start, end, plainText, + TaskLog.LogName.DEBUGOUT); + } + if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) { + printTaskLog(response, out, taskId, start, end, plainText, + TaskLog.LogName.PROFILE); + } } else { printTaskLog(response, out, taskId, start, end, plainText, filter); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Sat Jan 19 18:39:10 2008 @@ -336,6 +336,13 @@ vargs.add("-Dhadoop.tasklog.taskid=" + taskid); vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); + if (conf.getProfileTaskRange(t.isMapTask() + ).isIncluded(t.getPartition())) { + File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE); + vargs.add("-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,"+ + "verbose=n,file="+prof.toString()); + } + // Add main class and its arguments vargs.add(TaskTracker.Child.class.getName()); // main of Child // pass umbilical address Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java?rev=613499&r1=613498&r2=613499&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java Sat Jan 19 18:39:10 2008 @@ -201,6 +201,37 @@ BufferedWriter out; + public void testIntegerRanges() { + Configuration conf = new Configuration(); + conf.set("first", "-100"); + conf.set("second", "4-6,9-10,27"); + conf.set("third", "34-"); + Configuration.IntegerRanges range = conf.getRange("first", null); + System.out.println("first = " + range); + assertEquals(true, range.isIncluded(0)); + assertEquals(true, range.isIncluded(1)); + assertEquals(true, range.isIncluded(100)); + assertEquals(false, range.isIncluded(101)); + range = conf.getRange("second", null); + System.out.println("second = " + range); + assertEquals(false, range.isIncluded(3)); + assertEquals(true, range.isIncluded(4)); + assertEquals(true, range.isIncluded(6)); + assertEquals(false, range.isIncluded(7)); + assertEquals(false, range.isIncluded(8)); + assertEquals(true, range.isIncluded(9)); + assertEquals(true, range.isIncluded(10)); + assertEquals(false, range.isIncluded(11)); + assertEquals(false, range.isIncluded(26)); + assertEquals(true, range.isIncluded(27)); + assertEquals(false, range.isIncluded(28)); + range = conf.getRange("third", null); + System.out.println("third = " + range); + assertEquals(false, range.isIncluded(33)); + assertEquals(true, range.isIncluded(34)); + assertEquals(true, range.isIncluded(100000000)); + } + public static void main(String[] argv) throws Exception { junit.textui.TestRunner.main(new String[]{ TestConfiguration.class.getName()