Author: omalley
Date: Mon Aug 18 16:16:36 2008
New Revision: 686893
URL: http://svn.apache.org/viewvc?rev=686893&view=rev
Log:
HADOOP-1480. Add counters to the C++ Pipes API. (acmurthy via omalley)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh
hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc
hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc
hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Aug 18 16:16:36 2008
@@ -92,6 +92,8 @@
HADOOP-3585. FailMon package for hardware failure monitoring and
analysis of anomalies. (Ioannis Koltsidas via dhruba)
+ HADOOP-1480. Add counters to the C++ Pipes API. (acmurthy via omalley)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Modified: hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/pipes/api/hadoop/Pipes.hh?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh (original)
+++ hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh Mon Aug 18 16:16:36 2008
@@ -57,6 +57,19 @@
class TaskContext {
public:
/**
+ * Counter to keep track of a property and its value.
+ */
+ class Counter {
+ private:
+ int id;
+ public:
+ Counter(int counterId) : id(counterId) {}
+ Counter(const Counter& counter) : id(counter.id) {}
+
+ int getId() const { return id; }
+ };
+
+ /**
* Get the JobConf for the current task.
*/
virtual const JobConf* getJobConf() = 0;
@@ -89,6 +102,17 @@
*/
virtual void setStatus(const std::string& status) = 0;
+ /**
+ * Register a counter with the given group and name.
+ */
+ virtual Counter*
+ getCounter(const std::string& group, const std::string& name) = 0;
+
+ /**
+ * Increment the value of the counter with the given amount.
+ */
+ virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
+
virtual ~TaskContext() {}
};
Modified: hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc Mon Aug 18 16:16:36 2008
@@ -99,6 +99,10 @@
virtual void status(const string& message) = 0;
virtual void progress(float progress) = 0;
virtual void done() = 0;
+ virtual void registerCounter(int id, const string& group,
+ const string& name) = 0;
+ virtual void
+ incrementCounter(const TaskContext::Counter* counter, uint64_t amount) =
0;
virtual ~UpwardProtocol() {}
};
@@ -150,6 +154,19 @@
lineSeparator);
}
+ virtual void registerCounter(int id, const string& group,
+ const string& name) {
+ fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id,
+ fieldSeparator, group.c_str(), fieldSeparator, name.c_str(),
+ lineSeparator);
+ }
+
+ virtual void incrementCounter(const TaskContext::Counter* counter,
+ uint64_t amount) {
+ fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator,
counter->getId(),
+ fieldSeparator, (long)amount, lineSeparator);
+ }
+
virtual void done() {
fprintf(stream, "done%c", lineSeparator);
}
@@ -272,8 +289,9 @@
enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP,
MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE,
- CLOSE, ABORT,
- OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE};
+ CLOSE, ABORT,
+ OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
+ REGISTER_COUNTER, INCREMENT_COUNTER};
class BinaryUpwardProtocol: public UpwardProtocol {
private:
@@ -313,6 +331,21 @@
serializeInt(DONE, *stream);
}
+ virtual void registerCounter(int id, const string& group,
+ const string& name) {
+ serializeInt(REGISTER_COUNTER, *stream);
+ serializeInt(id, *stream);
+ serializeString(group, *stream);
+ serializeString(name, *stream);
+ }
+
+ virtual void incrementCounter(const TaskContext::Counter* counter,
+ uint64_t amount) {
+ serializeInt(INCREMENT_COUNTER, *stream);
+ serializeInt(counter->getId(), *stream);
+ serializeLong(amount, *stream);
+ }
+
~BinaryUpwardProtocol() {
delete stream;
}
@@ -505,6 +538,14 @@
return valueItr != endValueItr;
}
+ virtual Counter* getCounter(const std::string& group,
+ const std::string& name) {
+ return baseContext->getCounter(group, name);
+ }
+
+ virtual void incrementCounter(const Counter* counter, uint64_t amount) {
+ baseContext->incrementCounter(counter, amount);
+ }
};
/**
@@ -586,6 +627,7 @@
int numReduces;
const Factory* factory;
pthread_mutex_t mutexDone;
+ std::vector<int> registeredCounterIds;
public:
@@ -838,6 +880,24 @@
}
}
+ /**
+ * Register a counter with the given group and name.
+ */
+ virtual Counter* getCounter(const std::string& group,
+ const std::string& name) {
+ int id = registeredCounterIds.size();
+ registeredCounterIds.push_back(id);
+ uplink->registerCounter(id, group, name);
+ return new Counter(id);
+ }
+
+ /**
+ * Increment the value of the counter with the given amount.
+ */
+ virtual void incrementCounter(const Counter* counter, uint64_t amount) {
+ uplink->incrementCounter(counter, amount);
+ }
+
void closeAll() {
if (reader) {
reader->close();
Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc Mon Aug 18
16:16:36 2008
@@ -24,27 +24,43 @@
#include <sys/types.h>
#include <sys/stat.h>
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
class WordCountMap: public HadoopPipes::Mapper {
public:
- WordCountMap(HadoopPipes::MapContext& context){}
+ HadoopPipes::TaskContext::Counter* inputWords;
+
+ WordCountMap(HadoopPipes::TaskContext& context) {
+ inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+ }
+
void map(HadoopPipes::MapContext& context) {
std::vector<std::string> words =
HadoopUtils::splitString(context.getInputValue(), " ");
for(unsigned int i=0; i < words.size(); ++i) {
context.emit(words[i], "1");
}
+ context.incrementCounter(inputWords, words.size());
}
};
class WordCountReduce: public HadoopPipes::Reducer {
public:
- WordCountReduce(HadoopPipes::ReduceContext& context){}
+ HadoopPipes::TaskContext::Counter* outputWords;
+
+ WordCountReduce(HadoopPipes::TaskContext& context) {
+ outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+ }
+
void reduce(HadoopPipes::ReduceContext& context) {
int sum = 0;
while (context.nextValue()) {
sum += HadoopUtils::toInt(context.getInputValue());
}
context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+ context.incrementCounter(outputWords, 1);
}
};
Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc Mon Aug 18
16:16:36 2008
@@ -20,27 +20,43 @@
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
class WordCountMap: public HadoopPipes::Mapper {
public:
- WordCountMap(HadoopPipes::TaskContext& context){}
+ HadoopPipes::TaskContext::Counter* inputWords;
+
+ WordCountMap(HadoopPipes::TaskContext& context) {
+ inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+ }
+
void map(HadoopPipes::MapContext& context) {
std::vector<std::string> words =
HadoopUtils::splitString(context.getInputValue(), " ");
for(unsigned int i=0; i < words.size(); ++i) {
context.emit(words[i], "1");
}
+ context.incrementCounter(inputWords, words.size());
}
};
class WordCountReduce: public HadoopPipes::Reducer {
public:
- WordCountReduce(HadoopPipes::TaskContext& context){}
+ HadoopPipes::TaskContext::Counter* outputWords;
+
+ WordCountReduce(HadoopPipes::TaskContext& context) {
+ outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+ }
+
void reduce(HadoopPipes::ReduceContext& context) {
int sum = 0;
while (context.nextValue()) {
sum += HadoopUtils::toInt(context.getInputValue());
}
context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+ context.incrementCounter(outputWords, 1);
}
};
Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc Mon Aug 18
16:16:36 2008
@@ -20,27 +20,43 @@
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
class WordCountMap: public HadoopPipes::Mapper {
public:
- WordCountMap(HadoopPipes::TaskContext& context){}
+ HadoopPipes::TaskContext::Counter* inputWords;
+
+ WordCountMap(HadoopPipes::TaskContext& context) {
+ inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+ }
+
void map(HadoopPipes::MapContext& context) {
std::vector<std::string> words =
HadoopUtils::splitString(context.getInputValue(), " ");
for(unsigned int i=0; i < words.size(); ++i) {
context.emit(words[i], "1");
}
+ context.incrementCounter(inputWords, words.size());
}
};
class WordCountReduce: public HadoopPipes::Reducer {
public:
- WordCountReduce(HadoopPipes::TaskContext& context){}
+ HadoopPipes::TaskContext::Counter* outputWords;
+
+ WordCountReduce(HadoopPipes::TaskContext& context) {
+ outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+ }
+
void reduce(HadoopPipes::ReduceContext& context) {
int sum = 0;
while (context.nextValue()) {
sum += HadoopUtils::toInt(context.getInputValue());
}
context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+ context.incrementCounter(outputWords, 1);
}
};
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java Mon Aug
18 16:16:36 2008
@@ -319,6 +319,16 @@
}
/**
+ * Find a counter given the group and the name.
+ * @param group the name of the group
+ * @param name the internal name of the counter
+ * @return the counter for that name
+ */
+ public synchronized Counter findCounter(String group, String name) {
+ return getGroup(group).getCounterForName(name);
+ }
+
+ /**
* Find a counter by using strings
* @param group the name of the group
* @param id the id of the counter within the group (0 to N-1)
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java Mon Aug
18 16:16:36 2008
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.util.Progressable;
/**
@@ -46,6 +47,9 @@
}
public void progress() {
}
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
public void incrCounter(Enum key, long amount) {
}
public void incrCounter(String group, String counter, long amount) {
@@ -63,6 +67,15 @@
public abstract void setStatus(String status);
/**
+ * Get the [EMAIL PROTECTED] Counter} of the given group with the given name.
+ *
+ * @param group counter group
+ * @param name counter name
+ * @return the <code>Counter</code> of the given group/name.
+ */
+ public abstract Counter getCounter(String group, String name);
+
+ /**
* Increments the counter identified by the key, which can be of
* any [EMAIL PROTECTED] Enum} type, by the specified amount.
*
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Mon Aug 18
16:16:36 2008
@@ -48,6 +48,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
@@ -408,6 +409,13 @@
// indicate that progress update needs to be sent
setProgressFlag();
}
+ public Counters.Counter getCounter(String group, String name) {
+ Counters.Counter counter = null;
+ if (counters != null) {
+ counter = counters.findCounter(group, name);
+ }
+ return counter;
+ }
public void incrCounter(Enum key, long amount) {
if (counters != null) {
counters.incrCounter(key, amount);
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
Mon Aug 18 16:16:36 2008
@@ -73,7 +73,9 @@
PARTITIONED_OUTPUT(51),
STATUS(52),
PROGRESS(53),
- DONE(54);
+ DONE(54),
+ REGISTER_COUNTER(55),
+ INCREMENT_COUNTER(56);
final int code;
MessageType(int code) {
this.code = code;
@@ -124,6 +126,15 @@
handler.status(Text.readString(inStream));
} else if (cmd == MessageType.PROGRESS.code) {
handler.progress(inStream.readFloat());
+ } else if (cmd == MessageType.REGISTER_COUNTER.code) {
+ int id = WritableUtils.readVInt(inStream);
+ String group = Text.readString(inStream);
+ String name = Text.readString(inStream);
+ handler.registerCounter(id, group, name);
+ } else if (cmd == MessageType.INCREMENT_COUNTER.code) {
+ int id = WritableUtils.readVInt(inStream);
+ long amount = WritableUtils.readVLong(inStream);
+ handler.incrementCounter(id, amount);
} else if (cmd == MessageType.DONE.code) {
LOG.debug("Pipe child done");
handler.done();
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
Mon Aug 18 16:16:36 2008
@@ -19,9 +19,14 @@
package org.apache.hadoop.mapred.pipes;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
@@ -37,7 +42,9 @@
private float progressValue = 0.0f;
private boolean done = false;
private Throwable exception = null;
-
+ private Map<Integer, Counters.Counter> registeredCounters =
+ new HashMap<Integer, Counters.Counter>();
+
/**
* Create a handler that will handle any records output from the application.
* @param collector the "real" collector that takes the output
@@ -121,4 +128,19 @@
}
return done;
}
+
+ public void registerCounter(int id, String group, String name) throws
IOException {
+ Counters.Counter counter = reporter.getCounter(group, name);
+ registeredCounters.put(id, counter);
+ }
+
+ public void incrementCounter(int id, long amount) throws IOException {
+ if (id < registeredCounters.size()) {
+ Counters.Counter counter = registeredCounters.get(id);
+ counter.increment(amount);
+ } else {
+ throw new IOException("Invalid counter with id: " + id);
+ }
+ }
+
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
Mon Aug 18 16:16:36 2008
@@ -72,4 +72,20 @@
* @param e
*/
void failed(Throwable e);
+
+ /**
+ * Register a counter with the given id and group/name.
+ * @param group counter group
+ * @param name counter name
+ * @throws IOException
+ */
+ void registerCounter(int id, String group, String name) throws IOException;
+
+ /**
+ * Increment the value of a registered counter.
+ * @param id counter id of the registered counter
+ * @param amount increment for the counter value
+ * @throws IOException
+ */
+ void incrementCounter(int id, long amount) throws IOException;
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
Mon Aug 18 16:16:36 2008
@@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -37,6 +38,7 @@
import org.apache.hadoop.mapred.OutputLogFilter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.util.StringUtils;
import junit.framework.TestCase;
@@ -164,6 +166,15 @@
rJob = Submitter.runJob(job);
}
assertTrue("pipes job failed", rJob.isSuccessful());
+
+ Counters counters = rJob.getCounters();
+ Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
+ int numCounters = 0;
+ for (Counter c : wordCountCounters) {
+ System.out.println(c);
+ ++numCounters;
+ }
+ assertTrue("No counters found!", (numCounters > 0));
}
List<String> results = new ArrayList<String>();