I didn't look at repair function closely. How pregel handles dangling
links? If it's possible to handle them on user side, let's get rid of
it.

On Tue, Dec 4, 2012 at 5:51 AM, Thomas Jungblut
<[email protected]> wrote:
> This is certainly a very crude bug. The list does permit duplicates as
> opposed to the map before.
> I have already told that we have to remove the repair capability, because
> this will add too much complexity  (O(n) vs. O(1) you know). It will get
> especially slow when we have a disk based graph or a random access file in
> the back of this VertexInfo class which is a nice abstraction.
>
> So I would like to remove this and leave the data normalization up to the
> user. The bug above will be solved with it anyway, so this has few
> downsides (the major is of course that it is included in two releases
> already).
>
> 2012/12/3 Suraj Menon <[email protected]>
>
>> Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit tests
>> failed because there were instances where I added duplicates of vertices in
>> the list. Are you seeing this problem with this change because I don't see
>> an uniqueness check before adding? I propose using the VerticesInfo class
>> hiding the implementation detail of the vertex collection inside. But we
>> need to act if there is an issue here.
>>
>> -Suraj
>>
>> On Wed, Nov 21, 2012 at 6:39 AM, <[email protected]> wrote:
>>
>> > Author: edwardyoon
>> > Date: Wed Nov 21 11:39:50 2012
>> > New Revision: 1412065
>> >
>> > URL: http://svn.apache.org/viewvc?rev=1412065&view=rev
>> > Log:
>> > Optimize memory use.
>> >
>> > Modified:
>> >
>> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
>> >
>> > Modified:
>> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
>> > URL:
>> >
>> http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff
>> >
>> >
>> ==============================================================================
>> > ---
>> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
>> > (original)
>> > +++
>> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
>> > Wed Nov 21 11:39:50 2012
>> > @@ -24,7 +24,6 @@ import java.util.HashMap;
>> >  import java.util.List;
>> >  import java.util.Map;
>> >  import java.util.Map.Entry;
>> > -import java.util.Set;
>> >
>> >  import org.apache.commons.logging.Log;
>> >  import org.apache.commons.logging.LogFactory;
>> > @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte
>> >    private Combiner<M> combiner;
>> >    private Partitioner<V, M> partitioner;
>> >
>> > -  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E,
>> > M>>();
>> > +  private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E,
>> > M>>();
>> >
>> >    private boolean updated = true;
>> >    private int globalUpdateCounts = 0;
>> > @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte
>> >    public final void cleanup(
>> >        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
>> > peer)
>> >        throws IOException {
>> > -    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
>> > -      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
>> > +    for (Vertex<V, E, M> e : vertices) {
>> > +      peer.write(e.getVertexID(), e.getValue());
>> >      }
>> >    }
>> >
>> > @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte
>> >        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
>> > peer)
>> >        throws IOException {
>> >      int activeVertices = 0;
>> > -    for (Vertex<V, E, M> vertex : vertices.values()) {
>> > +    for (Vertex<V, E, M> vertex : vertices) {
>> >        List<M> msgs = messages.get(vertex.getVertexID());
>> >        // If there are newly received messages, restart.
>> >        if (vertex.isHalted() && msgs != null) {
>> > @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte
>> >    private void doInitialSuperstep(
>> >        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
>> > peer)
>> >        throws IOException {
>> > -    for (Vertex<V, E, M> vertex : vertices.values()) {
>> > +    for (Vertex<V, E, M> vertex : vertices) {
>> >        List<M> singletonList =
>> > Collections.singletonList(vertex.getValue());
>> >        M lastValue = vertex.getValue();
>> >        vertex.compute(singletonList.iterator());
>> > @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte
>> >          peer.send(peer.getPeerName(partition), new
>> > GraphJobMessage(vertex));
>> >        } else {
>> >          vertex.setup(conf);
>> > -        vertices.put(vertex.getVertexID(), vertex);
>> > +        vertices.add(vertex);
>> >        }
>> >        vertex = newVertexInstance(vertexClass, conf);
>> >        vertex.runner = this;
>> > @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte
>> >              Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
>> > msg.getVertex();
>> >              messagedVertex.runner = this;
>> >              messagedVertex.setup(conf);
>> > -            vertices.put(messagedVertex.getVertexID(), messagedVertex);
>> > +            vertices.add(messagedVertex);
>> >            }
>> >            startPos = peer.getPos();
>> >          }
>> > @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte
>> >          Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
>> > msg.getVertex();
>> >          messagedVertex.runner = this;
>> >          messagedVertex.setup(conf);
>> > -        vertices.put(messagedVertex.getVertexID(), messagedVertex);
>> > +        vertices.add(messagedVertex);
>> >        }
>> >      }
>> >      LOG.debug("Loading finished at " + peer.getSuperstepCount() + "
>> > steps.");
>> > @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte
>> >       */
>> >      if (repairNeeded) {
>> >        LOG.debug("Starting repair of this graph!");
>> > +      repair(peer, partitioningSteps, selfReference);
>> > +    }
>> >
>> > -      int multiSteps = 0;
>> > -      MapWritable ssize = new MapWritable();
>> > -      ssize.put(new IntWritable(peer.getPeerIndex()),
>> > -          new IntWritable(vertices.size()));
>> > -      peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
>> > -      ssize = null;
>> > -      peer.sync();
>> > +    LOG.debug("Starting Vertex processing!");
>> > +  }
>> >
>> > -      if (isMasterTask(peer)) {
>> > -        int minVerticesSize = Integer.MAX_VALUE;
>> > -        GraphJobMessage received = null;
>> > -        while ((received = peer.getCurrentMessage()) != null) {
>> > -          MapWritable x = received.getMap();
>> > -          for (Entry<Writable, Writable> e : x.entrySet()) {
>> > -            int curr = ((IntWritable) e.getValue()).get();
>> > -            if (minVerticesSize > curr) {
>> > -              minVerticesSize = curr;
>> > -            }
>> > -          }
>> > -        }
>> > +  @SuppressWarnings("unchecked")
>> > +  private void repair(
>> > +      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
>> > peer,
>> > +      int partitioningSteps, boolean selfReference) throws IOException,
>> > +      SyncException, InterruptedException {
>> >
>> > -        if (minVerticesSize < (partitioningSteps * 2)) {
>> > -          multiSteps = minVerticesSize;
>> > -        } else {
>> > -          multiSteps = (partitioningSteps * 2);
>> > -        }
>> > +    int multiSteps = 0;
>> > +    MapWritable ssize = new MapWritable();
>> > +    ssize.put(new IntWritable(peer.getPeerIndex()),
>> > +        new IntWritable(vertices.size()));
>> > +    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
>> > +    ssize = null;
>> > +    peer.sync();
>> >
>> > -        for (String peerName : peer.getAllPeerNames()) {
>> > -          MapWritable temp = new MapWritable();
>> > -          temp.put(new Text("steps"), new IntWritable(multiSteps));
>> > -          peer.send(peerName, new GraphJobMessage(temp));
>> > +    if (isMasterTask(peer)) {
>> > +      int minVerticesSize = Integer.MAX_VALUE;
>> > +      GraphJobMessage received = null;
>> > +      while ((received = peer.getCurrentMessage()) != null) {
>> > +        MapWritable x = received.getMap();
>> > +        for (Entry<Writable, Writable> e : x.entrySet()) {
>> > +          int curr = ((IntWritable) e.getValue()).get();
>> > +          if (minVerticesSize > curr) {
>> > +            minVerticesSize = curr;
>> > +          }
>> >          }
>> >        }
>> > -      peer.sync();
>> >
>> > -      GraphJobMessage received = peer.getCurrentMessage();
>> > -      MapWritable x = received.getMap();
>> > -      for (Entry<Writable, Writable> e : x.entrySet()) {
>> > -        multiSteps = ((IntWritable) e.getValue()).get();
>> > -      }
>> > -
>> > -      Set<V> keys = vertices.keySet();
>> > -      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
>> > -
>> > -      int i = 0;
>> > -      int syncs = 0;
>> > -      for (V v : keys) {
>> > -        Vertex<V, E, M> vertex2 = vertices.get(v);
>> > -        for (Edge<V, E> e : vertices.get(v).getEdges()) {
>> > -          peer.send(vertex2.getDestinationPeerName(e),
>> > -              new GraphJobMessage(e.getDestinationVertexID()));
>> > -        }
>> > +      if (minVerticesSize < (partitioningSteps * 2)) {
>> > +        multiSteps = minVerticesSize;
>> > +      } else {
>> > +        multiSteps = (partitioningSteps * 2);
>> > +      }
>> >
>> > -        if (syncs < multiSteps && (i % (vertices.size() / multiSteps))
>> ==
>> > 0) {
>> > -          peer.sync();
>> > -          syncs++;
>> > -          GraphJobMessage msg = null;
>> > -          while ((msg = peer.getCurrentMessage()) != null) {
>> > -            V vertexName = (V) msg.getVertexId();
>> > -            if (!vertices.containsKey(vertexName)) {
>> > -              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
>> > conf);
>> > -              newVertex.setVertexID(vertexName);
>> > -              newVertex.runner = this;
>> > -              if (selfReference) {
>> > -                newVertex.setEdges(Collections.singletonList(new Edge<V,
>> > E>(
>> > -                    newVertex.getVertexID(), null)));
>> > -              } else {
>> > -                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
>> > -              }
>> > -              newVertex.setup(conf);
>> > -              tmp.put(vertexName, newVertex);
>> > -            }
>> > -          }
>> > -        }
>> > -        i++;
>> > +      for (String peerName : peer.getAllPeerNames()) {
>> > +        MapWritable temp = new MapWritable();
>> > +        temp.put(new Text("steps"), new IntWritable(multiSteps));
>> > +        peer.send(peerName, new GraphJobMessage(temp));
>> >        }
>> > +    }
>> > +    peer.sync();
>> > +
>> > +    GraphJobMessage received = peer.getCurrentMessage();
>> > +    MapWritable x = received.getMap();
>> > +    for (Entry<Writable, Writable> e : x.entrySet()) {
>> > +      multiSteps = ((IntWritable) e.getValue()).get();
>> > +    }
>> > +
>> > +    Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
>> > +
>> > +    int i = 0;
>> > +    int syncs = 0;
>> > +
>> > +    for (Vertex<V, E, M> v : vertices) {
>> > +      for (Edge<V, E> e : v.getEdges()) {
>> > +        peer.send(v.getDestinationPeerName(e),
>> > +            new GraphJobMessage(e.getDestinationVertexID()));
>> > +      }
>> > +
>> > +      if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) ==
>> > 0) {
>> > +        peer.sync();
>> > +        syncs++;
>> > +        GraphJobMessage msg = null;
>> > +        while ((msg = peer.getCurrentMessage()) != null) {
>> > +          V vertexName = (V) msg.getVertexId();
>> >
>> > -      peer.sync();
>> > -      GraphJobMessage msg = null;
>> > -      while ((msg = peer.getCurrentMessage()) != null) {
>> > -        V vertexName = (V) msg.getVertexId();
>> > -        if (!vertices.containsKey(vertexName)) {
>> >            Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
>> > conf);
>> >            newVertex.setVertexID(vertexName);
>> >            newVertex.runner = this;
>> > @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte
>> >              newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
>> >            }
>> >            newVertex.setup(conf);
>> > -          vertices.put(vertexName, newVertex);
>> > -          newVertex = null;
>> > +          tmp.put(vertexName, newVertex);
>> > +
>> >          }
>> >        }
>> > +      i++;
>> > +    }
>> > +
>> > +    peer.sync();
>> > +    GraphJobMessage msg = null;
>> > +    while ((msg = peer.getCurrentMessage()) != null) {
>> > +      V vertexName = (V) msg.getVertexId();
>> >
>> > -      for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
>> > -        vertices.put(e.getKey(), e.getValue());
>> > +      Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
>> > +      newVertex.setVertexID(vertexName);
>> > +      newVertex.runner = this;
>> > +      if (selfReference) {
>> > +        newVertex.setEdges(Collections.singletonList(new Edge<V,
>> > E>(newVertex
>> > +            .getVertexID(), null)));
>> > +      } else {
>> > +        newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
>> >        }
>> > -      tmp.clear();
>> > +      newVertex.setup(conf);
>> > +      tmp.put(vertexName, newVertex);
>> > +      newVertex = null;
>> > +
>> >      }
>> >
>> > -    LOG.debug("Starting Vertex processing!");
>> > +    for (Vertex<V, E, M> e : vertices) {
>> > +      if (tmp.containsKey((e.getVertexID()))) {
>> > +        tmp.remove(e.getVertexID());
>> > +      }
>> > +    }
>> > +
>> > +    vertices.addAll(tmp.values());
>> > +    tmp.clear();
>> >    }
>> >
>> >    /**
>> >
>> >
>> >
>>



-- 
Best Regards, Edward J. Yoon
@eddieyoon

Reply via email to