This is certainly a very crude bug. The list does permit duplicates as opposed to the map before. I have already told that we have to remove the repair capability, because this will add too much complexity (O(n) vs. O(1) you know). It will get especially slow when we have a disk based graph or a random access file in the back of this VertexInfo class which is a nice abstraction.
So I would like to remove this and leave the data normalization up to the user. The bug above will be solved with it anyway, so this has few downsides (the major is of course that it is included in two releases already). 2012/12/3 Suraj Menon <[email protected]> > Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit tests > failed because there were instances where I added duplicates of vertices in > the list. Are you seeing this problem with this change because I don't see > an uniqueness check before adding? I propose using the VerticesInfo class > hiding the implementation detail of the vertex collection inside. But we > need to act if there is an issue here. > > -Suraj > > On Wed, Nov 21, 2012 at 6:39 AM, <[email protected]> wrote: > > > Author: edwardyoon > > Date: Wed Nov 21 11:39:50 2012 > > New Revision: 1412065 > > > > URL: http://svn.apache.org/viewvc?rev=1412065&view=rev > > Log: > > Optimize memory use. > > > > Modified: > > > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > > > > Modified: > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > > URL: > > > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff > > > > > ============================================================================== > > --- > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > > (original) > > +++ > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > > Wed Nov 21 11:39:50 2012 > > @@ -24,7 +24,6 @@ import java.util.HashMap; > > import java.util.List; > > import java.util.Map; > > import java.util.Map.Entry; > > -import java.util.Set; > > > > import org.apache.commons.logging.Log; > > import org.apache.commons.logging.LogFactory; > > @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte > > private Combiner<M> combiner; > > private Partitioner<V, M> partitioner; > > > > - private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, > > M>>(); > > + private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, > > M>>(); > > > > private boolean updated = true; > > private int globalUpdateCounts = 0; > > @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte > > public final void cleanup( > > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > > peer) > > throws IOException { > > - for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) { > > - peer.write(e.getValue().getVertexID(), e.getValue().getValue()); > > + for (Vertex<V, E, M> e : vertices) { > > + peer.write(e.getVertexID(), e.getValue()); > > } > > } > > > > @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte > > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > > peer) > > throws IOException { > > int activeVertices = 0; > > - for (Vertex<V, E, M> vertex : vertices.values()) { > > + for (Vertex<V, E, M> vertex : vertices) { > > List<M> msgs = messages.get(vertex.getVertexID()); > > // If there are newly received messages, restart. > > if (vertex.isHalted() && msgs != null) { > > @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte > > private void doInitialSuperstep( > > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > > peer) > > throws IOException { > > - for (Vertex<V, E, M> vertex : vertices.values()) { > > + for (Vertex<V, E, M> vertex : vertices) { > > List<M> singletonList = > > Collections.singletonList(vertex.getValue()); > > M lastValue = vertex.getValue(); > > vertex.compute(singletonList.iterator()); > > @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte > > peer.send(peer.getPeerName(partition), new > > GraphJobMessage(vertex)); > > } else { > > vertex.setup(conf); > > - vertices.put(vertex.getVertexID(), vertex); > > + vertices.add(vertex); > > } > > vertex = newVertexInstance(vertexClass, conf); > > vertex.runner = this; > > @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte > > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) > > msg.getVertex(); > > messagedVertex.runner = this; > > messagedVertex.setup(conf); > > - vertices.put(messagedVertex.getVertexID(), messagedVertex); > > + vertices.add(messagedVertex); > > } > > startPos = peer.getPos(); > > } > > @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte > > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) > > msg.getVertex(); > > messagedVertex.runner = this; > > messagedVertex.setup(conf); > > - vertices.put(messagedVertex.getVertexID(), messagedVertex); > > + vertices.add(messagedVertex); > > } > > } > > LOG.debug("Loading finished at " + peer.getSuperstepCount() + " > > steps."); > > @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte > > */ > > if (repairNeeded) { > > LOG.debug("Starting repair of this graph!"); > > + repair(peer, partitioningSteps, selfReference); > > + } > > > > - int multiSteps = 0; > > - MapWritable ssize = new MapWritable(); > > - ssize.put(new IntWritable(peer.getPeerIndex()), > > - new IntWritable(vertices.size())); > > - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); > > - ssize = null; > > - peer.sync(); > > + LOG.debug("Starting Vertex processing!"); > > + } > > > > - if (isMasterTask(peer)) { > > - int minVerticesSize = Integer.MAX_VALUE; > > - GraphJobMessage received = null; > > - while ((received = peer.getCurrentMessage()) != null) { > > - MapWritable x = received.getMap(); > > - for (Entry<Writable, Writable> e : x.entrySet()) { > > - int curr = ((IntWritable) e.getValue()).get(); > > - if (minVerticesSize > curr) { > > - minVerticesSize = curr; > > - } > > - } > > - } > > + @SuppressWarnings("unchecked") > > + private void repair( > > + BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> > > peer, > > + int partitioningSteps, boolean selfReference) throws IOException, > > + SyncException, InterruptedException { > > > > - if (minVerticesSize < (partitioningSteps * 2)) { > > - multiSteps = minVerticesSize; > > - } else { > > - multiSteps = (partitioningSteps * 2); > > - } > > + int multiSteps = 0; > > + MapWritable ssize = new MapWritable(); > > + ssize.put(new IntWritable(peer.getPeerIndex()), > > + new IntWritable(vertices.size())); > > + peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); > > + ssize = null; > > + peer.sync(); > > > > - for (String peerName : peer.getAllPeerNames()) { > > - MapWritable temp = new MapWritable(); > > - temp.put(new Text("steps"), new IntWritable(multiSteps)); > > - peer.send(peerName, new GraphJobMessage(temp)); > > + if (isMasterTask(peer)) { > > + int minVerticesSize = Integer.MAX_VALUE; > > + GraphJobMessage received = null; > > + while ((received = peer.getCurrentMessage()) != null) { > > + MapWritable x = received.getMap(); > > + for (Entry<Writable, Writable> e : x.entrySet()) { > > + int curr = ((IntWritable) e.getValue()).get(); > > + if (minVerticesSize > curr) { > > + minVerticesSize = curr; > > + } > > } > > } > > - peer.sync(); > > > > - GraphJobMessage received = peer.getCurrentMessage(); > > - MapWritable x = received.getMap(); > > - for (Entry<Writable, Writable> e : x.entrySet()) { > > - multiSteps = ((IntWritable) e.getValue()).get(); > > - } > > - > > - Set<V> keys = vertices.keySet(); > > - Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>(); > > - > > - int i = 0; > > - int syncs = 0; > > - for (V v : keys) { > > - Vertex<V, E, M> vertex2 = vertices.get(v); > > - for (Edge<V, E> e : vertices.get(v).getEdges()) { > > - peer.send(vertex2.getDestinationPeerName(e), > > - new GraphJobMessage(e.getDestinationVertexID())); > > - } > > + if (minVerticesSize < (partitioningSteps * 2)) { > > + multiSteps = minVerticesSize; > > + } else { > > + multiSteps = (partitioningSteps * 2); > > + } > > > > - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) > == > > 0) { > > - peer.sync(); > > - syncs++; > > - GraphJobMessage msg = null; > > - while ((msg = peer.getCurrentMessage()) != null) { > > - V vertexName = (V) msg.getVertexId(); > > - if (!vertices.containsKey(vertexName)) { > > - Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, > > conf); > > - newVertex.setVertexID(vertexName); > > - newVertex.runner = this; > > - if (selfReference) { > > - newVertex.setEdges(Collections.singletonList(new Edge<V, > > E>( > > - newVertex.getVertexID(), null))); > > - } else { > > - newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > > - } > > - newVertex.setup(conf); > > - tmp.put(vertexName, newVertex); > > - } > > - } > > - } > > - i++; > > + for (String peerName : peer.getAllPeerNames()) { > > + MapWritable temp = new MapWritable(); > > + temp.put(new Text("steps"), new IntWritable(multiSteps)); > > + peer.send(peerName, new GraphJobMessage(temp)); > > } > > + } > > + peer.sync(); > > + > > + GraphJobMessage received = peer.getCurrentMessage(); > > + MapWritable x = received.getMap(); > > + for (Entry<Writable, Writable> e : x.entrySet()) { > > + multiSteps = ((IntWritable) e.getValue()).get(); > > + } > > + > > + Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>(); > > + > > + int i = 0; > > + int syncs = 0; > > + > > + for (Vertex<V, E, M> v : vertices) { > > + for (Edge<V, E> e : v.getEdges()) { > > + peer.send(v.getDestinationPeerName(e), > > + new GraphJobMessage(e.getDestinationVertexID())); > > + } > > + > > + if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == > > 0) { > > + peer.sync(); > > + syncs++; > > + GraphJobMessage msg = null; > > + while ((msg = peer.getCurrentMessage()) != null) { > > + V vertexName = (V) msg.getVertexId(); > > > > - peer.sync(); > > - GraphJobMessage msg = null; > > - while ((msg = peer.getCurrentMessage()) != null) { > > - V vertexName = (V) msg.getVertexId(); > > - if (!vertices.containsKey(vertexName)) { > > Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, > > conf); > > newVertex.setVertexID(vertexName); > > newVertex.runner = this; > > @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte > > newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > > } > > newVertex.setup(conf); > > - vertices.put(vertexName, newVertex); > > - newVertex = null; > > + tmp.put(vertexName, newVertex); > > + > > } > > } > > + i++; > > + } > > + > > + peer.sync(); > > + GraphJobMessage msg = null; > > + while ((msg = peer.getCurrentMessage()) != null) { > > + V vertexName = (V) msg.getVertexId(); > > > > - for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) { > > - vertices.put(e.getKey(), e.getValue()); > > + Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf); > > + newVertex.setVertexID(vertexName); > > + newVertex.runner = this; > > + if (selfReference) { > > + newVertex.setEdges(Collections.singletonList(new Edge<V, > > E>(newVertex > > + .getVertexID(), null))); > > + } else { > > + newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > > } > > - tmp.clear(); > > + newVertex.setup(conf); > > + tmp.put(vertexName, newVertex); > > + newVertex = null; > > + > > } > > > > - LOG.debug("Starting Vertex processing!"); > > + for (Vertex<V, E, M> e : vertices) { > > + if (tmp.containsKey((e.getVertexID()))) { > > + tmp.remove(e.getVertexID()); > > + } > > + } > > + > > + vertices.addAll(tmp.values()); > > + tmp.clear(); > > } > > > > /** > > > > > > >
