Author: omalley
Date: Wed May 7 15:41:34 2008
New Revision: 654313
URL: http://svn.apache.org/viewvc?rev=654313&view=rev
Log:
HADOOP-1915. Allow users to specify counters via strings instead
of enumerations. Contributed by Tom White.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 7 15:41:34 2008
@@ -68,6 +68,9 @@
HADOOP-3058. Add FSNamesystem status metrics.
(Lohit Vjayarenu via rangadi)
+ HADOOP-1915. Allow users to specify counters via strings instead
+ of enumerations. (tomwhite via omalley)
+
IMPROVEMENTS
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java Wed May
7 15:41:34 2008
@@ -26,7 +26,6 @@
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.MissingResourceException;
import java.util.ResourceBundle;
@@ -56,6 +55,7 @@
*/
public static class Counter implements Writable {
+ private String name;
private String displayName;
private long value;
@@ -63,7 +63,8 @@
value = 0L;
}
- Counter(String displayName, long value) {
+ Counter(String name, String displayName, long value) {
+ this.name = name;
this.displayName = displayName;
this.value = value;
}
@@ -72,7 +73,12 @@
* Read the binary representation of the counter
*/
public synchronized void readFields(DataInput in) throws IOException {
- displayName = Text.readString(in);
+ name = Text.readString(in);
+ if (in.readBoolean()) {
+ displayName = Text.readString(in);
+ } else {
+ displayName = name;
+ }
value = WritableUtils.readVLong(in);
}
@@ -80,15 +86,28 @@
* Write the binary representation of the counter
*/
public synchronized void write(DataOutput out) throws IOException {
- Text.writeString(out, displayName);
+ Text.writeString(out, name);
+ boolean distinctDisplayName = (! name.equals(displayName));
+ out.writeBoolean(distinctDisplayName);
+ if (distinctDisplayName) {
+ Text.writeString(out, displayName);
+ }
WritableUtils.writeVLong(out, value);
}
/**
+ * Get the internal name of the counter.
+ * @return the internal name of the counter
+ */
+ public synchronized String getName() {
+ return name;
+ }
+
+ /**
* Get the name of the counter.
* @return the user facing name of the counter
*/
- public String getDisplayName() {
+ public synchronized String getDisplayName() {
return displayName;
}
@@ -119,7 +138,7 @@
public static class Group implements Writable, Iterable<Counter> {
private String groupName;
private String displayName;
- private ArrayList<Counter> subcounters = new ArrayList<Counter>();
+ private Map<String, Counter> subcounters = new HashMap<String, Counter>();
// Optional ResourceBundle for localization of group and counter names.
private ResourceBundle bundle = null;
@@ -166,7 +185,7 @@
* not exist.
*/
public synchronized long getCounter(String counterName) {
- for(Counter counter: subcounters) {
+ for(Counter counter: subcounters.values()) {
if (counter != null && counter.displayName.equals(counterName)) {
return counter.value;
}
@@ -179,22 +198,24 @@
* @param id the numeric id of the counter within the group
* @param name the internal counter name
* @return the counter
+ * @deprecated use [EMAIL PROTECTED] #getCounter(String)} instead
*/
+ @Deprecated
public synchronized Counter getCounter(int id, String name) {
- Counter result = null;
- int size = subcounters.size();
- if (id < size) {
- result = subcounters.get(id);
- }
+ return getCounterForName(name);
+ }
+
+ /**
+ * Get the counter for the given name and create it if it doesn't exist.
+ * @param name the internal counter name
+ * @return the counter
+ */
+ public synchronized Counter getCounterForName(String name) {
+ Counter result = subcounters.get(name);
if (result == null) {
- LOG.debug("Adding " + name + " at " + id);
- result = new Counter(localize(name + ".name", name), 0L);
- // extend the list
- subcounters.ensureCapacity(id + 1);
- for(int i=size; i <= id; ++i) {
- subcounters.add(null);
- }
- subcounters.set(id, result);
+ LOG.debug("Adding " + name);
+ result = new Counter(name, localize(name + ".name", name), 0L);
+ subcounters.put(name, result);
}
return result;
}
@@ -203,13 +224,7 @@
* Returns the number of counters in this group.
*/
public synchronized int size() {
- int num = 0;
- for(Counter counter: subcounters) {
- if (counter != null) {
- num += 1;
- }
- }
- return num;
+ return subcounters.size();
}
/**
@@ -231,13 +246,8 @@
public synchronized void write(DataOutput out) throws IOException {
Text.writeString(out, displayName);
WritableUtils.writeVInt(out, subcounters.size());
- for(Counter counter: subcounters) {
- if (counter == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- counter.write(out);
- }
+ for(Counter counter: subcounters.values()) {
+ counter.write(out);
}
}
@@ -245,58 +255,15 @@
displayName = Text.readString(in);
subcounters.clear();
int size = WritableUtils.readVInt(in);
- subcounters.ensureCapacity(size);
for(int i=0; i < size; i++) {
- Counter counter = null;
- if (in.readBoolean()) {
- counter = new Counter();
- counter.readFields(in);
- }
- subcounters.add(counter);
- }
- }
-
- private class CounterIterator implements Iterator<Counter> {
- private int current = -1;
-
- CounterIterator() {
- getNext();
- }
-
- private void getNext() {
- synchronized (Group.this) {
- int len = subcounters.size();
- while (++current < len) {
- if (subcounters.get(current) != null) {
- return;
- }
- }
- }
- current = Integer.MAX_VALUE;
- }
-
- public boolean hasNext() {
- synchronized (Group.this) {
- return current < subcounters.size();
- }
- }
-
- public Counter next() {
- synchronized (Group.this) {
- int result = current;
- getNext();
- return subcounters.get(result);
- }
- }
-
- public void remove() {
- throw new UnsupportedOperationException
- ("NonNullIterator doesn't support remove");
+ Counter counter = new Counter();
+ counter.readFields(in);
+ subcounters.put(counter.getName(), counter);
}
}
- public Iterator<Counter> iterator() {
- return new CounterIterator();
+ public synchronized Iterator<Counter> iterator() {
+ return new ArrayList<Counter>(subcounters.values()).iterator();
}
}
@@ -347,7 +314,7 @@
Counter counter = cache.get(key);
if (counter == null) {
Group group = getGroup(key.getDeclaringClass().getName());
- counter = group.getCounter(key.ordinal(), key.toString());
+ counter = group.getCounterForName(key.toString());
cache.put(key, counter);
}
return counter;
@@ -359,9 +326,11 @@
* @param id the id of the counter within the group (0 to N-1)
* @param name the internal name of the counter
* @return the counter for that name
+ * @deprecated
*/
+ @Deprecated
public synchronized Counter findCounter(String group, int id, String name) {
- return getGroup(group).getCounter(id, name);
+ return getGroup(group).getCounterForName(name);
}
/**
@@ -376,6 +345,18 @@
}
/**
+ * Increments the specified counter by the specified amount, creating it if
+ * it didn't already exist.
+ * @param group the name of the group
+ * @param counter the internal name of the counter
+ * @param amount amount by which counter is to be incremented
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void incrCounter(String group, String counter, long
amount) {
+ getGroup(group).getCounterForName(counter).value += amount;
+ }
+
+ /**
* Returns current value of the specified counter, or 0 if the counter
* does not exist.
*/
@@ -392,12 +373,11 @@
public synchronized void incrAllCounters(Counters other) {
for (Group otherGroup: other) {
Group group = getGroup(otherGroup.getName());
- for(int i=0; i < otherGroup.subcounters.size(); ++i) {
- Counter otherCounter = otherGroup.subcounters.get(i);
- if (otherCounter != null) {
- group.getCounter(i, otherCounter.displayName).value +=
- otherCounter.value;
- }
+ group.displayName = otherGroup.displayName;
+ for (Counter otherCounter : otherGroup) {
+ Counter counter = group.getCounterForName(otherCounter.getName());
+ counter.displayName = otherCounter.displayName;
+ counter.value += otherCounter.value;
}
}
}
@@ -436,7 +416,7 @@
*
* where each counter is of the form:
*
- * name value
+ * name (false | true displayName) value
*/
public synchronized void write(DataOutput out) throws IOException {
out.writeInt(counters.size());
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
---
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
(original)
+++
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
Wed May 7 15:41:34 2008
@@ -41,8 +41,9 @@
* version 9 changes the counter representation for HADOOP-2248
* version 10 changes the TaskStatus representation for HADOOP-2208
* version 11 changes string to JobID in getTaskCompletionEvents().
+ * version 12 changes the counters representation for HADOOP-1915
*/
- public static final long versionID = 11L;
+ public static final long versionID = 12L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
---
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
(original)
+++
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
Wed May 7 15:41:34 2008
@@ -38,8 +38,9 @@
* Version 6: change the counters representation for HADOOP-2248
* Version 7: added getAllJobs for HADOOP-2487
* Version 8: change {job|task}id's to use corresponding objects rather that
strings.
+ * Version 9: change the counter representation for HADOOP-1915
*/
- public static final long versionID = 8L;
+ public static final long versionID = 9L;
/**
* Allocate a name for the job.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed May
7 15:41:34 2008
@@ -48,6 +48,8 @@
}
public void incrCounter(Enum key, long amount) {
}
+ public void incrCounter(String group, String counter, long amount) {
+ }
public InputSplit getInputSplit() throws UnsupportedOperationException {
throw new UnsupportedOperationException("NULL reporter has no input");
}
@@ -72,6 +74,17 @@
public abstract void incrCounter(Enum key, long amount);
/**
+ * Increments the counter identified by the group and counter name
+ * by the specified amount.
+ *
+ * @param group name to identify the group of the counter to be incremented.
+ * @param counter name to identify the counter within the group.
+ * @param amount A non-negative amount by which the counter is to
+ * be incremented.
+ */
+ public abstract void incrCounter(String group, String counter, long amount);
+
+ /**
* Get the [EMAIL PROTECTED] InputSplit} object for a map.
*
* @return the <code>InputSplit</code> that the map is reading from.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed May 7
15:41:34 2008
@@ -361,6 +361,12 @@
}
setProgressFlag();
}
+ public void incrCounter(String group, String counter, long amount) {
+ if (counters != null) {
+ counters.incrCounter(group, counter, amount);
+ }
+ setProgressFlag();
+ }
public InputSplit getInputSplit() throws UnsupportedOperationException
{
return Task.this.getInputSplit();
}
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
---
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
(original)
+++
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
Wed May 7 15:41:34 2008
@@ -42,9 +42,10 @@
* or not the task's output needs to be promoted.
* Version 8 changes {job|tip|task}id's to use their corresponding
* objects rather than strings.
+ * Version 9 changes the counter representation for HADOOP-1915
* */
- public static final long versionID = 8L;
+ public static final long versionID = 9L;
/** Called when a child task process starts, to get its task.*/
Task getTask(TaskAttemptID taskid) throws IOException;
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=654313&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
Wed May 7 15:41:34 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
+
+ enum EnumCounter { MAP_RECORDS }
+
+ static class CountingMapper<K, V> extends IdentityMapper<K, V> {
+
+ public void map(K key, V value,
+ OutputCollector<K, V> output, Reporter reporter)
+ throws IOException {
+ output.collect(key, value);
+ reporter.incrCounter(EnumCounter.MAP_RECORDS, 1);
+ reporter.incrCounter("StringCounter", "MapRecords", 1);
+ }
+
+ }
+
+ public void testMapReduceJob() throws Exception {
+ OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
+ Writer wr = new OutputStreamWriter(os);
+ wr.write("hello1\n");
+ wr.write("hello2\n");
+ wr.write("hello3\n");
+ wr.write("hello4\n");
+ wr.close();
+
+ JobConf conf = createJobConf();
+ conf.setJobName("counters");
+
+ conf.setInputFormat(TextInputFormat.class);
+
+ conf.setMapOutputKeyClass(LongWritable.class);
+ conf.setMapOutputValueClass(Text.class);
+
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setMapperClass(CountingMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ FileInputFormat.setInputPaths(conf, getInputDir());
+
+ FileOutputFormat.setOutputPath(conf, getOutputDir());
+
+ RunningJob runningJob = JobClient.runJob(conf);
+
+ Path[] outputFiles = FileUtil.stat2Paths(
+ getFileSystem().listStatus(getOutputDir(),
+ new OutputLogFilter()));
+ if (outputFiles.length > 0) {
+ InputStream is = getFileSystem().open(outputFiles[0]);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String line = reader.readLine();
+ int counter = 0;
+ while (line != null) {
+ counter++;
+ assertTrue(line.contains("hello"));
+ line = reader.readLine();
+ }
+ reader.close();
+ assertEquals(4, counter);
+ }
+
+ assertEquals(4,
+ runningJob.getCounters().getCounter(EnumCounter.MAP_RECORDS));
+ assertEquals(4,
+ runningJob.getCounters().getGroup("StringCounter")
+ .getCounter("MapRecords"));
+ }
+
+}