Author: edwardyoon
Date: Wed Nov 21 11:39:50 2012
New Revision: 1412065
URL: http://svn.apache.org/viewvc?rev=1412065&view=rev
Log:
Optimize memory use.
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Wed Nov 21 11:39:50 2012
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte
private Combiner<M> combiner;
private Partitioner<V, M> partitioner;
- private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
+ private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>();
private boolean updated = true;
private int globalUpdateCounts = 0;
@@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte
public final void cleanup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
- peer.write(e.getValue().getVertexID(), e.getValue().getValue());
+ for (Vertex<V, E, M> e : vertices) {
+ peer.write(e.getVertexID(), e.getValue());
}
}
@@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
int activeVertices = 0;
- for (Vertex<V, E, M> vertex : vertices.values()) {
+ for (Vertex<V, E, M> vertex : vertices) {
List<M> msgs = messages.get(vertex.getVertexID());
// If there are newly received messages, restart.
if (vertex.isHalted() && msgs != null) {
@@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte
private void doInitialSuperstep(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- for (Vertex<V, E, M> vertex : vertices.values()) {
+ for (Vertex<V, E, M> vertex : vertices) {
List<M> singletonList = Collections.singletonList(vertex.getValue());
M lastValue = vertex.getValue();
vertex.compute(singletonList.iterator());
@@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte
peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
} else {
vertex.setup(conf);
- vertices.put(vertex.getVertexID(), vertex);
+ vertices.add(vertex);
}
vertex = newVertexInstance(vertexClass, conf);
vertex.runner = this;
@@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte
Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
messagedVertex.runner = this;
messagedVertex.setup(conf);
- vertices.put(messagedVertex.getVertexID(), messagedVertex);
+ vertices.add(messagedVertex);
}
startPos = peer.getPos();
}
@@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte
Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
messagedVertex.runner = this;
messagedVertex.setup(conf);
- vertices.put(messagedVertex.getVertexID(), messagedVertex);
+ vertices.add(messagedVertex);
}
}
LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
@@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte
*/
if (repairNeeded) {
LOG.debug("Starting repair of this graph!");
+ repair(peer, partitioningSteps, selfReference);
+ }
- int multiSteps = 0;
- MapWritable ssize = new MapWritable();
- ssize.put(new IntWritable(peer.getPeerIndex()),
- new IntWritable(vertices.size()));
- peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
- ssize = null;
- peer.sync();
+ LOG.debug("Starting Vertex processing!");
+ }
- if (isMasterTask(peer)) {
- int minVerticesSize = Integer.MAX_VALUE;
- GraphJobMessage received = null;
- while ((received = peer.getCurrentMessage()) != null) {
- MapWritable x = received.getMap();
- for (Entry<Writable, Writable> e : x.entrySet()) {
- int curr = ((IntWritable) e.getValue()).get();
- if (minVerticesSize > curr) {
- minVerticesSize = curr;
- }
- }
- }
+ @SuppressWarnings("unchecked")
+ private void repair(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+ int partitioningSteps, boolean selfReference) throws IOException,
+ SyncException, InterruptedException {
- if (minVerticesSize < (partitioningSteps * 2)) {
- multiSteps = minVerticesSize;
- } else {
- multiSteps = (partitioningSteps * 2);
- }
+ int multiSteps = 0;
+ MapWritable ssize = new MapWritable();
+ ssize.put(new IntWritable(peer.getPeerIndex()),
+ new IntWritable(vertices.size()));
+ peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
+ ssize = null;
+ peer.sync();
- for (String peerName : peer.getAllPeerNames()) {
- MapWritable temp = new MapWritable();
- temp.put(new Text("steps"), new IntWritable(multiSteps));
- peer.send(peerName, new GraphJobMessage(temp));
+ if (isMasterTask(peer)) {
+ int minVerticesSize = Integer.MAX_VALUE;
+ GraphJobMessage received = null;
+ while ((received = peer.getCurrentMessage()) != null) {
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ int curr = ((IntWritable) e.getValue()).get();
+ if (minVerticesSize > curr) {
+ minVerticesSize = curr;
+ }
}
}
- peer.sync();
- GraphJobMessage received = peer.getCurrentMessage();
- MapWritable x = received.getMap();
- for (Entry<Writable, Writable> e : x.entrySet()) {
- multiSteps = ((IntWritable) e.getValue()).get();
- }
-
- Set<V> keys = vertices.keySet();
- Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
-
- int i = 0;
- int syncs = 0;
- for (V v : keys) {
- Vertex<V, E, M> vertex2 = vertices.get(v);
- for (Edge<V, E> e : vertices.get(v).getEdges()) {
- peer.send(vertex2.getDestinationPeerName(e),
- new GraphJobMessage(e.getDestinationVertexID()));
- }
+ if (minVerticesSize < (partitioningSteps * 2)) {
+ multiSteps = minVerticesSize;
+ } else {
+ multiSteps = (partitioningSteps * 2);
+ }
- if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
- peer.sync();
- syncs++;
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- V vertexName = (V) msg.getVertexId();
- if (!vertices.containsKey(vertexName)) {
- Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
- newVertex.setVertexID(vertexName);
- newVertex.runner = this;
- if (selfReference) {
- newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
- newVertex.getVertexID(), null)));
- } else {
- newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
- }
- newVertex.setup(conf);
- tmp.put(vertexName, newVertex);
- }
- }
- }
- i++;
+ for (String peerName : peer.getAllPeerNames()) {
+ MapWritable temp = new MapWritable();
+ temp.put(new Text("steps"), new IntWritable(multiSteps));
+ peer.send(peerName, new GraphJobMessage(temp));
}
+ }
+ peer.sync();
+
+ GraphJobMessage received = peer.getCurrentMessage();
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ multiSteps = ((IntWritable) e.getValue()).get();
+ }
+
+ Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
+
+ int i = 0;
+ int syncs = 0;
+
+ for (Vertex<V, E, M> v : vertices) {
+ for (Edge<V, E> e : v.getEdges()) {
+ peer.send(v.getDestinationPeerName(e),
+ new GraphJobMessage(e.getDestinationVertexID()));
+ }
+
+ if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
+ peer.sync();
+ syncs++;
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ V vertexName = (V) msg.getVertexId();
- peer.sync();
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- V vertexName = (V) msg.getVertexId();
- if (!vertices.containsKey(vertexName)) {
Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
newVertex.setVertexID(vertexName);
newVertex.runner = this;
@@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte
newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
}
newVertex.setup(conf);
- vertices.put(vertexName, newVertex);
- newVertex = null;
+ tmp.put(vertexName, newVertex);
+
}
}
+ i++;
+ }
+
+ peer.sync();
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ V vertexName = (V) msg.getVertexId();
- for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
- vertices.put(e.getKey(), e.getValue());
+ Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+ newVertex.setVertexID(vertexName);
+ newVertex.runner = this;
+ if (selfReference) {
+ newVertex.setEdges(Collections.singletonList(new Edge<V, E>(newVertex
+ .getVertexID(), null)));
+ } else {
+ newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
}
- tmp.clear();
+ newVertex.setup(conf);
+ tmp.put(vertexName, newVertex);
+ newVertex = null;
+
}
- LOG.debug("Starting Vertex processing!");
+ for (Vertex<V, E, M> e : vertices) {
+ if (tmp.containsKey((e.getVertexID()))) {
+ tmp.remove(e.getVertexID());
+ }
+ }
+
+ vertices.addAll(tmp.values());
+ tmp.clear();
}
/**