Author: edwardyoon
Date: Wed Nov 30 01:46:43 2011
New Revision: 1208208

URL: http://svn.apache.org/viewvc?rev=1208208&view=rev
Log:
Add Counters to Hama Jobs

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Nov 30 01:46:43 2011
@@ -4,6 +4,7 @@ Release 0.4 - Unreleased
 
   NEW FEATURES
    
+   HAMA-479: Add Counters to Hama Jobs (edwardyoon)
    HAMA-454: Add Zookeeper as synchronization service to YARN (tjungblut)
    HAMA-258: Add Input Output system (edwardyoon)
    HAMA-458: Add Message Combiner (edwardyoon)

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java 
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java 
Wed Nov 30 01:46:43 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.Constants;
 import org.apache.hama.util.KeyValuePair;
+import org.apache.hama.bsp.Counters.Counter;
 
 /**
  * BSP communication interface.
@@ -129,4 +130,43 @@ public interface BSPPeer<KEYIN, VALUEIN,
    * @return the jobs configuration
    */
   public Configuration getConfiguration();
+  
+  /**
+   * Get the {@link Counter} of the given group with the given name.
+   * 
+   * @param name counter name
+   * @return the <code>Counter</code> of the given group/name.
+   */
+  public Counter getCounter(Enum<?> name);
+
+  /**
+   * Get the {@link Counter} of the given group with the given name.
+   * 
+   * @param group counter group
+   * @param name counter name
+   * @return the <code>Counter</code> of the given group/name.
+   */
+  public Counter getCounter(String group, String name);
+  
+  /**
+   * Increments the counter identified by the key, which can be of
+   * any {@link Enum} type, by the specified amount.
+   * 
+   * @param key key to identify the counter to be incremented. The key can be
+   *            be any <code>Enum</code>. 
+   * @param amount A non-negative amount by which the counter is to 
+   *               be incremented.
+   */
+  public void incrCounter(Enum<?> key, long amount);
+  
+  /**
+   * Increments the counter identified by the group and counter name
+   * by the specified amount.
+   * 
+   * @param group name to identify the group of the counter to be incremented.
+   * @param counter name to identify the counter within the group.
+   * @param amount A non-negative amount by which the counter is to 
+   *               be incremented.
+   */
+  public void incrCounter(String group, String counter, long amount);
 }

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java 
Wed Nov 30 01:46:43 2011
@@ -39,6 +39,7 @@ import org.apache.hama.bsp.sync.SyncClie
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.util.KeyValuePair;
+import org.apache.hama.bsp.Counters.Counter;
 
 /**
  * This class represents a BSP peer.
@@ -48,6 +49,10 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
 
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
+  protected static enum PeerCounter {
+    SUPERSTEPS
+  }
+
   private final Configuration conf;
   private final FileSystem fs;
   private BSPJob bspJob;
@@ -72,6 +77,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   private RecordWriter<KEYOUT, VALUEOUT> outWriter;
 
   private InetSocketAddress peerAddress;
+  private Counters counters;
 
   /**
    * Protected default constructor for LocalBSPRunner.
@@ -104,7 +110,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    */
   public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
       BSPPeerProtocol umbilical, int partition, String splitClass,
-      BytesWritable split) throws Exception {
+      BytesWritable split, Counters counters) throws Exception {
     this.conf = conf;
     this.taskId = taskId;
     this.umbilical = umbilical;
@@ -113,6 +119,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     this.partition = partition;
     this.splitClass = splitClass;
     this.split = split;
+    this.counters = counters;
 
     this.fs = FileSystem.get(conf);
 
@@ -130,7 +137,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
     setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 0,
         TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
-        TaskStatus.Phase.STARTING));
+        TaskStatus.Phase.STARTING, counters));
 
     messenger = MessageManagerFactory.getMessageManager(conf);
     messenger.init(conf, peerAddress);
@@ -146,8 +153,8 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
 
     // just output something when the user configured it
     if (conf.get("bsp.output.dir") != null) {
-      Path outdir = new Path(conf.get("bsp.output.dir"),
-          Task.getOutputName(partition));
+      Path outdir = new Path(conf.get("bsp.output.dir"), Task
+          .getOutputName(partition));
       outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob,
           outdir.makeQualified(fs).toString());
       final RecordWriter<KEYOUT, VALUEOUT> finalOut = outWriter;
@@ -252,24 +259,24 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
       }
 
       leaveBarrier();
