Author: ddas
Date: Fri Jul 25 09:59:16 2008
New Revision: 679859
URL: http://svn.apache.org/viewvc?rev=679859&view=rev
Log:
HADOOP-3747. Adds counter suport for MultipleOutputs. Contributed by Alejandro
Abdelnur.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=679859&r1=679858&r2=679859&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 25 09:59:16 2008
@@ -107,6 +107,9 @@
shuffle and the backoff logic is dependent on the type of timeout.
(Jothi Padmanabhan via ddas)
+ HADOOP-3747. Adds counter suport for MultipleOutputs.
+ (Alejandro Abdelnur via ddas)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java?rev=679859&r1=679858&r2=679859&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
Fri Jul 25 09:59:16 2008
@@ -45,6 +45,13 @@
* key/values written to the job <code>OutputCollector</code> are part of the
* reduce phase.
* <p/>
+ * MultipleOutputs supports counters, by default the are disabled. The counters
+ * group is the [EMAIL PROTECTED] MultipleOutputs} class name.
+ * </p>
+ * The names of the counters are the same as the named outputs. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ * <p/>
* Job configuration usage pattern is:
* <pre>
*
@@ -115,6 +122,13 @@
private static final String VALUE = ".value";
private static final String MULTI = ".multi";
+ private static final String COUNTERS_ENABLED = "mo.counters";
+
+ /**
+ * Counters group used by the the counters of MultipleOutputs.
+ */
+ private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+
/**
* Checks if a named output is alreadyDefined or not.
*
@@ -319,12 +333,52 @@
conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
}
+ /**
+ * Enables or disables counters for the named outputs.
+ * <p/>
+ * By default these counters are disabled.
+ * <p/>
+ * MultipleOutputs supports counters, by default the are disabled.
+ * The counters group is the [EMAIL PROTECTED] MultipleOutputs} class name.
+ * </p>
+ * The names of the counters are the same as the named outputs. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ *
+ * @param conf job conf to enableadd the named output.
+ * @param enabled indicates if the counters will be enabled or not.
+ */
+ public static void setCountersEnabled(JobConf conf, boolean enabled) {
+ conf.setBoolean(COUNTERS_ENABLED, enabled);
+ }
+
+ /**
+ * Returns if the counters for the named outputs are enabled or not.
+ * <p/>
+ * By default these counters are disabled.
+ * <p/>
+ * MultipleOutputs supports counters, by default the are disabled.
+ * The counters group is the [EMAIL PROTECTED] MultipleOutputs} class name.
+ * </p>
+ * The names of the counters are the same as the named outputs. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ *
+ *
+ * @param conf job conf to enableadd the named output.
+ * @return TRUE if the counters are enabled, FALSE if they are disabled.
+ */
+ public static boolean getCountersEnabled(JobConf conf) {
+ return conf.getBoolean(COUNTERS_ENABLED, false);
+ }
+
// instance code, to be used from Mapper/Reducer code
private JobConf conf;
private OutputFormat outputFormat;
private Set<String> namedOutputs;
private Map<String, RecordWriter> recordWriters;
+ private boolean countersEnabled;
/**
* Creates and initializes multiple named outputs support, it should be
@@ -338,6 +392,7 @@
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
recordWriters = new HashMap<String, RecordWriter>();
+ countersEnabled = getCountersEnabled(job);
}
/**
@@ -354,20 +409,56 @@
// MultithreaderMapRunner.
private synchronized RecordWriter getRecordWriter(String namedOutput,
String baseFileName,
- Reporter reporter)
+ final Reporter reporter)
throws IOException {
RecordWriter writer = recordWriters.get(baseFileName);
if (writer == null) {
+ if (countersEnabled && reporter == null) {
+ throw new IllegalArgumentException(
+ "Counters are enabled, Reporter cannot be NULL");
+ }
JobConf jobConf = new JobConf(conf);
jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
FileSystem fs = FileSystem.get(conf);
writer =
outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
+
+ if (countersEnabled) {
+ if (reporter == null) {
+ throw new IllegalArgumentException(
+ "Counters are enabled, Reporter cannot be NULL");
+ }
+ writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
+ }
+
recordWriters.put(baseFileName, writer);
}
return writer;
}
+ private static class RecordWriterWithCounter implements RecordWriter {
+ private RecordWriter writer;
+ private String counterName;
+ private Reporter reporter;
+
+ public RecordWriterWithCounter(RecordWriter writer, String counterName,
+ Reporter reporter) {
+ this.writer = writer;
+ this.counterName = counterName;
+ this.reporter = reporter;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public void write(Object key, Object value) throws IOException {
+ reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
+ writer.write(key, value);
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ writer.close(reporter);
+ }
+ }
+
/**
* Gets the output collector for a named output.
* <p/>
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java?rev=679859&r1=679858&r2=679859&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
Fri Jul 25 09:59:16 2008
@@ -37,8 +37,16 @@
super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
}
+ public void testWithoutCounters() throws Exception {
+ _testMultipleOutputs(false);
+ }
+
+ public void testWithCounters() throws Exception {
+ _testMultipleOutputs(true);
+ }
+
@SuppressWarnings({"unchecked"})
- public void testMultipleOutputs() throws Exception {
+ protected void _testMultipleOutputs(boolean withCounters) throws Exception {
Path inDir = new Path("testing/mo/input");
Path outDir = new Path("testing/mo/output");
@@ -85,6 +93,8 @@
MultipleOutputs.addMultiNamedOutput(conf, "sequence",
SequenceFileOutputFormat.class, LongWritable.class, Text.class);
+ MultipleOutputs.setCountersEnabled(conf, withCounters);
+
conf.setMapperClass(MOMap.class);
conf.setReducerClass(MOReduce.class);
@@ -147,6 +157,20 @@
reader.close();
assertFalse(count == 0);
+ Counters.Group counters =
+ job.getCounters().getGroup(MultipleOutputs.class.getName());
+ if (!withCounters) {
+ assertEquals(0, counters.size());
+ }
+ else {
+ assertEquals(4, counters.size());
+ assertEquals(4, counters.getCounter("text"));
+ assertEquals(2, counters.getCounter("sequence_A"));
+ assertEquals(4, counters.getCounter("sequence_B"));
+ assertEquals(2, counters.getCounter("sequence_C"));
+
+ }
+
}
@SuppressWarnings({"unchecked"})