Author: edwardyoon
Date: Thu Dec 8 11:01:07 2011
New Revision: 1211824
URL: http://svn.apache.org/viewvc?rev=1211824&view=rev
Log:
Move clearOutgoingQueues() call to bottom of sync() method.
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1211824&r1=1211823&r2=1211824&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
Thu Dec 8 11:01:07 2011
@@ -258,14 +258,14 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
messenger.transfer(addr, bundle);
}
- // Clear outgoing queues.
- messenger.clearOutgoingQueues();
leaveBarrier();
incrCounter(PeerCounter.SUPERSTEPS, 1);
currentTaskStatus.setCounters(counters);
umbilical.statusUpdate(taskId, currentTaskStatus);
+ // Clear outgoing queues.
+ messenger.clearOutgoingQueues();
}
private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1211824&r1=1211823&r2=1211824&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Thu Dec 8 11:01:07 2011
@@ -39,14 +39,11 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.BooleanMessage;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.IntegerMessage;
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
-import org.apache.zookeeper.KeeperException;
public class ShortestPaths extends
BSP<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> {
@@ -63,28 +60,52 @@ public class ShortestPaths extends
throws IOException, SyncException, InterruptedException {
boolean updated = true;
while (updated) {
- peer.sync();
-
int updatesMade = 0;
+ int globalUpdateCounts = 0;
ShortestPathVertexMessage msg = null;
Deque<ShortestPathVertex> updatedQueue = new
LinkedList<ShortestPathVertex>();
while ((msg = (ShortestPathVertexMessage) peer.getCurrentMessage()) !=
null) {
- int index = Collections.binarySearch(vertexLookup, msg.getTag());
- ShortestPathVertex vertex = vertexLookup.get(index);
+ if (msg.getTag().getName().startsWith("updatesMade")) {
+ if (msg.getData() == -1) {
+ updated = false;
+ } else {
+ globalUpdateCounts += msg.getData();
+ }
+ } else {
+ int index = Collections.binarySearch(vertexLookup, msg.getTag());
+ ShortestPathVertex vertex = vertexLookup.get(index);
+
+ // check if we need an distance update
+ if (vertex.getCost() > msg.getData()) {
+ updatesMade++;
+ updatedQueue.add(vertex);
+ vertex.setCost(msg.getData());
+ }
+ }
+ }
+
+ LOG.info(">> previous updates counts: " + globalUpdateCounts + " at "
+ + peer.getSuperstepCount());
- // check if we need an distance update
- if (vertex.getCost() > msg.getData()) {
- updatesMade++;
- updatedQueue.add(vertex);
- vertex.setCost(msg.getData());
+ if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask)
+ && peer.getSuperstepCount() > 1) {
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new ShortestPathVertexMessage(
+ new ShortestPathVertex((int) peer.getSuperstepCount(),
+ "updatesMade-" + peer.getPeerName()), -1));
}
}
- updated = broadcastUpdatesMade(peer, updatesMade);
// send updates to the adjacents of the updated vertices
+ peer.send(masterTask, new ShortestPathVertexMessage(
+ new ShortestPathVertex((int) peer.getSuperstepCount(), "updatesMade-"
+ + peer.getPeerName()), updatesMade));
+
for (ShortestPathVertex vertex : updatedQueue) {
sendMessageToNeighbors(peer, vertex);
}
+
+ peer.sync();
}
}
@@ -112,6 +133,7 @@ public class ShortestPaths extends
if (startVertex != null) {
sendMessageToNeighbors(peer, startVertex);
}
+ peer.sync();
}
@Override
@@ -129,42 +151,6 @@ public class ShortestPaths extends
}
/**
- * This method broadcasts to a master groom how many updates were made. He
- * simply sums them up and sends a message back to the grooms if sum is
- * greater than zero.
- *
- * @param peer The peer we got through the BSP method.
- * @param master The assigned master groom name.
- * @param updates How many updates were made?
- * @return True if we need another iteration, False if no updates can be made
- * anymore.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- private boolean broadcastUpdatesMade(
- BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer,
- int updates) throws IOException, SyncException, InterruptedException {
- peer.send(masterTask, new IntegerMessage(peer.getPeerName(), updates));
- peer.sync();
- if (peer.getPeerName().equals(masterTask)) {
- int count = 0;
- IntegerMessage message;
- while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
- count += message.getData();
- }
-
- for (String name : peer.getAllPeerNames()) {
- peer.send(name, new BooleanMessage("", count > 0 ? true : false));
- }
- }
-
- peer.sync();
- BooleanMessage message = (BooleanMessage) peer.getCurrentMessage();
- return message.getData();
- }
-
- /**
* This method takes advantage of our partitioning: it uses the vertexID
* (simply hash of the name) to determine the host where the message belongs
* to. <br/>