Author: edwardyoon Date: Tue Feb 28 05:01:58 2012 New Revision: 1294462 URL: http://svn.apache.org/viewvc?rev=1294462&view=rev Log: Exit if there's no update made
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1294462&r1=1294461&r2=1294462&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Feb 28 05:01:58 2012 @@ -28,6 +28,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -41,6 +42,8 @@ import org.apache.hama.util.KeyValuePair public class GraphJobRunner extends BSP { public static final Log LOG = LogFactory.getLog(GraphJobRunner.class); private Map<String, Vertex> vertices = new HashMap<String, Vertex>(); + private String masterTask; + private String FLAG_MESSAGE = "hama.graph.msg.counts"; @SuppressWarnings("unchecked") @Override @@ -52,6 +55,7 @@ public class GraphJobRunner extends BSP boolean updated = true; int iteration = 0; while (updated && iteration < maxIteration) { + int globalUpdateCounts = 0; peer.sync(); MapWritable msg = null; @@ -60,27 +64,50 @@ public class GraphJobRunner extends BSP for (Entry<Writable, Writable> e : msg.entrySet()) { String vertexID = ((Text) e.getKey()).toString(); - Writable value = e.getValue(); - if (msgMap.containsKey(vertexID)) { - LinkedList<Writable> msgs = msgMap.get(vertexID); - msgs.add(value); - msgMap.put(vertexID, msgs); + if (vertexID.toString().equals(FLAG_MESSAGE)) { + if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { + updated = false; + } else { + globalUpdateCounts += ((IntWritable) e.getValue()).get(); + } } else { - LinkedList<Writable> msgs = new LinkedList<Writable>(); - msgs.add(value); - msgMap.put(vertexID, msgs); - } + Writable value = e.getValue(); + if (msgMap.containsKey(vertexID)) { + LinkedList<Writable> msgs = msgMap.get(vertexID); + msgs.add(value); + msgMap.put(vertexID, msgs); + } else { + LinkedList<Writable> msgs = new LinkedList<Writable>(); + msgs.add(value); + msgMap.put(vertexID, msgs); + } + } } } - if (msgMap.size() < 1) { - updated = false; + // exit if there's no update made + if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask) + && peer.getSuperstepCount() > 1) { + MapWritable updatedCnt = new MapWritable(); + updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable( + Integer.MIN_VALUE)); + + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, updatedCnt); + } } + // send msgCounts to the master task + MapWritable updatedCnt = new MapWritable(); + updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(msgMap.size())); + peer.send(masterTask, updatedCnt); + for (Map.Entry<String, LinkedList<Writable>> e : msgMap.entrySet()) { - vertices.get(e.getKey()).compute(e.getValue().iterator()); + if (e.getValue().size() > 0) { + vertices.get(e.getKey()).compute(e.getValue().iterator()); + } } iteration++; } @@ -90,6 +117,8 @@ public class GraphJobRunner extends BSP public void setup(BSPPeer peer) throws IOException, SyncException, InterruptedException { Configuration conf = peer.getConfiguration(); + // Choose one as a master to collect global updates + masterTask = peer.getPeerName(0); LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class")); KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;