-      currentTaskStatus.incrementSuperstepCount();
+      incrCounter(PeerCounter.SUPERSTEPS, 1);
+      currentTaskStatus.setCounters(counters);
       umbilical.statusUpdate(taskId, currentTaskStatus);
 
       // Clear outgoing queues.
       messenger.clearOutgoingQueues();
 
     } catch (Exception e) {
-      LOG.fatal(
-          "Caught exception during superstep "
-              + currentTaskStatus.getSuperstepCount() + "!", e);
+      LOG.fatal("Caught exception during superstep "
+          + currentTaskStatus.getSuperstepCount() + "!", e);
     }
   }
 
   private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
     if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
         Combiner.class)) {
-      Combiner combiner = (Combiner) ReflectionUtils.newInstance(
-          conf.getClass("bsp.combiner.class", Combiner.class), conf);
+      Combiner combiner = (Combiner) ReflectionUtils.newInstance(conf.getClass(
+          "bsp.combiner.class", Combiner.class), conf);
 
       return combiner.combine(messages);
     } else {
@@ -282,13 +289,13 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   protected void enterBarrier() throws Exception {
-    syncClient.enterBarrier(taskId.getJobID(), taskId,
-        currentTaskStatus.getSuperstepCount());
+    syncClient.enterBarrier(taskId.getJobID(), taskId, currentTaskStatus
+        .getSuperstepCount());
   }
 
   protected void leaveBarrier() throws Exception {
-    syncClient.leaveBarrier(taskId.getJobID(), taskId,
-        currentTaskStatus.getSuperstepCount());
+    syncClient.leaveBarrier(taskId.getJobID(), taskId, currentTaskStatus
+        .getSuperstepCount());
   }
 
   public void close() throws Exception {
@@ -404,4 +411,31 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     initInput();
   }
 
+  @Override
+  public Counter getCounter(Enum<?> name) {
+    return counters == null ? null : counters.findCounter(name);
+  }
+
+  @Override
+  public Counter getCounter(String group, String name) {
+    Counters.Counter counter = null;
+    if (counters != null) {
+      counter = counters.findCounter(group, name);
+    }
+    return counter;
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    if (counters != null) {
+      counters.incrCounter(key, amount);
+    }
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount) {
+    if (counters != null) {
+      counters.incrCounter(group, counter, amount);
+    }
+  }
 }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java?rev=1208208&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java 
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java 
Wed Nov 30 01:46:43 2011
@@ -0,0 +1,772 @@
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
+
+public class Counters implements Writable, Iterable<Counters.Group> {
+  private static final Log LOG = LogFactory.getLog(Counters.class);
+  private static final char GROUP_OPEN = '{';
+  private static final char GROUP_CLOSE = '}';
+  private static final char COUNTER_OPEN = '[';
+  private static final char COUNTER_CLOSE = ']';
+  private static final char UNIT_OPEN = '(';
+  private static final char UNIT_CLOSE = ')';
+  private static char[] charsToEscape = { GROUP_OPEN, GROUP_CLOSE,
+      COUNTER_OPEN, COUNTER_CLOSE, UNIT_OPEN, UNIT_CLOSE };
+
+  // private static Log log = LogFactory.getLog("Counters.class");
+
+  /**
+   * A counter record, comprising its name and value.
+   */
+  public static class Counter implements Writable {
+    private String name;
+    private String displayName;
+    private long value = 0;
+
+    public Counter() {
+    }
+
+    public Counter(String name, String displayName, long value) {
+      this(name, displayName);
+      increment(value);
+    }
+
+    public Counter(String name, String displayName) {
+      this.name = name;
+      this.displayName = displayName;
+    }
+
+    public synchronized void setDisplayName(String displayName) {
+      this.displayName = displayName;
+    }
+
+    /**
+     * Read the binary representation of the counter
+     */
+    @Override
+    public synchronized void readFields(DataInput in) throws IOException {
+      name = Text.readString(in);
+      if (in.readBoolean()) {
+        displayName = Text.readString(in);
+      } else {
+        displayName = name;
+      }
+      value = WritableUtils.readVLong(in);
+    }
+
+    /**
+     * Write the binary representation of the counter
+     */
+    @Override
+    public synchronized void write(DataOutput out) throws IOException {
+      Text.writeString(out, name);
+      boolean distinctDisplayName = !name.equals(displayName);
+      out.writeBoolean(distinctDisplayName);
+      if (distinctDisplayName) {
+        Text.writeString(out, displayName);
+      }
+      WritableUtils.writeVLong(out, value);
+    }
+
+    public synchronized String getName() {
+      return name;
+    }
+
+    /**
+     * Get the name of the counter.
+     * 
+     * @return the user facing name of the counter
+     */
+    public synchronized String getDisplayName() {
+      return displayName;
+    }
+
+    /**
+     * What is the current value of this counter?
+     * 
+     * @return the current value
+     */
+    public synchronized long getValue() {
+      return value;
+    }
+
+    /**
+     * Increment this counter by the given value
+     * 
+     * @param incr the value to increase this counter by
+     */
+    public synchronized void increment(long incr) {
+      value += incr;
+    }
+
+    @Override
+    public synchronized boolean equals(Object genericRight) {
+      if (genericRight instanceof Counter) {
+        synchronized (genericRight) {
+          Counter right = (Counter) genericRight;
+          return name.equals(right.name)
+              && displayName.equals(right.displayName) && value == right.value;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public synchronized int hashCode() {
+      return name.hashCode() + displayName.hashCode();
+    }
+
+    /**
+     * Returns the compact stringified version of the counter in the format
+     * [(actual-name)(display-name)(value)]
+     */
+    public synchronized String makeEscapedCompactString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append(COUNTER_OPEN);
+
+      // Add the counter name
+      buf.append(UNIT_OPEN);
+      buf.append(escape(getName()));
+      buf.append(UNIT_CLOSE);
+
+      // Add the display name
+      buf.append(UNIT_OPEN);
+      buf.append(escape(getDisplayName()));
+      buf.append(UNIT_CLOSE);
+
+      // Add the value
+      buf.append(UNIT_OPEN);
+      buf.append(this.getValue());
+      buf.append(UNIT_CLOSE);
+
+      buf.append(COUNTER_CLOSE);
+
+      return buf.toString();
+    }
+
+    // Checks for (content) equality of two (basic) counters
+    synchronized boolean contentEquals(Counter c) {
+      return this.equals(c);
+    }
+
+    /**
+     * What is the current value of this counter?
+     * 
+     * @return the current value
+     */
+    public synchronized long getCounter() {
+      return getValue();
+    }
+
+  }
+
+  /**
+   * <code>Group</code> of counters, comprising of counters from a particular
+   * counter {@link Enum} class.
+   * 
+   * <p>
+   * <code>Group</code>handles localization of the class name and the counter
+   * names.
+   * </p>
+   */
+  public static class Group implements Writable, Iterable<Counter> {
+    private String groupName;
+    private String displayName;
+    private Map<String, Counter> subcounters = new HashMap<String, Counter>();
+
+    // Optional ResourceBundle for localization of group and counter names.
+    private ResourceBundle bundle = null;
+
+    Group(String groupName) {
+      try {
+        bundle = getResourceBundle(groupName);
+      } catch (MissingResourceException neverMind) {
+      }
+      this.groupName = groupName;
+      this.displayName = localize("CounterGroupName", groupName);
+      LOG.debug("Creating group " + groupName + " with "
+          + (bundle == null ? "nothing" : "bundle"));
+    }
+
+    /**
+     * Returns the specified resource bundle, or throws an exception.
+     * 
+     * @throws MissingResourceException if the bundle isn't found
+     */
+    private static ResourceBundle getResourceBundle(String enumClassName) {
+      String bundleName = enumClassName.replace('$', '_');
+      return ResourceBundle.getBundle(bundleName);
+    }
+
+    /**
+     * Returns raw name of the group. This is the name of the enum class for
+     * this group of counters.
+     */
+    public String getName() {
+      return groupName;
+    }
+
+    /**
+     * Returns localized name of the group. This is the same as getName() by
+     * default, but different if an appropriate ResourceBundle is found.
+     */
+    public String getDisplayName() {
+      return displayName;
+    }
+
+    /**
+     * Set the display name
+     */
+    public void setDisplayName(String displayName) {
+      this.displayName = displayName;
+    }
+
+    /**
+     * Returns the compact stringified version of the group in the format
+     * {(actual-name)(display-name)(value)[][][]} where [] are compact strings
+     * for the counters within.
+     */
+    public String makeEscapedCompactString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append(GROUP_OPEN); // group start
+
+      // Add the group name
+      buf.append(UNIT_OPEN);
+      buf.append(escape(getName()));
+      buf.append(UNIT_CLOSE);
+
+      // Add the display name
+      buf.append(UNIT_OPEN);
+      buf.append(escape(getDisplayName()));
+      buf.append(UNIT_CLOSE);
+
+      // write the value
+      for (Counter counter : subcounters.values()) {
+        buf.append(counter.makeEscapedCompactString());
+      }
+
+      buf.append(GROUP_CLOSE); // group end
+      return buf.toString();
+    }
+
+    @Override
+    public int hashCode() {
+      return subcounters.hashCode();
+    }
+
+    /**
+     * Checks for (content) equality of Groups
+     */
+    @Override
+    public synchronized boolean equals(Object obj) {
+      boolean isEqual = false;
+      if (obj != null && obj instanceof Group) {
+        Group g = (Group) obj;
+        if (size() == g.size()) {
+          isEqual = true;
+          for (Map.Entry<String, Counter> entry : subcounters.entrySet()) {
+            String key = entry.getKey();
+            Counter c1 = entry.getValue();
+            Counter c2 = g.getCounterForName(key);
+            if (!c1.contentEquals(c2)) {
+              isEqual = false;
+              break;
+            }
+          }
+        }
+      }
+      return isEqual;
+    }
+
+    /**
+     * Returns the value of the specified counter, or 0 if the counter does not
+     * exist.
+     */
+    public synchronized long getCounter(String counterName) {
+      for (Counter counter : subcounters.values()) {
+        if (counter != null && counter.getDisplayName().equals(counterName)) {
+          return counter.getValue();
+        }
+      }
+      return 0L;
+    }
+
+    /**
+     * Get the counter for the given id and create it if it doesn't exist.
+     * 
+     * @param id the numeric id of the counter within the group
+     * @param name the internal counter name
+     * @return the counter
+     * @deprecated use {@link #getCounter(String)} instead
+     */
+    @Deprecated
+    public synchronized Counter getCounter(int id, String name) {
+      return getCounterForName(name);
+    }
+
+    /**
+     * Get the counter for the given name and create it if it doesn't exist.
+     * 
+     * @param name the internal counter name
+     * @return the counter
+     */
+    public synchronized Counter getCounterForName(String name) {
+      Counter result = subcounters.get(name);
+      if (result == null) {
+        LOG.debug("Adding " + name);
+        result = new Counter(name, localize(name + ".name", name), 0L);
+        subcounters.put(name, result);
+      }
+      return result;
+    }
+
+    /**
+     * Returns the number of counters in this group.
+     */
+    public synchronized int size() {
+      return subcounters.size();
+    }
+
+    /**
+     * Looks up key in the ResourceBundle and returns the corresponding value.
+     * If the bundle or the key doesn't exist, returns the default value.
+     */
+    private String localize(String key, String defaultValue) {
+      String result = defaultValue;
+      if (bundle != null) {
+        try {
+          result = bundle.getString(key);
+        } catch (MissingResourceException mre) {
+        }
+      }
+      return result;
+    }
+
+    public synchronized void write(DataOutput out) throws IOException {
+      Text.writeString(out, displayName);
+      WritableUtils.writeVInt(out, subcounters.size());
+      for (Counter counter : subcounters.values()) {
+        counter.write(out);
+      }
+    }
+
+    public synchronized void readFields(DataInput in) throws IOException {
+      displayName = Text.readString(in);
+      subcounters.clear();
+      int size = WritableUtils.readVInt(in);
+      for (int i = 0; i < size; i++) {
+        Counter counter = new Counter();
+        counter.readFields(in);
+        subcounters.put(counter.getName(), counter);
+      }
+    }
+
+    public synchronized Iterator<Counter> iterator() {
+      return new ArrayList<Counter>(subcounters.values()).iterator();
+    }
+  }
+
+  // Map from group name (enum class name) to map of int (enum ordinal) to
+  // counter record (name-value pair).
+  private Map<String, Group> counters = new HashMap<String, Group>();
+
+  /**
+   * A cache from enum values to the associated counter. Dramatically speeds up
+   * typical usage.
+   */
+  private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
+
+  /**
+   * Returns the names of all counter classes.
+   * 
+   * @return Set of counter names.
+   */
+  public synchronized Collection<String> getGroupNames() {
+    return counters.keySet();
+  }
+
+  public synchronized Iterator<Group> iterator() {
+    return counters.values().iterator();
+  }
+
+  /**
+   * Returns the named counter group, or an empty group if there is none with
+   * the specified name.
+   */
+  public synchronized Group getGroup(String groupName) {
+    Group result = counters.get(groupName);
+    if (result == null) {
+      result = new Group(groupName);
+      counters.put(groupName, result);
+    }
+    return result;
+  }
+
+  /**
+   * Find the counter for the given enum. The same enum will always return the
+   * same counter.
+   * 
+   * @param key the counter key
+   * @return the matching counter object
+   */
+  public synchronized Counter findCounter(Enum key) {
+    Counter counter = cache.get(key);
+    if (counter == null) {
+      Group group = getGroup(key.getDeclaringClass().getName());
+      counter = group.getCounterForName(key.toString());
+      cache.put(key, counter);
+    }
+    return counter;
+  }
+
+  /**
+   * Find a counter given the group and the name.
+   * 
+   * @param group the name of the group
+   * @param name the internal name of the counter
+   * @return the counter for that name
+   */
+  public synchronized Counter findCounter(String group, String name) {
+    return getGroup(group).getCounterForName(name);
+  }
+
+  /**
+   * Find a counter by using strings
+   * 
+   * @param group the name of the group
+   * @param id the id of the counter within the group (0 to N-1)
+   * @param name the internal name of the counter
+   * @return the counter for that name
+   * @deprecated
+   */
+  @Deprecated
+  public synchronized Counter findCounter(String group, int id, String name) {
+    return getGroup(group).getCounterForName(name);
+  }
+
+  /**
+   * Increments the specified counter by the specified amount, creating it if 
it
+   * didn't already exist.
+   * 
+   * @param key identifies a counter
+   * @param amount amount by which counter is to be incremented
+   */
+  public synchronized void incrCounter(Enum key, long amount) {
+    findCounter(key).increment(amount);
+  }
+
+  /**
+   * Increments the specified counter by the specified amount, creating it if 
it
+   * didn't already exist.
+   * 
+   * @param group the name of the group
+   * @param counter the internal name of the counter
+   * @param amount amount by which counter is to be incremented
+   */
+  public synchronized void incrCounter(String group, String counter, long 
amount) {
+    getGroup(group).getCounterForName(counter).increment(amount);
+  }
+
+  /**
+   * Returns current value of the specified counter, or 0 if the counter does
+   * not exist.
+   */
+  public synchronized long getCounter(Enum key) {
+    return findCounter(key).getValue();
+  }
+
+  /**
+   * Increments multiple counters by their amounts in another Counters 
instance.
+   * 
+   * @param other the other Counters instance
+   */
+  public synchronized void incrAllCounters(Counters other) {
+    for (Group otherGroup : other) {
+      Group group = getGroup(otherGroup.getName());
+      group.displayName = otherGroup.displayName;
+      for (Counter otherCounter : otherGroup) {
+        Counter counter = group.getCounterForName(otherCounter.getName());
+        counter.setDisplayName(otherCounter.getDisplayName());
+        counter.increment(otherCounter.getValue());
+      }
+    }
+  }
+
+  /**
+   * Convenience method for computing the sum of two sets of counters.
+   */
+  public static Counters sum(Counters a, Counters b) {
+    Counters counters = new Counters();
+    counters.incrAllCounters(a);
+    counters.incrAllCounters(b);
+    return counters;
+  }
+
+  /**
+   * Returns the total number of counters, by summing the number of counters in
+   * each group.
+   */
+  public synchronized int size() {
+    int result = 0;
+    for (Group group : this) {
+      result += group.size();
+    }
+    return result;
+  }
+
+  /**
+   * Write the set of groups. The external format is: #groups (groupName 
group)*
+   * 
+   * i.e. the number of groups followed by 0 or more groups, where each group 
is
+   * of the form:
+   * 
+   * groupDisplayName #counters (false | true counter)*
+   * 
+   * where each counter is of the form:
+   * 
+   * name (false | true displayName) value
+   */
+  public synchronized void write(DataOutput out) throws IOException {
+    out.writeInt(counters.size());
+    for (Group group : counters.values()) {
+      Text.writeString(out, group.getName());
+      group.write(out);
+    }
+  }
+
+  /**
+   * Read a set of groups.
+   */
+  public synchronized void readFields(DataInput in) throws IOException {
+    int numClasses = in.readInt();
+    counters.clear();
+    while (numClasses-- > 0) {
+      String groupName = Text.readString(in);
+      Group group = new Group(groupName);
+      group.readFields(in);
+      counters.put(groupName, group);
+    }
+  }
+
+  /**
+   * Logs the current counter values.
+   * 
+   * @param log The log to use.
+   */
+  public void log(Log log) {
+    log.info("Counters: " + size());
+    for (Group group : this) {
+      log.info("  " + group.getDisplayName());
+      for (Counter counter : group) {
+        log
+            .info("    " + counter.getDisplayName() + "="
+                + counter.getCounter());
+      }
+    }
+  }
+
+  /**
+   * Return textual representation of the counter values.
+   */
+  public synchronized String toString() {
+    StringBuilder sb = new StringBuilder("Counters: " + size());
+    for (Group group : this) {
+      sb.append("\n\t" + group.getDisplayName());
+      for (Counter counter : group) {
+        sb.append("\n\t\t" + counter.getDisplayName() + "="
+            + counter.getCounter());
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Convert a counters object into a single line that is easy to parse.
+   * 
+   * @return the string with "name=value" for each counter and separated by ","
+   */
+  public synchronized String makeCompactString() {
+    StringBuffer buffer = new StringBuffer();
+    boolean first = true;
+    for (Group group : this) {
+      for (Counter counter : group) {
+        if (first) {
+          first = false;
+        } else {
+          buffer.append(',');
+        }
+        buffer.append(group.getDisplayName());
+        buffer.append('.');
+        buffer.append(counter.getDisplayName());
+        buffer.append(':');
+        buffer.append(counter.getCounter());
+      }
+    }
+    return buffer.toString();
+  }
+
+  /**
+   * Represent the counter in a textual format that can be converted back to 
its
+   * object form
+   * 
+   * @return the string in the following format
+   *         {(groupname)(group-displayname)[(
+   *         countername)(displayname)(value)][][]}{}{}
+   */
+  public synchronized String makeEscapedCompactString() {
+    StringBuffer buffer = new StringBuffer();
+    for (Group group : this) {
+      buffer.append(group.makeEscapedCompactString());
+    }
+    return buffer.toString();
+  }
+
+  // Extracts a block (data enclosed within delimeters) ignoring escape
+  // sequences. Throws ParseException if an incomplete block is found else
+  // returns null.
+  private static String getBlock(String str, char open, char close,
+      IntWritable index) throws ParseException {
+    StringBuilder split = new StringBuilder();
+    int next = StringUtils.findNext(str, open, StringUtils.ESCAPE_CHAR, index
+        .get(), split);
+    split.setLength(0); // clear the buffer
+    if (next >= 0) {
+      ++next; // move over '('
+
+      next = StringUtils.findNext(str, close, StringUtils.ESCAPE_CHAR, next,
+          split);
+      if (next >= 0) {
+        ++next; // move over ')'
+        index.set(next);
+        return split.toString(); // found a block
+      } else {
+        throw new ParseException("Unexpected end of block", next);
+      }
+    }
+    return null; // found nothing
+  }
+
+  /**
+   * Convert a stringified counter representation into a counter object. Note
+   * that the counter can be recovered if its stringified using
+   * {@link #makeEscapedCompactString()}.
+   * 
+   * @return a Counter
+   */
+  public static Counters fromEscapedCompactString(String compactString)
+      throws ParseException {
+    Counters counters = new Counters();
+    IntWritable index = new IntWritable(0);
+
+    // Get the group to work on
+    String groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, 
index);
+
+    while (groupString != null) {
+      IntWritable groupIndex = new IntWritable(0);
+
+      // Get the actual name
+      String groupName = getBlock(groupString, UNIT_OPEN, UNIT_CLOSE,
+          groupIndex);
+      groupName = unescape(groupName);
+
+      // Get the display name
+      String groupDisplayName = getBlock(groupString, UNIT_OPEN, UNIT_CLOSE,
+          groupIndex);
+      groupDisplayName = unescape(groupDisplayName);
+
+      // Get the counters
+      Group group = counters.getGroup(groupName);
+      group.setDisplayName(groupDisplayName);
+
+      String counterString = getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE,
+          groupIndex);
+
+      while (counterString != null) {
+        IntWritable counterIndex = new IntWritable(0);
+
+        // Get the actual name
+        String counterName = getBlock(counterString, UNIT_OPEN, UNIT_CLOSE,
+            counterIndex);
+        counterName = unescape(counterName);
+
+        // Get the display name
+        String counterDisplayName = getBlock(counterString, UNIT_OPEN,
+            UNIT_CLOSE, counterIndex);
+        counterDisplayName = unescape(counterDisplayName);
+
+        // Get the value
+        long value = Long.parseLong(getBlock(counterString, UNIT_OPEN,
+            UNIT_CLOSE, counterIndex));
+
+        // Add the counter
+        Counter counter = group.getCounterForName(counterName);
+        counter.setDisplayName(counterDisplayName);
+        counter.increment(value);
+
+        // Get the next counter
+        counterString = getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE,
+            groupIndex);
+      }
+
+      groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
+    }
+    return counters;
+  }
+
+  // Escapes all the delimiters for counters i.e {,[,(,),],}
+  private static String escape(String string) {
+    return StringUtils.escapeString(string, StringUtils.ESCAPE_CHAR,
+        charsToEscape);
+  }
+
+  // Unescapes all the delimiters for counters i.e {,[,(,),],}
+  private static String unescape(String string) {
+    return StringUtils.unEscapeString(string, StringUtils.ESCAPE_CHAR,
+        charsToEscape);
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return counters.hashCode();
+  }
+
+  @Override
+  public synchronized boolean equals(Object obj) {
+    boolean isEqual = false;
+    if (obj != null && obj instanceof Counters) {
+      Counters other = (Counters) obj;
+      if (size() == other.size()) {
+        isEqual = true;
+        for (Map.Entry<String, Group> entry : this.counters.entrySet()) {
+          String key = entry.getKey();
+          Group sourceGroup = entry.getValue();
+          Group targetGroup = other.getGroup(key);
+          if (!sourceGroup.equals(targetGroup)) {
+            isEqual = false;
+            break;
+          }
+        }
+      }
+    }
+    return isEqual;
+  }
+}

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java 
Wed Nov 30 01:46:43 2011
@@ -740,7 +740,7 @@ public class GroomServer implements Runn
       this.localJobConf = null;
       this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
           TaskStatus.State.UNASSIGNED, "init", groomServer,
-          TaskStatus.Phase.STARTING);
+          TaskStatus.Phase.STARTING, task.getCounters());
     }
 
     private void localizeTask(Task task) throws IOException {
@@ -920,7 +920,7 @@ public class GroomServer implements Runn
         // instantiate and init our peer
         @SuppressWarnings("rawtypes")
         BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, defaultConf,
-            taskid, umbilical, task.partition, task.splitClass, task.split);
+            taskid, umbilical, task.partition, task.splitClass, task.split, 
task.getCounters());
 
         task.run(job, bspPeer, umbilical); // run the task
 

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java 
Wed Nov 30 01:46:43 2011
@@ -83,8 +83,8 @@ class JobInProgress {
     this.jobFile = jobFile;
     this.master = master;
 
-    this.status = new JobStatus(jobId, null, 0L, 0L,
-        JobStatus.State.PREP.value());
+    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
+        .value());
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -99,12 +99,11 @@ class JobInProgress {
     fs.copyToLocalFile(jobFile, localJobFile);
     BSPJob job = new BSPJob(jobId, localJobFile.toString());
     this.jobSplit = job.getConf().get("bsp.job.split.file");
-    
-    this.numBSPTasks = job.getNumBspTask();
 
+    this.numBSPTasks = job.getNumBspTask();
 
-    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
-        job.getJobName());
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
+        .getJobName());
 
     this.setJobName(job.getJobName());
 
