Author: edwardyoon
Date: Tue Feb  3 00:28:07 2015
New Revision: 1656609

URL: http://svn.apache.org/r1656609
Log:
HAMA-919: Manage messages per Vertex

Added:
    
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
    
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Removed:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
    
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
    
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
    
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/pom.xml
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
    hama/trunk/graph/pom.xml
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
    
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java

Modified: hama/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Feb  3 00:28:07 2015
@@ -19,6 +19,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-919: Manage messages per Vertex (edwardyoon)
    HAMA-923: add a toString() method for FloatArrayWritable and 
TextArrayWritable classes (edwardyoon)
    HAMA-921: Polish doSuperstep() function and VertexMessageIterable class 
(Anastasis Andronidis)
    HAMA-913: Add RPC implementation using netty(bsmin)  

Modified: hama/trunk/core/pom.xml
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Tue Feb  3 00:28:07 2015
@@ -136,8 +136,14 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.esotericsoftware.kryo</groupId>
-      <artifactId>kryo</artifactId>
+      <groupId>org.apache.directmemory</groupId>
+      <artifactId>directmemory-cache</artifactId>
+      <version>0.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.directmemory</groupId>
+      <artifactId>directmemory-kryo</artifactId>
+      <version>0.2</version>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>

Modified: 
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java 
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue 
Feb  3 00:28:07 2015
@@ -100,7 +100,9 @@ public class TestPersistQueue extends Te
       }
 
       int cnt = 0;
