Author: edwardyoon
Date: Wed Nov 30 01:46:43 2011
New Revision: 1208208
URL: http://svn.apache.org/viewvc?rev=1208208&view=rev
Log:
Add Counters to Hama Jobs
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Nov 30 01:46:43 2011
@@ -4,6 +4,7 @@ Release 0.4 - Unreleased
NEW FEATURES
+ HAMA-479: Add Counters to Hama Jobs (edwardyoon)
HAMA-454: Add Zookeeper as synchronization service to YARN (tjungblut)
HAMA-258: Add Input Output system (edwardyoon)
HAMA-458: Add Message Combiner (edwardyoon)
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
Wed Nov 30 01:46:43 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.Constants;
import org.apache.hama.util.KeyValuePair;
+import org.apache.hama.bsp.Counters.Counter;
/**
* BSP communication interface.
@@ -129,4 +130,43 @@ public interface BSPPeer<KEYIN, VALUEIN,
* @return the jobs configuration
*/
public Configuration getConfiguration();
+
+ /**
+ * Get the {@link Counter} of the given group with the given name.
+ *
+ * @param name counter name
+ * @return the <code>Counter</code> of the given group/name.
+ */
+ public Counter getCounter(Enum<?> name);
+
+ /**
+ * Get the {@link Counter} of the given group with the given name.
+ *
+ * @param group counter group
+ * @param name counter name
+ * @return the <code>Counter</code> of the given group/name.
+ */
+ public Counter getCounter(String group, String name);
+
+ /**
+ * Increments the counter identified by the key, which can be of
+ * any {@link Enum} type, by the specified amount.
+ *
+ * @param key key to identify the counter to be incremented. The key can be
+ * be any <code>Enum</code>.
+ * @param amount A non-negative amount by which the counter is to
+ * be incremented.
+ */
+ public void incrCounter(Enum<?> key, long amount);
+
+ /**
+ * Increments the counter identified by the group and counter name
+ * by the specified amount.
+ *
+ * @param group name to identify the group of the counter to be incremented.
+ * @param counter name to identify the counter within the group.
+ * @param amount A non-negative amount by which the counter is to
+ * be incremented.
+ */
+ public void incrCounter(String group, String counter, long amount);
}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
Wed Nov 30 01:46:43 2011
@@ -39,6 +39,7 @@ import org.apache.hama.bsp.sync.SyncClie
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.util.KeyValuePair;
+import org.apache.hama.bsp.Counters.Counter;
/**
* This class represents a BSP peer.
@@ -48,6 +49,10 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
+ protected static enum PeerCounter {
+ SUPERSTEPS
+ }
+
private final Configuration conf;
private final FileSystem fs;
private BSPJob bspJob;
@@ -72,6 +77,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
private RecordWriter<KEYOUT, VALUEOUT> outWriter;
private InetSocketAddress peerAddress;
+ private Counters counters;
/**
* Protected default constructor for LocalBSPRunner.
@@ -104,7 +110,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
*/
public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
BSPPeerProtocol umbilical, int partition, String splitClass,
- BytesWritable split) throws Exception {
+ BytesWritable split, Counters counters) throws Exception {
this.conf = conf;
this.taskId = taskId;
this.umbilical = umbilical;
@@ -113,6 +119,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
this.partition = partition;
this.splitClass = splitClass;
this.split = split;
+ this.counters = counters;
this.fs = FileSystem.get(conf);
@@ -130,7 +137,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 0,
TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
- TaskStatus.Phase.STARTING));
+ TaskStatus.Phase.STARTING, counters));
messenger = MessageManagerFactory.getMessageManager(conf);
messenger.init(conf, peerAddress);
@@ -146,8 +153,8 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
// just output something when the user configured it
if (conf.get("bsp.output.dir") != null) {
- Path outdir = new Path(conf.get("bsp.output.dir"),
- Task.getOutputName(partition));
+ Path outdir = new Path(conf.get("bsp.output.dir"), Task
+ .getOutputName(partition));
outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob,
outdir.makeQualified(fs).toString());
final RecordWriter<KEYOUT, VALUEOUT> finalOut = outWriter;
@@ -252,24 +259,24 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
}
leaveBarrier();
- currentTaskStatus.incrementSuperstepCount();
+ incrCounter(PeerCounter.SUPERSTEPS, 1);
+ currentTaskStatus.setCounters(counters);
umbilical.statusUpdate(taskId, currentTaskStatus);
// Clear outgoing queues.
messenger.clearOutgoingQueues();
} catch (Exception e) {
- LOG.fatal(
- "Caught exception during superstep "
- + currentTaskStatus.getSuperstepCount() + "!", e);
+ LOG.fatal("Caught exception during superstep "
+ + currentTaskStatus.getSuperstepCount() + "!", e);
}
}
private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
Combiner.class)) {
- Combiner combiner = (Combiner) ReflectionUtils.newInstance(
- conf.getClass("bsp.combiner.class", Combiner.class), conf);
+ Combiner combiner = (Combiner) ReflectionUtils.newInstance(conf.getClass(
+ "bsp.combiner.class", Combiner.class), conf);
return combiner.combine(messages);
} else {
@@ -282,13 +289,13 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
}
protected void enterBarrier() throws Exception {
- syncClient.enterBarrier(taskId.getJobID(), taskId,
- currentTaskStatus.getSuperstepCount());
+ syncClient.enterBarrier(taskId.getJobID(), taskId, currentTaskStatus
+ .getSuperstepCount());
}
protected void leaveBarrier() throws Exception {
- syncClient.leaveBarrier(taskId.getJobID(), taskId,
- currentTaskStatus.getSuperstepCount());
+ syncClient.leaveBarrier(taskId.getJobID(), taskId, currentTaskStatus
+ .getSuperstepCount());
}
public void close() throws Exception {
@@ -404,4 +411,31 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
initInput();
}
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return counters == null ? null : counters.findCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ Counters.Counter counter = null;
+ if (counters != null) {
+ counter = counters.findCounter(group, name);
+ }
+ return counter;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ if (counters != null) {
+ counters.incrCounter(key, amount);
+ }
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+ if (counters != null) {
+ counters.incrCounter(group, counter, amount);
+ }
+ }
}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java?rev=1208208&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
Wed Nov 30 01:46:43 2011
@@ -0,0 +1,772 @@
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
+
+public class Counters implements Writable, Iterable<Counters.Group> {
+ private static final Log LOG = LogFactory.getLog(Counters.class);
+ private static final char GROUP_OPEN = '{';
+ private static final char GROUP_CLOSE = '}';
+ private static final char COUNTER_OPEN = '[';
+ private static final char COUNTER_CLOSE = ']';
+ private static final char UNIT_OPEN = '(';
+ private static final char UNIT_CLOSE = ')';
+ private static char[] charsToEscape = { GROUP_OPEN, GROUP_CLOSE,
+ COUNTER_OPEN, COUNTER_CLOSE, UNIT_OPEN, UNIT_CLOSE };
+
+ // private static Log log = LogFactory.getLog("Counters.class");
+
+ /**
+ * A counter record, comprising its name and value.
+ */
+ public static class Counter implements Writable {
+ private String name;
+ private String displayName;
+ private long value = 0;
+
+ public Counter() {
+ }
+
+ public Counter(String name, String displayName, long value) {
+ this(name, displayName);
+ increment(value);
+ }
+
+ public Counter(String name, String displayName) {
+ this.name = name;
+ this.displayName = displayName;
+ }
+
+ public synchronized void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ /**
+ * Read the binary representation of the counter
+ */
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ name = Text.readString(in);
+ if (in.readBoolean()) {
+ displayName = Text.readString(in);
+ } else {
+ displayName = name;
+ }
+ value = WritableUtils.readVLong(in);
+ }
+
+ /**
+ * Write the binary representation of the counter
+ */
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ Text.writeString(out, name);
+ boolean distinctDisplayName = !name.equals(displayName);
+ out.writeBoolean(distinctDisplayName);
+ if (distinctDisplayName) {
+ Text.writeString(out, displayName);
+ }
+ WritableUtils.writeVLong(out, value);
+ }
+
+ public synchronized String getName() {
+ return name;
+ }
+
+ /**
+ * Get the name of the counter.
+ *
+ * @return the user facing name of the counter
+ */
+ public synchronized String getDisplayName() {
+ return displayName;
+ }
+
+ /**
+ * What is the current value of this counter?
+ *
+ * @return the current value
+ */
+ public synchronized long getValue() {
+ return value;
+ }
+
+ /**
+ * Increment this counter by the given value
+ *
+ * @param incr the value to increase this counter by
+ */
+ public synchronized void increment(long incr) {
+ value += incr;
+ }
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ if (genericRight instanceof Counter) {
+ synchronized (genericRight) {
+ Counter right = (Counter) genericRight;
+ return name.equals(right.name)
+ && displayName.equals(right.displayName) && value == right.value;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ return name.hashCode() + displayName.hashCode();
+ }
+
+ /**
+ * Returns the compact stringified version of the counter in the format
+ * [(actual-name)(display-name)(value)]
+ */
+ public synchronized String makeEscapedCompactString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(COUNTER_OPEN);
+
+ // Add the counter name
+ buf.append(UNIT_OPEN);
+ buf.append(escape(getName()));
+ buf.append(UNIT_CLOSE);
+
+ // Add the display name
+ buf.append(UNIT_OPEN);
+ buf.append(escape(getDisplayName()));
+ buf.append(UNIT_CLOSE);
+
+ // Add the value
+ buf.append(UNIT_OPEN);
+ buf.append(this.getValue());
+ buf.append(UNIT_CLOSE);
+
+ buf.append(COUNTER_CLOSE);
+
+ return buf.toString();
+ }
+
+ // Checks for (content) equality of two (basic) counters
+ synchronized boolean contentEquals(Counter c) {
+ return this.equals(c);
+ }
+
+ /**
+ * What is the current value of this counter?
+ *
+ * @return the current value
+ */
+ public synchronized long getCounter() {
+ return getValue();
+ }
+
+ }
+
+ /**
+ * <code>Group</code> of counters, comprising of counters from a particular
+ * counter {@link Enum} class.
+ *
+ * <p>
+ * <code>Group</code>handles localization of the class name and the counter
+ * names.
+ * </p>
+ */
+ public static class Group implements Writable, Iterable<Counter> {
+ private String groupName;
+ private String displayName;
+ private Map<String, Counter> subcounters = new HashMap<String, Counter>();
+
+ // Optional ResourceBundle for localization of group and counter names.
+ private ResourceBundle bundle = null;
+
+ Group(String groupName) {
+ try {
+ bundle = getResourceBundle(groupName);
+ } catch (MissingResourceException neverMind) {
+ }
+ this.groupName = groupName;
+ this.displayName = localize("CounterGroupName", groupName);
+ LOG.debug("Creating group " + groupName + " with "
+ + (bundle == null ? "nothing" : "bundle"));
+ }
+
+ /**
+ * Returns the specified resource bundle, or throws an exception.
+ *
+ * @throws MissingResourceException if the bundle isn't found
+ */
+ private static ResourceBundle getResourceBundle(String enumClassName) {
+ String bundleName = enumClassName.replace('$', '_');
+ return ResourceBundle.getBundle(bundleName);
+ }
+
+ /**
+ * Returns raw name of the group. This is the name of the enum class for
+ * this group of counters.
+ */
+ public String getName() {
+ return groupName;
+ }
+
+ /**
+ * Returns localized name of the group. This is the same as getName() by
+ * default, but different if an appropriate ResourceBundle is found.
+ */
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ /**
+ * Set the display name
+ */
+ public void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ /**
+ * Returns the compact stringified version of the group in the format
+ * {(actual-name)(display-name)(value)[][][]} where [] are compact strings
+ * for the counters within.
+ */
+ public String makeEscapedCompactString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(GROUP_OPEN); // group start
+
+ // Add the group name
+ buf.append(UNIT_OPEN);
+ buf.append(escape(getName()));
+ buf.append(UNIT_CLOSE);
+
+ // Add the display name
+ buf.append(UNIT_OPEN);
+ buf.append(escape(getDisplayName()));
+ buf.append(UNIT_CLOSE);
+
+ // write the value
+ for (Counter counter : subcounters.values()) {
+ buf.append(counter.makeEscapedCompactString());
+ }
+
+ buf.append(GROUP_CLOSE); // group end
+ return buf.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return subcounters.hashCode();
+ }
+
+ /**
+ * Checks for (content) equality of Groups
+ */
+ @Override
+ public synchronized boolean equals(Object obj) {
+ boolean isEqual = false;
+ if (obj != null && obj instanceof Group) {
+ Group g = (Group) obj;
+ if (size() == g.size()) {
+ isEqual = true;
+ for (Map.Entry<String, Counter> entry : subcounters.entrySet()) {
+ String key = entry.getKey();
+ Counter c1 = entry.getValue();
+ Counter c2 = g.getCounterForName(key);
+ if (!c1.contentEquals(c2)) {
+ isEqual = false;
+ break;
+ }
+ }
+ }
+ }
+ return isEqual;
+ }
+
+ /**
+ * Returns the value of the specified counter, or 0 if the counter does not
+ * exist.
+ */
+ public synchronized long getCounter(String counterName) {
+ for (Counter counter : subcounters.values()) {
+ if (counter != null && counter.getDisplayName().equals(counterName)) {
+ return counter.getValue();
+ }
+ }
+ return 0L;
+ }
+
+ /**
+ * Get the counter for the given id and create it if it doesn't exist.
+ *
+ * @param id the numeric id of the counter within the group
+ * @param name the internal counter name
+ * @return the counter
+ * @deprecated use {@link #getCounter(String)} instead
+ */
+ @Deprecated
+ public synchronized Counter getCounter(int id, String name) {
+ return getCounterForName(name);
+ }
+
+ /**
+ * Get the counter for the given name and create it if it doesn't exist.
+ *
+ * @param name the internal counter name
+ * @return the counter
+ */
+ public synchronized Counter getCounterForName(String name) {
+ Counter result = subcounters.get(name);
+ if (result == null) {
+ LOG.debug("Adding " + name);
+ result = new Counter(name, localize(name + ".name", name), 0L);
+ subcounters.put(name, result);
+ }
+ return result;
+ }
+
+ /**
+ * Returns the number of counters in this group.
+ */
+ public synchronized int size() {
+ return subcounters.size();
+ }
+
+ /**
+ * Looks up key in the ResourceBundle and returns the corresponding value.
+ * If the bundle or the key doesn't exist, returns the default value.
+ */
+ private String localize(String key, String defaultValue) {
+ String result = defaultValue;
+ if (bundle != null) {
+ try {
+ result = bundle.getString(key);
+ } catch (MissingResourceException mre) {
+ }
+ }
+ return result;
+ }
+
+ public synchronized void write(DataOutput out) throws IOException {
+ Text.writeString(out, displayName);
+ WritableUtils.writeVInt(out, subcounters.size());
+ for (Counter counter : subcounters.values()) {
+ counter.write(out);
+ }
+ }
+
+ public synchronized void readFields(DataInput in) throws IOException {
+ displayName = Text.readString(in);
+ subcounters.clear();
+ int size = WritableUtils.readVInt(in);
+ for (int i = 0; i < size; i++) {
+ Counter counter = new Counter();
+ counter.readFields(in);
+ subcounters.put(counter.getName(), counter);
+ }
+ }
+
+ public synchronized Iterator<Counter> iterator() {
+ return new ArrayList<Counter>(subcounters.values()).iterator();
+ }
+ }
+
+ // Map from group name (enum class name) to map of int (enum ordinal) to
+ // counter record (name-value pair).
+ private Map<String, Group> counters = new HashMap<String, Group>();
+
+ /**
+ * A cache from enum values to the associated counter. Dramatically speeds up
+ * typical usage.
+ */
+ private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
+
+ /**
+ * Returns the names of all counter classes.
+ *
+ * @return Set of counter names.
+ */
+ public synchronized Collection<String> getGroupNames() {
+ return counters.keySet();
+ }
+
+ public synchronized Iterator<Group> iterator() {
+ return counters.values().iterator();
+ }
+
+ /**
+ * Returns the named counter group, or an empty group if there is none with
+ * the specified name.
+ */
+ public synchronized Group getGroup(String groupName) {
+ Group result = counters.get(groupName);
+ if (result == null) {
+ result = new Group(groupName);
+ counters.put(groupName, result);
+ }
+ return result;
+ }
+
+ /**
+ * Find the counter for the given enum. The same enum will always return the
+ * same counter.
+ *
+ * @param key the counter key
+ * @return the matching counter object
+ */
+ public synchronized Counter findCounter(Enum key) {
+ Counter counter = cache.get(key);
+ if (counter == null) {
+ Group group = getGroup(key.getDeclaringClass().getName());
+ counter = group.getCounterForName(key.toString());
+ cache.put(key, counter);
+ }
+ return counter;
+ }
+
+ /**
+ * Find a counter given the group and the name.
+ *
+ * @param group the name of the group
+ * @param name the internal name of the counter
+ * @return the counter for that name
+ */
+ public synchronized Counter findCounter(String group, String name) {
+ return getGroup(group).getCounterForName(name);
+ }
+
+ /**
+ * Find a counter by using strings
+ *
+ * @param group the name of the group
+ * @param id the id of the counter within the group (0 to N-1)
+ * @param name the internal name of the counter
+ * @return the counter for that name
+ * @deprecated
+ */
+ @Deprecated
+ public synchronized Counter findCounter(String group, int id, String name) {
+ return getGroup(group).getCounterForName(name);
+ }
+
+ /**
+ * Increments the specified counter by the specified amount, creating it if
it
+ * didn't already exist.
+ *
+ * @param key identifies a counter
+ * @param amount amount by which counter is to be incremented
+ */
+ public synchronized void incrCounter(Enum key, long amount) {
+ findCounter(key).increment(amount);
+ }
+
+ /**
+ * Increments the specified counter by the specified amount, creating it if
it
+ * didn't already exist.
+ *
+ * @param group the name of the group
+ * @param counter the internal name of the counter
+ * @param amount amount by which counter is to be incremented
+ */
+ public synchronized void incrCounter(String group, String counter, long
amount) {
+ getGroup(group).getCounterForName(counter).increment(amount);
+ }
+
+ /**
+ * Returns current value of the specified counter, or 0 if the counter does
+ * not exist.
+ */
+ public synchronized long getCounter(Enum key) {
+ return findCounter(key).getValue();
+ }
+
+ /**
+ * Increments multiple counters by their amounts in another Counters
instance.
+ *
+ * @param other the other Counters instance
+ */
+ public synchronized void incrAllCounters(Counters other) {
+ for (Group otherGroup : other) {
+ Group group = getGroup(otherGroup.getName());
+ group.displayName = otherGroup.displayName;
+ for (Counter otherCounter : otherGroup) {
+ Counter counter = group.getCounterForName(otherCounter.getName());
+ counter.setDisplayName(otherCounter.getDisplayName());
+ counter.increment(otherCounter.getValue());
+ }
+ }
+ }
+
+ /**
+ * Convenience method for computing the sum of two sets of counters.
+ */
+ public static Counters sum(Counters a, Counters b) {
+ Counters counters = new Counters();
+ counters.incrAllCounters(a);
+ counters.incrAllCounters(b);
+ return counters;
+ }
+
+ /**
+ * Returns the total number of counters, by summing the number of counters in
+ * each group.
+ */
+ public synchronized int size() {
+ int result = 0;
+ for (Group group : this) {
+ result += group.size();
+ }
+ return result;
+ }
+
+ /**
+ * Write the set of groups. The external format is: #groups (groupName
group)*
+ *
+ * i.e. the number of groups followed by 0 or more groups, where each group
is
+ * of the form:
+ *
+ * groupDisplayName #counters (false | true counter)*
+ *
+ * where each counter is of the form:
+ *
+ * name (false | true displayName) value
+ */
+ public synchronized void write(DataOutput out) throws IOException {
+ out.writeInt(counters.size());
+ for (Group group : counters.values()) {
+ Text.writeString(out, group.getName());
+ group.write(out);
+ }
+ }
+
+ /**
+ * Read a set of groups.
+ */
+ public synchronized void readFields(DataInput in) throws IOException {
+ int numClasses = in.readInt();
+ counters.clear();
+ while (numClasses-- > 0) {
+ String groupName = Text.readString(in);
+ Group group = new Group(groupName);
+ group.readFields(in);
+ counters.put(groupName, group);
+ }
+ }
+
+ /**
+ * Logs the current counter values.
+ *
+ * @param log The log to use.
+ */
+ public void log(Log log) {
+ log.info("Counters: " + size());
+ for (Group group : this) {
+ log.info(" " + group.getDisplayName());
+ for (Counter counter : group) {
+ log
+ .info(" " + counter.getDisplayName() + "="
+ + counter.getCounter());
+ }
+ }
+ }
+
+ /**
+ * Return textual representation of the counter values.
+ */
+ public synchronized String toString() {
+ StringBuilder sb = new StringBuilder("Counters: " + size());
+ for (Group group : this) {
+ sb.append("\n\t" + group.getDisplayName());
+ for (Counter counter : group) {
+ sb.append("\n\t\t" + counter.getDisplayName() + "="
+ + counter.getCounter());
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Convert a counters object into a single line that is easy to parse.
+ *
+ * @return the string with "name=value" for each counter and separated by ","
+ */
+ public synchronized String makeCompactString() {
+ StringBuffer buffer = new StringBuffer();
+ boolean first = true;
+ for (Group group : this) {
+ for (Counter counter : group) {
+ if (first) {
+ first = false;
+ } else {
+ buffer.append(',');
+ }
+ buffer.append(group.getDisplayName());
+ buffer.append('.');
+ buffer.append(counter.getDisplayName());
+ buffer.append(':');
+ buffer.append(counter.getCounter());
+ }
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * Represent the counter in a textual format that can be converted back to
its
+ * object form
+ *
+ * @return the string in the following format
+ * {(groupname)(group-displayname)[(
+ * countername)(displayname)(value)][][]}{}{}
+ */
+ public synchronized String makeEscapedCompactString() {
+ StringBuffer buffer = new StringBuffer();
+ for (Group group : this) {
+ buffer.append(group.makeEscapedCompactString());
+ }
+ return buffer.toString();
+ }
+
+ // Extracts a block (data enclosed within delimeters) ignoring escape
+ // sequences. Throws ParseException if an incomplete block is found else
+ // returns null.
+ private static String getBlock(String str, char open, char close,
+ IntWritable index) throws ParseException {
+ StringBuilder split = new StringBuilder();
+ int next = StringUtils.findNext(str, open, StringUtils.ESCAPE_CHAR, index
+ .get(), split);
+ split.setLength(0); // clear the buffer
+ if (next >= 0) {
+ ++next; // move over '('
+
+ next = StringUtils.findNext(str, close, StringUtils.ESCAPE_CHAR, next,
+ split);
+ if (next >= 0) {
+ ++next; // move over ')'
+ index.set(next);
+ return split.toString(); // found a block
+ } else {
+ throw new ParseException("Unexpected end of block", next);
+ }
+ }
+ return null; // found nothing
+ }
+
+ /**
+ * Convert a stringified counter representation into a counter object. Note
+ * that the counter can be recovered if its stringified using
+ * {@link #makeEscapedCompactString()}.
+ *
+ * @return a Counter
+ */
+ public static Counters fromEscapedCompactString(String compactString)
+ throws ParseException {
+ Counters counters = new Counters();
+ IntWritable index = new IntWritable(0);
+
+ // Get the group to work on
+ String groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE,
index);
+
+ while (groupString != null) {
+ IntWritable groupIndex = new IntWritable(0);
+
+ // Get the actual name
+ String groupName = getBlock(groupString, UNIT_OPEN, UNIT_CLOSE,
+ groupIndex);
+ groupName = unescape(groupName);
+
+ // Get the display name
+ String groupDisplayName = getBlock(groupString, UNIT_OPEN, UNIT_CLOSE,
+ groupIndex);
+ groupDisplayName = unescape(groupDisplayName);
+
+ // Get the counters
+ Group group = counters.getGroup(groupName);
+ group.setDisplayName(groupDisplayName);
+
+ String counterString = getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE,
+ groupIndex);
+
+ while (counterString != null) {
+ IntWritable counterIndex = new IntWritable(0);
+
+ // Get the actual name
+ String counterName = getBlock(counterString, UNIT_OPEN, UNIT_CLOSE,
+ counterIndex);
+ counterName = unescape(counterName);
+
+ // Get the display name
+ String counterDisplayName = getBlock(counterString, UNIT_OPEN,
+ UNIT_CLOSE, counterIndex);
+ counterDisplayName = unescape(counterDisplayName);
+
+ // Get the value
+ long value = Long.parseLong(getBlock(counterString, UNIT_OPEN,
+ UNIT_CLOSE, counterIndex));
+
+ // Add the counter
+ Counter counter = group.getCounterForName(counterName);
+ counter.setDisplayName(counterDisplayName);
+ counter.increment(value);
+
+ // Get the next counter
+ counterString = getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE,
+ groupIndex);
+ }
+
+ groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
+ }
+ return counters;
+ }
+
+ // Escapes all the delimiters for counters i.e {,[,(,),],}
+ private static String escape(String string) {
+ return StringUtils.escapeString(string, StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ }
+
+ // Unescapes all the delimiters for counters i.e {,[,(,),],}
+ private static String unescape(String string) {
+ return StringUtils.unEscapeString(string, StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ return counters.hashCode();
+ }
+
+ @Override
+ public synchronized boolean equals(Object obj) {
+ boolean isEqual = false;
+ if (obj != null && obj instanceof Counters) {
+ Counters other = (Counters) obj;
+ if (size() == other.size()) {
+ isEqual = true;
+ for (Map.Entry<String, Group> entry : this.counters.entrySet()) {
+ String key = entry.getKey();
+ Group sourceGroup = entry.getValue();
+ Group targetGroup = other.getGroup(key);
+ if (!sourceGroup.equals(targetGroup)) {
+ isEqual = false;
+ break;
+ }
+ }
+ }
+ }
+ return isEqual;
+ }
+}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Wed Nov 30 01:46:43 2011
@@ -740,7 +740,7 @@ public class GroomServer implements Runn
this.localJobConf = null;
this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
TaskStatus.State.UNASSIGNED, "init", groomServer,
- TaskStatus.Phase.STARTING);
+ TaskStatus.Phase.STARTING, task.getCounters());
}
private void localizeTask(Task task) throws IOException {
@@ -920,7 +920,7 @@ public class GroomServer implements Runn
// instantiate and init our peer
@SuppressWarnings("rawtypes")
BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, defaultConf,
- taskid, umbilical, task.partition, task.splitClass, task.split);
+ taskid, umbilical, task.partition, task.splitClass, task.split,
task.getCounters());
task.run(job, bspPeer, umbilical); // run the task
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
Wed Nov 30 01:46:43 2011
@@ -83,8 +83,8 @@ class JobInProgress {
this.jobFile = jobFile;
this.master = master;
- this.status = new JobStatus(jobId, null, 0L, 0L,
- JobStatus.State.PREP.value());
+ this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
+ .value());
this.startTime = System.currentTimeMillis();
this.superstepCounter = 0;
this.restartCount = 0;
@@ -99,12 +99,11 @@ class JobInProgress {
fs.copyToLocalFile(jobFile, localJobFile);
BSPJob job = new BSPJob(jobId, localJobFile.toString());
this.jobSplit = job.getConf().get("bsp.job.split.file");
-
- this.numBSPTasks = job.getNumBspTask();
+ this.numBSPTasks = job.getNumBspTask();
- this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
- job.getJobName());
+ this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
+ .getJobName());
this.setJobName(job.getJobName());
@@ -269,9 +268,9 @@ class JobInProgress {
}
if (allDone) {
- this.status = new JobStatus(this.status.getJobID(),
- this.profile.getUser(), superstepCounter, superstepCounter,
- superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
+ this.status = new JobStatus(this.status.getJobID(), this.profile
+ .getUser(), superstepCounter, superstepCounter, superstepCounter,
+ JobStatus.SUCCEEDED, superstepCounter);
this.finishTime = System.currentTimeMillis();
this.status.setFinishTime(this.finishTime);
@@ -303,9 +302,8 @@ class JobInProgress {
}
if (allDone) {
- this.status = new JobStatus(this.status.getJobID(),
- this.profile.getUser(), superstepCounter, superstepCounter,
- superstepCounter, JobStatus.FAILED, superstepCounter);
+ this.status = new JobStatus(this.status.getJobID(), this.profile
+ .getUser(), 0L, 0L, 0L, JobStatus.FAILED, superstepCounter);
this.finishTime = System.currentTimeMillis();
this.status.setFinishTime(this.finishTime);
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
Wed Nov 30 01:46:43 2011
@@ -204,7 +204,7 @@ public class LocalBSPRunner implements J
}
// this class will spawn a new thread and executes the BSP
- @SuppressWarnings({ "deprecation", "rawtypes" })
+ @SuppressWarnings( { "deprecation", "rawtypes" })
class BSPRunner implements Callable<BSP> {
private Configuration conf;
@@ -212,6 +212,7 @@ public class LocalBSPRunner implements J
private int id;
private BSP bsp;
private RawSplit[] splits;
+ private Counters counters = new Counters();
public BSPRunner(Configuration conf, BSPJob job, int id, RawSplit[]
splits) {
super();
@@ -224,8 +225,8 @@ public class LocalBSPRunner implements J
conf.setInt(Constants.PEER_PORT, id);
conf.set(Constants.PEER_HOST, "local");
- bsp = (BSP) ReflectionUtils.newInstance(
- job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+ bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
+ "bsp.work.class", BSP.class), job.getConf());
}
@@ -242,7 +243,7 @@ public class LocalBSPRunner implements J
BSPPeerImpl peer = new BSPPeerImpl(job, conf, new TaskAttemptID(
new TaskID(job.getJobID(), id), id), new LocalUmbilical(), id,
- splitname, realBytes);
+ splitname, realBytes, counters);
bsp.setConf(conf);
try {
@@ -333,7 +334,8 @@ public class LocalBSPRunner implements J
inetSocketAddress = BSPNetUtils.getAddress(peerName);
socketCache.put(peerName, inetSocketAddress);
}
- LinkedList<BSPMessage> msgs =
localOutgoingMessages.get(inetSocketAddress);
+ LinkedList<BSPMessage> msgs = localOutgoingMessages
+ .get(inetSocketAddress);
if (msgs == null) {
msgs = new LinkedList<BSPMessage>();
}
@@ -478,7 +480,5 @@ public class LocalBSPRunner implements J
public void close() throws Exception {
}
-
}
-
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Wed
Nov 30 01:46:43 2011
@@ -45,6 +45,9 @@ public abstract class Task implements Wr
protected LocalDirAllocator lDirAlloc;
+ // Current counters
+ private transient Counters counters = new Counters();
+
public Task() {
jobId = new BSPJobID();
taskId = new TaskAttemptID();
@@ -149,5 +152,7 @@ public abstract class Task implements Wr
public abstract BSPJob getConf();
public abstract void setConf(BSPJob localJobConf);
+
+ Counters getCounters() { return counters; }
}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
Wed Nov 30 01:46:43 2011
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.BSPPeerImpl.PeerCounter;
/**
* Describes the current status of a task. This is not intended to be a
@@ -50,24 +51,24 @@ public class TaskStatus implements Writa
private volatile State runState;
private String stateString;
private String groomServer;
- private long superstepCount;
private long startTime;
private long finishTime;
private volatile Phase phase = Phase.STARTING;
+ private Counters counters;
+
/**
*
*/
public TaskStatus() {
jobId = new BSPJobID();
taskId = new TaskAttemptID();
- this.superstepCount = 0;
}
public TaskStatus(BSPJobID jobId, TaskAttemptID taskId, float progress,
- State runState, String stateString, String groomServer, Phase phase) {
+ State runState, String stateString, String groomServer, Phase phase,
Counters counters) {
this.jobId = jobId;
this.taskId = taskId;
this.progress = progress;
@@ -75,7 +76,7 @@ public class TaskStatus implements Writa
this.stateString = stateString;
this.groomServer = groomServer;
this.phase = phase;
- this.superstepCount = 0;
+ this.counters = counters;
}
// //////////////////////////////////////////////////
@@ -94,10 +95,6 @@ public class TaskStatus implements Writa
return progress;
}
- public void setSuperstepCount(long superstepCount) {
- this.superstepCount = superstepCount;
- }
-
public void setProgress(float progress) {
this.progress = progress;
}
@@ -171,6 +168,20 @@ public class TaskStatus implements Writa
}
/**
+ * Get task's counters.
+ */
+ public Counters getCounters() {
+ return counters;
+ }
+ /**
+ * Set the task's counters.
+ * @param counters
+ */
+ public void setCounters(Counters counters) {
+ this.counters = counters;
+ }
+
+ /**
* Update the status of the task.
*
* This update is done by ping thread before sending the status.
@@ -179,9 +190,10 @@ public class TaskStatus implements Writa
* @param state
* @param counters
*/
- synchronized void statusUpdate(float progress, String state) {
+ synchronized void statusUpdate(float progress, String state, Counters
counters) {
setProgress(progress);
setStateString(state);
+ setCounters(counters);
}
/**
@@ -190,7 +202,8 @@ public class TaskStatus implements Writa
* @param status updated status
*/
synchronized void statusUpdate(TaskStatus status) {
- this.superstepCount = status.getSuperstepCount();
+ this.counters = status.getCounters();
+
this.progress = status.getProgress();
this.runState = status.getRunState();
this.stateString = status.getStateString();
@@ -211,16 +224,14 @@ public class TaskStatus implements Writa
* This update is done in BSPMaster when a cleanup attempt of task reports
its
* status. Then update only specific fields, not all.
*
- * @param superstepCount
* @param runState
* @param progress
* @param state
* @param phase
* @param finishTime
*/
- synchronized void statusUpdate(long superstepCount, State runState, float
progress, String state,
+ synchronized void statusUpdate(State runState, float progress, String state,
Phase phase, long finishTime) {
- setSuperstepCount(superstepCount);
setRunState(runState);
setProgress(progress);
setStateString(state);
@@ -234,14 +245,7 @@ public class TaskStatus implements Writa
* @return The number of BSP super steps executed by the task.
*/
public long getSuperstepCount() {
- return superstepCount;
- }
-
- /**
- * Increments the number of BSP super steps executed by the task.
- */
- public void incrementSuperstepCount() {
- superstepCount += 1;
+ return counters.getCounter(PeerCounter.SUPERSTEPS);
}
@Override
@@ -268,7 +272,9 @@ public class TaskStatus implements Writa
this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong();
this.finishTime = in.readLong();
- this.superstepCount = in.readLong();
+
+ counters = new Counters();
+ this.counters.readFields(in);
}
@Override
@@ -281,6 +287,7 @@ public class TaskStatus implements Writa
WritableUtils.writeEnum(out, phase);
out.writeLong(startTime);
out.writeLong(finishTime);
- out.writeLong(superstepCount);
+
+ counters.write(out);
}
}