@@ -269,9 +268,9 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(),
-          this.profile.getUser(), superstepCounter, superstepCounter,
-          superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(), this.profile
+          .getUser(), superstepCounter, superstepCounter, superstepCounter,
+          JobStatus.SUCCEEDED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -303,9 +302,8 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(),
-          this.profile.getUser(), superstepCounter, superstepCounter,
-          superstepCounter, JobStatus.FAILED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(), this.profile
+          .getUser(), 0L, 0L, 0L, JobStatus.FAILED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java 
Wed Nov 30 01:46:43 2011
@@ -204,7 +204,7 @@ public class LocalBSPRunner implements J
   }
 
   // this class will spawn a new thread and executes the BSP
-  @SuppressWarnings({ "deprecation", "rawtypes" })
+  @SuppressWarnings( { "deprecation", "rawtypes" })
   class BSPRunner implements Callable<BSP> {
 
     private Configuration conf;
@@ -212,6 +212,7 @@ public class LocalBSPRunner implements J
     private int id;
     private BSP bsp;
     private RawSplit[] splits;
+    private Counters counters = new Counters();
 
     public BSPRunner(Configuration conf, BSPJob job, int id, RawSplit[] 
splits) {
       super();
@@ -224,8 +225,8 @@ public class LocalBSPRunner implements J
       conf.setInt(Constants.PEER_PORT, id);
       conf.set(Constants.PEER_HOST, "local");
 
-      bsp = (BSP) ReflectionUtils.newInstance(
-          job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+      bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
+          "bsp.work.class", BSP.class), job.getConf());
 
     }
 
