Author: tjungblut
Date: Wed Sep 5 17:12:01 2012
New Revision: 1381252
URL: http://svn.apache.org/viewvc?rev=1381252&view=rev
Log:
[HAMA-635]: Number of vertices value is inconsistent among tasks by Yuesheng Hu
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1381252&r1=1381251&r2=1381252&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Sep 5 17:12:01 2012
@@ -6,6 +6,7 @@ Release 0.6 (unreleased changes)
BUG FIXES
+ HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu
via tjungblut)
HAMA-633: Fix CI Failures (tjungblut)
HAMA-631: Add "commons-httpclient-3.1.jar" (Paul Gyuho Song via edwardyoon)
HAMA-608: LocalRunner should honor the configured queues (tjungblut)
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1381252&r1=1381251&r2=1381252&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Wed Sep 5 17:12:01 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -38,6 +39,7 @@ public final class GraphJobMessage imple
public static final int VERTEX_FLAG = 0x02;
public static final int REPAIR_FLAG = 0x04;
public static final int PARTITION_FLAG = 0x08;
+ public static final int VERTICES_SIZE_FLAG = 0x10;
// staticly defined because it is process-wide information, therefore in caps
// considered as a constant
@@ -52,6 +54,7 @@ public final class GraphJobMessage imple
private Writable vertexId;
private Writable vertexValue;
private Vertex<?, ?, ?> vertex;
+ private IntWritable vertices_size;
public GraphJobMessage() {
}
@@ -77,6 +80,11 @@ public final class GraphJobMessage imple
this.vertex = vertex;
}
+ public GraphJobMessage(IntWritable size) {
+ this.flag = VERTICES_SIZE_FLAG;
+ this.vertices_size = size;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(this.flag);
@@ -108,6 +116,8 @@ public final class GraphJobMessage imple
out.writeBoolean(false);
}
}
+ } else if (isVerticesSizeMessage()) {
+ vertices_size.write(out);
} else {
vertexId.write(out);
}
@@ -153,6 +163,9 @@ public final class GraphJobMessage imple
new Edge<Writable, Writable>(edgeVertexID, destination,
edgeValue));
}
this.vertex = vertex;
+ } else if (isVerticesSizeMessage()) {
+ vertices_size = new IntWritable();
+ vertices_size.readFields(in);
} else {
vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
vertexId.readFields(in);
@@ -175,6 +188,10 @@ public final class GraphJobMessage imple
return vertex;
}
+ public IntWritable getVerticesSize() {
+ return vertices_size;
+ }
+
public boolean isMapMessage() {
return flag == MAP_FLAG;
}
@@ -191,6 +208,10 @@ public final class GraphJobMessage imple
return flag == PARTITION_FLAG;
}
+ public boolean isVerticesSizeMessage() {
+ return flag == VERTICES_SIZE_FLAG;
+ }
+
@Override
public String toString() {
return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1381252&r1=1381251&r2=1381252&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Wed Sep 5 17:12:01 2012
@@ -87,7 +87,7 @@ public final class GraphJobRunner<V exte
private boolean updated = true;
private int globalUpdateCounts = 0;
- private long numberVertices;
+ private long numberVertices = 0;
// -1 is deactivated
private int maxIteration = -1;
private long iteration;
@@ -169,7 +169,19 @@ public final class GraphJobRunner<V exte
VertexInputReader.class), conf);
loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader);
- numberVertices = vertices.size() * peer.getNumPeers();
+
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new GraphJobMessage(new
IntWritable(vertices.size())));
+ }
+
+ peer.sync();
+
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ if (msg.isVerticesSizeMessage()) {
+ numberVertices += msg.getVerticesSize().get();
+ }
+ }
// TODO refactor this to a single step
for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
LinkedList<M> msgIterator = new LinkedList<M>();