Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit tests failed because there were instances where I added duplicates of vertices in the list. Are you seeing this problem with this change because I don't see an uniqueness check before adding? I propose using the VerticesInfo class hiding the implementation detail of the vertex collection inside. But we need to act if there is an issue here.
-Suraj On Wed, Nov 21, 2012 at 6:39 AM, <[email protected]> wrote: > Author: edwardyoon > Date: Wed Nov 21 11:39:50 2012 > New Revision: 1412065 > > URL: http://svn.apache.org/viewvc?rev=1412065&view=rev > Log: > Optimize memory use. > > Modified: > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > > Modified: > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff > > ============================================================================== > --- > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > (original) > +++ > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > Wed Nov 21 11:39:50 2012 > @@ -24,7 +24,6 @@ import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Map.Entry; > -import java.util.Set; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte > private Combiner<M> combiner; > private Partitioner<V, M> partitioner; > > - private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, > M>>(); > + private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, > M>>(); > > private boolean updated = true; > private int globalUpdateCounts = 0; > @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte > public final void cleanup( > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > peer) > throws IOException { > - for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) { > - peer.write(e.getValue().getVertexID(), e.getValue().getValue()); > + for (Vertex<V, E, M> e : vertices) { > + peer.write(e.getVertexID(), e.getValue()); > } > } > > @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > peer) > throws IOException { > int activeVertices = 0; > - for (Vertex<V, E, M> vertex : vertices.values()) { > + for (Vertex<V, E, M> vertex : vertices) { > List<M> msgs = messages.get(vertex.getVertexID()); > // If there are newly received messages, restart. > if (vertex.isHalted() && msgs != null) { > @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte > private void doInitialSuperstep( > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > peer) > throws IOException { > - for (Vertex<V, E, M> vertex : vertices.values()) { > + for (Vertex<V, E, M> vertex : vertices) { > List<M> singletonList = > Collections.singletonList(vertex.getValue()); > M lastValue = vertex.getValue(); > vertex.compute(singletonList.iterator()); > @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte > peer.send(peer.getPeerName(partition), new > GraphJobMessage(vertex)); > } else { > vertex.setup(conf); > - vertices.put(vertex.getVertexID(), vertex); > + vertices.add(vertex); > } > vertex = newVertexInstance(vertexClass, conf); > vertex.runner = this; > @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) > msg.getVertex(); > messagedVertex.runner = this; > messagedVertex.setup(conf); > - vertices.put(messagedVertex.getVertexID(), messagedVertex); > + vertices.add(messagedVertex); > } > startPos = peer.getPos(); > } > @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) > msg.getVertex(); > messagedVertex.runner = this; > messagedVertex.setup(conf); > - vertices.put(messagedVertex.getVertexID(), messagedVertex); > + vertices.add(messagedVertex); > } > } > LOG.debug("Loading finished at " + peer.getSuperstepCount() + " > steps."); > @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte > */ > if (repairNeeded) { > LOG.debug("Starting repair of this graph!"); > + repair(peer, partitioningSteps, selfReference); > + } > > - int multiSteps = 0; > - MapWritable ssize = new MapWritable(); > - ssize.put(new IntWritable(peer.getPeerIndex()), > - new IntWritable(vertices.size())); > - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); > - ssize = null; > - peer.sync(); > + LOG.debug("Starting Vertex processing!"); > + } > > - if (isMasterTask(peer)) { > - int minVerticesSize = Integer.MAX_VALUE; > - GraphJobMessage received = null; > - while ((received = peer.getCurrentMessage()) != null) { > - MapWritable x = received.getMap(); > - for (Entry<Writable, Writable> e : x.entrySet()) { > - int curr = ((IntWritable) e.getValue()).get(); > - if (minVerticesSize > curr) { > - minVerticesSize = curr; > - } > - } > - } > + @SuppressWarnings("unchecked") > + private void repair( > + BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > peer, > + int partitioningSteps, boolean selfReference) throws IOException, > + SyncException, InterruptedException { > > - if (minVerticesSize < (partitioningSteps * 2)) { > - multiSteps = minVerticesSize; > - } else { > - multiSteps = (partitioningSteps * 2); > - } > + int multiSteps = 0; > + MapWritable ssize = new MapWritable(); > + ssize.put(new IntWritable(peer.getPeerIndex()), > + new IntWritable(vertices.size())); > + peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); > + ssize = null; > + peer.sync(); > > - for (String peerName : peer.getAllPeerNames()) { > - MapWritable temp = new MapWritable(); > - temp.put(new Text("steps"), new IntWritable(multiSteps)); > - peer.send(peerName, new GraphJobMessage(temp)); > + if (isMasterTask(peer)) { > + int minVerticesSize = Integer.MAX_VALUE; > + GraphJobMessage received = null; > + while ((received = peer.getCurrentMessage()) != null) { > + MapWritable x = received.getMap(); > + for (Entry<Writable, Writable> e : x.entrySet()) { > + int curr = ((IntWritable) e.getValue()).get(); > + if (minVerticesSize > curr) { > + minVerticesSize = curr; > + } > } > } > - peer.sync(); > > - GraphJobMessage received = peer.getCurrentMessage(); > - MapWritable x = received.getMap(); > - for (Entry<Writable, Writable> e : x.entrySet()) { > - multiSteps = ((IntWritable) e.getValue()).get(); > - } > - > - Set<V> keys = vertices.keySet(); > - Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>(); > - > - int i = 0; > - int syncs = 0; > - for (V v : keys) { > - Vertex<V, E, M> vertex2 = vertices.get(v); > - for (Edge<V, E> e : vertices.get(v).getEdges()) { > - peer.send(vertex2.getDestinationPeerName(e), > - new GraphJobMessage(e.getDestinationVertexID())); > - } > + if (minVerticesSize < (partitioningSteps * 2)) { > + multiSteps = minVerticesSize; > + } else { > + multiSteps = (partitioningSteps * 2); > + } > > - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == > 0) { > - peer.sync(); > - syncs++; > - GraphJobMessage msg = null; > - while ((msg = peer.getCurrentMessage()) != null) { > - V vertexName = (V) msg.getVertexId(); > - if (!vertices.containsKey(vertexName)) { > - Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, > conf); > - newVertex.setVertexID(vertexName); > - newVertex.runner = this; > - if (selfReference) { > - newVertex.setEdges(Collections.singletonList(new Edge<V, > E>( > - newVertex.getVertexID(), null))); > - } else { > - newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > - } > - newVertex.setup(conf); > - tmp.put(vertexName, newVertex); > - } > - } > - } > - i++; > + for (String peerName : peer.getAllPeerNames()) { > + MapWritable temp = new MapWritable(); > + temp.put(new Text("steps"), new IntWritable(multiSteps)); > + peer.send(peerName, new GraphJobMessage(temp)); > } > + } > + peer.sync(); > + > + GraphJobMessage received = peer.getCurrentMessage(); > + MapWritable x = received.getMap(); > + for (Entry<Writable, Writable> e : x.entrySet()) { > + multiSteps = ((IntWritable) e.getValue()).get(); > + } > + > + Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>(); > + > + int i = 0; > + int syncs = 0; > + > + for (Vertex<V, E, M> v : vertices) { > + for (Edge<V, E> e : v.getEdges()) { > + peer.send(v.getDestinationPeerName(e), > + new GraphJobMessage(e.getDestinationVertexID())); > + } > + > + if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == > 0) { > + peer.sync(); > + syncs++; > + GraphJobMessage msg = null; > + while ((msg = peer.getCurrentMessage()) != null) { > + V vertexName = (V) msg.getVertexId(); > > - peer.sync(); > - GraphJobMessage msg = null; > - while ((msg = peer.getCurrentMessage()) != null) { > - V vertexName = (V) msg.getVertexId(); > - if (!vertices.containsKey(vertexName)) { > Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, > conf); > newVertex.setVertexID(vertexName); > newVertex.runner = this; > @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte > newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > } > newVertex.setup(conf); > - vertices.put(vertexName, newVertex); > - newVertex = null; > + tmp.put(vertexName, newVertex); > + > } > } > + i++; > + } > + > + peer.sync(); > + GraphJobMessage msg = null; > + while ((msg = peer.getCurrentMessage()) != null) { > + V vertexName = (V) msg.getVertexId(); > > - for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) { > - vertices.put(e.getKey(), e.getValue()); > + Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf); > + newVertex.setVertexID(vertexName); > + newVertex.runner = this; > + if (selfReference) { > + newVertex.setEdges(Collections.singletonList(new Edge<V, > E>(newVertex > + .getVertexID(), null))); > + } else { > + newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > } > - tmp.clear(); > + newVertex.setup(conf); > + tmp.put(vertexName, newVertex); > + newVertex = null; > + > } > > - LOG.debug("Starting Vertex processing!"); > + for (Vertex<V, E, M> e : vertices) { > + if (tmp.containsKey((e.getVertexID()))) { > + tmp.remove(e.getVertexID()); > + } > + } > + > + vertices.addAll(tmp.values()); > + tmp.clear(); > } > > /** > > >