@@ -242,7 +243,7 @@ public class LocalBSPRunner implements J
 
       BSPPeerImpl peer = new BSPPeerImpl(job, conf, new TaskAttemptID(
           new TaskID(job.getJobID(), id), id), new LocalUmbilical(), id,
-          splitname, realBytes);
+          splitname, realBytes, counters);
 
       bsp.setConf(conf);
       try {
@@ -333,7 +334,8 @@ public class LocalBSPRunner implements J
         inetSocketAddress = BSPNetUtils.getAddress(peerName);
         socketCache.put(peerName, inetSocketAddress);
       }
-      LinkedList<BSPMessage> msgs = 
localOutgoingMessages.get(inetSocketAddress);
+      LinkedList<BSPMessage> msgs = localOutgoingMessages
+          .get(inetSocketAddress);
       if (msgs == null) {
         msgs = new LinkedList<BSPMessage>();
       }
@@ -478,7 +480,5 @@ public class LocalBSPRunner implements J
     public void close() throws Exception {
 
     }
-
   }
-
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java 
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Wed 
Nov 30 01:46:43 2011
@@ -45,6 +45,9 @@ public abstract class Task implements Wr
 
   protected LocalDirAllocator lDirAlloc;
 
+  // Current counters
+  private transient Counters counters = new Counters();
+  
   public Task() {
     jobId = new BSPJobID();
     taskId = new TaskAttemptID();
@@ -149,5 +152,7 @@ public abstract class Task implements Wr
   
   public abstract BSPJob getConf();
   public abstract void setConf(BSPJob localJobConf);
+
+  Counters getCounters() { return counters; }
   
 }

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1208208&r1=1208207&r2=1208208&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java 
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java 
Wed Nov 30 01:46:43 2011
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.BSPPeerImpl.PeerCounter;
 
 /**
  * Describes the current status of a task. This is not intended to be a
@@ -50,24 +51,24 @@ public class TaskStatus implements Writa
   private volatile State runState;
   private String stateString;
   private String groomServer;
-  private long superstepCount;
 
   private long startTime;
   private long finishTime;
 
   private volatile Phase phase = Phase.STARTING;
 
+  private Counters counters;
+  
   /**
    * 
    */
   public TaskStatus() {
     jobId = new BSPJobID();
     taskId = new TaskAttemptID();
-    this.superstepCount = 0;
   }
 
   public TaskStatus(BSPJobID jobId, TaskAttemptID taskId, float progress,
-      State runState, String stateString, String groomServer, Phase phase) {
+      State runState, String stateString, String groomServer, Phase phase, 
Counters counters) {
     this.jobId = jobId;
     this.taskId = taskId;
     this.progress = progress;
@@ -75,7 +76,7 @@ public class TaskStatus implements Writa
     this.stateString = stateString;
     this.groomServer = groomServer;
     this.phase = phase;
-    this.superstepCount = 0;
+    this.counters = counters;
   }
 
   // //////////////////////////////////////////////////
