I didn't look at repair function closely. How pregel handles dangling links? If it's possible to handle them on user side, let's get rid of it.
On Tue, Dec 4, 2012 at 5:51 AM, Thomas Jungblut <[email protected]> wrote: > This is certainly a very crude bug. The list does permit duplicates as > opposed to the map before. > I have already told that we have to remove the repair capability, because > this will add too much complexity (O(n) vs. O(1) you know). It will get > especially slow when we have a disk based graph or a random access file in > the back of this VertexInfo class which is a nice abstraction. > > So I would like to remove this and leave the data normalization up to the > user. The bug above will be solved with it anyway, so this has few > downsides (the major is of course that it is included in two releases > already). > > 2012/12/3 Suraj Menon <[email protected]> > >> Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit tests >> failed because there were instances where I added duplicates of vertices in >> the list. Are you seeing this problem with this change because I don't see >> an uniqueness check before adding? I propose using the VerticesInfo class >> hiding the implementation detail of the vertex collection inside. But we >> need to act if there is an issue here. >> >> -Suraj >> >> On Wed, Nov 21, 2012 at 6:39 AM, <[email protected]> wrote: >> >> > Author: edwardyoon >> > Date: Wed Nov 21 11:39:50 2012 >> > New Revision: 1412065 >> > >> > URL: http://svn.apache.org/viewvc?rev=1412065&view=rev >> > Log: >> > Optimize memory use. >> > >> > Modified: >> > >> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java >> > >> > Modified: >> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java >> > URL: >> > >> http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff >> > >> > >> ============================================================================== >> > --- >> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java >> > (original) >> > +++ >> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java >> > Wed Nov 21 11:39:50 2012 >> > @@ -24,7 +24,6 @@ import java.util.HashMap; >> > import java.util.List; >> > import java.util.Map; >> > import java.util.Map.Entry; >> > -import java.util.Set; >> > >> > import org.apache.commons.logging.Log; >> > import org.apache.commons.logging.LogFactory; >> > @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte >> > private Combiner<M> combiner; >> > private Partitioner<V, M> partitioner; >> > >> > - private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, >> > M>>(); >> > + private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, >> > M>>(); >> > >> > private boolean updated = true; >> > private int globalUpdateCounts = 0; >> > @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte >> > public final void cleanup( >> > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> >> > peer) >> > throws IOException { >> > - for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) { >> > - peer.write(e.getValue().getVertexID(), e.getValue().getValue()); >> > + for (Vertex<V, E, M> e : vertices) { >> > + peer.write(e.getVertexID(), e.getValue()); >> > } >> > } >> > >> > @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte >> > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> >> > peer) >> > throws IOException { >> > int activeVertices = 0; >> > - for (Vertex<V, E, M> vertex : vertices.values()) { >> > + for (Vertex<V, E, M> vertex : vertices) { >> > List<M> msgs = messages.get(vertex.getVertexID()); >> > // If there are newly received messages, restart. >> > if (vertex.isHalted() && msgs != null) { >> > @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte >> > private void doInitialSuperstep( >> > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> >> > peer) >> > throws IOException { >> > - for (Vertex<V, E, M> vertex : vertices.values()) { >> > + for (Vertex<V, E, M> vertex : vertices) { >> > List<M> singletonList = >> > Collections.singletonList(vertex.getValue()); >> > M lastValue = vertex.getValue(); >> > vertex.compute(singletonList.iterator()); >> > @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte >> > peer.send(peer.getPeerName(partition), new >> > GraphJobMessage(vertex)); >> > } else { >> > vertex.setup(conf); >> > - vertices.put(vertex.getVertexID(), vertex); >> > + vertices.add(vertex); >> > } >> > vertex = newVertexInstance(vertexClass, conf); >> > vertex.runner = this; >> > @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte >> > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) >> > msg.getVertex(); >> > messagedVertex.runner = this; >> > messagedVertex.setup(conf); >> > - vertices.put(messagedVertex.getVertexID(), messagedVertex); >> > + vertices.add(messagedVertex); >> > } >> > startPos = peer.getPos(); >> > } >> > @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte >> > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) >> > msg.getVertex(); >> > messagedVertex.runner = this; >> > messagedVertex.setup(conf); >> > - vertices.put(messagedVertex.getVertexID(), messagedVertex); >> > + vertices.add(messagedVertex); >> > } >> > } >> > LOG.debug("Loading finished at " + peer.getSuperstepCount() + " >> > steps."); >> > @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte >> > */ >> > if (repairNeeded) { >> > LOG.debug("Starting repair of this graph!"); >> > + repair(peer, partitioningSteps, selfReference); >> > + } >> > >> > - int multiSteps = 0; >> > - MapWritable ssize = new MapWritable(); >> > - ssize.put(new IntWritable(peer.getPeerIndex()), >> > - new IntWritable(vertices.size())); >> > - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); >> > - ssize = null; >> > - peer.sync(); >> > + LOG.debug("Starting Vertex processing!"); >> > + } >> > >> > - if (isMasterTask(peer)) { >> > - int minVerticesSize = Integer.MAX_VALUE; >> > - GraphJobMessage received = null; >> > - while ((received = peer.getCurrentMessage()) != null) { >> > - MapWritable x = received.getMap(); >> > - for (Entry<Writable, Writable> e : x.entrySet()) { >> > - int curr = ((IntWritable) e.getValue()).get(); >> > - if (minVerticesSize > curr) { >> > - minVerticesSize = curr; >> > - } >> > - } >> > - } >> > + @SuppressWarnings("unchecked") >> > + private void repair( >> > + BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> >> > peer, >> > + int partitioningSteps, boolean selfReference) throws IOException, >> > + SyncException, InterruptedException { >> > >> > - if (minVerticesSize < (partitioningSteps * 2)) { >> > - multiSteps = minVerticesSize; >> > - } else { >> > - multiSteps = (partitioningSteps * 2); >> > - } >> > + int multiSteps = 0; >> > + MapWritable ssize = new MapWritable(); >> > + ssize.put(new IntWritable(peer.getPeerIndex()), >> > + new IntWritable(vertices.size())); >> > + peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); >> > + ssize = null; >> > + peer.sync(); >> > >> > - for (String peerName : peer.getAllPeerNames()) { >> > - MapWritable temp = new MapWritable(); >> > - temp.put(new Text("steps"), new IntWritable(multiSteps)); >> > - peer.send(peerName, new GraphJobMessage(temp)); >> > + if (isMasterTask(peer)) { >> > + int minVerticesSize = Integer.MAX_VALUE; >> > + GraphJobMessage received = null; >> > + while ((received = peer.getCurrentMessage()) != null) { >> > + MapWritable x = received.getMap(); >> > + for (Entry<Writable, Writable> e : x.entrySet()) { >> > + int curr = ((IntWritable) e.getValue()).get(); >> > + if (minVerticesSize > curr) { >> > + minVerticesSize = curr; >> > + } >> > } >> > } >> > - peer.sync(); >> > >> > - GraphJobMessage received = peer.getCurrentMessage(); >> > - MapWritable x = received.getMap(); >> > - for (Entry<Writable, Writable> e : x.entrySet()) { >> > - multiSteps = ((IntWritable) e.getValue()).get(); >> > - } >> > - >> > - Set<V> keys = vertices.keySet(); >> > - Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>(); >> > - >> > - int i = 0; >> > - int syncs = 0; >> > - for (V v : keys) { >> > - Vertex<V, E, M> vertex2 = vertices.get(v); >> > - for (Edge<V, E> e : vertices.get(v).getEdges()) { >> > - peer.send(vertex2.getDestinationPeerName(e), >> > - new GraphJobMessage(e.getDestinationVertexID())); >> > - } >> > + if (minVerticesSize < (partitioningSteps * 2)) { >> > + multiSteps = minVerticesSize; >> > + } else { >> > + multiSteps = (partitioningSteps * 2); >> > + } >> > >> > - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) >> == >> > 0) { >> > - peer.sync(); >> > - syncs++; >> > - GraphJobMessage msg = null; >> > - while ((msg = peer.getCurrentMessage()) != null) { >> > - V vertexName = (V) msg.getVertexId(); >> > - if (!vertices.containsKey(vertexName)) { >> > - Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, >> > conf); >> > - newVertex.setVertexID(vertexName); >> > - newVertex.runner = this; >> > - if (selfReference) { >> > - newVertex.setEdges(Collections.singletonList(new Edge<V, >> > E>( >> > - newVertex.getVertexID(), null))); >> > - } else { >> > - newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); >> > - } >> > - newVertex.setup(conf); >> > - tmp.put(vertexName, newVertex); >> > - } >> > - } >> > - } >> > - i++; >> > + for (String peerName : peer.getAllPeerNames()) { >> > + MapWritable temp = new MapWritable(); >> > + temp.put(new Text("steps"), new IntWritable(multiSteps)); >> > + peer.send(peerName, new GraphJobMessage(temp)); >> > } >> > + } >> > + peer.sync(); >> > + >> > + GraphJobMessage received = peer.getCurrentMessage(); >> > + MapWritable x = received.getMap(); >> > + for (Entry<Writable, Writable> e : x.entrySet()) { >> > + multiSteps = ((IntWritable) e.getValue()).get(); >> > + } >> > + >> > + Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>(); >> > + >> > + int i = 0; >> > + int syncs = 0; >> > + >> > + for (Vertex<V, E, M> v : vertices) { >> > + for (Edge<V, E> e : v.getEdges()) { >> > + peer.send(v.getDestinationPeerName(e), >> > + new GraphJobMessage(e.getDestinationVertexID())); >> > + } >> > + >> > + if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == >> > 0) { >> > + peer.sync(); >> > + syncs++; >> > + GraphJobMessage msg = null; >> > + while ((msg = peer.getCurrentMessage()) != null) { >> > + V vertexName = (V) msg.getVertexId(); >> > >> > - peer.sync(); >> > - GraphJobMessage msg = null; >> > - while ((msg = peer.getCurrentMessage()) != null) { >> > - V vertexName = (V) msg.getVertexId(); >> > - if (!vertices.containsKey(vertexName)) { >> > Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, >> > conf); >> > newVertex.setVertexID(vertexName); >> > newVertex.runner = this; >> > @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte >> > newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); >> > } >> > newVertex.setup(conf); >> > - vertices.put(vertexName, newVertex); >> > - newVertex = null; >> > + tmp.put(vertexName, newVertex); >> > + >> > } >> > } >> > + i++; >> > + } >> > + >> > + peer.sync(); >> > + GraphJobMessage msg = null; >> > + while ((msg = peer.getCurrentMessage()) != null) { >> > + V vertexName = (V) msg.getVertexId(); >> > >> > - for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) { >> > - vertices.put(e.getKey(), e.getValue()); >> > + Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf); >> > + newVertex.setVertexID(vertexName); >> > + newVertex.runner = this; >> > + if (selfReference) { >> > + newVertex.setEdges(Collections.singletonList(new Edge<V, >> > E>(newVertex >> > + .getVertexID(), null))); >> > + } else { >> > + newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); >> > } >> > - tmp.clear(); >> > + newVertex.setup(conf); >> > + tmp.put(vertexName, newVertex); >> > + newVertex = null; >> > + >> > } >> > >> > - LOG.debug("Starting Vertex processing!"); >> > + for (Vertex<V, E, M> e : vertices) { >> > + if (tmp.containsKey((e.getVertexID()))) { >> > + tmp.remove(e.getVertexID()); >> > + } >> > + } >> > + >> > + vertices.addAll(tmp.values()); >> > + tmp.clear(); >> > } >> > >> > /** >> > >> > >> > >> -- Best Regards, Edward J. Yoon @eddieyoon
