Author: andronat
Date: Mon Jan 27 09:44:58 2014
New Revision: 1561622
URL: http://svn.apache.org/r1561622
Log:
HAMA-860
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1561622&r1=1561621&r2=1561622&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jan 27 09:44:58 2014
@@ -14,6 +14,7 @@ Release 0.7.0 (unreleased changes)
BUG FIXES
+ HAMA-860: Make aggregators start from the first superstep (Anastasis
Andronidis)
HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
HAMA-845: The size() of Spilling Queue returns always numMessagesWritten
(edwardyoon)
HAMA-834: Fix KMeans example (Martin Illecker)
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1561622&r1=1561621&r2=1561622&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Mon Jan 27 09:44:58 2014
@@ -45,7 +45,7 @@ import org.apache.hama.util.ReflectionUt
/**
* Fully generic graph job runner.
- *
+ *
* @param <V> the id type of a vertex.
* @param <E> the value type of an edge.
* @param <M> the value type of a vertex.
@@ -177,20 +177,8 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- if (isMasterTask(peer) && iteration == 1) {
- MapWritable updatedCnt = new MapWritable();
- updatedCnt.put(
- FLAG_VERTEX_TOTAL_VERTICES,
- new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
- .getCounter())));
- // send the updates from the master tasks back to the slaves
- for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new GraphJobMessage(updatedCnt));
- }
- }
-
// this is only done in every second iteration
- if (isMasterTask(peer) && iteration > 1) {
+ if (isMasterTask(peer)) {
MapWritable updatedCnt = new MapWritable();
// send total number of vertices.
updatedCnt.put(
@@ -208,7 +196,7 @@ public final class GraphJobRunner<V exte
peer.send(peerName, new GraphJobMessage(updatedCnt));
}
}
- if (getAggregationRunner().isEnabled() && iteration > 1) {
+ if (getAggregationRunner().isEnabled()) {
// in case we need to sync, we need to replay the messages that already
// are added to the queue. This prevents loosing messages when using
// aggregators.
@@ -465,7 +453,7 @@ public final class GraphJobRunner<V exte
/**
* Add new vertex into memory of each peer.
- *
+ *
* @throws IOException
*/
private void addVertex(Vertex<V, E, M> vertex) throws IOException {
@@ -483,7 +471,7 @@ public final class GraphJobRunner<V exte
/**
* Remove vertex from this peer.
- *
+ *
* @throws IOException
*/
private void removeVertex(V vertexID) {
@@ -494,7 +482,7 @@ public final class GraphJobRunner<V exte
/**
* After all inserts are done, we must finalize the VertexInfo data
structure.
- *
+ *
* @throws IOException
*/
private void finishAdditions() throws IOException {
@@ -505,7 +493,7 @@ public final class GraphJobRunner<V exte
/**
* After all inserts are done, we must finalize the VertexInfo data
structure.
- *
+ *
* @throws IOException
*/
private void finishRemovals() throws IOException {
@@ -542,7 +530,7 @@ public final class GraphJobRunner<V exte
/**
* Parses the messages in every superstep and does actions according to flags
* in the messages.
- *
+ *
* @return the first vertex message, null if none received.
*/
@SuppressWarnings("unchecked")
@@ -647,7 +635,7 @@ public final class GraphJobRunner<V exte
/**
* Gets the last aggregated value at the given index. The index is dependend
* on how the aggregators were configured during job setup phase.
- *
+ *
* @return the value of the aggregator, or null if none was defined.
*/
public final Writable getLastAggregatedValue(int index) {
@@ -657,7 +645,7 @@ public final class GraphJobRunner<V exte
/**
* Gets the last aggregated number of vertices at the given index. The index
* is dependend on how the aggregators were configured during job setup
phase.
- *
+ *
* @return the value of the aggregator, or null if none was defined.
*/
public final IntWritable getNumLastAggregatedVertices(int index) {