I have added repairs to secure null pointer exceptions (when you try to get a dangling node that doesn't exists in the map, the vertex is null - so is the exception then).
How pregel handles dangling links? No idea, dangling links in the webgraph are 4xx/5xx status codes, so they are probably ignored. 2012/12/4 Edward J. Yoon <[email protected]> > I didn't look at repair function closely. How pregel handles dangling > links? If it's possible to handle them on user side, let's get rid of > it. > > On Tue, Dec 4, 2012 at 5:51 AM, Thomas Jungblut > <[email protected]> wrote: > > This is certainly a very crude bug. The list does permit duplicates as > > opposed to the map before. > > I have already told that we have to remove the repair capability, because > > this will add too much complexity (O(n) vs. O(1) you know). It will get > > especially slow when we have a disk based graph or a random access file > in > > the back of this VertexInfo class which is a nice abstraction. > > > > So I would like to remove this and leave the data normalization up to the > > user. The bug above will be solved with it anyway, so this has few > > downsides (the major is of course that it is included in two releases > > already). > > > > 2012/12/3 Suraj Menon <[email protected]> > > > >> Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit > tests > >> failed because there were instances where I added duplicates of > vertices in > >> the list. Are you seeing this problem with this change because I don't > see > >> an uniqueness check before adding? I propose using the VerticesInfo > class > >> hiding the implementation detail of the vertex collection inside. But we > >> need to act if there is an issue here. > >> > >> -Suraj > >> > >> On Wed, Nov 21, 2012 at 6:39 AM, <[email protected]> wrote: > >> > >> > Author: edwardyoon > >> > Date: Wed Nov 21 11:39:50 2012 > >> > New Revision: 1412065 > >> > > >> > URL: http://svn.apache.org/viewvc?rev=1412065&view=rev > >> > Log: > >> > Optimize memory use. > >> > > >> > Modified: > >> > > >> > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > >> > > >> > Modified: > >> > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > >> > URL: > >> > > >> > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff > >> > > >> > > >> > ============================================================================== > >> > --- > >> > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > >> > (original) > >> > +++ > >> > > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > >> > Wed Nov 21 11:39:50 2012 > >> > @@ -24,7 +24,6 @@ import java.util.HashMap; > >> > import java.util.List; > >> > import java.util.Map; > >> > import java.util.Map.Entry; > >> > -import java.util.Set; > >> > > >> > import org.apache.commons.logging.Log; > >> > import org.apache.commons.logging.LogFactory; > >> > @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte > >> > private Combiner<M> combiner; > >> > private Partitioner<V, M> partitioner; > >> > > >> > - private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, > Vertex<V, E, > >> > M>>(); > >> > + private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, > >> > M>>(); > >> > > >> > private boolean updated = true; > >> > private int globalUpdateCounts = 0; > >> > @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte > >> > public final void cleanup( > >> > BSPPeer<Writable, Writable, Writable, Writable, > GraphJobMessage> > >> > peer) > >> > throws IOException { > >> > - for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) { > >> > - peer.write(e.getValue().getVertexID(), > e.getValue().getValue()); > >> > + for (Vertex<V, E, M> e : vertices) { > >> > + peer.write(e.getVertexID(), e.getValue()); > >> > } > >> > } > >> > > >> > @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte > >> > BSPPeer<Writable, Writable, Writable, Writable, > GraphJobMessage> > >> > peer) > >> > throws IOException { > >> > int activeVertices = 0; > >> > - for (Vertex<V, E, M> vertex : vertices.values()) { > >> > + for (Vertex<V, E, M> vertex : vertices) { > >> > List<M> msgs = messages.get(vertex.getVertexID()); > >> > // If there are newly received messages, restart. > >> > if (vertex.isHalted() && msgs != null) { > >> > @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte > >> > private void doInitialSuperstep( > >> > BSPPeer<Writable, Writable, Writable, Writable, > GraphJobMessage> > >> > peer) > >> > throws IOException { > >> > - for (Vertex<V, E, M> vertex : vertices.values()) { > >> > + for (Vertex<V, E, M> vertex : vertices) { > >> > List<M> singletonList = > >> > Collections.singletonList(vertex.getValue()); > >> > M lastValue = vertex.getValue(); > >> > vertex.compute(singletonList.iterator()); > >> > @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte > >> > peer.send(peer.getPeerName(partition), new > >> > GraphJobMessage(vertex)); > >> > } else { > >> > vertex.setup(conf); > >> > - vertices.put(vertex.getVertexID(), vertex); > >> > + vertices.add(vertex); > >> > } > >> > vertex = newVertexInstance(vertexClass, conf); > >> > vertex.runner = this; > >> > @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte > >> > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) > >> > msg.getVertex(); > >> > messagedVertex.runner = this; > >> > messagedVertex.setup(conf); > >> > - vertices.put(messagedVertex.getVertexID(), > messagedVertex); > >> > + vertices.add(messagedVertex); > >> > } > >> > startPos = peer.getPos(); > >> > } > >> > @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte > >> > Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) > >> > msg.getVertex(); > >> > messagedVertex.runner = this; > >> > messagedVertex.setup(conf); > >> > - vertices.put(messagedVertex.getVertexID(), messagedVertex); > >> > + vertices.add(messagedVertex); > >> > } > >> > } > >> > LOG.debug("Loading finished at " + peer.getSuperstepCount() + " > >> > steps."); > >> > @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte > >> > */ > >> > if (repairNeeded) { > >> > LOG.debug("Starting repair of this graph!"); > >> > + repair(peer, partitioningSteps, selfReference); > >> > + } > >> > > >> > - int multiSteps = 0; > >> > - MapWritable ssize = new MapWritable(); > >> > - ssize.put(new IntWritable(peer.getPeerIndex()), > >> > - new IntWritable(vertices.size())); > >> > - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); > >> > - ssize = null; > >> > - peer.sync(); > >> > + LOG.debug("Starting Vertex processing!"); > >> > + } > >> > > >> > - if (isMasterTask(peer)) { > >> > - int minVerticesSize = Integer.MAX_VALUE; > >> > - GraphJobMessage received = null; > >> > - while ((received = peer.getCurrentMessage()) != null) { > >> > - MapWritable x = received.getMap(); > >> > - for (Entry<Writable, Writable> e : x.entrySet()) { > >> > - int curr = ((IntWritable) e.getValue()).get(); > >> > - if (minVerticesSize > curr) { > >> > - minVerticesSize = curr; > >> > - } > >> > - } > >> > - } > >> > + @SuppressWarnings("unchecked") > >> > + private void repair( > >> > + BSPPeer<Writable, Writable, Writable, Writable, > GraphJobMessage> > >> > peer, > >> > + int partitioningSteps, boolean selfReference) throws > IOException, > >> > + SyncException, InterruptedException { > >> > > >> > - if (minVerticesSize < (partitioningSteps * 2)) { > >> > - multiSteps = minVerticesSize; > >> > - } else { > >> > - multiSteps = (partitioningSteps * 2); > >> > - } > >> > + int multiSteps = 0; > >> > + MapWritable ssize = new MapWritable(); > >> > + ssize.put(new IntWritable(peer.getPeerIndex()), > >> > + new IntWritable(vertices.size())); > >> > + peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); > >> > + ssize = null; > >> > + peer.sync(); > >> > > >> > - for (String peerName : peer.getAllPeerNames()) { > >> > - MapWritable temp = new MapWritable(); > >> > - temp.put(new Text("steps"), new IntWritable(multiSteps)); > >> > - peer.send(peerName, new GraphJobMessage(temp)); > >> > + if (isMasterTask(peer)) { > >> > + int minVerticesSize = Integer.MAX_VALUE; > >> > + GraphJobMessage received = null; > >> > + while ((received = peer.getCurrentMessage()) != null) { > >> > + MapWritable x = received.getMap(); > >> > + for (Entry<Writable, Writable> e : x.entrySet()) { > >> > + int curr = ((IntWritable) e.getValue()).get(); > >> > + if (minVerticesSize > curr) { > >> > + minVerticesSize = curr; > >> > + } > >> > } > >> > } > >> > - peer.sync(); > >> > > >> > - GraphJobMessage received = peer.getCurrentMessage(); > >> > - MapWritable x = received.getMap(); > >> > - for (Entry<Writable, Writable> e : x.entrySet()) { > >> > - multiSteps = ((IntWritable) e.getValue()).get(); > >> > - } > >> > - > >> > - Set<V> keys = vertices.keySet(); > >> > - Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, > M>>(); > >> > - > >> > - int i = 0; > >> > - int syncs = 0; > >> > - for (V v : keys) { > >> > - Vertex<V, E, M> vertex2 = vertices.get(v); > >> > - for (Edge<V, E> e : vertices.get(v).getEdges()) { > >> > - peer.send(vertex2.getDestinationPeerName(e), > >> > - new GraphJobMessage(e.getDestinationVertexID())); > >> > - } > >> > + if (minVerticesSize < (partitioningSteps * 2)) { > >> > + multiSteps = minVerticesSize; > >> > + } else { > >> > + multiSteps = (partitioningSteps * 2); > >> > + } > >> > > >> > - if (syncs < multiSteps && (i % (vertices.size() / > multiSteps)) > >> == > >> > 0) { > >> > - peer.sync(); > >> > - syncs++; > >> > - GraphJobMessage msg = null; > >> > - while ((msg = peer.getCurrentMessage()) != null) { > >> > - V vertexName = (V) msg.getVertexId(); > >> > - if (!vertices.containsKey(vertexName)) { > >> > - Vertex<V, E, M> newVertex = > newVertexInstance(vertexClass, > >> > conf); > >> > - newVertex.setVertexID(vertexName); > >> > - newVertex.runner = this; > >> > - if (selfReference) { > >> > - newVertex.setEdges(Collections.singletonList(new > Edge<V, > >> > E>( > >> > - newVertex.getVertexID(), null))); > >> > - } else { > >> > - newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > >> > - } > >> > - newVertex.setup(conf); > >> > - tmp.put(vertexName, newVertex); > >> > - } > >> > - } > >> > - } > >> > - i++; > >> > + for (String peerName : peer.getAllPeerNames()) { > >> > + MapWritable temp = new MapWritable(); > >> > + temp.put(new Text("steps"), new IntWritable(multiSteps)); > >> > + peer.send(peerName, new GraphJobMessage(temp)); > >> > } > >> > + } > >> > + peer.sync(); > >> > + > >> > + GraphJobMessage received = peer.getCurrentMessage(); > >> > + MapWritable x = received.getMap(); > >> > + for (Entry<Writable, Writable> e : x.entrySet()) { > >> > + multiSteps = ((IntWritable) e.getValue()).get(); > >> > + } > >> > + > >> > + Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>(); > >> > + > >> > + int i = 0; > >> > + int syncs = 0; > >> > + > >> > + for (Vertex<V, E, M> v : vertices) { > >> > + for (Edge<V, E> e : v.getEdges()) { > >> > + peer.send(v.getDestinationPeerName(e), > >> > + new GraphJobMessage(e.getDestinationVertexID())); > >> > + } > >> > + > >> > + if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) > == > >> > 0) { > >> > + peer.sync(); > >> > + syncs++; > >> > + GraphJobMessage msg = null; > >> > + while ((msg = peer.getCurrentMessage()) != null) { > >> > + V vertexName = (V) msg.getVertexId(); > >> > > >> > - peer.sync(); > >> > - GraphJobMessage msg = null; > >> > - while ((msg = peer.getCurrentMessage()) != null) { > >> > - V vertexName = (V) msg.getVertexId(); > >> > - if (!vertices.containsKey(vertexName)) { > >> > Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, > >> > conf); > >> > newVertex.setVertexID(vertexName); > >> > newVertex.runner = this; > >> > @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte > >> > newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > >> > } > >> > newVertex.setup(conf); > >> > - vertices.put(vertexName, newVertex); > >> > - newVertex = null; > >> > + tmp.put(vertexName, newVertex); > >> > + > >> > } > >> > } > >> > + i++; > >> > + } > >> > + > >> > + peer.sync(); > >> > + GraphJobMessage msg = null; > >> > + while ((msg = peer.getCurrentMessage()) != null) { > >> > + V vertexName = (V) msg.getVertexId(); > >> > > >> > - for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) { > >> > - vertices.put(e.getKey(), e.getValue()); > >> > + Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, > conf); > >> > + newVertex.setVertexID(vertexName); > >> > + newVertex.runner = this; > >> > + if (selfReference) { > >> > + newVertex.setEdges(Collections.singletonList(new Edge<V, > >> > E>(newVertex > >> > + .getVertexID(), null))); > >> > + } else { > >> > + newVertex.setEdges(new ArrayList<Edge<V, E>>(0)); > >> > } > >> > - tmp.clear(); > >> > + newVertex.setup(conf); > >> > + tmp.put(vertexName, newVertex); > >> > + newVertex = null; > >> > + > >> > } > >> > > >> > - LOG.debug("Starting Vertex processing!"); > >> > + for (Vertex<V, E, M> e : vertices) { > >> > + if (tmp.containsKey((e.getVertexID()))) { > >> > + tmp.remove(e.getVertexID()); > >> > + } > >> > + } > >> > + > >> > + vertices.addAll(tmp.values()); > >> > + tmp.clear(); > >> > } > >> > > >> > /** > >> > > >> > > >> > > >> > > > > -- > Best Regards, Edward J. Yoon > @eddieyoon >
