I have added repairs to secure null pointer exceptions (when you try to get
a dangling node that doesn't exists in the map, the vertex is null - so is
the exception then).

How pregel handles dangling links?


No idea, dangling links in the webgraph are 4xx/5xx status codes, so they
are probably ignored.

2012/12/4 Edward J. Yoon <[email protected]>

> I didn't look at repair function closely. How pregel handles dangling
> links? If it's possible to handle them on user side, let's get rid of
> it.
>
> On Tue, Dec 4, 2012 at 5:51 AM, Thomas Jungblut
> <[email protected]> wrote:
> > This is certainly a very crude bug. The list does permit duplicates as
> > opposed to the map before.
> > I have already told that we have to remove the repair capability, because
> > this will add too much complexity  (O(n) vs. O(1) you know). It will get
> > especially slow when we have a disk based graph or a random access file
> in
> > the back of this VertexInfo class which is a nice abstraction.
> >
> > So I would like to remove this and leave the data normalization up to the
> > user. The bug above will be solved with it anyway, so this has few
> > downsides (the major is of course that it is included in two releases
> > already).
> >
> > 2012/12/3 Suraj Menon <[email protected]>
> >
> >> Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit
> tests
> >> failed because there were instances where I added duplicates of
> vertices in
> >> the list. Are you seeing this problem with this change because I don't
> see
> >> an uniqueness check before adding? I propose using the VerticesInfo
> class
> >> hiding the implementation detail of the vertex collection inside. But we
> >> need to act if there is an issue here.
> >>
> >> -Suraj
> >>
> >> On Wed, Nov 21, 2012 at 6:39 AM, <[email protected]> wrote:
> >>
> >> > Author: edwardyoon
> >> > Date: Wed Nov 21 11:39:50 2012
> >> > New Revision: 1412065
> >> >
> >> > URL: http://svn.apache.org/viewvc?rev=1412065&view=rev
> >> > Log:
> >> > Optimize memory use.
> >> >
> >> > Modified:
> >> >
> >> >
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> >> >
> >> > Modified:
> >> >
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> >> > URL:
> >> >
> >>
> http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff
> >> >
> >> >
> >>
> ==============================================================================
> >> > ---
> >> >
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> >> > (original)
> >> > +++
> >> >
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> >> > Wed Nov 21 11:39:50 2012
> >> > @@ -24,7 +24,6 @@ import java.util.HashMap;
> >> >  import java.util.List;
> >> >  import java.util.Map;
> >> >  import java.util.Map.Entry;
> >> > -import java.util.Set;
> >> >
> >> >  import org.apache.commons.logging.Log;
> >> >  import org.apache.commons.logging.LogFactory;
> >> > @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte
> >> >    private Combiner<M> combiner;
> >> >    private Partitioner<V, M> partitioner;
> >> >
> >> > -  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V,
> Vertex<V, E,
> >> > M>>();
> >> > +  private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E,
> >> > M>>();
> >> >
> >> >    private boolean updated = true;
> >> >    private int globalUpdateCounts = 0;
> >> > @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte
> >> >    public final void cleanup(
> >> >        BSPPeer<Writable, Writable, Writable, Writable,
> GraphJobMessage>
> >> > peer)
> >> >        throws IOException {
> >> > -    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
> >> > -      peer.write(e.getValue().getVertexID(),
> e.getValue().getValue());
> >> > +    for (Vertex<V, E, M> e : vertices) {
> >> > +      peer.write(e.getVertexID(), e.getValue());
> >> >      }
> >> >    }
> >> >
> >> > @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte
> >> >        BSPPeer<Writable, Writable, Writable, Writable,
> GraphJobMessage>
> >> > peer)
> >> >        throws IOException {
> >> >      int activeVertices = 0;
> >> > -    for (Vertex<V, E, M> vertex : vertices.values()) {
> >> > +    for (Vertex<V, E, M> vertex : vertices) {
> >> >        List<M> msgs = messages.get(vertex.getVertexID());
> >> >        // If there are newly received messages, restart.
> >> >        if (vertex.isHalted() && msgs != null) {
> >> > @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte
> >> >    private void doInitialSuperstep(
> >> >        BSPPeer<Writable, Writable, Writable, Writable,
> GraphJobMessage>
> >> > peer)
> >> >        throws IOException {
> >> > -    for (Vertex<V, E, M> vertex : vertices.values()) {
> >> > +    for (Vertex<V, E, M> vertex : vertices) {
> >> >        List<M> singletonList =
> >> > Collections.singletonList(vertex.getValue());
> >> >        M lastValue = vertex.getValue();
> >> >        vertex.compute(singletonList.iterator());
> >> > @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte
> >> >          peer.send(peer.getPeerName(partition), new
> >> > GraphJobMessage(vertex));
> >> >        } else {
> >> >          vertex.setup(conf);
> >> > -        vertices.put(vertex.getVertexID(), vertex);
> >> > +        vertices.add(vertex);
> >> >        }
> >> >        vertex = newVertexInstance(vertexClass, conf);
> >> >        vertex.runner = this;
> >> > @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte
> >> >              Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
> >> > msg.getVertex();
> >> >              messagedVertex.runner = this;
> >> >              messagedVertex.setup(conf);
> >> > -            vertices.put(messagedVertex.getVertexID(),
> messagedVertex);
> >> > +            vertices.add(messagedVertex);
> >> >            }
> >> >            startPos = peer.getPos();
> >> >          }
> >> > @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte
> >> >          Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
> >> > msg.getVertex();
> >> >          messagedVertex.runner = this;
> >> >          messagedVertex.setup(conf);
> >> > -        vertices.put(messagedVertex.getVertexID(), messagedVertex);
> >> > +        vertices.add(messagedVertex);
> >> >        }
> >> >      }
> >> >      LOG.debug("Loading finished at " + peer.getSuperstepCount() + "
> >> > steps.");
> >> > @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte
> >> >       */
> >> >      if (repairNeeded) {
> >> >        LOG.debug("Starting repair of this graph!");
> >> > +      repair(peer, partitioningSteps, selfReference);
> >> > +    }
> >> >
> >> > -      int multiSteps = 0;
> >> > -      MapWritable ssize = new MapWritable();
> >> > -      ssize.put(new IntWritable(peer.getPeerIndex()),
> >> > -          new IntWritable(vertices.size()));
> >> > -      peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
> >> > -      ssize = null;
> >> > -      peer.sync();
> >> > +    LOG.debug("Starting Vertex processing!");
> >> > +  }
> >> >
> >> > -      if (isMasterTask(peer)) {
> >> > -        int minVerticesSize = Integer.MAX_VALUE;
> >> > -        GraphJobMessage received = null;
> >> > -        while ((received = peer.getCurrentMessage()) != null) {
> >> > -          MapWritable x = received.getMap();
> >> > -          for (Entry<Writable, Writable> e : x.entrySet()) {
> >> > -            int curr = ((IntWritable) e.getValue()).get();
> >> > -            if (minVerticesSize > curr) {
> >> > -              minVerticesSize = curr;
> >> > -            }
> >> > -          }
> >> > -        }
> >> > +  @SuppressWarnings("unchecked")
> >> > +  private void repair(
> >> > +      BSPPeer<Writable, Writable, Writable, Writable,
> GraphJobMessage>
> >> > peer,
> >> > +      int partitioningSteps, boolean selfReference) throws
> IOException,
> >> > +      SyncException, InterruptedException {
> >> >
> >> > -        if (minVerticesSize < (partitioningSteps * 2)) {
> >> > -          multiSteps = minVerticesSize;
> >> > -        } else {
> >> > -          multiSteps = (partitioningSteps * 2);
> >> > -        }
> >> > +    int multiSteps = 0;
> >> > +    MapWritable ssize = new MapWritable();
> >> > +    ssize.put(new IntWritable(peer.getPeerIndex()),
> >> > +        new IntWritable(vertices.size()));
> >> > +    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
> >> > +    ssize = null;
> >> > +    peer.sync();
> >> >
> >> > -        for (String peerName : peer.getAllPeerNames()) {
> >> > -          MapWritable temp = new MapWritable();
> >> > -          temp.put(new Text("steps"), new IntWritable(multiSteps));
> >> > -          peer.send(peerName, new GraphJobMessage(temp));
> >> > +    if (isMasterTask(peer)) {
> >> > +      int minVerticesSize = Integer.MAX_VALUE;
> >> > +      GraphJobMessage received = null;
> >> > +      while ((received = peer.getCurrentMessage()) != null) {
> >> > +        MapWritable x = received.getMap();
> >> > +        for (Entry<Writable, Writable> e : x.entrySet()) {
> >> > +          int curr = ((IntWritable) e.getValue()).get();
> >> > +          if (minVerticesSize > curr) {
> >> > +            minVerticesSize = curr;
> >> > +          }
> >> >          }
> >> >        }
> >> > -      peer.sync();
> >> >
> >> > -      GraphJobMessage received = peer.getCurrentMessage();
> >> > -      MapWritable x = received.getMap();
> >> > -      for (Entry<Writable, Writable> e : x.entrySet()) {
> >> > -        multiSteps = ((IntWritable) e.getValue()).get();
> >> > -      }
> >> > -
> >> > -      Set<V> keys = vertices.keySet();
> >> > -      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E,
> M>>();
> >> > -
> >> > -      int i = 0;
> >> > -      int syncs = 0;
> >> > -      for (V v : keys) {
> >> > -        Vertex<V, E, M> vertex2 = vertices.get(v);
> >> > -        for (Edge<V, E> e : vertices.get(v).getEdges()) {
> >> > -          peer.send(vertex2.getDestinationPeerName(e),
> >> > -              new GraphJobMessage(e.getDestinationVertexID()));
> >> > -        }
> >> > +      if (minVerticesSize < (partitioningSteps * 2)) {
> >> > +        multiSteps = minVerticesSize;
> >> > +      } else {
> >> > +        multiSteps = (partitioningSteps * 2);
> >> > +      }
> >> >
> >> > -        if (syncs < multiSteps && (i % (vertices.size() /
> multiSteps))
> >> ==
> >> > 0) {
> >> > -          peer.sync();
> >> > -          syncs++;
> >> > -          GraphJobMessage msg = null;
> >> > -          while ((msg = peer.getCurrentMessage()) != null) {
> >> > -            V vertexName = (V) msg.getVertexId();
> >> > -            if (!vertices.containsKey(vertexName)) {
> >> > -              Vertex<V, E, M> newVertex =
> newVertexInstance(vertexClass,
> >> > conf);
> >> > -              newVertex.setVertexID(vertexName);
> >> > -              newVertex.runner = this;
> >> > -              if (selfReference) {
> >> > -                newVertex.setEdges(Collections.singletonList(new
> Edge<V,
> >> > E>(
> >> > -                    newVertex.getVertexID(), null)));
> >> > -              } else {
> >> > -                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
> >> > -              }
> >> > -              newVertex.setup(conf);
> >> > -              tmp.put(vertexName, newVertex);
> >> > -            }
> >> > -          }
> >> > -        }
> >> > -        i++;
> >> > +      for (String peerName : peer.getAllPeerNames()) {
> >> > +        MapWritable temp = new MapWritable();
> >> > +        temp.put(new Text("steps"), new IntWritable(multiSteps));
> >> > +        peer.send(peerName, new GraphJobMessage(temp));
> >> >        }
> >> > +    }
> >> > +    peer.sync();
> >> > +
> >> > +    GraphJobMessage received = peer.getCurrentMessage();
> >> > +    MapWritable x = received.getMap();
> >> > +    for (Entry<Writable, Writable> e : x.entrySet()) {
> >> > +      multiSteps = ((IntWritable) e.getValue()).get();
> >> > +    }
> >> > +
> >> > +    Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
> >> > +
> >> > +    int i = 0;
> >> > +    int syncs = 0;
> >> > +
> >> > +    for (Vertex<V, E, M> v : vertices) {
> >> > +      for (Edge<V, E> e : v.getEdges()) {
> >> > +        peer.send(v.getDestinationPeerName(e),
> >> > +            new GraphJobMessage(e.getDestinationVertexID()));
> >> > +      }
> >> > +
> >> > +      if (syncs < multiSteps && (i % (vertices.size() / multiSteps))
> ==
> >> > 0) {
> >> > +        peer.sync();
> >> > +        syncs++;
> >> > +        GraphJobMessage msg = null;
> >> > +        while ((msg = peer.getCurrentMessage()) != null) {
> >> > +          V vertexName = (V) msg.getVertexId();
> >> >
> >> > -      peer.sync();
> >> > -      GraphJobMessage msg = null;
> >> > -      while ((msg = peer.getCurrentMessage()) != null) {
> >> > -        V vertexName = (V) msg.getVertexId();
> >> > -        if (!vertices.containsKey(vertexName)) {
> >> >            Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
> >> > conf);
> >> >            newVertex.setVertexID(vertexName);
> >> >            newVertex.runner = this;
> >> > @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte
> >> >              newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
> >> >            }
> >> >            newVertex.setup(conf);
> >> > -          vertices.put(vertexName, newVertex);
> >> > -          newVertex = null;
> >> > +          tmp.put(vertexName, newVertex);
> >> > +
> >> >          }
> >> >        }
> >> > +      i++;
> >> > +    }
> >> > +
> >> > +    peer.sync();
> >> > +    GraphJobMessage msg = null;
> >> > +    while ((msg = peer.getCurrentMessage()) != null) {
> >> > +      V vertexName = (V) msg.getVertexId();
> >> >
> >> > -      for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
> >> > -        vertices.put(e.getKey(), e.getValue());
> >> > +      Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
> conf);
> >> > +      newVertex.setVertexID(vertexName);
> >> > +      newVertex.runner = this;
> >> > +      if (selfReference) {
> >> > +        newVertex.setEdges(Collections.singletonList(new Edge<V,
> >> > E>(newVertex
> >> > +            .getVertexID(), null)));
> >> > +      } else {
> >> > +        newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
> >> >        }
> >> > -      tmp.clear();
> >> > +      newVertex.setup(conf);
> >> > +      tmp.put(vertexName, newVertex);
> >> > +      newVertex = null;
> >> > +
> >> >      }
> >> >
> >> > -    LOG.debug("Starting Vertex processing!");
> >> > +    for (Vertex<V, E, M> e : vertices) {
> >> > +      if (tmp.containsKey((e.getVertexID()))) {
> >> > +        tmp.remove(e.getVertexID());
> >> > +      }
> >> > +    }
> >> > +
> >> > +    vertices.addAll(tmp.values());
> >> > +    tmp.clear();
> >> >    }
> >> >
> >> >    /**
> >> >
> >> >
> >> >
> >>
>
>
>
> --
> Best Regards, Edward J. Yoon
> @eddieyoon
>

Reply via email to