Author: cutting Date: Fri Mar 2 11:56:22 2007 New Revision: 513917 URL: http://svn.apache.org/viewvc?view=rev&rev=513917 Log: HADOOP-1041. Optimize mapred counter implementation. Contributed by David Bowen.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/build.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Mar 2 11:56:22 2007 @@ -179,6 +179,10 @@ 54. HADOOP-1046. Clean up tmp from partially received stale block files. (ab) +55. HADOOP-1041. Optimize mapred counter implementation. Also group + counters by their declaring Enum. (David Bowen via cutting) + + Release 0.11.2 - 2007-02-16 1. HADOOP-1009. Fix an infinite loop in the HDFS namenode. Modified: lucene/hadoop/trunk/build.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/build.xml (original) +++ lucene/hadoop/trunk/build.xml Fri Mar 2 11:56:22 2007 @@ -216,6 +216,14 @@ <compilerarg line="${javac.args}" /> <classpath refid="classpath"/> </javac> + + <copy todir="${build.classes}"> + <fileset + dir="${src.dir}" + includes="**/*.properties" + /> + </copy> + </target> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Fri Mar 2 11:56:22 2007 @@ -21,11 +21,16 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.Map; -import java.util.Set; +import java.util.MissingResourceException; +import java.util.ResourceBundle; +import java.util.SortedMap; import java.util.TreeMap; -import org.apache.commons.logging.Log; +import org.apache.commons.logging.*; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; @@ -34,34 +39,231 @@ */ public class Counters implements Writable { - private Map<String,Long> counters = new TreeMap<String,Long>(); - + //private static Log log = LogFactory.getLog("Counters.class"); + + /** + * A counter record, comprising its name and value. + */ + private static class CounterRec { + + public String name; + public long value; + + public CounterRec(String name, long value) { + this.name = name; + this.value = value; + } + + } // end class CounterRec + + /** + * Represents a group of counters, comprising the counters from a particular + * counter enum class. + * + * This class handles localization of the class name and the counter names. + */ + public static class Group { + + // The group name is the fully qualified enum class name. + private String groupName; + + // Optional ResourceBundle for localization of group and counter names. + private ResourceBundle bundle = null; + + // Maps counter names to their current values. Note that the iteration + // order of this Map is the same as the ordering of the Enum class in which + // these counter names were defined. + private Map<String,Long> groupCounters = new LinkedHashMap<String,Long>(); + + + Group(String groupName, Collection<CounterRec> counters) { + this.groupName = groupName; + try { + bundle = getResourceBundle(groupName); + } + catch (MissingResourceException neverMind) { + } + + for (CounterRec counter : counters) { + groupCounters.put(counter.name, counter.value); + } + } + + /** + * Returns the specified resource bundle, or throws an exception. + * @throws MissingResourceException if the bundle isn't found + */ + private static ResourceBundle getResourceBundle(String enumClassName) { + String bundleName = enumClassName.replace('$','_'); + return ResourceBundle.getBundle(bundleName); + } + + /** + * Returns raw name of the group. This is the name of the enum class + * for this group of counters. + */ + public String getName() { + return groupName; + } + + /** + * Returns localized name of the group. This is the same as getName() by + * default, but different if an appropriate ResourceBundle is found. + */ + public String getDisplayName() { + return localize("CounterGroupName", groupName); + } + + /** + * Returns localized name of the specified counter. + */ + public String getDisplayName(String counter) { + return localize(counter + ".name", counter); + } + + /** + * Returns the counters for this group, with their names localized. + */ + public Collection<String> getCounterNames() { + return groupCounters.keySet(); + } + + /** + * Returns the value of the specified counter, or 0 if the counter does + * not exist. + */ + public long getCounter(String counter) { + Long result = groupCounters.get(counter); + return (result == null ? 0L : result); + } + + /** + * Returns the number of counters in this group. + */ + public int size() { + return groupCounters.size(); + } + + /** + * Looks up key in the ResourceBundle and returns the corresponding value. + * If the bundle or the key doesn't exist, returns the default value. + */ + private String localize(String key, String defaultValue) { + String result = defaultValue; + if (bundle != null) { + try { + result = bundle.getString(key); + } + catch (MissingResourceException mre) { + } + } + return result; + } + + + } // end class Group + + + // Map from group name (enum class name) to map of int (enum ordinal) to + // counter record (name-value pair). + private Map<String,Map<Integer,CounterRec>> counters = + new TreeMap<String,Map<Integer,CounterRec>>(); + /** - * Returns the names of all counters. + * Returns the names of all counter classes. * @return Set of counter names. */ - public synchronized Set<String> getCounterNames() { + public synchronized Collection<String> getGroupNames() { return counters.keySet(); } /** - * Returns the value of the named counter, or 0 if counter doesn't exist. - * @param name name of a counter - * @return value of the counter + * Returns the named counter group, or an empty group if there is none + * with the specified name. */ - public synchronized long getCounter(String name) { - Long result = counters.get(name); - return (result == null ? 0L : result); + public synchronized Group getGroup(String groupName) { + Map<Integer,CounterRec> counterMap = counters.get(groupName); + Collection<CounterRec> groupCounters; + if (counterMap == null) { + groupCounters = Collections.emptySet(); + } + else { + groupCounters = counterMap.values(); + } + return new Group(groupName, groupCounters); } /** - * Increments the named counter by the specified amount, creating it if + * Increments the specified counter by the specified amount, creating it if * it didn't already exist. - * @param name of a counter + * @param key identifies a counter * @param amount amount by which counter is to be incremented */ - public synchronized void incrCounter(String name, long amount) { - counters.put(name, amount + getCounter(name)); + public synchronized void incrCounter(Enum key, long amount) { + int ordinal = key.ordinal(); + String counterName = key.toString(); + String groupName = key.getDeclaringClass().getName(); + Map<Integer,CounterRec> counterMap = getCounterMap(groupName); + CounterRec counter = getCounter(counterMap, counterName, ordinal); + counter.value += amount; + } + + /** + * Returns current value of the specified counter, or 0 if the counter + * does not exist. + */ + public synchronized long getCounter(Enum key) { + long result = 0L; + String groupName = key.getDeclaringClass().getName(); + Map<Integer,CounterRec> counterMap = counters.get(groupName); + if (counterMap != null) { + int ordinal = key.ordinal(); + String name = key.toString(); + CounterRec counter = counterMap.get(ordinal); + if (counter != null && counter.name.equals(name)) { + result = counter.value; + } + else { + // ordinal lookup failed, but maybe there is a matching name; this + // could happen if e.g. a client has a different version of the Enum class. + for (CounterRec ctr : counterMap.values()) { + if (ctr.name.equals(name)) { + result = ctr.value; + break; + } + } + } + } + return result; + } + + /** + * Returns the counters for the specified counter class. The counters are + * returned as a map from ordinal number, so that their ordering in the + * enum class declaration is preserved. + */ + private Map<Integer,CounterRec> getCounterMap(String groupName) { + Map<Integer,CounterRec> map = counters.get(groupName); + if (map == null) { + map = new TreeMap<Integer,CounterRec>(); + counters.put(groupName, map); + } + return map; + } + + /** + * Returns the counter record with the specified name and ordinal by + * finding or creating it in the specified counterMap. + */ + private CounterRec getCounter(Map<Integer,CounterRec> counterMap, + String counterName, int ordinal) + { + CounterRec result = counterMap.get(ordinal); + if (result == null) { + result = new CounterRec(counterName, 0L); + counterMap.put(ordinal, result); + } + return result; } /** @@ -70,34 +272,80 @@ * @param other the other Counters instance */ public synchronized void incrAllCounters(Counters other) { - for (String name : other.getCounterNames()) { - incrCounter(name, other.getCounter(name)); + for (String groupName : other.counters.keySet()) { + Map<Integer,CounterRec> otherCounters = other.counters.get(groupName); + Map<Integer,CounterRec> myCounters = getCounterMap(groupName); + for (int i : otherCounters.keySet()) { + CounterRec otherCounter = otherCounters.get(i); + CounterRec counter = getCounter(myCounters, otherCounter.name, i); + counter.value += otherCounter.value; + } } } + + /** + * Convenience method for computing the sum of two sets of counters. + */ + public static Counters sum(Counters a, Counters b) { + Counters counters = new Counters(); + counters.incrAllCounters(a); + counters.incrAllCounters(b); + return counters; + } /** - * Returns the number of counters. + * Returns the total number of counters, by summing the number of counters + * in each group. */ - public synchronized int size() { - return counters.size(); + public synchronized int size() { + int result = 0; + for (String groupName : counters.keySet()) { + result += counters.get(groupName).size(); + } + return result; } - // Writable + // Writable. The external format is: + // + // #groups group* + // + // i.e. the number of groups followed by 0 or more groups, where each + // group is of the form: + // + // groupName #counters counter* + // + // where each counter is of the form: + // + // ordinal name value + // public synchronized void write(DataOutput out) throws IOException { out.writeInt(counters.size()); - for (String name : counters.keySet()) { - UTF8.writeString(out, name); - out.writeLong(counters.get(name)); + for (String groupName : counters.keySet()) { + UTF8.writeString(out, groupName); + Map<Integer,CounterRec> map = counters.get(groupName); + out.writeInt(map.size()); + for (Integer ordinal : map.keySet()) { + CounterRec counter = map.get(ordinal); + out.writeInt(ordinal); + UTF8.writeString(out, counter.name); + out.writeLong(counter.value); + } } } public synchronized void readFields(DataInput in) throws IOException { - int n = in.readInt(); - while (n-- > 0) { - String name = UTF8.readString(in); - long value = in.readLong(); - counters.put(name, value); + int numClasses = in.readInt(); + while (numClasses-- > 0) { + String groupName = UTF8.readString(in); + Map<Integer,CounterRec> counters = getCounterMap(groupName); + int numCounters = in.readInt(); + while (numCounters-- > 0) { + int index = in.readInt(); + String counterName = UTF8.readString(in); + long value = in.readLong(); + counters.put(index, new CounterRec(counterName,value)); + } } } @@ -106,10 +354,17 @@ * @param log The log to use. */ public void log(Log log) { - log.info("Counters: " + getCounterNames().size()); - for (String counterName : getCounterNames()) { - log.info(" " + counterName + "=" + getCounter(counterName)); + log.info("Counters: " + size()); + Collection<String> groupNames = getGroupNames(); + for (String groupName : groupNames) { + Group group = getGroup(groupName); + log.info(" " + group.getDisplayName()); + for (String counterName : group.getCounterNames()) { + log.info(" " + group.getDisplayName(counterName) + "=" + + group.getCounter(counterName)); + } } } + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Mar 2 11:56:22 2007 @@ -176,8 +176,11 @@ "reduce() completion: " + status.reduceProgress(); } - public Counters getCounters() { - return status.getCounters(); + /** + * Returns the counters for this job + */ + public Counters getCounters() throws IOException { + return jobSubmitClient.getJobCounters(getJobID()); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 2 11:56:22 2007 @@ -84,7 +84,8 @@ private LocalFileSystem localFs; private String uniqueString; - private Counters counters = new Counters(); + private Counters mapCounters = new Counters(); + private Counters reduceCounters = new Counters(); private MetricsRecord jobMetrics; /** @@ -132,6 +133,25 @@ } /** + * Called periodically by JobTrackerMetrics to update the metrics for + * this job. + */ + public void updateMetrics() { + Counters counters = getCounters(); + for (String groupName : counters.getGroupNames()) { + Counters.Group group = counters.getGroup(groupName); + jobMetrics.setTag("group", group.getDisplayName()); + + for (String counter : group.getCounterNames()) { + long value = group.getCounter(counter); + jobMetrics.setTag("counter", group.getDisplayName(counter)); + jobMetrics.setMetric("value", (float) value); + jobMetrics.update(); + } + } + } + + /** * Construct the splits, etc. This is invoked from an async * thread so that split-computation doesn't block anyone. */ @@ -354,29 +374,41 @@ (progressDelta / reduces.length))); } } - - // - // Update counters by summing over all tasks in progress - // - Counters newCounters = new Counters(); - for (TaskInProgress mapTask : maps) { - newCounters.incrAllCounters(mapTask.getCounters()); - } - for (TaskInProgress reduceTask : reduces) { - newCounters.incrAllCounters(reduceTask.getCounters()); - } - this.status.setCounters(newCounters); - - // - // Send counter data to the metrics package. - // - for (String counter : newCounters.getCounterNames()) { - long value = newCounters.getCounter(counter); - jobMetrics.setTag("counter", counter); - jobMetrics.setMetric("value", (float) value); - jobMetrics.update(); - } - } + } + + /** + * Returns map phase counters by summing over all map tasks in progress. + */ + public synchronized Counters getMapCounters() { + return sumTaskCounters(maps); + } + + /** + * Returns map phase counters by summing over all map tasks in progress. + */ + public synchronized Counters getReduceCounters() { + return sumTaskCounters(reduces); + } + + /** + * Returns the total job counters, by adding together the map and the + * reduce counters. + */ + public Counters getCounters() { + return Counters.sum(getMapCounters(), getReduceCounters()); + } + + /** + * Returns a Counters instance representing the sum of all the counters in + * the array of tasks in progress. + */ + private Counters sumTaskCounters(TaskInProgress[] tips) { + Counters counters = new Counters(); + for (TaskInProgress tip : tips) { + counters.incrAllCounters(tip.getCounters()); + } + return counters; + } ///////////////////////////////////////////////////// // Create/manage tasks Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Fri Mar 2 11:56:22 2007 @@ -54,7 +54,7 @@ private int runState; private long startTime; private String user; - private Counters counters = new Counters(); + /** */ public JobStatus() { @@ -118,7 +118,7 @@ public void setRunState(int state) { this.runState = state; } - + /** * Set the start time of the job * @param startTime The startTime of the job @@ -140,15 +140,6 @@ */ public String getUsername() { return this.user;}; - /** - * @param counters Counters for the job. - */ - void setCounters(Counters counters) { this.counters = counters; } - /** - * @return the counters for the job - */ - public Counters getCounters() { return counters; } - /////////////////////////////////////// // Writable /////////////////////////////////////// @@ -159,8 +150,8 @@ out.writeInt(runState); out.writeLong(startTime); UTF8.writeString(out, user); - counters.write(out); } + public void readFields(DataInput in) throws IOException { this.jobid = UTF8.readString(in); this.mapProgress = in.readFloat(); @@ -168,6 +159,5 @@ this.runState = in.readInt(); this.startTime = in.readLong(); this.user = UTF8.readString(in); - counters.readFields(in); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar 2 11:56:22 2007 @@ -32,7 +32,7 @@ *Changing the versionID to 2L since the getTaskCompletionEvents method has *changed */ - public static final long versionID = 2L; + public static final long versionID = 3L; /** * Submit a Job for execution. Returns the latest profile for * that job. @@ -60,6 +60,11 @@ */ public JobStatus getJobStatus(String jobid) throws IOException; + /** + * Grab the current job counters + */ + public Counters getJobCounters(String jobid) throws IOException; + /** * Grab a bunch of info on the map tasks that make up the job */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Mar 2 11:56:22 2007 @@ -424,6 +424,12 @@ numJobsCompleted = 0; } metricsRecord.update(); + + if (tracker != null) { + for (JobInProgress jip : tracker.getRunningJobs()) { + jip.updateMetrics(); + } + } } synchronized void launchMap() { @@ -929,6 +935,15 @@ } return v; } + /** + * Version that is called from a timer thread, and therefore needs to be + * careful to synchronize. + */ + public synchronized List<JobInProgress> getRunningJobs() { + synchronized (jobs) { + return (List<JobInProgress>) runningJobs(); + } + } public Vector failedJobs() { Vector v = new Vector(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { @@ -1422,6 +1437,14 @@ return null; } } + public synchronized Counters getJobCounters(String jobid) { + JobInProgress job = (JobInProgress) jobs.get(jobid); + if (job != null) { + return job.getCounters(); + } else { + return null; + } + } public synchronized TaskReport[] getMapTaskReports(String jobid) { JobInProgress job = (JobInProgress) jobs.get(jobid); if (job == null) { @@ -1545,7 +1568,7 @@ JobStatus status = jip.getStatus(); if (status.getRunState() == JobStatus.RUNNING || status.getRunState() == JobStatus.PREP) { - status.setStartTime(jip.getStartTime()); + status.setStartTime(jip.getStartTime()); status.setUsername(jip.getProfile().getUser()); v.add(status); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 2 11:56:22 2007 @@ -62,9 +62,12 @@ private Path localFile; private FileSystem localFs; - // Contains the counters summed over all the tasks which + // Counters summed over all the map/reduce tasks which // have successfully completed - private Counters counters = new Counters(); + private Counters completedTaskCounters = new Counters(); + + // Current counters, including incomplete task(s) + private Counters currentCounters = new Counters(); public long getProtocolVersion(String protocol, long clientVersion) { return TaskUmbilicalProtocol.versionID; @@ -181,7 +184,7 @@ public Task getTask(String taskid) { return null; } public void progress(String taskId, float progress, String state, - TaskStatus.Phase phase, Counters taskStats) { + TaskStatus.Phase phase, Counters taskCounters) { LOG.info(state); float taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping @@ -190,9 +193,9 @@ } else { status.setReduceProgress(progress); } + currentCounters = Counters.sum(completedTaskCounters, taskCounters); // ignore phase - updateStatusCounters(taskStats); } /** @@ -201,21 +204,7 @@ * successfully completed */ private void updateCounters(Task task) { - counters.incrAllCounters(task.getCounters()); - status.setCounters(counters); - } - - /** - * Sets status counters to the sum of (1) the counters from - * all completed tasks, and (2) the counters from a particular - * task in progress. - * @param taskCounters Counters from a task that is in progress - */ - private void updateStatusCounters(Counters taskCounters) { - Counters newStats = new Counters(); - newStats.incrAllCounters(counters); - newStats.incrAllCounters(taskCounters); - status.setCounters(newStats); + completedTaskCounters.incrAllCounters(task.getCounters()); } public void reportDiagnosticInfo(String taskid, String trace) { @@ -272,6 +261,11 @@ public JobStatus getJobStatus(String id) { Job job = (Job)jobs.get(id); return job.status; + } + + public Counters getJobCounters(String id) { + Job job = (Job)jobs.get(id); + return job.currentCounters; } public String getFilesystemName() throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Mar 2 11:56:22 2007 @@ -50,6 +50,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.mapred.Task.Counter.*; /** A Map task. */ class MapTask extends Task { @@ -64,13 +65,6 @@ { // set phase for this task setPhase(TaskStatus.Phase.MAP); } - - private enum Counter { - INPUT_RECORDS, - INPUT_BYTES, - OUTPUT_RECORDS, - OUTPUT_BYTES - } public MapTask() {} @@ -161,8 +155,8 @@ setProgress(getProgress()); long beforePos = getPos(); boolean ret = rawIn.next(key, value); - reporter.incrCounter(Counter.INPUT_RECORDS, 1); - reporter.incrCounter(Counter.INPUT_BYTES, (getPos() - beforePos)); + reporter.incrCounter(MAP_INPUT_RECORDS, 1); + reporter.incrCounter(MAP_INPUT_BYTES, (getPos() - beforePos)); return ret; } public long getPos() throws IOException { return rawIn.getPos(); } @@ -324,8 +318,8 @@ int partNumber = partitioner.getPartition(key, value, partitions); sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); - reporter.incrCounter(Counter.OUTPUT_RECORDS, 1); - reporter.incrCounter(Counter.OUTPUT_BYTES, + reporter.incrCounter(MAP_OUTPUT_RECORDS, 1); + reporter.incrCounter(MAP_OUTPUT_BYTES, (keyValBuffer.getLength() - keyOffset)); //now check whether we need to spill to disk Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 2 11:56:22 2007 @@ -44,6 +44,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.mapred.Task.Counter.*; + /** A Reduce task. */ class ReduceTask extends Task { @@ -54,8 +56,6 @@ public Writable newInstance() { return new ReduceTask(); } }); } - - private enum Counter { INPUT_RECORDS, OUTPUT_RECORDS } private int numMaps; private boolean sortComplete; @@ -296,7 +296,7 @@ public void collect(WritableComparable key, Writable value) throws IOException { out.write(key, value); - reporter.incrCounter(Counter.OUTPUT_RECORDS, 1); + reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1); reportProgress(umbilical); } }; @@ -309,7 +309,7 @@ keyClass, valClass, umbilical, job); values.informReduceProgress(); while (values.more()) { - reporter.incrCounter(Counter.INPUT_RECORDS, 1); + reporter.incrCounter(REDUCE_INPUT_RECORDS, 1); reducer.reduce(values.getKey(), values, collector, reporter); values.nextKey(); values.informReduceProgress(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Fri Mar 2 11:56:22 2007 @@ -80,5 +80,8 @@ public TaskCompletionEvent[] getTaskCompletionEvents( int startFrom) throws IOException; - public Counters getCounters(); + /** + * Gets the counters for this job. + */ + public Counters getCounters() throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Mar 2 11:56:22 2007 @@ -35,6 +35,17 @@ private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner"); + // Counters used by Task subclasses + protected static enum Counter { + MAP_INPUT_RECORDS, + MAP_OUTPUT_RECORDS, + MAP_INPUT_BYTES, + MAP_OUTPUT_BYTES, + REDUCE_INPUT_RECORDS, + REDUCE_OUTPUT_RECORDS + } + + //////////////////////////////////////////// // Fields //////////////////////////////////////////// @@ -152,6 +163,7 @@ private transient long nextProgressTime = System.currentTimeMillis() + PROGRESS_INTERVAL; + // Current counters private transient Counters counters = new Counters(); public abstract boolean isMapTask(); @@ -174,8 +186,7 @@ public void incrCounter(Enum key, long amount) { Counters counters = getCounters(); if (counters != null) { - String name = key.getDeclaringClass().getName()+"#"+key.toString(); - counters.incrCounter(name, amount); + counters.incrCounter(key, amount); } } }; Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties?view=auto&rev=513917 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties Fri Mar 2 11:56:22 2007 @@ -0,0 +1,11 @@ +# ResourceBundle properties file for Map-Reduce counters + +CounterGroupName= Map-Reduce Framework + +MAP_INPUT_RECORDS.name= Map input records +MAP_INPUT_BYTES.name= Map input bytes +MAP_OUTPUT_RECORDS.name= Map output records +MAP_OUTPUT_BYTES.name= Map output bytes +REDUCE_INPUT_RECORDS.name= Reduce input records +REDUCE_OUTPUT_RECORDS.name= Reduce output records + Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Fri Mar 2 11:56:22 2007 @@ -3,6 +3,7 @@ import="javax.servlet.*" import="javax.servlet.http.*" import="java.io.*" + import="java.text.*" import="java.util.*" import="java.text.DecimalFormat" import="org.apache.hadoop.mapred.*" @@ -10,6 +11,7 @@ %> <%! + JobTracker tracker = JobTracker.getTracker(); String trackerName = StringUtils.simpleHostname(tracker.getJobTrackerMachine()); @@ -37,21 +39,53 @@ failures += task.numTaskFailures(); } out.print("<tr><th><a href=\"/jobtasks.jsp?jobid=" + jobId + - "&type="+ kind + "&pagenum=1\">" + kind + "</a></th><td>" + + "&type="+ kind + "&pagenum=1\">" + kind + + "</a></th><td align=\"right\">" + StringUtils.formatPercent(completePercent, 2) + - "</td><td>" + totalTasks + "</td><td>" + + "</td><td align=\"right\">" + + totalTasks + + "</td><td align=\"right\">" + (totalTasks - runningTasks - finishedTasks - killedTasks) + - "</td><td>" + - runningTasks + "</td><td>" + - finishedTasks + "</td><td>" + + "</td><td align=\"right\">" + + runningTasks + + "</td><td align=\"right\">" + + finishedTasks + + "</td><td align=\"right\">" + killedTasks + - "</td><td><a href=\"/jobfailures.jsp?jobid=" + jobId + + "</td><td align=\"right\"><a href=\"/jobfailures.jsp?jobid=" + jobId + "&kind=" + kind + "\">" + failures + "</a></td></tr>\n"); } - - private void printJobStatus(JspWriter out, - String jobId) throws IOException { +%> +<% + String jobId = request.getParameter("jobid"); + String refreshParam = request.getParameter("refresh"); + + int refresh = 60; // refresh every 60 seconds by default + if (refreshParam != null) { + try { + refresh = Integer.parseInt(refreshParam); + } + catch (NumberFormatException ignored) { + } + } +%> + +<html> +<head> + <% + if (refresh != 0) { + %> + <meta http-equiv="refresh" content="<%=refresh%>"> + <% + } + %> +<title>Hadoop <%=jobId%> on <%=trackerName%></title> +</head> +<body> +<h1>Hadoop <%=jobId%> on <a href="/jobtracker.jsp"><%=trackerName%></a></h1> + +<% JobInProgress job = (JobInProgress) tracker.getJob(jobId); if (job == null) { out.print("<b>Job " + jobId + " not found.</b><br>\n"); @@ -99,33 +133,53 @@ job.getReduceTasks()); out.print("</table>\n"); - Counters counters = status.getCounters(); - out.println("<p/>"); - out.println("<table border=2 cellpadding=\"5\" cellspacing=\"2\">"); - out.println("<tr><th>Counter</th><th>Value</th></tr>"); - for (String counter : counters.getCounterNames()) { - out.print("<tr><td>" + counter + "</td><td>" + counters.getCounter(counter) + - "</td></tr>\n"); + %> + <p/> + <table border=2 cellpadding="5" cellspacing="2"> + <tr> + <th><br/></th> + <th>Counter</th> + <th>Map</th> + <th>Reduce</th> + <th>Total</th> + </tr> + <% + Counters mapCounters = job.getMapCounters(); + Counters reduceCounters = job.getReduceCounters(); + Counters totalCounters = Counters.sum(mapCounters,reduceCounters); + + for (String groupName : totalCounters.getGroupNames()) { + Counters.Group totalGroup = totalCounters.getGroup(groupName); + Counters.Group mapGroup = mapCounters.getGroup(groupName); + Counters.Group reduceGroup = reduceCounters.getGroup(groupName); + + Format decimal = new DecimalFormat(); + + boolean isFirst = true; + for (String counter : totalGroup.getCounterNames()) { + String mapValue = decimal.format(mapGroup.getCounter(counter)); + String reduceValue = decimal.format(reduceGroup.getCounter(counter)); + String totalValue = decimal.format(totalGroup.getCounter(counter)); + %> + <tr> + <% + if (isFirst) { + isFirst = false; + %> + <td rowspan="<%=totalGroup.size()%>"><%=totalGroup.getDisplayName()%></td> + <% + } + %> + <td><%=totalGroup.getDisplayName(counter)%></td> + <td align="right"><%=mapValue%></td> + <td align="right"><%=reduceValue%></td> + <td align="right"><%=totalValue%></td> + </tr> + <% + } } - out.print("</table>\n"); - } -%> - -<% - String jobid = request.getParameter("jobid"); -%> - -<html> -<head> -<meta http-equiv="refresh" content=60> -<title>Hadoop <%=jobid%> on <%=trackerName%></title> -</head> -<body> -<h1>Hadoop <%=jobid%> on <a href="/jobtracker.jsp"><%=trackerName%></a></h1> - -<% - printJobStatus(out, jobid); -%> + %> + </table> <hr> <a href="/jobtracker.jsp">Go back to JobTracker</a><br> Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Fri Mar 2 11:56:22 2007 @@ -36,7 +36,9 @@ %> <html> -<title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerLabel%></title> + <head> + <title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerLabel%></title> + </head> <body> <h1>Hadoop <%=type%> task list for <a href="/jobdetails.jsp?jobid=<%=jobid%>"><%=jobid%></a> on @@ -67,10 +69,10 @@ report.getTaskId() + "</a></td>"); out.print("<td>" + StringUtils.formatPercent(report.getProgress(),2) + "</td>"); - out.print("<td>" + report.getState() + "</td>"); - out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, report.getStartTime(),0) + "</td>"); + out.print("<td>" + report.getState() + "<br/></td>"); + out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, report.getStartTime(),0) + "<br/></td>"); out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, - report.getFinishTime(), report.getStartTime()) + "</td>"); + report.getFinishTime(), report.getStartTime()) + "<br/></td>"); String[] diagnostics = report.getDiagnostics(); out.print("<td><pre>"); for (int j = 0; j < diagnostics.length ; j++) { Modified: lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp Fri Mar 2 11:56:22 2007 @@ -14,7 +14,7 @@ StringUtils.simpleHostname(tracker.getJobTrackerMachine()); private static DecimalFormat percentFormat = new DecimalFormat("##0.00"); - public void generateJobTable(JspWriter out, String label, Vector jobs) throws IOException { + public void generateJobTable(JspWriter out, String label, Vector jobs, int refresh) throws IOException { out.print("<center>\n"); out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n"); out.print("<tr><td align=\"center\" colspan=\"9\"><b>" + label + " Jobs </b></td></tr>\n"); @@ -39,7 +39,8 @@ int completedReduces = job.finishedReduces(); String name = profile.getJobName(); - out.print( "<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + + out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + + "&refresh=" + refresh + "\">" + jobid + "</a></td>" + "<td>" + profile.getUser() + "</td>" + "<td>" + ("".equals(name) ? " " : name) + "</td>" + @@ -95,20 +96,20 @@ <h2>Running Jobs</h2> <% - generateJobTable(out, "Running", tracker.runningJobs()); + generateJobTable(out, "Running", tracker.runningJobs(), 10); %> <hr> <h2>Completed Jobs</h2> <% - generateJobTable(out, "Completed", tracker.completedJobs()); + generateJobTable(out, "Completed", tracker.completedJobs(), 0); %> <hr> <h2>Failed Jobs</h2> <% - generateJobTable(out, "Failed", tracker.failedJobs()); + generateJobTable(out, "Failed", tracker.failedJobs(), 0); %> <hr> Modified: lucene/hadoop/trunk/src/webapps/job/taskstats.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskstats.jsp?view=diff&rev=513917&r1=513916&r2=513917 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/taskstats.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Fri Mar 2 11:56:22 2007 @@ -4,6 +4,7 @@ import="javax.servlet.http.*" import="java.io.*" import="java.lang.String" + import="java.text.*" import="java.util.*" import="org.apache.hadoop.mapred.*" import="org.apache.hadoop.util.*" @@ -16,44 +17,60 @@ JobInProgress job = (JobInProgress) tracker.getJob(jobid); String tipid = request.getParameter("tipid"); String taskid = request.getParameter("taskid"); + Format decimal = new DecimalFormat(); Counters counters; if (taskid == null) { - counters = tracker.getTipCounters(jobid, tipid); - taskid = tipid; // for page title etc + counters = tracker.getTipCounters(jobid, tipid); + taskid = tipid; // for page title etc } else { - TaskStatus taskStatus = tracker.getTaskStatus(jobid, tipid, taskid); - counters = taskStatus.getCounters(); + TaskStatus taskStatus = tracker.getTaskStatus(jobid, tipid, taskid); + counters = taskStatus.getCounters(); } %> <html> -<title>Counters for <%=taskid%></title> + <head> + <title>Counters for <%=taskid%></title> + </head> <body> <h1>Counters for <%=taskid%></h1> <hr> <% - if( counters == null ) { + if ( counters == null ) { %> - <h3>No counter information found for this task</h3> + <h3>No counter information found for this task</h3> <% - }else{ + } else { %> - <table border=2 cellpadding="5" cellspacing="2"> - <tr><td align="center">Counter</td><td>Value</td></tr> - <% - for (String counter : counters.getCounterNames()) { - long value = counters.getCounter(counter); - %> - <tr><td><%=counter%></td><td><%=value%></td></tr> - <% - } - %> - </table> + <table> <% - } + for (String groupName : counters.getGroupNames()) { + Counters.Group group = counters.getGroup(groupName); + String displayGroupName = group.getDisplayName(); +%> + <tr> + <td colspan="3"><br/><b><%=displayGroupName%></b></td> + </tr> +<% + for (String counter : group.getCounterNames()) { + String displayCounterName = group.getDisplayName(counter); + long value = group.getCounter(counter); +%> + <tr> + <td width="50"></td> + <td><%=displayCounterName%></td> + <td align="right"><%=decimal.format(value)%></td> + </tr> +<% + } + } +%> + </table> +<% + } %> <hr>