Author: cdouglas
Date: Sat Jan 19 18:39:10 2008
New Revision: 613499

URL: http://svn.apache.org/viewvc?rev=613499&view=rev
Log:
HADOOP-2367. Add ability to profile a subset of map/reduce tasks and fetch the
result to the local filesystem of the submitting application. Also includes a
general IntegerRanges extension to Configuration for setting positive, ranged
parameters. Contributed by Owen O'Malley.


Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Sat Jan 19 18:39:10 2008
@@ -101,6 +101,11 @@
     sequence files as BytesWritable/BytesWritable regardless of the
     key and value types used to write the file. (cdouglas via omalley)
 
+    HADOOP-2367. Add ability to profile a subset of map/reduce tasks and fetch
+    the result to the local filesystem of the submitting application. Also
+    includes a general IntegerRanges extension to Configuration for setting
+    positive, ranged parameters. (Owen O'Malley via cdouglas)
+
   IMPROVEMENTS
 
     HADOOP-2045.  Change committer list on website to a table, so that

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Sat 
Jan 19 18:39:10 2008
@@ -32,10 +32,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.StringTokenizer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -461,6 +463,103 @@
    */
   public void setBoolean(String name, boolean value) {
     set(name, Boolean.toString(value));
+  }
+
+  /**
+   * A class that represents a set of positive integer ranges. It parses 
+   * strings of the form: "2-3,5,7-" where ranges are separated by comma and 
+   * the lower/upper bounds are separated by dash. Either the lower or upper 
+   * bound may be omitted meaning all values up to or over. So the string 
+   * above means 2, 3, 5, and 7, 8, 9, ...
+   */
+  public static class IntegerRanges {
+    private static class Range {
+      int start;
+      int end;
+    }
+
+    List<Range> ranges = new ArrayList<Range>();
+    
+    public IntegerRanges() {
+    }
+    
+    public IntegerRanges(String newValue) {
+      StringTokenizer itr = new StringTokenizer(newValue, ",");
+      while (itr.hasMoreTokens()) {
+        String rng = itr.nextToken().trim();
+        String[] parts = rng.split("-", 3);
+        if (parts.length < 1 || parts.length > 2) {
+          throw new IllegalArgumentException("integer range badly formed: " + 
+                                             rng);
+        }
+        Range r = new Range();
+        r.start = convertToInt(parts[0], 0);
+        if (parts.length == 2) {
+          r.end = convertToInt(parts[1], Integer.MAX_VALUE);
+        } else {
+          r.end = r.start;
+        }
+        if (r.start > r.end) {
+          throw new IllegalArgumentException("IntegerRange from " + r.start + 
+                                             " to " + r.end + " is invalid");
+        }
+        ranges.add(r);
+      }
+    }
+
+    /**
+     * Convert a string to an int treating empty strings as the default value.
+     * @param value the string value
+     * @param defaultValue the value for if the string is empty
+     * @return the desired integer
+     */
+    private static int convertToInt(String value, int defaultValue) {
+      String trim = value.trim();
+      if (trim.length() == 0) {
+        return defaultValue;
+      }
+      return Integer.parseInt(trim);
+    }
+
+    /**
+     * Is the given value in the set of ranges
+     * @param value the value to check
+     * @return is the value in the ranges?
+     */
+    public boolean isIncluded(int value) {
+      for(Range r: ranges) {
+        if (r.start <= value && value <= r.end) {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      boolean first = true;
+      for(Range r: ranges) {
+        if (first) {
+          first = false;
+        } else {
+          result.append(',');
+        }
+        result.append(r.start);
+        result.append('-');
+        result.append(r.end);
+      }
+      return result.toString();
+    }
+  }
+
+  /**
+   * Parse the given attribute as a set of integer ranges
+   * @param name the attribute name
+   * @param defaultValue the default value if it is not set
+   * @return a new set of ranges from the configured value
+   */
+  public IntegerRanges getRange(String name, String defaultValue) {
+    return new IntegerRanges(get(name, defaultValue));
   }
 
   /** 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Sat 
Jan 19 18:39:10 2008
@@ -22,7 +22,9 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -50,6 +52,7 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -755,7 +758,17 @@
   public JobStatus[] jobsToComplete() throws IOException {
     return jobSubmitClient.jobsToComplete();
   }
-    
+
+  private static void downloadProfile(TaskCompletionEvent e
+                                      ) throws IOException  {
+    URLConnection connection = new URL(e.getTaskTrackerHttp() + 
+                                       "&plaintext=true&filter=profile"
+                                       ).openConnection();
+    InputStream in = connection.getInputStream();
+    OutputStream out = new FileOutputStream(e.getTaskId() + ".profile");
+    IOUtils.copyBytes(in, out, 64 * 1024, true);
+  }
+
   /** 
    * Get the jobs that are submitted.
    * 
@@ -792,7 +805,10 @@
       running = jc.submitJob(job);
       String jobId = running.getJobID();
       LOG.info("Running job: " + jobId);
-      int eventCounter = 0; 
+      int eventCounter = 0;
+      boolean profiling = job.getProfileEnabled();
+      Configuration.IntegerRanges mapRanges = job.getProfileTaskRange(true);
+      Configuration.IntegerRanges reduceRanges = 
job.getProfileTaskRange(false);
         
       while (true) {
         try {
@@ -812,48 +828,56 @@
             lastReport = report;
           }
             
-          if (filter  != TaskStatusFilter.NONE){
-            TaskCompletionEvent[] events = 
-              running.getTaskCompletionEvents(eventCounter); 
-            eventCounter += events.length;
-            for(TaskCompletionEvent event : events){
-              switch(filter){
-              case SUCCEEDED:
-                if (event.getTaskStatus() == 
-                    TaskCompletionEvent.Status.SUCCEEDED){
-                  LOG.info(event.toString());
-                  displayTaskLogs(event.getTaskId(), 
event.getTaskTrackerHttp());
-                }
-                break; 
-              case FAILED:
-                if (event.getTaskStatus() == 
-                    TaskCompletionEvent.Status.FAILED){
-                  LOG.info(event.toString());
-                  // Displaying the task diagnostic information
-                  String taskId = event.getTaskId();
-                  String tipId = TaskInProgress.getTipId(taskId);
-                  String[] taskDiagnostics = 
-                         jc.jobSubmitClient.getTaskDiagnostics(jobId, tipId, 
-                                                                       
taskId); 
-                  if (taskDiagnostics != null) {
-                    for(String diagnostics : taskDiagnostics){
-                         System.err.println(diagnostics);
-                    }
+          TaskCompletionEvent[] events = 
+            running.getTaskCompletionEvents(eventCounter); 
+          eventCounter += events.length;
+          for(TaskCompletionEvent event : events){
+            TaskCompletionEvent.Status status = event.getTaskStatus();
+            if (profiling && 
+                (status == TaskCompletionEvent.Status.SUCCEEDED ||
+                 status == TaskCompletionEvent.Status.FAILED) &&
+                (event.isMap ? mapRanges : reduceRanges).
+                   isIncluded(event.idWithinJob())) {
+              downloadProfile(event);
+            }
+            switch(filter){
+            case NONE:
+              break;
+            case SUCCEEDED:
+              if (event.getTaskStatus() == 
+                TaskCompletionEvent.Status.SUCCEEDED){
+                LOG.info(event.toString());
+                displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
+              }
+              break; 
+            case FAILED:
+              if (event.getTaskStatus() == 
+                TaskCompletionEvent.Status.FAILED){
+                LOG.info(event.toString());
+                // Displaying the task diagnostic information
+                String taskId = event.getTaskId();
+                String tipId = TaskInProgress.getTipId(taskId);
+                String[] taskDiagnostics = 
+                  jc.jobSubmitClient.getTaskDiagnostics(jobId, tipId, 
+                                                        taskId); 
+                if (taskDiagnostics != null) {
+                  for(String diagnostics : taskDiagnostics){
+                    System.err.println(diagnostics);
                   }
-                  // Displaying the task logs
-                  displayTaskLogs(event.getTaskId(), 
event.getTaskTrackerHttp());
-                }
-                break; 
-              case KILLED:
-                if (event.getTaskStatus() == 
TaskCompletionEvent.Status.KILLED){
-                  LOG.info(event.toString());
                 }
-                break; 
-              case ALL:
-                LOG.info(event.toString());
+                // Displaying the task logs
                 displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
-                break;
               }
+              break; 
+            case KILLED:
+              if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
+                LOG.info(event.toString());
+              }
+              break; 
+            case ALL:
+              LOG.info(event.toString());
+              displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
+              break;
             }
           }
           retries = MAX_RETRIES;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Sat Jan 
19 18:39:10 2008
@@ -1186,7 +1186,47 @@
     
     return JobPriority.valueOf(prio);
   }
-  
+
+  /**
+   * Get whether the task profiling is enabled.
+   * @return true if some tasks will be profiled
+   */
+  public boolean getProfileEnabled() {
+    return getBoolean("mapred.task.profile", false);
+  }
+
+  /**
+   * Set whether the system should collect profiler information for some of 
+   * the tasks in this job? The information is stored in the the user log 
+   * directory.
+   * @param newValue true means it should be gathered
+   */
+  public void setProfileEnabled(boolean newValue) {
+    setBoolean("mapred.task.profile", newValue);
+  }
+
+  /**
+   * Get the range of maps or reduces to profile.
+   * @param isMap is the task a map?
+   * @return the task ranges
+   */
+  public IntegerRanges getProfileTaskRange(boolean isMap) {
+    return getRange((isMap ? "mapred.task.profile.maps" : 
+                       "mapred.task.profile.reduces"), "0-2");
+  }
+
+  /**
+   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
+   * must also be called.
+   * @param newValue a set of integer ranges of the map ids
+   */
+  public void setProfileTaskRange(boolean isMap, String newValue) {
+    // parse the value to make sure it is legal
+    new Configuration.IntegerRanges(newValue);
+    set((isMap ? "mapred.task.profile.maps" : "mapred.task.profile.reduces"), 
+        newValue);
+  }
+
   /**
    * Set the debug script to run when the map tasks fail.
    * 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Sat Jan 
19 18:39:10 2008
@@ -61,6 +61,9 @@
     /** Log on the map-reduce system logs of the task. */
     SYSLOG ("syslog"),
     
+    /** The java profiler information. */
+    PROFILE ("profile.out"),
+    
     /** Log the debug script's stdout  */
     DEBUGOUT ("debugout");
         

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java 
Sat Jan 19 18:39:10 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.InputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -30,14 +31,67 @@
  * A servlet that is run by the TaskTrackers to provide the task logs via http.
  */
 public class TaskLogServlet extends HttpServlet {
+  
+  private boolean haveTaskLog(String taskId, TaskLog.LogName type) {
+    File f = TaskLog.getTaskLogFile(taskId, type);
+    return f.canRead();
+  }
+
+  /**
+   * Find the next quotable character in the given array.
+   * @param data the bytes to look in
+   * @param offset the first index to look in
+   * @param end the index after the last one to look in
+   * @return the index of the quotable character or end if none was found
+   */
+  private static int findFirstQuotable(byte[] data, int offset, int end) {
+    while (offset < end) {
+      switch (data[offset]) {
+      case '<':
+      case '>':
+      case '&':
+        return offset;
+      default:
+        offset += 1;
+      }
+    }
+    return offset;
+  }
+
+  private static void quotedWrite(OutputStream out, byte[] data, int offset,
+                                  int length) throws IOException {
+    int end = offset + length;
+    while (offset < end) {
+      int next = findFirstQuotable(data, offset, end);
+      out.write(data, offset, next - offset);
+      offset = next;
+      if (offset < end) {
+        switch (data[offset]) {
+        case '<':
+          out.write("&lt;".getBytes());
+          break;
+        case '>':
+          out.write("&gt;".getBytes());
+          break;
+        case '&':
+          out.write("&amp;".getBytes());
+          break;
+        default:
+          out.write(data[offset]);
+          break;
+        }
+        offset += 1;
+      }
+    }
+  }
+
   private void printTaskLog(HttpServletResponse response,
                             OutputStream out, String taskId, 
                             long start, long end, boolean plainText, 
                             TaskLog.LogName filter) throws IOException {
     if (!plainText) {
       out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" +
-                 "<table border=2 cellpadding=\"2\">\n" +
-                 "<tr><td><pre>\n").getBytes());
+                 "<pre>\n").getBytes());
     }
 
     try {
@@ -48,7 +102,11 @@
       while (true) {
         result = taskLogReader.read(b);
         if (result > 0) {
-          out.write(b, 0, result);
+          if (plainText) {
+            out.write(b, 0, result); 
+          } else {
+            quotedWrite(out, b, 0, result);
+          }
         } else {
           break;
         }
@@ -60,7 +118,7 @@
     } catch (IOException ioe) {
       if (filter == TaskLog.LogName.DEBUGOUT) {
         if (!plainText) {
-           out.write("</pre></td></tr></table><hr><br>\n".getBytes());
+           out.write("</pre><hr><br>\n".getBytes());
          }
         // do nothing
       }
@@ -132,8 +190,14 @@
                      TaskLog.LogName.STDERR);
         printTaskLog(response, out, taskId, start, end, plainText, 
                      TaskLog.LogName.SYSLOG);
-        printTaskLog(response, out, taskId, start, end, plainText, 
-                TaskLog.LogName.DEBUGOUT);
+        if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
+          printTaskLog(response, out, taskId, start, end, plainText, 
+                       TaskLog.LogName.DEBUGOUT);
+        }
+        if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
+          printTaskLog(response, out, taskId, start, end, plainText, 
+                       TaskLog.LogName.PROFILE);
+        }
       } else {
         printTaskLog(response, out, taskId, start, end, plainText, filter);
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Sat 
Jan 19 18:39:10 2008
@@ -336,6 +336,13 @@
       vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
       vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
 
+      if (conf.getProfileTaskRange(t.isMapTask()
+                                   ).isIncluded(t.getPartition())) {
+        File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+        vargs.add("-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,"+
+                  "verbose=n,file="+prof.toString());
+      }
+
       // Add main class and its arguments 
       vargs.add(TaskTracker.Child.class.getName());  // main of Child
       // pass umbilical address

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java?rev=613499&r1=613498&r2=613499&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java 
Sat Jan 19 18:39:10 2008
@@ -201,6 +201,37 @@
 
   BufferedWriter out;
        
+  public void testIntegerRanges() {
+    Configuration conf = new Configuration();
+    conf.set("first", "-100");
+    conf.set("second", "4-6,9-10,27");
+    conf.set("third", "34-");
+    Configuration.IntegerRanges range = conf.getRange("first", null);
+    System.out.println("first = " + range);
+    assertEquals(true, range.isIncluded(0));
+    assertEquals(true, range.isIncluded(1));
+    assertEquals(true, range.isIncluded(100));
+    assertEquals(false, range.isIncluded(101));
+    range = conf.getRange("second", null);
+    System.out.println("second = " + range);
+    assertEquals(false, range.isIncluded(3));
+    assertEquals(true, range.isIncluded(4));
+    assertEquals(true, range.isIncluded(6));
+    assertEquals(false, range.isIncluded(7));
+    assertEquals(false, range.isIncluded(8));
+    assertEquals(true, range.isIncluded(9));
+    assertEquals(true, range.isIncluded(10));
+    assertEquals(false, range.isIncluded(11));
+    assertEquals(false, range.isIncluded(26));
+    assertEquals(true, range.isIncluded(27));
+    assertEquals(false, range.isIncluded(28));
+    range = conf.getRange("third", null);
+    System.out.println("third = " + range);
+    assertEquals(false, range.isIncluded(33));
+    assertEquals(true, range.isIncluded(34));
+    assertEquals(true, range.isIncluded(100000000));
+  }
+
   public static void main(String[] argv) throws Exception {
     junit.textui.TestRunner.main(new String[]{
       TestConfiguration.class.getName()


Reply via email to