-      while ((peer.getCurrentMessage()) != null) {
+      IntWritable result = null;
+      while ((result = peer.getCurrentMessage()) != null) {
+        System.out.println(result);
         cnt++;
       }
 

Modified: 
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- 
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java 
(original)
+++ 
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java 
Tue Feb  3 00:28:07 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.TextOutputFor
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.GraphJobRunner.GraphJobCounter;
-import org.apache.hama.graph.MapVerticesInfo;
 import org.apache.hama.graph.Vertex;
 import org.apache.hama.graph.VertexInputReader;
 
@@ -142,10 +141,6 @@ public class DynamicGraph {
   private static GraphJob createJob(String[] args, HamaConfiguration conf)
       throws IOException {
 
-    // NOTE: Graph modification APIs can be used only with MapVerticesInfo.
-    conf.set("hama.graph.vertices.info",
-        "org.apache.hama.graph.MapVerticesInfo");
-
     GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
     graphJob.setJobName("Dynamic Graph");
     graphJob.setVertexClass(GraphVertex.class);

Modified: hama/trunk/graph/pom.xml
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/pom.xml?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/pom.xml (original)
+++ hama/trunk/graph/pom.xml Tue Feb  3 00:28:07 2015
@@ -42,16 +42,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.directmemory</groupId>
-      <artifactId>directmemory-cache</artifactId>
-      <version>0.2</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.directmemory</groupId>
-      <artifactId>directmemory-kryo</artifactId>
-      <version>0.2</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hama</groupId>
       <artifactId>hama-core</artifactId>
       <version>${project.version}</version>

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Tue Feb  
3 00:28:07 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.PartitioningR
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.OutgoingMessageManager;
 import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
 
 import com.google.common.base.Preconditions;
 
@@ -58,8 +57,8 @@ public class GraphJob extends BSPJob {
       throws IOException {
     super(conf);
     conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
-        OutgoingVertexMessagesManager.class, OutgoingMessageManager.class);
-    
+        OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
+
     this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true);
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);
@@ -128,8 +127,8 @@ public class GraphJob extends BSPJob {
   /**
    * Sets the input reader for parsing the input to vertices.
    */
-  public void setVertexInputReaderClass(@SuppressWarnings("rawtypes")
-  Class<? extends VertexInputReader> cls) {
+  public void setVertexInputReaderClass(
+      @SuppressWarnings("rawtypes") Class<? extends VertexInputReader> cls) {
     ensureState(JobState.DEFINE);
     conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
         RecordConverter.class);
@@ -140,8 +139,8 @@ public class GraphJob extends BSPJob {
    * Sets the output writer for materializing vertices to the output sink. If
    * not set, the default DefaultVertexOutputWriter will be used.
    */
-  public void setVertexOutputWriterClass(@SuppressWarnings("rawtypes")
-  Class<? extends VertexOutputWriter> cls) {
+  public void setVertexOutputWriterClass(
+      @SuppressWarnings("rawtypes") Class<? extends VertexOutputWriter> cls) {
     ensureState(JobState.DEFINE);
     conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls,
         VertexOutputWriter.class);
@@ -154,8 +153,8 @@ public class GraphJob extends BSPJob {
   }
 
   @Override
-  public void setPartitioner(@SuppressWarnings("rawtypes")
-  Class<? extends Partitioner> theClass) {
+  public void setPartitioner(
+      @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
     super.setPartitioner(theClass);
     conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
   }
@@ -201,9 +200,8 @@ public class GraphJob extends BSPJob {
       this.setVertexOutputWriterClass(DefaultVertexOutputWriter.class);
     }
 
-    // add the default message queue to the sorted one
     this.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
-        SortedMemoryQueue.class, MessageQueue.class);
+        IncomingVertexMessageManager.class, MessageQueue.class);
 
     super.submit();
   }

Modified: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java 
Tue Feb  3 00:28:07 2015
@@ -17,9 +17,15 @@
  */
 package org.apache.hama.graph;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.IntWritable;
@@ -46,10 +52,14 @@ public final class GraphJobMessage imple
   private MapWritable map;
   @SuppressWarnings("rawtypes")
   private WritableComparable vertexId;
-  private Writable vertexValue;
-  private IntWritable verticesSize;
+  private IntWritable integerMessage;
   private static GraphJobMessageComparator comparator;
 
+  private int numOfValues = 0;
+
+  private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+  private final DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+
   static {
     if (comparator == null) {
       comparator = new GraphJobMessageComparator();
@@ -61,6 +71,13 @@ public final class GraphJobMessage imple
   public GraphJobMessage() {
   }
 
+  public byte[] serialize(Writable message) throws IOException {
+    ByteArrayOutputStream mbos = new ByteArrayOutputStream();
+    DataOutputStream mdos = new DataOutputStream(mbos);
+    message.write(mdos);
+    return mbos.toByteArray();
+  }
+
   public GraphJobMessage(MapWritable map) {
     this.flag = MAP_FLAG;
     this.map = map;
@@ -69,12 +86,20 @@ public final class GraphJobMessage imple
   public GraphJobMessage(WritableComparable<?> vertexId, Writable vertexValue) 
{
     this.flag = VERTEX_FLAG;
     this.vertexId = vertexId;
-    this.vertexValue = vertexValue;
+
+    add(vertexValue);
+  }
+
+  public GraphJobMessage(WritableComparable<?> vertexId, List<Writable> 
values) {
+    this.flag = VERTEX_FLAG;
+    this.vertexId = vertexId;
+
+    addAll(values);
   }
 
   public GraphJobMessage(IntWritable size) {
     this.flag = VERTICES_SIZE_FLAG;
-    this.verticesSize = size;
+    this.integerMessage = size;
   }
 
   @Override
@@ -84,11 +109,14 @@ public final class GraphJobMessage imple
       // we don't need to write the classes because the other side has the same
       // classes for the two entities.
       vertexId.write(out);
-      vertexValue.write(out);
+
+      out.writeInt(numOfValues);
+      out.writeInt(byteBuffer.size());
+      out.write(byteBuffer.toByteArray());
     } else if (isMapMessage()) {
       map.write(out);
     } else if (isVerticesSizeMessage()) {
-      verticesSize.write(out);
+      integerMessage.write(out);
     } else {
       vertexId.write(out);
     }
@@ -107,8 +135,8 @@ public final class GraphJobMessage imple
       map = new MapWritable();
       map.readFields(in);
     } else if (isVerticesSizeMessage()) {
-      verticesSize = new IntWritable();
-      verticesSize.readFields(in);
+      integerMessage = new IntWritable();
+      integerMessage.readFields(in);
     } else {
       vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
           null);
@@ -122,14 +150,19 @@ public final class GraphJobMessage imple
     if (isVertexMessage()) {
       vertexId = GraphJobRunner.createVertexIDObject();
       vertexId.readFields(in);
-      vertexValue = GraphJobRunner.createVertexValue();
-      vertexValue.readFields(in);
+
+      this.numOfValues = in.readInt();
+      int bytesLength = in.readInt();
+      byte[] temp = new byte[bytesLength];
+      in.readFully(temp);
+      bufferDos.write(temp);
+      bufferDos.flush();
     } else if (isMapMessage()) {
       map = new MapWritable();
       map.readFields(in);
     } else if (isVerticesSizeMessage()) {
-      verticesSize = new IntWritable();
-      verticesSize.readFields(in);
+      integerMessage = new IntWritable();
+      integerMessage.readFields(in);
     } else {
       vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
           null);
@@ -161,12 +194,53 @@ public final class GraphJobMessage imple
     return vertexId;
   }
 
-  public Writable getVertexValue() {
-    return vertexValue;
+  private ByteArrayInputStream bis = null;
+  private DataInputStream dis = null;
+
+  public List<Writable> getVertexValue() {
+    bis = new ByteArrayInputStream(byteBuffer.toByteArray());
+    dis = new DataInputStream(bis);
+
+    List<Writable> valuesCache = new ArrayList<Writable>();
+
+    for (int i = 0; i < numOfValues; i++) {
+      try {
+        Writable v = GraphJobRunner.createVertexValue();
+        v.readFields(dis);
+        valuesCache.add(v);
+      } catch (IOException e) {
+        System.out.println(i + ", " + numOfValues);
+        e.printStackTrace();
+      }
+    }
+
+    return valuesCache;
+  }
+
+  public void add(Writable value) {
+    try {
+      bufferDos.write(serialize(value));
+      numOfValues++;
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public void addAll(List<Writable> values) {
+    for (Writable v : values)
+      add(v);
+  }
+
+  /**
+   * @return the number of values
+   */
+  public int size() {
+    return this.numOfValues;
   }
 
   public IntWritable getVerticesSize() {
-    return verticesSize;
+    return integerMessage;
   }
 
   public boolean isMapMessage() {
@@ -184,14 +258,14 @@ public final class GraphJobMessage imple
   @Override
   public String toString() {
     if (isVertexMessage()) {
-      return "ID: " + vertexId + " Val: " + vertexValue;
+      return "ID: " + vertexId + " Val: " + numOfValues;
     } else if (isMapMessage()) {
       return "Map: " + map;
     } else if (isVerticesSizeMessage()) {
-      return "#Vertices: " + verticesSize;
+      return "#Vertices: " + integerMessage;
     } else {
       return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
-          + vertexId + ", vertexValue=" + vertexValue + "]";
+          + vertexId + ", vertexValue=" + numOfValues + "]";
     }
   }
 
@@ -233,4 +307,5 @@ public final class GraphJobMessage imple
       return compare(key1, key2); // compare them
     }
   }
+
 }

Modified: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java 
Tue Feb  3 00:28:07 2015
@@ -19,7 +19,12 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,12 +44,11 @@ import org.apache.hama.bsp.PartitioningR
 import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
-import org.apache.hama.graph.IDSkippingIterator.Strategy;
 import org.apache.hama.util.ReflectionUtils;
 
 /**
  * Fully generic graph job runner.
- *
+ * 
  * @param <V> the id type of a vertex.
  * @param <E> the value type of an edge.
  * @param <M> the value type of a vertex.
@@ -89,6 +93,7 @@ public final class GraphJobRunner<V exte
   public static Class<Vertex<?, ?, ?>> vertexClass;
 
   private VerticesInfo<V, E, M> vertices;
+
   private boolean updated = true;
   private int globalUpdateCounts = 0;
   private int changedVertexCnt = 0;
@@ -134,7 +139,7 @@ public final class GraphJobRunner<V exte
       // note that the messages must be parsed here
       GraphJobMessage firstVertexMessage = parseMessages(peer);
       // master/slaves needs to update
-      firstVertexMessage = doAggregationUpdates(firstVertexMessage, peer);
+      doAggregationUpdates(peer);
       // check if updated changed by our aggregators
       if (!updated) {
         break;
@@ -153,6 +158,7 @@ public final class GraphJobRunner<V exte
   /**
    * Just write <ID as Writable, Value as Writable> pair as a result. Note that
    * this will also be executed when failure happened.
+   * 
    * @param peer
    * @throws java.io.IOException
    */
@@ -161,11 +167,11 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     vertexOutputWriter.setup(conf);
-    IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
-    while (skippingIterator.hasNext()) {
-      vertexOutputWriter.write(skippingIterator.next(), peer);
+    Iterator<Vertex<V, E, M>> iterator = vertices.iterator();
+    while (iterator.hasNext()) {
+      vertexOutputWriter.write(iterator.next(), peer);
     }
-    vertices.cleanup(conf, peer.getTaskId());
+    vertices.clear();
   }
 
   /**
@@ -173,8 +179,7 @@ public final class GraphJobRunner<V exte
    * master aggregation. In case of no aggregators defined, we save a sync by
    * reading multiple typed messages.
    */
-  private GraphJobMessage doAggregationUpdates(
-      GraphJobMessage firstVertexMessage,
+  private void doAggregationUpdates(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
@@ -197,114 +202,92 @@ public final class GraphJobRunner<V exte
         peer.send(peerName, new GraphJobMessage(updatedCnt));
       }
     }
+
     if (getAggregationRunner().isEnabled()) {
-      // in case we need to sync, we need to replay the messages that already
-      // are added to the queue. This prevents loosing messages when using
-      // aggregators.
-      if (firstVertexMessage != null) {
-        peer.send(peer.getPeerName(), firstVertexMessage);
-      }
-      GraphJobMessage msg;
-      while ((msg = peer.getCurrentMessage()) != null) {
-        peer.send(peer.getPeerName(), msg);
-      }
-      // now sync
       peer.sync();
       // now the map message must be read that might be send from the master
       updated = getAggregationRunner().receiveAggregatedValues(
           peer.getCurrentMessage().getMap(), iteration);
-      // set the first vertex message back to the message it had before sync
-      firstVertexMessage = peer.getCurrentMessage();
     }
-    return firstVertexMessage;
   }
 
+  private Set<V> notComputedVertices;
+
   /**
    * Do the main logic of a superstep, namely checking if vertices are active,
-   * feeding compute with messages and controlling combiners/aggregators.
-   * We iterate over our messages and vertices in sorted order. That means
-   * that we need to seek the first vertex that has the same ID as the
-   * iterated message.
+   * feeding compute with messages and controlling combiners/aggregators. We
+   * iterate over our messages and vertices in sorted order. That means that we
+   * need to seek the first vertex that has the same ID as the iterated 
message.
    */
+  @SuppressWarnings("unchecked")
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
     this.changedVertexCnt = 0;
-    this.vertices.startSuperstep();
-
-    IDSkippingIterator<V, E, M> iterator = this.vertices.skippingIterator();
-    VertexMessages<V, M> queueMessages = new VertexMessages<V, M>(peer);
-    queueMessages.prependMessage(currentMessage);
-
-    // note that can't skip inactive vertices because we have to rewrite the
-    // complete vertex file in each iteration
-    V firstVID = currentMessage == null ? null : (V) 
currentMessage.getVertexId();
-    while (iterator.hasNext(firstVID, Strategy.ALL)) {
-      Vertex<V, E, M> vertex = iterator.next();
-      boolean msgsExist = queueMessages.continueWith(vertex.getVertexID());
-
-      if (!msgsExist) checkMsgOrder(vertex.getVertexID(), queueMessages);
-
-      if (msgsExist && vertex.isHalted()) {
+    vertices.startSuperstep();
+    
+    notComputedVertices = new HashSet();
+    notComputedVertices.addAll(vertices.keySet());
+
+    List<M> msgs = null;
+    Vertex<V, E, M> vertex = null;
+
+    while (currentMessage != null) {
+      vertex = vertices.get((V) currentMessage.getVertexId());
+      
+      msgs = (List<M>) currentMessage.getVertexValue();
+      if (vertex.isHalted()) {
         vertex.setActive();
       }
 
       if (!vertex.isHalted()) {
-        vertex.compute(queueMessages);
+        vertex.compute(msgs);
+        notComputedVertices.remove(vertex.getVertexID());
         activeVertices++;
       }
 
-      // Dump remaining messages
-      queueMessages.dumpRest();
+      currentMessage = peer.getCurrentMessage();
+      vertices.finishVertexComputation(vertex);
+    }
 
-      // note that we even need to rewrite the vertex if it is halted for
-      // consistency reasons
-      this.vertices.finishVertexComputation(vertex);
+    for (V v : notComputedVertices) {
+      vertex = vertices.get(v);
+      if (!vertex.isHalted()) {
+        vertex.compute(Collections.<M> emptyList());
+        vertices.finishVertexComputation(vertex);
+        activeVertices++;
+      }
     }
-    this.vertices.finishSuperstep();
 
+    vertices.finishSuperstep();
     getAggregationRunner().sendAggregatorValues(peer, activeVertices,
         this.changedVertexCnt);
     this.iteration++;
   }
 
   /**
-   * Utility that ensures that the incoming messages have a target vertex.
-   */
-  private void checkMsgOrder(V vid, VertexMessages<V, M> vm) {
-    // When the vid is greater than the current message, it means that a vertex
-    // has sent a message to an other vertex that doesn't exist
-    if (vm.getMessageVID() != null && vm.getMessageVID().compareTo(vid) < 0) {
-      if (conf.getBoolean("hama.check.missing.vertex", true)) {
-        throw new IllegalArgumentException(
-                "A message has recieved with a destination ID: " + 
vm.getMessageVID()
-                + " that does not exist! (Vertex iterator is at" + vid + " 
ID)");
-      } else {
-        // Skip all unrecognized messages until we find a match
-        vm.continueUntil(vid);
-      }
-    }
-  }
-
-  /**
    * Seed the vertices first with their own values in compute. This is the 
first
    * superstep after the vertices have been loaded.
    */
   private void doInitialSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    vertices.startSuperstep();
     this.changedVertexCnt = 0;
-    IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
-    while (skippingIterator.hasNext()) {
-      Vertex<V, E, M> vertex = skippingIterator.next();
+    vertices.startSuperstep();
+    
+    Iterator<Vertex<V, E, M>> iterator = vertices.iterator();
+
+    while (iterator.hasNext()) {
+      Vertex<V, E, M> vertex = iterator.next();
 
       // Calls setup method.
       vertex.setup(conf);
+      
       vertex.compute(Collections.singleton(vertex.getValue()));
       vertices.finishVertexComputation(vertex);
     }
+    
     vertices.finishSuperstep();
     getAggregationRunner().sendAggregatorValues(peer, 1, 
this.changedVertexCnt);
     iteration++;
@@ -335,7 +318,7 @@ public final class GraphJobRunner<V exte
     getAggregationRunner().setupAggregators(peer);
 
     Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<? 
extends VerticesInfo<V, E, M>>) conf
-        .getClass("hama.graph.vertices.info", ListVerticesInfo.class,
+        .getClass("hama.graph.vertices.info", MapVerticesInfo.class,
             VerticesInfo.class);
     vertices = ReflectionUtils.newInstance(verticesInfoClass);
     vertices.init(this, conf, peer.getTaskId());
@@ -396,14 +379,15 @@ public final class GraphJobRunner<V exte
           }
         } else {
           if (vertex.compareTo(currentVertex) > 0) {
-            throw new IOException("The records of split aren't in order by 
vertex ID.");
+            throw new IOException(
+                "The records of split aren't in order by vertex ID.");
           }
 
           if (selfReference) {
             vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
           }
 
-          vertices.addVertex(vertex);
+          addVertex(vertex);
           vertex = currentVertex;
         }
       }
@@ -412,11 +396,7 @@ public final class GraphJobRunner<V exte
     if (selfReference) {
       vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
     }
-    vertices.addVertex(vertex);
-
-    vertices.finishAdditions();
-    // finish the "superstep" because we have written a new file here
-    vertices.finishSuperstep();
+    addVertex(vertex);
 
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
@@ -425,56 +405,32 @@ public final class GraphJobRunner<V exte
 
   /**
    * Add new vertex into memory of each peer.
-   *
+   * 
    * @throws IOException
    */
   private void addVertex(Vertex<V, E, M> vertex) throws IOException {
-    vertex.setRunner(this);
-    vertex.setup(conf);
-
     if (conf.getBoolean("hama.graph.self.ref", false)) {
       vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
     }
-
+    vertices.put(vertex);
+    
     LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer "
         + peer.getPeerName());
-    vertices.addVertex(vertex);
   }
 
   /**
    * Remove vertex from this peer.
-   *
+   * 
    * @throws IOException
    */
   private void removeVertex(V vertexID) {
-    vertices.removeVertex(vertexID);
+    vertices.remove(vertexID);
+
     LOG.debug("Removed VertexID: " + vertexID + " in peer "
         + peer.getPeerName());
   }
 
   /**
-   * After all inserts are done, we must finalize the VertexInfo data 
structure.
-   *
-   * @throws IOException
-   */
-  private void finishAdditions() throws IOException {
-    vertices.finishAdditions();
-    // finish the "superstep" because we have written a new file here
-    vertices.finishSuperstep();
-  }
-
-  /**
-   * After all inserts are done, we must finalize the VertexInfo data 
structure.
-   *
-   * @throws IOException
-   */
-  private void finishRemovals() throws IOException {
-    vertices.finishRemovals();
-    // finish the "superstep" because we have written a new file here
-    vertices.finishSuperstep();
-  }
-
-  /**
    * Counts vertices globally by sending the count of vertices in the map to 
the
    * other peers.
    */
@@ -502,7 +458,7 @@ public final class GraphJobRunner<V exte
   /**
    * Parses the messages in every superstep and does actions according to flags
    * in the messages.
-   *
+   * 
    * @return the first vertex message, null if none received.
    */
   @SuppressWarnings("unchecked")
@@ -512,7 +468,7 @@ public final class GraphJobRunner<V exte
     GraphJobMessage msg = null;
     boolean dynamicAdditions = false;
     boolean dynamicRemovals = false;
-
+    
     while ((msg = peer.getCurrentMessage()) != null) {
       // either this is a vertex message or a directive that must be read
       // as map
@@ -563,7 +519,7 @@ public final class GraphJobRunner<V exte
       }
 
     }
-
+    
     // If we applied any changes to vertices, we need to call finishAdditions
     // and finishRemovals in the end.
     if (dynamicAdditions) {
@@ -576,6 +532,18 @@ public final class GraphJobRunner<V exte
     return msg;
   }
 
+  private void finishRemovals() throws IOException {
+    vertices.finishRemovals();
+    // finish the "superstep" because we have written a new file here
+    vertices.finishSuperstep();
+  }
+
+  private void finishAdditions() throws IOException {
+    vertices.finishAdditions();
+    // finish the "superstep" because we have written a new file here
+    vertices.finishSuperstep();
+  }
+
   /**
    * @return the number of vertices, globally accumulated.
    */
@@ -607,7 +575,7 @@ public final class GraphJobRunner<V exte
   /**
    * Gets the last aggregated value at the given index. The index is dependend
    * on how the aggregators were configured during job setup phase.
-   *
+   * 
    * @return the value of the aggregator, or null if none was defined.
    */
   public final Writable getLastAggregatedValue(int index) {
@@ -617,7 +585,7 @@ public final class GraphJobRunner<V exte
   /**
    * Gets the last aggregated number of vertices at the given index. The index
    * is dependend on how the aggregators were configured during job setup 
phase.
-   *
+   * 
    * @return the value of the aggregator, or null if none was defined.
    */
   public final IntWritable getNumLastAggregatedVertices(int index) {

Added: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1656609&view=auto
==============================================================================
--- 
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
 (added)
+++ 
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
 Tue Feb  3 00:28:07 2015
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.SynchronizedQueue;
+
+public class IncomingVertexMessageManager<M extends WritableComparable<M>>
+    implements SynchronizedQueue<GraphJobMessage> {
+
+  private Configuration conf;
+
+  private final MessagePerVertex msgPerVertex = new MessagePerVertex();
+  private final ConcurrentLinkedQueue<GraphJobMessage> mapMessages = new 
ConcurrentLinkedQueue<GraphJobMessage>();
+
+  @Override
+  public Iterator<GraphJobMessage> iterator() {
+    return msgPerVertex.iterator();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void addAll(Iterable<GraphJobMessage> col) {
+    for (GraphJobMessage m : col)
+      add(m);
+  }
+
+  @Override
+  public void addAll(MessageQueue<GraphJobMessage> otherqueue) {
+    GraphJobMessage poll = null;
+    while ((poll = otherqueue.poll()) != null) {
+      add(poll);
+    }
+  }
+
+  @Override
+  public void add(GraphJobMessage item) {
+    if (item.isVertexMessage()) {
+      msgPerVertex.add(item.getVertexId(), item.getVertexValue());
+    } else if (item.isMapMessage() || item.isVerticesSizeMessage()) {
+      mapMessages.add(item);
+    }
+  }
+
+  @Override
+  public void clear() {
+    msgPerVertex.clear();
+  }
+
+  @Override
+  public GraphJobMessage poll() {
+    if (mapMessages.size() > 0) {
+      return mapMessages.poll();
+    } else {
+      return msgPerVertex.pollFirstEntry();
+    }
+  }
+
+  @Override
+  public int size() {
+    return msgPerVertex.size();
+  }
+
+  // empty, not needed to implement
+
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+
+  }
+
+  @Override
+  public void close() {
+    this.clear();
+  }
+
+  @Override
+  public void prepareRead() {
+
+  }
+
+  @Override
+  public void prepareWrite() {
+
+  }
+
+  @Override
+  public boolean isMessageSerialized() {
+    return false;
+  }
+
+  @Override
+  public boolean isMemoryBasedQueue() {
+    return true;
+  }
+
+  @Override
+  public MessageQueue<GraphJobMessage> getMessageQueue() {
+    return this;
+  }
+
+}

Modified: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java 
Tue Feb  3 00:28:07 2015
@@ -18,15 +18,15 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.util.KryoSerializer;
 
 /**
  * Stores the vertices into a memory-based tree map. This implementation allows
@@ -41,9 +41,8 @@ import org.apache.hama.util.KryoSerializ
 public final class MapVerticesInfo<V extends WritableComparable<V>, E extends 
Writable, M extends Writable>
     implements VerticesInfo<V, E, M> {
   private GraphJobRunner<V, E, M> runner;
-  Vertex<V, E, M> v;
 
-  private final SortedMap<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+  private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, 
M>>();
 
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
@@ -52,98 +51,67 @@ public final class MapVerticesInfo<V ext
   }
 
   @Override
-  public void addVertex(Vertex<V, E, M> vertex) throws IOException {
-    if (verticesMap.containsKey(vertex.getVertexID())) {
-      throw new UnsupportedOperationException("Vertex with ID: "
-          + vertex.getVertexID() + " already exists!");
-    } else {
-      verticesMap.put(vertex.getVertexID(), serialize(vertex));
-    }
+  public void put(Vertex<V, E, M> vertex) throws IOException {
+    vertices.put(vertex.getVertexID(), vertex);
   }
 
   @Override
-  public void removeVertex(V vertexID) throws UnsupportedOperationException {
-    if (verticesMap.containsKey(vertexID)) {
-      verticesMap.remove(vertexID);
-    } else {
-      throw new UnsupportedOperationException("Vertex with ID: " + vertexID
-          + " not found on this peer.");
-    }
+  public void remove(V vertexID) throws UnsupportedOperationException {
+    vertices.remove(vertexID);
   }
 
   public void clear() {
-    verticesMap.clear();
+    vertices.clear();
   }
 
   @Override
   public int size() {
-    return this.verticesMap.size();
+    return vertices.size();
   }
 
   @Override
-  public IDSkippingIterator<V, E, M> skippingIterator() {
-    return new IDSkippingIterator<V, E, M>() {
-      Iterator<V> it = verticesMap.keySet().iterator();
+  public Vertex<V, E, M> get(V vertexID) {
+    Vertex<V, E, M> vertex = vertices.get(vertexID);
+    vertex.setRunner(runner);
+    return vertex;
+  }
+
+  @Override
+  public Iterator<Vertex<V, E, M>> iterator() {
+
+    final Iterator<Vertex<V, E, M>> vertexIterator = 
vertices.values().iterator();
+    
+    return new Iterator<Vertex<V, E, M>>() {
 
       @Override
-      public boolean hasNext(V msgId,
-          org.apache.hama.graph.IDSkippingIterator.Strategy strat)
-          throws IOException {
-
-        if (it.hasNext()) {
-          V vertexID = it.next();
-          v = deserialize(vertexID, verticesMap.get(vertexID));
-
-          while (!strat.accept(v, msgId)) {
-            if (it.hasNext()) {
-              vertexID = it.next();
-              v = deserialize(vertexID, verticesMap.get(vertexID));
-            } else {
-              return false;
-            }
-          }
-
-          return true;
-        } else {
-          v = null;
-          return false;
-        }
+      public boolean hasNext() {
+        return vertexIterator.hasNext();
       }
 
       @Override
       public Vertex<V, E, M> next() {
-        if (v == null) {
-          throw new UnsupportedOperationException(
-              "You must invoke hasNext before ask for the next vertex.");
-        }
-
-        Vertex<V, E, M> tmp = v;
-        v = null;
-        return tmp;
+        Vertex<V, E, M> vertex = vertexIterator.next();
+        vertex.setRunner(runner);
+        return vertex;
       }
 
-    };
-  }
+      @Override
+      public void remove() {
+        // TODO Auto-generated method stub
+      }
 
-  private final KryoSerializer kryo = new 
KryoSerializer(GraphJobRunner.VERTEX_CLASS);
-  
-  public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
-    return kryo.serialize(vertex);
+    };
   }
 
-  @SuppressWarnings("unchecked")
-  public Vertex<V, E, M> deserialize(V vertexID, byte[] serialized)
-      throws IOException {
-    v = (Vertex<V, E, M>) kryo.deserialize(serialized);
-    v.setRunner(runner);
-    v.setVertexID(vertexID);
-    return v;
+  @Override
+  public Set<V> keySet() {
+    return vertices.keySet();
   }
 
   @Override
   public void finishVertexComputation(Vertex<V, E, M> vertex)
       throws IOException {
-    verticesMap.put(vertex.getVertexID(), serialize(vertex));
+    vertices.put(vertex.getVertexID(), vertex);
   }
 
   @Override
@@ -155,17 +123,11 @@ public final class MapVerticesInfo<V ext
   }
 
   @Override
-  public void finishSuperstep() {
+  public void startSuperstep() throws IOException {
   }
 
   @Override
-  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
-      throws IOException {
-
+  public void finishSuperstep() throws IOException {
   }
 
-  @Override
-  public void startSuperstep() throws IOException {
-
-  }
 }

Added: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1656609&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java 
(added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java 
Tue Feb  3 00:28:07 2015
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+public class MessagePerVertex {
+
+  @SuppressWarnings("rawtypes")
+  private final ConcurrentNavigableMap<WritableComparable, GraphJobMessage> 
storage = new ConcurrentSkipListMap<WritableComparable, GraphJobMessage>();
+
+  public int size() {
+    return storage.size();
+  }
+
+  public void clear() {
+    storage.clear();
+  }
+
+  @SuppressWarnings("rawtypes")
+  public void put(WritableComparable vertexId, GraphJobMessage 
graphJobMessage) {
+    storage.put(vertexId, graphJobMessage);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public void add(WritableComparable vertexID, List<Writable> values) {
+    if (storage.containsKey(vertexID)) {
+      storage.get(vertexID).addAll(values);
+    } else {
+      put(vertexID, new GraphJobMessage(vertexID, values));
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public boolean containsKey(WritableComparable vertexID) {
+    return storage.containsKey(vertexID);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public GraphJobMessage get(WritableComparable vertexID) {
+    return storage.get(vertexID);
+  }
+
+  public Iterator<GraphJobMessage> iterator() {
+    return storage.values().iterator();
+  }
+
+  public GraphJobMessage pollFirstEntry() {
+    return (storage.size() > 0) ? storage.pollFirstEntry().getValue() : null;
+  }
+
+}

Modified: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java 
(original)
+++ 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java 
Tue Feb  3 00:28:07 2015
@@ -31,6 +31,7 @@ import org.apache.hama.util.ReflectionUt
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 /**
@@ -65,7 +66,7 @@ public class OffHeapVerticesInfo<V exten
             ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER,
                 KryoSerializer.class, Serializer.class)))
         .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
-    if (conf.getBoolean(DM_SORTED, true)) {
+    if (conf.getBoolean(DM_SORTED, false)) {
       dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E, M>>>());
     } else {
       dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
@@ -77,85 +78,73 @@ public class OffHeapVerticesInfo<V exten
   }
 
   @Override
-  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
-      throws IOException {
-    vertices.dump();
-  }
-
-  @Override
-  public void addVertex(Vertex<V, E, M> vertex) {
+  public void put(Vertex<V, E, M> vertex) {
     vertices.put(vertex.getVertexID(), vertex);
   }
 
-  @Override
-  public void finishAdditions() {
+  public void clear() {
+    vertices.clear();
   }
 
   @Override
-  public void startSuperstep() throws IOException {
+  public int size() {
+    return (int) this.vertices.entries();
   }
 
   @Override
-  public void finishSuperstep() throws IOException {
+  public void remove(V vertexID) {
+    vertices.free(vertexID);
   }
 
   @Override
-  public void finishVertexComputation(Vertex<V, E, M> vertex)
-      throws IOException {
-    vertices.put(vertex.getVertexID(), vertex);
-  }
-
-  public void clear() {
-    vertices.clear();
-  }
+  public Vertex<V, E, M> get(V vertexID) {
+    Vertex<V, E, M> vertex = vertices.retrieve(vertexID);
+    vertex.setRunner(runner);
 
-  @Override
-  public int size() {
-    return (int) this.vertices.entries();
+    return vertex;
   }
 
   @Override
-  public IDSkippingIterator<V, E, M> skippingIterator() {
+  public Iterator<Vertex<V, E, M>> iterator() {
     final Iterator<Vertex<V, E, M>> vertexIterator = new 
CacheValuesIterable<V, Vertex<V, E, M>>(
         vertices, strict).iterator();
 
-    return new IDSkippingIterator<V, E, M>() {
-      int currentIndex = 0;
-
-      Vertex<V, E, M> currentVertex = null;
+    return new Iterator<Vertex<V, E, M>>() {
 
       @Override
-      public boolean hasNext(V e,
-          org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
-        if (currentIndex < vertices.entries()) {
-
-          Vertex<V, E, M> next = vertexIterator.next();
-          while (!strat.accept(next, e)) {
-            currentIndex++;
-          }
-          currentVertex = next;
-          return true;
-        } else {
-          return false;
-        }
+      public boolean hasNext() {
+        return vertexIterator.hasNext();
       }
 
       @Override
       public Vertex<V, E, M> next() {
-        currentIndex++;
-        if (currentVertex != null && currentVertex.getRunner() == null) {
-          currentVertex.setRunner(runner);
-        }
-        return currentVertex;
+        Vertex<V, E, M> vertex = vertexIterator.next();
+        vertex.setRunner(runner);
+
+        return vertex;
+      }
+
+      @Override
+      public void remove() {
+        // TODO Auto-generated method stub
       }
 
     };
+  }
 
+  @Override
+  public Set<V> keySet() {
+    return vertices.getMap().keySet();
   }
 
   @Override
-  public void removeVertex(V vertexID) {
-    vertices.free(vertexID);
+  public void finishVertexComputation(Vertex<V, E, M> vertex)
+      throws IOException {
+    vertices.put(vertex.getVertexID(), vertex);
+  }
+
+  @Override
+  public void finishAdditions() {
   }
 
   @Override
@@ -163,4 +152,12 @@ public class OffHeapVerticesInfo<V exten
     vertices.collectExpired();
   }
 
+  @Override
+  public void startSuperstep() throws IOException {
+  }
+
+  @Override
+  public void finishSuperstep() throws IOException {
+  }
+
 }

Added: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1656609&view=auto
==============================================================================
--- 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
 (added)
+++ 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
 Tue Feb  3 00:28:07 2015
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.BSPNetUtils;
+
+public class OutgoingVertexMessageManager<M extends Writable> implements
+    OutgoingMessageManager<GraphJobMessage> {
+  protected static final Log LOG = LogFactory
+      .getLog(OutgoingVertexMessageManager.class);
+
+  private HamaConfiguration conf;
+  private BSPMessageCompressor<GraphJobMessage> compressor;
+  private Combiner<Writable> combiner;
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new 
HashMap<String, InetSocketAddress>();
+  private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> 
outgoingBundles = new HashMap<InetSocketAddress, 
BSPMessageBundle<GraphJobMessage>>();
+
+  private HashMap<InetSocketAddress, MessagePerVertex> storage = new 
HashMap<InetSocketAddress, MessagePerVertex>();
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init(HamaConfiguration conf,
+      BSPMessageCompressor<GraphJobMessage> compressor) {
+    this.conf = conf;
+    this.compressor = compressor;
+    if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
+        Combiner.class)) {
+      LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
+
+      combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
+          .newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
+              conf);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void addMessage(String peerName, GraphJobMessage msg) {
+    InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
+
+    if (msg.isVertexMessage()) {
+      WritableComparable vertexID = msg.getVertexId();
+
+      if (!storage.containsKey(targetPeerAddress)) {
+        storage.put(targetPeerAddress, new MessagePerVertex());
+      }
+
+      MessagePerVertex msgPerVertex = storage.get(targetPeerAddress);
+      msgPerVertex.add(vertexID, msg.getVertexValue());
+
+      // Combining messages
+      if (combiner != null
+          && msgPerVertex.get(vertexID).getVertexValue().size() > 1) {
+        storage.get(targetPeerAddress).put(
+            vertexID,
+            new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
+                vertexID).getVertexValue())));
+      }
+
+    } else {
+      outgoingBundles.get(targetPeerAddress).addMessage(msg);
+    }
+  }
+
+  private InetSocketAddress getSocketAddress(String peerName) {
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+
+    if (!outgoingBundles.containsKey(targetPeerAddress)) {
+      BSPMessageBundle<GraphJobMessage> bundle = new 
BSPMessageBundle<GraphJobMessage>();
+      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+        bundle.setCompressor(compressor,
+            conf.getLong("hama.messenger.compression.threshold", 128));
+      }
+      outgoingBundles.put(targetPeerAddress, bundle);
+    }
+    return targetPeerAddress;
+  }
+
+  @Override
+  public void clear() {
+    outgoingBundles.clear();
+    storage.clear();
+  }
+
+  @Override
+  public Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>> 
getBundleIterator() {
+    return new Iterator<Entry<InetSocketAddress, 
BSPMessageBundle<GraphJobMessage>>>() {
+      final Iterator<Entry<InetSocketAddress, 
BSPMessageBundle<GraphJobMessage>>> bundles = outgoingBundles
+          .entrySet().iterator();
+
+      @Override
+      public boolean hasNext() {
+        return bundles.hasNext();
+      }
+
+      @Override
+      public Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> 
next() {
+        Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> bundle = 
bundles
+            .next();
+
+        MessagePerVertex msgStorage = storage.get(bundle.getKey());
+        if (msgStorage != null) {
+          Iterator<GraphJobMessage> it = msgStorage.iterator();
+          while (it.hasNext()) {
+            bundle.getValue().addMessage(it.next());
+          }
+        }
+
+        storage.remove(bundle.getKey());
+        return bundle;
+      }
+
+      @Override
+      public void remove() {
+        // TODO Auto-generated method stub
+      }
+
+    };
+  }
+
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Tue 
Feb  3 00:28:07 2015
@@ -18,6 +18,8 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -41,22 +43,43 @@ public interface VerticesInfo<V extends
       TaskAttemptID attempt) throws IOException;
 
   /**
-   * Cleanup of internal structures.
+   * Add a vertex to the underlying structure.
    */
-  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
-      throws IOException;
+  public void put(Vertex<V, E, M> vertex) throws IOException;
 
+  public Vertex<V, E, M> get(V vertexID);
+  
   /**
-   * Add a vertex to the underlying structure.
+   * Remove a vertex to the underlying structure.
    */
-  public void addVertex(Vertex<V, E, M> vertex) throws IOException;
+  public void remove(V vertexID) throws UnsupportedOperationException;
 
   /**
-   * Remove a vertex to the underlying structure.
+   * @return the number of vertices added to the underlying structure.
+   *         Implementations should take care this is a constant time 
operation.
    */
-  public void removeVertex(V vertexID) throws UnsupportedOperationException;
+  public int size();
 
   /**
+   * Must be called once a vertex is guaranteed not to change any more and can
+   * safely be persisted to a secondary storage.
+   */
+  public void finishVertexComputation(Vertex<V, E, M> vertex)
+      throws IOException;
+  
+  /**
+   * @return the iterator.
+   */
+  public Iterator<Vertex<V, E, M>> iterator();
+
+  public void clear();
+
+  /**
+   * @return the set of vertex IDs
+   */
+  public Set<V> keySet();
+  
+  /**
    * Finish the additions, from this point on the implementations should close
    * the adds and throw exceptions in case something is added after this call.
    */
@@ -78,18 +101,4 @@ public interface VerticesInfo<V extends
    */
   public void finishSuperstep() throws IOException;
 
-  /**
-   * Must be called once a vertex is guaranteed not to change any more and can
-   * safely be persisted to a secondary storage.
-   */
-  public void finishVertexComputation(Vertex<V, E, M> vertex)
-      throws IOException;
-
-  /**
-   * @return the number of vertices added to the underlying structure.
-   *         Implementations should take care this is a constant time 
operation.
-   */
-  public int size();
-
-  public IDSkippingIterator<V, E, M> skippingIterator();
 }

Modified: 
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- 
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java 
(original)
+++ 
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java 
Tue Feb  3 00:28:07 2015
@@ -61,8 +61,7 @@ public class TestSubmitGraphJob extends
   @Before
   public void setUp() throws Exception {
     super.setUp();
-    vi.add(ListVerticesInfo.class);
-    vi.add(DiskVerticesInfo.class);
+    vi.add(MapVerticesInfo.class);
     vi.add(OffHeapVerticesInfo.class);
   }
 
@@ -120,7 +119,7 @@ public class TestSubmitGraphJob extends
   @SuppressWarnings("rawtypes")
   protected void injectVerticesInfo() {
     Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math
-        .abs(new Random().nextInt() % 3));
+        .abs(new Random().nextInt() % 2));
     LOG.info("using vertices info of type : " + verticesInfoClass.getName());
   }
 


Reply via email to