@@ -94,10 +95,6 @@ public class TaskStatus implements Writa
     return progress;
   }
 
-  public void setSuperstepCount(long superstepCount) {
-    this.superstepCount = superstepCount;  
-  }
-  
   public void setProgress(float progress) {
     this.progress = progress;
   }
@@ -171,6 +168,20 @@ public class TaskStatus implements Writa
   }
 
   /**
+   * Get task's counters.
+   */
+  public Counters getCounters() {
+    return counters;
+  }
+  /**
+   * Set the task's counters.
+   * @param counters
+   */
+  public void setCounters(Counters counters) {
+    this.counters = counters;
+  }
+  
+  /**
    * Update the status of the task.
    * 
    * This update is done by ping thread before sending the status.
@@ -179,9 +190,10 @@ public class TaskStatus implements Writa
    * @param state
    * @param counters
    */
-  synchronized void statusUpdate(float progress, String state) {
+  synchronized void statusUpdate(float progress, String state, Counters 
counters) {
     setProgress(progress);
     setStateString(state);
+    setCounters(counters);
   }
 
   /**
@@ -190,7 +202,8 @@ public class TaskStatus implements Writa
    * @param status updated status
    */
   synchronized void statusUpdate(TaskStatus status) {
-    this.superstepCount = status.getSuperstepCount();
+    this.counters = status.getCounters();
+    
     this.progress = status.getProgress();
     this.runState = status.getRunState();
     this.stateString = status.getStateString();
@@ -211,16 +224,14 @@ public class TaskStatus implements Writa
    * This update is done in BSPMaster when a cleanup attempt of task reports 
its
    * status. Then update only specific fields, not all.
    * 
-   * @param superstepCount
    * @param runState
    * @param progress
    * @param state
    * @param phase
    * @param finishTime
    */
-  synchronized void statusUpdate(long superstepCount, State runState, float 
progress, String state,
+  synchronized void statusUpdate(State runState, float progress, String state,
       Phase phase, long finishTime) {
-    setSuperstepCount(superstepCount);
     setRunState(runState);
     setProgress(progress);
     setStateString(state);
@@ -234,14 +245,7 @@ public class TaskStatus implements Writa
    * @return The number of BSP super steps executed by the task.
    */
   public long getSuperstepCount() {
-    return superstepCount;
-  }
-
-  /**
-   * Increments the number of BSP super steps executed by the task.
-   */
-  public void incrementSuperstepCount() {
-    superstepCount += 1;
+    return counters.getCounter(PeerCounter.SUPERSTEPS);
   }
 
   @Override
@@ -268,7 +272,9 @@ public class TaskStatus implements Writa
     this.phase = WritableUtils.readEnum(in, Phase.class);
     this.startTime = in.readLong();
     this.finishTime = in.readLong();
-    this.superstepCount = in.readLong();
+    
+    counters = new Counters();
+    this.counters.readFields(in);
   }
 
   @Override
@@ -281,6 +287,7 @@ public class TaskStatus implements Writa
     WritableUtils.writeEnum(out, phase);
     out.writeLong(startTime);
     out.writeLong(finishTime);
-    out.writeLong(superstepCount);
+    
+    counters.write(out);
   }
 }


Reply via email to