Author: ddas Date: Sat Dec 15 07:16:55 2007 New Revision: 604440 URL: http://svn.apache.org/viewvc?rev=604440&view=rev Log: HADOOP-2248. Speeds up the framework w.r.t Counters. Also has API updates to the Counters part. Contributed by Owen O'Malley.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Sat Dec 15 07:16:55 2007 @@ -120,6 +120,9 @@ It now uses the map-reduce framework for load generation. (Mukund Madhugiri via dhruba) + HADOOP-2248. Speeds up the framework w.r.t Counters. Also has API + updates to the Counters part. (Owen O'Malley via ddas) + OPTIMIZATIONS HADOOP-1898. Release the lock protecting the last time of the last stack Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Sat Dec 15 07:16:55 2007 @@ -23,16 +23,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.MissingResourceException; import java.util.ResourceBundle; -import java.util.TreeMap; import org.apache.commons.logging.*; -import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * A set of named counters. @@ -44,24 +46,68 @@ * <p><code>Counters</code> are bunched into [EMAIL PROTECTED] Group}s, each comprising of * counters from a particular <code>Enum</code> class. */ -public class Counters implements Writable { +public class Counters implements Writable, Iterable<Counters.Group> { + private static final Log LOG = LogFactory.getLog(Counters.class); //private static Log log = LogFactory.getLog("Counters.class"); /** * A counter record, comprising its name and value. */ - private static class CounterRec { - - public String name; - public long value; + public static class Counter implements Writable { + + private String displayName; + private long value; - public CounterRec(String name, long value) { - this.name = name; + Counter() { + value = 0L; + } + + Counter(String displayName, long value) { + this.displayName = displayName; this.value = value; } - } // end class CounterRec + /** + * Read the binary representation of the counter + */ + public synchronized void readFields(DataInput in) throws IOException { + displayName = Text.readString(in); + value = WritableUtils.readVLong(in); + } + + /** + * Write the binary representation of the counter + */ + public synchronized void write(DataOutput out) throws IOException { + Text.writeString(out, displayName); + WritableUtils.writeVLong(out, value); + } + + /** + * Get the name of the counter. + * @return the user facing name of the counter + */ + public String getDisplayName() { + return displayName; + } + + /** + * What is the current value of this counter? + * @return the current value + */ + public synchronized long getCounter() { + return value; + } + + /** + * Increment this counter by the given value + * @param incr the value to increase this counter by + */ + public synchronized void increment(long incr) { + value += incr; + } + } /** * <code>Group</code> of counters, comprising of counters from a particular @@ -70,31 +116,24 @@ * <p><code>Group</code>handles localization of the class name and the * counter names.</p> */ - public static class Group { - - // The group name is the fully qualified enum class name. + public static class Group implements Writable, Iterable<Counter> { private String groupName; + private String displayName; + private ArrayList<Counter> subcounters = new ArrayList<Counter>(); // Optional ResourceBundle for localization of group and counter names. - private ResourceBundle bundle = null; - - // Maps counter names to their current values. Note that the iteration - // order of this Map is the same as the ordering of the Enum class in which - // these counter names were defined. - private Map<String,Long> groupCounters = new LinkedHashMap<String,Long>(); - + private ResourceBundle bundle = null; - Group(String groupName, Collection<CounterRec> counters) { - this.groupName = groupName; + Group(String groupName) { try { bundle = getResourceBundle(groupName); } catch (MissingResourceException neverMind) { } - - for (CounterRec counter : counters) { - groupCounters.put(counter.name, counter.value); - } + this.groupName = groupName; + this.displayName = localize("CounterGroupName", groupName); + LOG.debug("Creating group " + groupName + " with " + + (bundle == null ? "nothing" : "bundle")); } /** @@ -119,37 +158,81 @@ * default, but different if an appropriate ResourceBundle is found. */ public String getDisplayName() { - return localize("CounterGroupName", groupName); + return displayName; } /** * Returns localized name of the specified counter. + * @deprecated get the counter directly */ public String getDisplayName(String counter) { - return localize(counter + ".name", counter); + return counter; } /** * Returns the counters for this group, with their names localized. + * @deprecated iterate through the group instead */ - public Collection<String> getCounterNames() { - return groupCounters.keySet(); + public synchronized Collection<String> getCounterNames() { + List<String> result = new ArrayList<String>(); + for (Counter counter:subcounters) { + if (counter != null) { + result.add(counter.displayName); + } + } + return result; } /** * Returns the value of the specified counter, or 0 if the counter does * not exist. + * @deprecated */ - public long getCounter(String counter) { - Long result = groupCounters.get(counter); - return (result == null ? 0L : result); + public synchronized long getCounter(String counterName) { + for(Counter counter: subcounters) { + if (counter != null && counter.displayName.equals(counterName)) { + return counter.value; + } + } + return 0L; + } + + /** + * Get the counter for the given id and create it if it doesn't exist. + * @param id the numeric id of the counter within the group + * @param name the internal counter name + * @return the counter + */ + public synchronized Counter getCounter(int id, String name) { + Counter result = null; + int size = subcounters.size(); + if (id < size) { + result = subcounters.get(id); + } + if (result == null) { + LOG.debug("Adding " + name + " at " + id); + result = new Counter(localize(name + ".name", name), 0L); + // extend the list + subcounters.ensureCapacity(id + 1); + for(int i=size; i <= id; ++i) { + subcounters.add(null); + } + subcounters.set(id, result); + } + return result; } /** * Returns the number of counters in this group. */ - public int size() { - return groupCounters.size(); + public synchronized int size() { + int num = 0; + for(Counter counter: subcounters) { + if (counter != null) { + num += 1; + } + } + return num; } /** @@ -168,110 +251,160 @@ return result; } + public synchronized void write(DataOutput out) throws IOException { + Text.writeString(out, displayName); + WritableUtils.writeVInt(out, subcounters.size()); + for(Counter counter: subcounters) { + if (counter == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + counter.write(out); + } + } + } - } // end class Group - + public synchronized void readFields(DataInput in) throws IOException { + displayName = Text.readString(in); + subcounters.clear(); + int size = WritableUtils.readVInt(in); + subcounters.ensureCapacity(size); + for(int i=0; i < size; i++) { + Counter counter = null; + if (in.readBoolean()) { + counter = new Counter(); + counter.readFields(in); + } + subcounters.add(counter); + } + } + + private class CounterIterator implements Iterator<Counter> { + private int current = -1; + + CounterIterator() { + getNext(); + } + + private void getNext() { + synchronized (Group.this) { + int len = subcounters.size(); + while (++current < len) { + if (subcounters.get(current) != null) { + return; + } + } + } + current = Integer.MAX_VALUE; + } + + public boolean hasNext() { + synchronized (Group.this) { + return current < subcounters.size(); + } + } + + public Counter next() { + synchronized (Group.this) { + int result = current; + getNext(); + return subcounters.get(result); + } + } + + public void remove() { + throw new UnsupportedOperationException + ("NonNullIterator doesn't support remove"); + } + } + + public Iterator<Counter> iterator() { + return new CounterIterator(); + } + } // Map from group name (enum class name) to map of int (enum ordinal) to // counter record (name-value pair). - private Map<String,Map<Integer,CounterRec>> counters = - new TreeMap<String,Map<Integer,CounterRec>>(); + private Map<String,Group> counters = new HashMap<String, Group>(); + + /** + * A cache from enum values to the associated counter. Dramatically speeds up + * typical usage. + */ + @SuppressWarnings("unchecked") + private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>(); /** * Returns the names of all counter classes. * @return Set of counter names. */ public synchronized Collection<String> getGroupNames() { - return new ArrayList<String>(counters.keySet()); + return counters.keySet(); } - + + public synchronized Iterator<Group> iterator() { + return counters.values().iterator(); + } + /** * Returns the named counter group, or an empty group if there is none * with the specified name. */ public synchronized Group getGroup(String groupName) { - Map<Integer,CounterRec> counterMap = counters.get(groupName); - Collection<CounterRec> groupCounters; - if (counterMap == null) { - groupCounters = Collections.emptySet(); + Group result = counters.get(groupName); + if (result == null) { + result = new Group(groupName); + counters.put(groupName, result); } - else { - groupCounters = counterMap.values(); + return result; + } + + /** + * Find the counter for the given enum. The same enum will always return the + * same counter. + * @param key the counter key + * @return the matching counter object + */ + @SuppressWarnings("unchecked") + public synchronized Counter findCounter(Enum key) { + Counter counter = cache.get(key); + if (counter == null) { + Group group = getGroup(key.getDeclaringClass().getName()); + counter = group.getCounter(key.ordinal(), key.toString()); + cache.put(key, counter); } - return new Group(groupName, groupCounters); + return counter; } - + + /** + * Find a counter by using strings + * @param group the name of the group + * @param id the id of the counter within the group (0 to N-1) + * @param name the internal name of the counter + * @return the counter for that name + */ + public synchronized Counter findCounter(String group, int id, String name) { + return getGroup(group).getCounter(id, name); + } + /** * Increments the specified counter by the specified amount, creating it if * it didn't already exist. * @param key identifies a counter * @param amount amount by which counter is to be incremented */ + @SuppressWarnings("unchecked") public synchronized void incrCounter(Enum key, long amount) { - int ordinal = key.ordinal(); - String counterName = key.toString(); - String groupName = key.getDeclaringClass().getName(); - Map<Integer,CounterRec> counterMap = getCounterMap(groupName); - CounterRec counter = getCounter(counterMap, counterName, ordinal); - counter.value += amount; + findCounter(key).value += amount; } /** * Returns current value of the specified counter, or 0 if the counter * does not exist. */ + @SuppressWarnings("unchecked") public synchronized long getCounter(Enum key) { - long result = 0L; - String groupName = key.getDeclaringClass().getName(); - Map<Integer,CounterRec> counterMap = counters.get(groupName); - if (counterMap != null) { - int ordinal = key.ordinal(); - String name = key.toString(); - CounterRec counter = counterMap.get(ordinal); - if (counter != null && counter.name.equals(name)) { - result = counter.value; - } - else { - // ordinal lookup failed, but maybe there is a matching name; this - // could happen if e.g. a client has a different version of the Enum class. - for (CounterRec ctr : counterMap.values()) { - if (ctr.name.equals(name)) { - result = ctr.value; - break; - } - } - } - } - return result; - } - - /** - * Returns the counters for the specified counter class. The counters are - * returned as a map from ordinal number, so that their ordering in the - * enum class declaration is preserved. - */ - private Map<Integer,CounterRec> getCounterMap(String groupName) { - Map<Integer,CounterRec> map = counters.get(groupName); - if (map == null) { - map = new TreeMap<Integer,CounterRec>(); - counters.put(groupName, map); - } - return map; - } - - /** - * Returns the counter record with the specified name and ordinal by - * finding or creating it in the specified counterMap. - */ - private CounterRec getCounter(Map<Integer,CounterRec> counterMap, - String counterName, int ordinal) - { - CounterRec result = counterMap.get(ordinal); - if (result == null) { - result = new CounterRec(counterName, 0L); - counterMap.put(ordinal, result); - } - return result; + return findCounter(key).value; } /** @@ -280,13 +413,14 @@ * @param other the other Counters instance */ public synchronized void incrAllCounters(Counters other) { - for (String groupName : other.counters.keySet()) { - Map<Integer,CounterRec> otherCounters = other.counters.get(groupName); - Map<Integer,CounterRec> myCounters = getCounterMap(groupName); - for (int i : otherCounters.keySet()) { - CounterRec otherCounter = otherCounters.get(i); - CounterRec counter = getCounter(myCounters, otherCounter.name, i); - counter.value += otherCounter.value; + for (Group otherGroup: other) { + Group group = getGroup(otherGroup.getName()); + for(int i=0; i < otherGroup.subcounters.size(); ++i) { + Counter otherCounter = otherGroup.subcounters.get(i); + if (otherCounter != null) { + group.getCounter(i, otherCounter.displayName).value += + otherCounter.value; + } } } } @@ -307,55 +441,45 @@ */ public synchronized int size() { int result = 0; - for (String groupName : counters.keySet()) { - result += counters.get(groupName).size(); + for (Group group : this) { + result += group.size(); } return result; } - // Writable. The external format is: - // - // #groups group* - // - // i.e. the number of groups followed by 0 or more groups, where each - // group is of the form: - // - // groupName #counters counter* - // - // where each counter is of the form: - // - // ordinal name value - // - + /** + * Write the set of groups. + * The external format is: + * #groups (groupName group)* + * + * i.e. the number of groups followed by 0 or more groups, where each + * group is of the form: + * + * groupDisplayName #counters (false | true counter)* + * + * where each counter is of the form: + * + * name value + */ public synchronized void write(DataOutput out) throws IOException { out.writeInt(counters.size()); - for (Map.Entry<String, Map<Integer, CounterRec>> e1 : counters.entrySet()) { - String groupName = e1.getKey(); - Map<Integer, CounterRec> map = e1.getValue(); - UTF8.writeString(out, groupName); - out.writeInt(map.size()); - for (Map.Entry<Integer, CounterRec> e2 : map.entrySet()) { - Integer ordinal = e2.getKey(); - CounterRec counter = e2.getValue(); - out.writeInt(ordinal); - UTF8.writeString(out, counter.name); - out.writeLong(counter.value); - } + for (Group group: counters.values()) { + Text.writeString(out, group.getName()); + group.write(out); } } + /** + * Read a set of groups. + */ public synchronized void readFields(DataInput in) throws IOException { int numClasses = in.readInt(); + counters.clear(); while (numClasses-- > 0) { - String groupName = UTF8.readString(in); - Map<Integer,CounterRec> counters = getCounterMap(groupName); - int numCounters = in.readInt(); - while (numCounters-- > 0) { - int index = in.readInt(); - String counterName = UTF8.readString(in); - long value = in.readLong(); - counters.put(index, new CounterRec(counterName, value)); - } + String groupName = Text.readString(in); + Group group = new Group(groupName); + group.readFields(in); + counters.put(groupName, group); } } @@ -365,29 +489,25 @@ */ public void log(Log log) { log.info("Counters: " + size()); - Collection<String> groupNames = getGroupNames(); - for (String groupName : groupNames) { - Group group = getGroup(groupName); + for(Group group: this) { log.info(" " + group.getDisplayName()); - for (String counterName : group.getCounterNames()) { - log.info(" " + group.getDisplayName(counterName) + "=" + - group.getCounter(counterName)); - } + for (Counter counter: group) { + log.info(" " + counter.getDisplayName() + "=" + + counter.getCounter()); + } } } /** * Return textual representation of the counter values. */ - public String toString() { + public synchronized String toString() { StringBuilder sb = new StringBuilder("Counters: " + size()); - Collection<String> groupNames = getGroupNames(); - for (String groupName : groupNames) { - Group group = getGroup(groupName); + for (Group group: this) { sb.append("\n\t" + group.getDisplayName()); - for (String counterName : group.getCounterNames()) { - sb.append("\n\t\t" + group.getDisplayName(counterName) + "=" + - group.getCounter(counterName)); + for (Counter counter: group) { + sb.append("\n\t\t" + counter.getDisplayName() + "=" + + counter.getCounter()); } } return sb.toString(); @@ -397,22 +517,21 @@ * Convert a counters object into a single line that is easy to parse. * @return the string with "name=value" for each counter and separated by "," */ - public String makeCompactString() { + public synchronized String makeCompactString() { StringBuffer buffer = new StringBuffer(); - for(String groupName: getGroupNames()){ - Counters.Group group = getGroup(groupName); + for(Group group: this){ boolean first = true; - for(String counterName: group.getCounterNames()) { + for(Counter counter: group) { if (first) { first = false; } else { buffer.append(','); } - buffer.append(groupName); + buffer.append(group.getDisplayName()); buffer.append('.'); - buffer.append(counterName); + buffer.append(counter.getDisplayName()); buffer.append('='); - buffer.append(group.getCounter(counterName)); + buffer.append(counter.getCounter()); } } return buffer.toString(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Sat Dec 15 07:16:55 2007 @@ -38,8 +38,9 @@ * version 7 replaces maxTasks by maxMapTasks and maxReduceTasks in * TaskTrackerStatus for HADOOP-1274 * Version 8: HeartbeatResponse is added with the next heartbeat interval. + * version 9 changes the counter representation for HADOOP-2248 */ - public static final long versionID = 8L; + public static final long versionID = 9L; public final static int TRACKERS_OK = 0; public final static int UNKNOWN_TASKTRACKER = 1; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Dec 15 07:16:55 2007 @@ -175,14 +175,12 @@ */ public void updateMetrics() { Counters counters = getCounters(); - for (String groupName : counters.getGroupNames()) { - Counters.Group group = counters.getGroup(groupName); + for (Counters.Group group : counters) { jobMetrics.setTag("group", group.getDisplayName()); - for (String counter : group.getCounterNames()) { - long value = group.getCounter(counter); - jobMetrics.setTag("counter", group.getDisplayName(counter)); - jobMetrics.setMetric("value", (float) value); + for (Counters.Counter counter : group) { + jobMetrics.setTag("counter", counter.getDisplayName()); + jobMetrics.setMetric("value", (float) counter.getCounter()); jobMetrics.update(); } } @@ -504,14 +502,14 @@ * Returns map phase counters by summing over all map tasks in progress. */ public synchronized Counters getMapCounters() { - return sumTaskCounters(maps); + return incrementTaskCounters(new Counters(), maps); } /** * Returns map phase counters by summing over all map tasks in progress. */ public synchronized Counters getReduceCounters() { - return sumTaskCounters(reduces); + return incrementTaskCounters(new Counters(), reduces); } /** @@ -519,16 +517,20 @@ * the map and the reduce counters. */ public Counters getCounters() { - return Counters.sum(getJobCounters(), - Counters.sum(getMapCounters(), getReduceCounters())); + Counters result = new Counters(); + result.incrAllCounters(getJobCounters()); + incrementTaskCounters(result, maps); + return incrementTaskCounters(result, reduces); } /** - * Returns a Counters instance representing the sum of all the counters in - * the array of tasks in progress. + * Increments the counters with the counters from each task. + * @param counters the counters to increment + * @param tips the tasks to add in to counters + * @return counters the same object passed in as counters */ - private Counters sumTaskCounters(TaskInProgress[] tips) { - Counters counters = new Counters(); + private Counters incrementTaskCounters(Counters counters, + TaskInProgress[] tips) { for (TaskInProgress tip : tips) { counters.incrAllCounters(tip.getCounters()); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Sat Dec 15 07:16:55 2007 @@ -35,8 +35,9 @@ *Version 4: added jobtracker state to ClusterStatus *Version 5: max_tasks in ClusterStatus is replaced by * max_map_tasks and max_reduce_tasks for HADOOP-1274 + * Version 6: change the counters representation for HADOOP-2248 */ - public static final long versionID = 5L; + public static final long versionID = 6L; /** * Allocate a name for the job. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Sat Dec 15 07:16:55 2007 @@ -49,7 +49,6 @@ import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.ReduceTask.ValuesIterator; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.mapred.Task.Counter.*; @@ -114,6 +113,51 @@ return instantiatedSplit; } + /** + * This class wraps the user's record reader to update the counters and progress + * as records are read. + * @param <K> + * @param <V> + */ + class TrackedRecordReader<K extends WritableComparable, V extends Writable> + implements RecordReader<K,V> { + private RecordReader<K,V> rawIn; + private Counters.Counter inputByteCounter; + private Counters.Counter inputRecordCounter; + + TrackedRecordReader(RecordReader<K,V> raw, Counters counters) { + rawIn = raw; + inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS); + inputByteCounter = counters.findCounter(MAP_INPUT_BYTES); + } + + public K createKey() { + return rawIn.createKey(); + } + + public V createValue() { + return rawIn.createValue(); + } + + public synchronized boolean next(K key, V value) + throws IOException { + + setProgress(getProgress()); + long beforePos = getPos(); + boolean ret = rawIn.next(key, value); + if (ret) { + inputRecordCounter.increment(1); + inputByteCounter.increment(getPos() - beforePos); + } + return ret; + } + public long getPos() throws IOException { return rawIn.getPos(); } + public void close() throws IOException { rawIn.close(); } + public float getProgress() throws IOException { + return rawIn.getProgress(); + } + }; + @SuppressWarnings("unchecked") public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { @@ -153,37 +197,9 @@ job.setLong("map.input.length", fileSplit.getLength()); } - final RecordReader rawIn = // open input + RecordReader rawIn = // open input job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter); - - RecordReader in = new RecordReader() { // wrap in progress reporter - - public WritableComparable createKey() { - return rawIn.createKey(); - } - - public Writable createValue() { - return rawIn.createValue(); - } - - public synchronized boolean next(WritableComparable key, Writable value) - throws IOException { - - setProgress(getProgress()); - long beforePos = getPos(); - boolean ret = rawIn.next(key, value); - if (ret) { - reporter.incrCounter(MAP_INPUT_RECORDS, 1); - reporter.incrCounter(MAP_INPUT_BYTES, (getPos() - beforePos)); - } - return ret; - } - public long getPos() throws IOException { return rawIn.getPos(); } - public void close() throws IOException { rawIn.close(); } - public float getProgress() throws IOException { - return rawIn.getProgress(); - } - }; + RecordReader in = new TrackedRecordReader(rawIn, getCounters()); MapRunnable runner = (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job); @@ -251,7 +267,6 @@ private Partitioner partitioner; private JobConf job; private Reporter reporter; - final private TaskUmbilicalProtocol umbilical; private DataOutputBuffer keyValBuffer; //the buffer where key/val will //be stored before they are @@ -271,6 +286,11 @@ private FSDataOutputStream out; private FSDataOutputStream indexOut; private long segmentStart; + private Counters.Counter mapOutputByteCounter; + private Counters.Counter mapOutputRecordCounter; + private Counters.Counter combineInputCounter; + private Counters.Counter combineOutputCounter; + public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, Reporter reporter) throws IOException { this.partitions = job.getNumReduceTasks(); @@ -281,7 +301,6 @@ this.job = job; this.reporter = reporter; - this.umbilical = umbilical; this.comparator = job.getOutputKeyComparator(); this.keyClass = job.getMapOutputKeyClass(); this.valClass = job.getMapOutputValueClass(); @@ -299,6 +318,11 @@ ReflectionUtils.newInstance(codecClass, job); } sortImpl = new BufferSorter[partitions]; + Counters counters = getCounters(); + mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES); + mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS); + combineInputCounter = getCounters().findCounter(COMBINE_INPUT_RECORDS); + combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS); for (int i = 0; i < partitions; i++) sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance( job.getClass("map.sort.class", MergeSorter.class, @@ -352,9 +376,8 @@ int partNumber = partitioner.getPartition(key, value, partitions); sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); - reporter.incrCounter(MAP_OUTPUT_RECORDS, 1); - reporter.incrCounter(MAP_OUTPUT_BYTES, - (keyValBuffer.getLength() - keyOffset)); + mapOutputRecordCounter.increment(1); + mapOutputByteCounter.increment(keyValBuffer.getLength() - keyOffset); //now check whether we need to spill to disk long totalMem = 0; @@ -438,7 +461,7 @@ while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, reporter); values.nextKey(); - reporter.incrCounter(COMBINE_OUTPUT_RECORDS, 1); + combineOutputCounter.increment(1); // indicate we're making progress reporter.progress(); } @@ -589,7 +612,7 @@ } public Object next() { - reporter.incrCounter(COMBINE_INPUT_RECORDS, 1); + combineInputCounter.increment(1); return super.next(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Sat Dec 15 07:16:55 2007 @@ -89,6 +89,12 @@ private Progress copyPhase = getProgress().addPhase("copy"); private Progress sortPhase = getProgress().addPhase("sort"); private Progress reducePhase = getProgress().addPhase("reduce"); + private Counters.Counter reduceInputKeyCounter = + getCounters().findCounter(REDUCE_INPUT_GROUPS); + private Counters.Counter reduceInputValueCounter = + getCounters().findCounter(REDUCE_INPUT_RECORDS); + private Counters.Counter reduceOutputCounter = + getCounters().findCounter(REDUCE_OUTPUT_RECORDS); public ReduceTask() { super(); @@ -232,7 +238,7 @@ reporter.progress(); } public Object next() { - reporter.incrCounter(REDUCE_INPUT_RECORDS, 1); + reduceInputValueCounter.increment(1); return super.next(); } } @@ -306,7 +312,7 @@ public void collect(WritableComparable key, Writable value) throws IOException { out.write(key, value); - reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1); + reduceOutputCounter.increment(1); // indicate that progress update needs to be sent reporter.progress(); } @@ -322,7 +328,7 @@ job, reporter); values.informReduceProgress(); while (values.more()) { - reporter.incrCounter(REDUCE_INPUT_GROUPS, 1); + reduceInputKeyCounter.increment(1); reducer.reduce(values.getKey(), values, collector, reporter); values.nextKey(); values.informReduceProgress(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Sat Dec 15 07:16:55 2007 @@ -340,7 +340,6 @@ setProgressFlag(); } public void incrCounter(Enum key, long amount) { - Counters counters = getCounters(); if (counters != null) { counters.incrCounter(key, amount); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Sat Dec 15 07:16:55 2007 @@ -36,8 +36,9 @@ * TaskUmbilicalProtocol.progress(String, float, String, * org.apache.hadoop.mapred.TaskStatus.Phase, Counters) * with [EMAIL PROTECTED] #statusUpdate(String, TaskStatus)} + * Version 5 changed counters representation for HADOOP-2248 * */ - public static final long versionID = 4L; + public static final long versionID = 5L; /** Called when a child task process starts, to get its task.*/ Task getTask(String taskid) throws IOException; Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=604440&r1=604439&r2=604440&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Sat Dec 15 07:16:55 2007 @@ -69,6 +69,11 @@ assertEquals("number of maps", 1, reports.length); reports = client.getReduceTaskReports(jobid); assertEquals("number of reduces", 1, reports.length); + Counters counters = ret.job.getCounters(); + assertEquals("number of map inputs", 3, + counters.getCounter(Task.Counter.MAP_INPUT_RECORDS)); + assertEquals("number of reduce outputs", 9, + counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS)); runCustomFormats(mr); } finally { if (mr != null) { mr.shutdown(); }