Author: edwardyoon
Date: Tue Jan 8 01:39:19 2013
New Revision: 1430108
URL: http://svn.apache.org/viewvc?rev=1430108&view=rev
Log:
HAMA-709: Set tasks to 1
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1430108&r1=1430107&r2=1430108&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
Tue Jan 8 01:39:19 2013
@@ -59,7 +59,7 @@ public class MindistSearchTest extends T
public void testMindistSearch() throws Exception {
generateTestData();
try {
- MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "2" });
+ MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "1" });
verifyResult();
} finally {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1430108&r1=1430107&r2=1430108&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Jan 8 01:39:19 2013
@@ -105,6 +105,8 @@ public final class GraphJobRunner<V exte
}
+ Map<V, List<M>> messages = null;
+
@Override
public final void bsp(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -118,7 +120,7 @@ public final class GraphJobRunner<V exte
peer.sync();
// note that the messages must be parsed here
- final Map<V, List<M>> messages = parseMessages(peer);
+ messages = parseMessages(peer);
// master needs to update
doMasterUpdates(peer);
// if aggregators say we don't have updates anymore, break
@@ -126,7 +128,7 @@ public final class GraphJobRunner<V exte
break;
}
// loop over vertices and do their computation
- doSuperstep(messages, peer);
+ doSuperstep(peer);
if (isMasterTask(peer)) {
peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -175,7 +177,7 @@ public final class GraphJobRunner<V exte
* Do the main logic of a superstep, namely checking if vertices are active,
* feeding compute with messages and controlling combiners/aggregators.
*/
- private void doSuperstep(Map<V, List<M>> messages,
+ private void doSuperstep(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
int activeVertices = 0;
@@ -202,10 +204,14 @@ public final class GraphJobRunner<V exte
activeVertices++;
}
}
+
+ msgs = null;
+ messages.remove(vertex.getVertexID());
}
aggregationRunner.sendAggregatorValues(peer, activeVertices);
iteration++;
+ messages = new HashMap<V, List<M>>();
}
/**
@@ -331,8 +337,8 @@ public final class GraphJobRunner<V exte
@SuppressWarnings("unchecked")
private void repair(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
- boolean selfReference) throws IOException,
- SyncException, InterruptedException {
+ boolean selfReference) throws IOException, SyncException,
+ InterruptedException {
Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
@@ -408,7 +414,8 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
GraphJobMessage msg = null;
- final Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
+ Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
+
while ((msg = peer.getCurrentMessage()) != null) {
// either this is a vertex message or a directive that must be read
// as map
@@ -445,6 +452,7 @@ public final class GraphJobRunner<V exte
}
}
+
return msgMap;
}