Repository: giraph
Updated Branches:
  refs/heads/trunk 4f3551dfd -> 6f5a457fa


http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
index b8a2dd5..cdafa3f 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
@@ -26,25 +26,26 @@ import com.google.common.hash.Hashing;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.edge.EdgeStore;
-import org.apache.giraph.edge.EdgeStoreFactory;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.utils.VertexIdEdges;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.VertexIterator;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.log4j.Logger;
 
 import java.io.BufferedInputStream;
@@ -53,6 +54,7 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -73,6 +75,10 @@ import static 
org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
 import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
 import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * Disk-backed PartitionStore. An instance of this class can be coupled with an
  * out-of-core engine. Out-of-core engine is responsible to determine when to
@@ -142,13 +148,8 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
    * time (a read lock used for spilling), and cannot be overlapped with
    * change of data structure holding the data.
    */
-  private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+  private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
-  /** Giraph configuration */
-  private final
-  ImmutableClassesGiraphConfiguration<I, V, E> conf;
-  /** Mapper context */
-  private final Context context;
   /** Base path where the partition files are written to */
   private final String[] basePaths;
   /** Used to hash partition Ids */
@@ -157,13 +158,9 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   private final AtomicInteger maxPartitionsInMem = new AtomicInteger(-1);
   /** Number of slots used */
   private final AtomicInteger numPartitionsInMem = new AtomicInteger(0);
-  /** service worker reference */
-  private CentralizedServiceWorker<I, V, E> serviceWorker;
 
   /** Out-of-core engine */
   private final OutOfCoreEngine oocEngine;
-  /** Edge store for this worker */
-  private final EdgeStore<I, V, E> edgeStore;
   /** If moving of edges to vertices in INPUT_SUPERSTEP has been started */
   private volatile boolean movingEdges;
   /** Whether the partition store is initialized */
@@ -187,7 +184,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   private final ConcurrentMap<Integer, Integer> numPendingInputVerticesOnDisk =
       Maps.newConcurrentMap();
   /** Lock to avoid overlap of addition and removal on pendingInputVertices */
-  private ReadWriteLock vertexBufferRWLock = new ReentrantReadWriteLock();
+  private final ReadWriteLock vertexBufferRWLock = new 
ReentrantReadWriteLock();
 
   /**
    * Similar to vertex buffer, but used for input edges (see comments for
@@ -199,7 +196,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   private final ConcurrentMap<Integer, Integer> numPendingInputEdgesOnDisk =
       Maps.newConcurrentMap();
   /** Lock to avoid overlap of addition and removal on pendingInputEdges */
-  private ReadWriteLock edgeBufferRWLock = new ReentrantReadWriteLock();
+  private final ReadWriteLock edgeBufferRWLock = new ReentrantReadWriteLock();
 
   /**
    * For each out-of-core partitions, whether its edge store is also
@@ -209,6 +206,48 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       Maps.newConcurrentMap();
 
   /**
+   * Type of VertexIdMessage class (container for serialized messages) received
+   * for a particular message. If we write the received messages to disk before
+   * adding them to message store, we need this type when we want to read the
+   * messages back from disk (so that we know how to read the messages from
+   * disk).
+   */
+  private enum SerializedMessageClass {
+    /** ByteArrayVertexIdMessages */
+    BYTE_ARRAY_VERTEX_ID_MESSAGES,
+    /** ByteArrayOneMEssageToManyIds */
+    BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
+  }
+
+  /**
+   * Similar to vertex buffer and edge buffer, but used for messages (see
+   * comments for pendingInputVertices).
+   */
+  private volatile ConcurrentMap<Integer,
+      Pair<Integer, List<VertexIdMessages<I, Writable>>>>
+      pendingIncomingMessages = Maps.newConcurrentMap();
+  /** Whether a partition has any incoming message buffer on disk */
+  private volatile ConcurrentMap<Integer, Boolean> incomingMessagesOnDisk =
+      Maps.newConcurrentMap();
+
+  /**
+   * Similar to pendingIncomingMessages, but is used for messages for current
+   * superstep instead.
+   */
+  private volatile ConcurrentMap<Integer,
+      Pair<Integer, List<VertexIdMessages<I, Writable>>>>
+      pendingCurrentMessages = Maps.newConcurrentMap();
+  /** Similar to incomingMessagesOnDisk for messages for current superstep */
+  private volatile ConcurrentMap<Integer, Boolean> currentMessagesOnDisk =
+      Maps.newConcurrentMap();
+
+  /**
+   * Lock to avoid overlap of addition and removal of pending message buffers
+   */
+  private final ReadWriteLock messageBufferRWLock =
+      new ReentrantReadWriteLock();
+
+  /**
    * Constructor
    *
    * @param conf Configuration
@@ -219,9 +258,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context,
       CentralizedServiceWorker<I, V, E> serviceWorker) {
-    this.conf = conf;
-    this.context = context;
-    this.serviceWorker = serviceWorker;
+    super(conf, context, serviceWorker);
     this.minBuffSize = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
     int userMaxNumPartitions = MAX_PARTITIONS_IN_MEMORY.get(conf);
     if (userMaxNumPartitions > 0) {
@@ -233,9 +270,6 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       this.oocEngine =
           new AdaptiveOutOfCoreEngine<I, V, E>(conf, serviceWorker);
     }
-    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
-    edgeStoreFactory.initialize(serviceWorker, conf, context);
-    this.edgeStore = edgeStoreFactory.newStore();
     this.movingEdges = false;
     this.isInitialized = new AtomicBoolean(false);
 
@@ -425,10 +459,9 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
           partitionId + " to disk");
     }
     MetaPartition swapOutPartition = partitions.get(partitionId);
-    if (swapOutPartition == null) {
-      throw new IllegalStateException("swapOnePartitionToDisk: the partition " 
+
-          "is not found to spill to disk (impossible)");
-    }
+    checkNotNull(swapOutPartition,
+        "swapOnePartitionToDisk: the partition is not found to spill to disk " 
+
+            "(impossible)");
 
     // Since the partition is popped from the maps, it is not going to be
     // processed (or change its process state) until spilling of the partition
@@ -523,10 +556,8 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
     }
 
     MetaPartition meta = partitions.get(partitionId);
-    if (meta == null) {
-      throw new IllegalStateException("getNextPartition: partition " +
-          partitionId + " does not exist (impossible)");
-    }
+    checkNotNull(meta, "getNextPartition: partition " + partitionId +
+        " does not exist (impossible)");
 
     // The only time we iterate through all partitions in INPUT_SUPERSTEP is
     // when we want to move
@@ -582,6 +613,18 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       while (!partitionInMemory) {
         switch (meta.getState()) {
         case INACTIVE:
+          // Check if the message store for the current superstep is in memory,
+          // and if not, load it from the disk.
+          Boolean messagesOnDisk = currentMessagesOnDisk.get(partitionId);
+          if (messagesOnDisk != null && messagesOnDisk) {
+            try {
+              loadMessages(partitionId);
+            } catch (IOException e) {
+              throw new IllegalStateException("getPartition: failed while " +
+                  "loading messages of current superstep for partition " +
+                  partitionId);
+            }
+          }
           meta.setState(State.ACTIVE);
           partitionInMemory = true;
           break;
@@ -617,7 +660,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
                 LOG.info("getPartition: start reading partition " +
                     partitionId + " from disk");
               }
-              partition = loadPartition(partitionId, meta.getVertexCount());
+              partition = loadPartition(meta);
               if (LOG.isInfoEnabled()) {
                 LOG.info("getPartition: done reading partition " +
                     partitionId + " from disk");
@@ -639,6 +682,221 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
     }
   }
 
+  @Override
+  public void prepareSuperstep() {
+    rwLock.writeLock().lock();
+    super.prepareSuperstep();
+    pendingCurrentMessages = pendingIncomingMessages;
+    currentMessagesOnDisk = incomingMessagesOnDisk;
+    pendingIncomingMessages = Maps.newConcurrentMap();
+    incomingMessagesOnDisk = Maps.newConcurrentMap();
+    rwLock.writeLock().unlock();
+  }
+
+  /**
+   * Spill message buffers (either buffers for messages for current superstep,
+   * or buffers for incoming messages) of a given partition to disk. Note that
+   * the partition should be ON_DISK or IN_TRANSIT.
+   *
+   * @param partitionId Id of the partition to spill its message buffers
+   * @throws IOException
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
+  public void spillPartitionMessages(Integer partitionId) throws IOException {
+    rwLock.readLock().lock();
+    spillMessages(partitionId, pendingCurrentMessages,
+        serviceWorker.getSuperstep());
+    spillMessages(partitionId, pendingIncomingMessages,
+        serviceWorker.getSuperstep() + 1);
+    rwLock.readLock().unlock();
+  }
+
+  /**
+   * Spill message buffers of a particular type of message (current or incoming
+   * buffer) for a partition to disk.
+   *
+   * @param partitionId Id of the partition to spill the messages for
+   * @param pendingMessages The map to get the message buffers from
+   * @param superstep Superstep of which we want to offload messages. This is
+   *                  equal to current superstep number if we want to offload
+   *                  buffers for currentMessageStore, and is equal to next
+   *                  superstep number if we want to offload buffer for
+   *                  incomingMessageStore
+   * @throws IOException
+   */
+  private void spillMessages(Integer partitionId,
+      ConcurrentMap<Integer, Pair<Integer, List<VertexIdMessages<I, 
Writable>>>>
+          pendingMessages, long superstep) throws IOException {
+    Pair<Integer, List<VertexIdMessages<I, Writable>>> entry;
+    messageBufferRWLock.writeLock().lock();
+    entry = pendingMessages.remove(partitionId);
+    if (entry != null && entry.getLeft() < minBuffSize) {
+      pendingMessages.put(partitionId, entry);
+      entry = null;
+    }
+    messageBufferRWLock.writeLock().unlock();
+
+    if (entry == null) {
+      return;
+    }
+
+    // Sanity check
+    checkState(!entry.getRight().isEmpty(),
+        "spillMessages: the message buffer that is supposed to be flushed to " 
+
+            "disk does not exist.");
+
+    File file = new File(getPendingMessagesBufferPath(partitionId, superstep));
+
+    FileOutputStream fos = new FileOutputStream(file, true);
+    BufferedOutputStream bos = new BufferedOutputStream(fos);
+    DataOutputStream dos = new DataOutputStream(bos);
+    for (VertexIdMessages<I, Writable> messages : entry.getRight()) {
+      SerializedMessageClass messageClass;
+      if (messages instanceof ByteArrayVertexIdMessages) {
+        messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
+      } else if (messages instanceof ByteArrayOneMessageToManyIds) {
+        messageClass =
+            SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
+      } else {
+        throw new IllegalStateException("spillMessages: serialized message " +
+            "type is not supported");
+      }
+      dos.writeInt(messageClass.ordinal());
+      messages.write(dos);
+    }
+    dos.close();
+  }
+
+  /**
+   * Looks through all partitions already on disk, and see if any of them has
+   * enough pending message in its buffer in memory. This can be message buffer
+   * of current superstep, or incoming superstep. If so, put that partition
+   * along with an approximate amount of memory it took (in bytes) in a list to
+   * return.
+
+   * @return List of pairs (partitionId, sizeInByte) of the partitions where
+   *         their pending messages are worth flushing to disk
+   */
+  public PairList<Integer, Integer> getOocPartitionIdsWithPendingMessages() {
+    PairList<Integer, Integer> pairList = new PairList<>();
+    pairList.initialize();
+    Set<Integer> partitionIds = Sets.newHashSet();
+    // First, iterating over pending incoming messages
+    if (pendingIncomingMessages != null) {
+      for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
+           entry : pendingIncomingMessages.entrySet()) {
+        if (entry.getValue().getLeft() > minBuffSize) {
+          pairList.add(entry.getKey(), entry.getValue().getLeft());
+          partitionIds.add(entry.getKey());
+        }
+      }
+    }
+    // Second, iterating over pending current messages (i.e. pending incoming
+    // messages of previous superstep)
+    if (pendingCurrentMessages != null) {
+      for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
+           entry : pendingCurrentMessages.entrySet()) {
+        if (entry.getValue().getLeft() > minBuffSize &&
+            !partitionIds.contains(entry.getKey())) {
+          pairList.add(entry.getKey(), entry.getValue().getLeft());
+        }
+      }
+    }
+    return pairList;
+  }
+
+  @Override
+  public <M extends Writable> void addPartitionCurrentMessages(
+      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    // Current messages are only added to the store in the event of partition
+    // migration. Presumably the partition has just migrated and its data is
+    // still available in memory. Note that partition migration only happens at
+    // the beginning of a superstep.
+    ((MessageStore<I, M>) currentMessageStore)
+        .addPartitionMessages(partitionId, messages);
+  }
+
+  @Override
+  public <M extends Writable> void addPartitionIncomingMessages(
+      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    if (conf.getIncomingMessageClasses().useMessageCombiner()) {
+      ((MessageStore<I, M>) incomingMessageStore)
+          .addPartitionMessages(partitionId, messages);
+    } else {
+      MetaPartition meta = partitions.get(partitionId);
+      checkNotNull(meta, "addPartitionIncomingMessages: trying to add " +
+          "messages to partition " + partitionId + " which does not exist " +
+          "in the partition set of this worker!");
+
+      synchronized (meta) {
+        switch (meta.getState()) {
+        case INACTIVE:
+        case ACTIVE:
+          // A partition might be in memory, but its message store might still
+          // be on disk. This happens because while we are loading the 
partition
+          // to memory, we only load its current messages, not the incoming
+          // messages. If a new superstep has been started, while the partition
+          // is still in memory, the incoming message store in the previous
+          // superstep (which is the current messages in the current superstep)
+          // is on disk.
+          // This may also happen when a partition is offloaded to disk while
+          // it was unprocessed, and then again loaded in the same superstep 
for
+          // processing.
+          Boolean isMsgOnDisk = incomingMessagesOnDisk.get(partitionId);
+          if (isMsgOnDisk == null || !isMsgOnDisk) {
+            ((MessageStore<I, M>) incomingMessageStore)
+                .addPartitionMessages(partitionId, messages);
+            break;
+          }
+          // Continue to IN_TRANSIT and ON_DISK cases as the partition is in
+          // memory, but it's messages are not yet loaded
+          // CHECKSTYLE: stop FallThrough
+        case IN_TRANSIT:
+        case ON_DISK:
+          // CHECKSTYLE: resume FallThrough
+          List<VertexIdMessages<I, Writable>> newMessages =
+              new ArrayList<VertexIdMessages<I, Writable>>();
+          newMessages.add((VertexIdMessages<I, Writable>) messages);
+          int length = messages.getSerializedSize();
+          Pair<Integer, List<VertexIdMessages<I, Writable>>> newPair =
+              new MutablePair<>(length, newMessages);
+          messageBufferRWLock.readLock().lock();
+          Pair<Integer, List<VertexIdMessages<I, Writable>>> oldPair =
+              pendingIncomingMessages.putIfAbsent(partitionId, newPair);
+          if (oldPair != null) {
+            synchronized (oldPair) {
+              MutablePair<Integer, List<VertexIdMessages<I, Writable>>> pair =
+                  (MutablePair<Integer, List<VertexIdMessages<I, Writable>>>)
+                      oldPair;
+              pair.setLeft(pair.getLeft() + length);
+              pair.getRight().add((VertexIdMessages<I, Writable>) messages);
+            }
+          }
+          messageBufferRWLock.readLock().unlock();
+          // In the case that the number of partitions is asked to be fixed by
+          // the user, we should offload the message buffers as necessary.
+          if (isNumPartitionsFixed &&
+              pendingIncomingMessages.get(partitionId).getLeft() >
+                  minBuffSize) {
+            try {
+              spillPartitionMessages(partitionId);
+            } catch (IOException e) {
+              throw new IllegalStateException("addPartitionIncomingMessages: " 
+
+                  "spilling message buffers for partition " + partitionId +
+                  " failed!");
+            }
+          }
+          break;
+        default:
+          throw new IllegalStateException("addPartitionIncomingMessages: " +
+              "illegal state " + meta.getState() + " for partition " +
+              meta.getId());
+        }
+      }
+    }
+  }
+
   /**
    * Spills edge store generated for specified partition in INPUT_SUPERSTEP
    * Note that the partition should be ON_DISK or IN_TRANSIT.
@@ -667,18 +925,16 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
     }
 
     // Sanity check
-    if (entry.getRight().isEmpty()) {
-      throw new IllegalStateException("spillPartitionInputEdgeStore: " +
-          "the edge buffer that is supposed to be flushed to disk does not" +
-          "exist.");
-    }
+    checkState(!entry.getRight().isEmpty(),
+        "spillPartitionInputEdgeStore: the edge buffer that is supposed to " +
+            "be flushed to disk does not exist.");
 
     List<VertexIdEdges<I, E>> bufferList = entry.getRight();
     Integer numBuffers = numPendingInputEdgesOnDisk.putIfAbsent(partitionId,
         bufferList.size());
     if (numBuffers != null) {
-      numPendingInputEdgesOnDisk.replace(
-          partitionId, numBuffers + bufferList.size());
+      numPendingInputEdgesOnDisk.replace(partitionId,
+          numBuffers + bufferList.size());
     }
 
     File file = new File(getPendingEdgesBufferPath(partitionId));
@@ -833,11 +1089,9 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       return;
     }
     // Sanity check
-    if (entry.getRight().isEmpty()) {
-      throw new IllegalStateException("spillPartitionInputVertexBuffer: " +
-          "the vertex buffer that is supposed to be flushed to disk does not" +
-          "exist.");
-    }
+    checkState(!entry.getRight().isEmpty(),
+        "spillPartitionInputVertexBuffer: the vertex buffer that is " +
+            "supposed to be flushed to disk does not exist.");
 
     List<ExtendedDataOutput> bufferList = entry.getRight();
     Integer numBuffers = numPendingInputVerticesOnDisk.putIfAbsent(partitionId,
@@ -968,23 +1222,16 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
 
   @Override
   public void putPartition(Partition<I, V, E> partition) {
-    if (partition == null) {
-      throw new IllegalStateException("putPartition: partition to put is null" 
+
-          " (impossible)");
-    }
+    checkArgument(partition != null);
+
     Integer id = partition.getId();
     MetaPartition meta = partitions.get(id);
-    if (meta == null) {
-      throw new IllegalStateException("putPartition: partition to put does" +
-          "not exist in the store (impossible)");
-    }
+    checkNotNull(meta, "putPartition: partition to put does " +
+        "not exist in the store (impossible)");
     synchronized (meta) {
-      if (meta.getState() != State.ACTIVE) {
-        String msg = "It is not possible to put back a partition which is " +
-            "not ACTIVE.\n" + meta.toString();
-        LOG.error(msg);
-        throw new IllegalStateException(msg);
-      }
+      checkState(meta.getState() == State.ACTIVE,
+          "It is not possible to put back a partition which is not ACTIVE. " +
+              "meta = " + meta.toString());
 
       meta.setState(State.INACTIVE);
       meta.setProcessed(true);
@@ -1003,10 +1250,10 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       MetaPartition meta = partitions.remove(partitionId);
       // Since this method is called outside of the iteration cycle, all
       // partitions in the store should be in the processed state.
-      if (!processedPartitions.get(meta.getState()).remove(partitionId)) {
-        throw new IllegalStateException("removePartition: partition that is" +
-            "about to remove is not in processed list (impossible)");
-      }
+      checkState(processedPartitions.get(meta.getState()).remove(partitionId),
+          "removePartition: partition that is about to remove is not in " +
+              "processed list (impossible)");
+
       getPartition(meta);
       numPartitionsInMem.getAndDecrement();
       return meta.getPartition();
@@ -1028,7 +1275,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
     }
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("addPartition: partition " + id + "is  added to the store.");
+      LOG.info("addPartition: partition " + id + " is  added to the store.");
     }
 
     meta.setPartition(partition);
@@ -1050,13 +1297,11 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   @Override
   public void shutdown() {
     // Sanity check to check there is nothing left from previous superstep
-    if (!unProcessedPartitions.get(State.INACTIVE).isEmpty() ||
-        !unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() ||
-        !unProcessedPartitions.get(State.ON_DISK).isEmpty()) {
-      throw new IllegalStateException("shutdown: There are some " +
-          "unprocessed partitions left from the " +
-          "previous superstep. This should not be possible");
-    }
+    checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() &&
+            unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() &&
+            unProcessedPartitions.get(State.ON_DISK).isEmpty(),
+        "shutdown: There are some unprocessed partitions left from the " +
+            "previous superstep. This should not be possible.");
 
     for (MetaPartition meta : partitions.values()) {
       synchronized (meta) {
@@ -1092,30 +1337,28 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
     }
     // Sanity check to make sure nothing left unprocessed from previous
     // superstep
-    if (!unProcessedPartitions.get(State.INACTIVE).isEmpty() ||
-        !unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() ||
-        !unProcessedPartitions.get(State.ON_DISK).isEmpty()) {
-      throw new IllegalStateException("startIteration: There are some " +
-          "unprocessed and/or in-transition partitions left from the " +
-          "previous superstep. This should not be possible");
-    }
+    checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() &&
+            unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() &&
+            unProcessedPartitions.get(State.ON_DISK).isEmpty(),
+        "startIteration: There are some unprocessed and/or " +
+            "in-transition partitions left from the previous superstep. " +
+            "This should not be possible.");
 
     rwLock.writeLock().lock();
     for (MetaPartition meta : partitions.values()) {
       // Sanity check
-      if (!meta.isProcessed()) {
-        throw new IllegalStateException("startIteration: meta-partition " +
-            meta + " has not been processed in the previous superstep.");
-      }
+      checkState(meta.isProcessed(), "startIteration: " +
+          "meta-partition " + meta + " has not been processed in the " +
+          "previous superstep.");
+
       // The only case where a partition can be IN_TRANSIT is where it is still
       // being offloaded to disk, and it happens only in 
swapOnePartitionToDisk,
       // where we at least hold a read lock while transfer is in progress. 
Since
       // the write lock is held in this section, no partition should be
       // IN_TRANSIT.
-      if (meta.getState() == State.IN_TRANSIT) {
-        throw new IllegalStateException("startIteration: meta-partition " +
-            meta + " is still IN_TRANSIT (impossible)");
-      }
+      checkState(meta.getState() != State.IN_TRANSIT,
+          "startIteration: meta-partition " + meta + " is still IN_TRANSIT " +
+              "(impossible)");
 
       meta.setProcessed(false);
     }
@@ -1255,21 +1498,109 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   /**
+   * Load messages for a given partition for the current superstep to memory.
+   *
+   * @param partitionId Id of the partition to load the messages for
+   * @throws IOException
+   */
+  private void loadMessages(int partitionId) throws IOException {
+    // Messages for current superstep
+    if (currentMessageStore != null &&
+        !conf.getOutgoingMessageClasses().useMessageCombiner()) {
+      checkState(!currentMessageStore.hasMessagesForPartition(partitionId),
+          "loadMessages: partition " + partitionId + " is on disk, " +
+              "but its message store is in memory (impossible)");
+      // First, reading the message store for the partition if there is any
+      File file = new File(
+          getMessagesPath(partitionId, serviceWorker.getSuperstep()));
+      if (file.exists()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("loadMessages: loading message store of partition " +
+              partitionId);
+        }
+        FileInputStream filein = new FileInputStream(file);
+        BufferedInputStream bufferin = new BufferedInputStream(filein);
+        DataInputStream inputStream = new DataInputStream(bufferin);
+        currentMessageStore.readFieldsForPartition(inputStream, partitionId);
+        inputStream.close();
+        checkState(file.delete(), "loadMessages: failed to delete %s.",
+            file.getAbsolutePath());
+      }
+
+      messageBufferRWLock.writeLock().lock();
+      Pair<Integer, List<VertexIdMessages<I, Writable>>> pendingMessages =
+          pendingCurrentMessages.remove(partitionId);
+      messageBufferRWLock.writeLock().unlock();
+
+      // Second, reading message buffers (incoming messages in previous
+      // superstep)
+      file = new File(getPendingMessagesBufferPath(partitionId,
+          serviceWorker.getSuperstep()));
+      if (file.exists()) {
+        FileInputStream filein = new FileInputStream(file);
+        BufferedInputStream bufferin = new BufferedInputStream(filein);
+        DataInputStream inputStream = new DataInputStream(bufferin);
+        while (true) {
+          int type;
+          try {
+            type = inputStream.readInt();
+          } catch (EOFException e) {
+            // Reached end of file, so all the records are read.
+            break;
+          }
+          SerializedMessageClass messageClass =
+              SerializedMessageClass.values()[type];
+          VertexIdMessages<I, Writable> vim;
+          switch (messageClass) {
+          case BYTE_ARRAY_VERTEX_ID_MESSAGES:
+            vim = new ByteArrayVertexIdMessages<>(
+                conf.createOutgoingMessageValueFactory());
+            vim.setConf(conf);
+            break;
+          case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
+            vim = new ByteArrayOneMessageToManyIds<>(
+                conf.createOutgoingMessageValueFactory());
+            vim.setConf(conf);
+            break;
+          default:
+            throw new IllegalStateException("loadMessages: unsupported " +
+                "serialized message type!");
+          }
+          vim.readFields(inputStream);
+          currentMessageStore.addPartitionMessages(partitionId, vim);
+        }
+        inputStream.close();
+        checkState(!file.delete(), "loadMessages: failed to delete %s",
+            file.getAbsolutePath());
+      }
+
+      // Third, applying message buffers already in memory
+      if (pendingMessages != null) {
+        for (VertexIdMessages<I, Writable> vim : pendingMessages.getRight()) {
+          currentMessageStore.addPartitionMessages(partitionId, vim);
+        }
+      }
+      currentMessagesOnDisk.put(partitionId, false);
+    }
+  }
+
+  /**
    * Load a partition from disk. It deletes the files after the load,
    * except for the edges, if the graph is static.
    *
-   * @param id The id of the partition to load
-   * @param numVertices The number of vertices contained on disk
+   * @param meta meta partition to load the partition of
    * @return The partition
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  private Partition<I, V, E> loadPartition(int id, long numVertices)
-    throws IOException {
-    Partition<I, V, E> partition = conf.createPartition(id, context);
+  private Partition<I, V, E> loadPartition(MetaPartition meta)
+      throws IOException {
+    Integer partitionId = meta.getId();
+    long numVertices = meta.getVertexCount();
+    Partition<I, V, E> partition = conf.createPartition(partitionId, context);
 
     // Vertices
-    File file = new File(getVerticesPath(id));
+    File file = new File(getVerticesPath(partitionId));
     if (LOG.isDebugEnabled()) {
       LOG.debug("loadPartition: loading partition vertices " +
         partition.getId() + " from " + file.getAbsolutePath());
@@ -1284,14 +1615,11 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       partition.putVertex(vertex);
     }
     inputStream.close();
-    if (!file.delete()) {
-      String msg = "loadPartition: failed to delete " + file.getAbsolutePath();
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
+    checkState(file.delete(), "loadPartition: failed to delete %s",
+        file.getAbsolutePath());
 
     // Edges
-    file = new File(getEdgesPath(id));
+    file = new File(getEdgesPath(partitionId));
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("loadPartition: loading partition edges " +
@@ -1309,86 +1637,79 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
     // around.
     if (!conf.isStaticGraph() ||
         serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
-      if (!file.delete()) {
-        String msg =
-            "loadPartition: failed to delete " + file.getAbsolutePath();
-        LOG.error(msg);
-        throw new IllegalStateException(msg);
-      }
+      checkState(file.delete(), "loadPartition: failed to delete %s",
+          file.getAbsolutePath());
     }
 
-    if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
-      // Input vertex buffers
-      // First, applying vertex buffers on disk (since they came earlier)
-      Integer numBuffers = numPendingInputVerticesOnDisk.remove(id);
-      if (numBuffers != null) {
-        file = new File(getPendingVerticesBufferPath(id));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("loadPartition: loading " + numBuffers + " input vertex " +
-              "buffers of partition " + id + " from " + 
file.getAbsolutePath());
-        }
-        filein = new FileInputStream(file);
-        bufferin = new BufferedInputStream(filein);
-        inputStream = new DataInputStream(bufferin);
-        for (int i = 0; i < numBuffers; ++i) {
-          ExtendedDataOutput extendedDataOutput =
-              WritableUtils.readExtendedDataOutput(inputStream, conf);
-          partition.addPartitionVertices(
-              new VertexIterator<I, V, E>(extendedDataOutput, conf));
-        }
-        inputStream.close();
-        if (!file.delete()) {
-          String msg =
-              "loadPartition: failed to delete " + file.getAbsolutePath();
-          LOG.error(msg);
-          throw new IllegalStateException(msg);
-        }
+    // Load message for the current superstep
+    loadMessages(partitionId);
+
+    // Input vertex buffers
+    // First, applying vertex buffers on disk (since they came earlier)
+    Integer numBuffers = numPendingInputVerticesOnDisk.remove(partitionId);
+    if (numBuffers != null) {
+      file = new File(getPendingVerticesBufferPath(partitionId));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loadPartition: loading " + numBuffers + " input vertex " +
+            "buffers of partition " + partitionId + " from " +
+            file.getAbsolutePath());
       }
-      // Second, applying vertex buffers already in memory
-      Pair<Integer, List<ExtendedDataOutput>> vertexPair;
-      vertexBufferRWLock.writeLock().lock();
-      vertexPair = pendingInputVertices.remove(id);
-      vertexBufferRWLock.writeLock().unlock();
-      if (vertexPair != null) {
-        for (ExtendedDataOutput extendedDataOutput : vertexPair.getRight()) {
-          partition.addPartitionVertices(
-              new VertexIterator<I, V, E>(extendedDataOutput, conf));
-        }
+      filein = new FileInputStream(file);
+      bufferin = new BufferedInputStream(filein);
+      inputStream = new DataInputStream(bufferin);
+      for (int i = 0; i < numBuffers; ++i) {
+        ExtendedDataOutput extendedDataOutput =
+            WritableUtils.readExtendedDataOutput(inputStream, conf);
+        partition.addPartitionVertices(
+            new VertexIterator<I, V, E>(extendedDataOutput, conf));
       }
-
-      // Edge store
-      if (!hasEdgeStoreOnDisk.containsKey(id)) {
-        throw new IllegalStateException("loadPartition: partition is written" +
-            " to disk in INPUT_SUPERSTEP, but it is not clear whether its " +
-            "edge store is on disk or not (impossible)");
+      inputStream.close();
+      checkState(file.delete(), "loadPartition: failed to delete %s",
+          file.getAbsolutePath());
+    }
+    // Second, applying vertex buffers already in memory
+    Pair<Integer, List<ExtendedDataOutput>> vertexPair;
+    vertexBufferRWLock.writeLock().lock();
+    vertexPair = pendingInputVertices.remove(partitionId);
+    vertexBufferRWLock.writeLock().unlock();
+    if (vertexPair != null) {
+      for (ExtendedDataOutput extendedDataOutput : vertexPair.getRight()) {
+        partition.addPartitionVertices(
+            new VertexIterator<I, V, E>(extendedDataOutput, conf));
       }
-      if (hasEdgeStoreOnDisk.remove(id)) {
-        file = new File(getEdgeStorePath(id));
+    }
+
+    // Edge store
+    if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
+      checkState(hasEdgeStoreOnDisk.containsKey(partitionId),
+          "loadPartition: partition is written to disk in INPUT_SUPERSTEP, " +
+              "but it is not clear whether its edge store is on disk or not " +
+              "(impossible)");
+
+      if (hasEdgeStoreOnDisk.remove(partitionId)) {
+        file = new File(getEdgeStorePath(partitionId));
         if (LOG.isDebugEnabled()) {
-          LOG.debug("loadPartition: loading edge store of partition " + id +
-              " from " + file.getAbsolutePath());
+          LOG.debug("loadPartition: loading edge store of partition " +
+              partitionId + " from " + file.getAbsolutePath());
         }
         filein = new FileInputStream(file);
         bufferin = new BufferedInputStream(filein);
         inputStream = new DataInputStream(bufferin);
-        edgeStore.readPartitionEdgeStore(id, inputStream);
+        edgeStore.readPartitionEdgeStore(partitionId, inputStream);
         inputStream.close();
-        if (!file.delete()) {
-          String msg =
-              "loadPartition: failed to delete " + file.getAbsolutePath();
-          LOG.error(msg);
-          throw new IllegalStateException(msg);
-        }
+        checkState(file.delete(), "loadPartition: failed to delete %s",
+            file.getAbsolutePath());
       }
 
       // Input edge buffers
       // First, applying edge buffers on disk (since they came earlier)
-      numBuffers = numPendingInputEdgesOnDisk.remove(id);
+      numBuffers = numPendingInputEdgesOnDisk.remove(partitionId);
       if (numBuffers != null) {
-        file = new File(getPendingEdgesBufferPath(id));
+        file = new File(getPendingEdgesBufferPath(partitionId));
         if (LOG.isDebugEnabled()) {
           LOG.debug("loadPartition: loading " + numBuffers + " input edge " +
-              "buffers of partition " + id + " from " + 
file.getAbsolutePath());
+              "buffers of partition " + partitionId + " from " +
+              file.getAbsolutePath());
         }
         filein = new FileInputStream(file);
         bufferin = new BufferedInputStream(filein);
@@ -1398,24 +1719,20 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
               new ByteArrayVertexIdEdges<I, E>();
           vertexIdEdges.setConf(conf);
           vertexIdEdges.readFields(inputStream);
-          edgeStore.addPartitionEdges(id, vertexIdEdges);
+          edgeStore.addPartitionEdges(partitionId, vertexIdEdges);
         }
         inputStream.close();
-        if (!file.delete()) {
-          String msg =
-              "loadPartition: failed to delete " + file.getAbsolutePath();
-          LOG.error(msg);
-          throw new IllegalStateException(msg);
-        }
+        checkState(file.delete(), "loadPartition: failed to delete %s",
+            file.getAbsolutePath());
       }
       // Second, applying edge buffers already in memory
       Pair<Integer, List<VertexIdEdges<I, E>>> edgePair = null;
       edgeBufferRWLock.writeLock().lock();
-      edgePair = pendingInputEdges.remove(id);
+      edgePair = pendingInputEdges.remove(partitionId);
       edgeBufferRWLock.writeLock().unlock();
       if (edgePair != null) {
         for (VertexIdEdges<I, E> vertexIdEdges : edgePair.getRight()) {
-          edgeStore.addPartitionEdges(id, vertexIdEdges);
+          edgeStore.addPartitionEdges(partitionId, vertexIdEdges);
         }
       }
     }
@@ -1438,12 +1755,8 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
         " already exists.");
     }
 
-    if (!file.createNewFile()) {
-      String msg = "offloadPartition: file " + parent.getAbsolutePath() +
-        " already exists.";
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
+    checkState(file.createNewFile(),
+        "offloadPartition: file %s already exists.", parent.getAbsolutePath());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("offloadPartition: writing partition vertices " +
@@ -1488,6 +1801,19 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       outputStream.close();
     }
 
+    if (currentMessageStore != null &&
+        !conf.getOutgoingMessageClasses().useMessageCombiner() &&
+        currentMessageStore.hasMessagesForPartition(partitionId)) {
+      writeMessageData(currentMessageStore, currentMessagesOnDisk, partitionId,
+          serviceWorker.getSuperstep());
+    }
+    if (incomingMessageStore != null &&
+        !conf.getIncomingMessageClasses().useMessageCombiner() &&
+        incomingMessageStore.hasMessagesForPartition(partitionId)) {
+      writeMessageData(incomingMessageStore, incomingMessagesOnDisk,
+          partitionId, serviceWorker.getSuperstep() + 1);
+    }
+
     // Writing edge store to disk in the input superstep
     if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
       if (edgeStore.hasPartitionEdges(partitionId)) {
@@ -1515,28 +1841,59 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   /**
+   * Offload message data of a particular type of store (current or incoming) 
to
+   * disk.
+   *
+   * @param messageStore The message store to write to disk
+   * @param messagesOnDisk Map to update and let others know that this message
+   *                       store is on disk
+   * @param partitionId Id of the partition we want to offload the message 
store
+   *                    of
+   * @param superstep Superstep for which we want to offload message data for.
+   *                  It is equal the current superstep number for offloading
+   *                  currentMessageStore, and is equal to next superstep
+   *                  number for offloading incomingMessageStore
+   * @throws IOException
+   */
+  private void writeMessageData(MessageStore<I, Writable> messageStore,
+      ConcurrentMap<Integer, Boolean> messagesOnDisk, int partitionId,
+      long superstep) throws IOException {
+    File file = new File(getMessagesPath(partitionId, superstep));
+    checkState(!file.exists(),
+        "writeMessageData: message store file for partition " +
+            partitionId + " for messages in superstep " +
+            superstep + " already exist (impossible).");
+
+    checkState(file.createNewFile(),
+        "offloadPartition: cannot create message store file for " +
+            "partition " + partitionId);
+
+    FileOutputStream fileout = new FileOutputStream(file);
+    BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
+    DataOutputStream outputStream = new DataOutputStream(bufferout);
+    messageStore.writePartition(outputStream, partitionId);
+    messageStore.clearPartition(partitionId);
+    outputStream.close();
+    messagesOnDisk.put(partitionId, true);
+  }
+
+  /**
    * Delete a partition's files.
    *
    * @param id The id of the partition owning the file.
    */
-  public void deletePartitionFiles(Integer id) {
+  private void deletePartitionFiles(Integer id) {
     // File containing vertices
     File file = new File(getVerticesPath(id));
-    if (file.exists() && !file.delete()) {
-      String msg = "deletePartitionFiles: Failed to delete file " +
-        file.getAbsolutePath();
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
+    checkState(!file.exists() || file.delete(),
+        "deletePartitionFiles: Failed to delete file " +
+            file.getAbsolutePath());
 
     // File containing edges
     file = new File(getEdgesPath(id));
-    if (file.exists() && !file.delete()) {
-      String msg = "deletePartitionFiles: Failed to delete file " +
-        file.getAbsolutePath();
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
+    checkState(!file.exists() || file.delete(),
+        "deletePartitionFiles: Failed to delete file " +
+            file.getAbsolutePath());
   }
 
   /**
@@ -1605,6 +1962,29 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   /**
+   * Get the path to the file where pending incoming messages are stored.
+   *
+   * @param partitionId The partition
+   * @param superstep superstep number
+   * @return The path to the file
+   */
+  private String getPendingMessagesBufferPath(Integer partitionId,
+      long superstep) {
+    return getPartitionPath(partitionId) + "_pending_messages_" + superstep;
+  }
+
+  /**
+   * Get the path to the file where messages are stored.
+   *
+   * @param partitionId The partition
+   * @param superstep superstep number
+   * @return The path to the file
+   */
+  private String getMessagesPath(Integer partitionId, long superstep) {
+    return getPartitionPath(partitionId) + "_messages_" + superstep;
+  }
+
+  /**
    * Partition container holding additional meta data associated with each
    * partition.
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
index d58ebe0..7c4d8df 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
@@ -92,6 +92,8 @@ public class OutOfCoreProcessorCallable<I extends 
WritableComparable,
           oocEngine.getPartitionsWithInputVertices();
       BlockingQueue<Integer> partitionsWithInputEdges =
           oocEngine.getPartitionsWithInputEdges();
+      BlockingQueue<Integer> partitionsWithPendingMessages =
+          oocEngine.getPartitionsWithPendingMessages();
       AtomicInteger numPartitionsToSpill =
           oocEngine.getNumPartitionsToSpill();
 
@@ -100,7 +102,9 @@ public class OutOfCoreProcessorCallable<I extends 
WritableComparable,
         if (partitionId == null) {
           break;
         }
-        LOG.info("call: spilling vertex buffer of partition " + partitionId);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: spilling vertex buffer of partition " + partitionId);
+        }
         try {
           partitionStore.spillPartitionInputVertexBuffer(partitionId);
         } catch (IOException e) {
@@ -114,7 +118,9 @@ public class OutOfCoreProcessorCallable<I extends 
WritableComparable,
         if (partitionId == null) {
           break;
         }
-        LOG.info("call: spilling edge buffer of partition " + partitionId);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: spilling edge buffer of partition " + partitionId);
+        }
         try {
           partitionStore.spillPartitionInputEdgeStore(partitionId);
         } catch (IOException e) {
@@ -123,9 +129,28 @@ public class OutOfCoreProcessorCallable<I extends 
WritableComparable,
         }
       }
 
+      while (!partitionsWithPendingMessages.isEmpty()) {
+        Integer partitionId = partitionsWithPendingMessages.poll();
+        if (partitionId == null) {
+          break;
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info(
+              "call: spilling message buffers of partition " + partitionId);
+        }
+        try {
+          partitionStore.spillPartitionMessages(partitionId);
+        } catch (IOException e) {
+          throw new IllegalStateException("call: caught IOException while " +
+              "spilling edge buffers/store to disk");
+        }
+      }
+
       // Put partitions on disk
       while (numPartitionsToSpill.getAndDecrement() > 0) {
-        LOG.info("call: start offloading a partition");
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: start offloading a partition");
+        }
         partitionStore.spillOnePartition();
       }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java 
b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java
new file mode 100644
index 0000000..d6e3a70
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdEdges;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Structure that keeps partition information.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public interface PartitionData<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+  /**
+   * Add a *new* partition to the store. If the partition is already existed,
+   * it does not add the partition and returns false.
+   *
+   * @param partition Partition to add
+   * @return Whether the addition made any change in the partition store
+   */
+  boolean addPartition(Partition<I, V, E> partition);
+
+  /**
+   * Remove a partition and return it. Called from a single thread, *not* from
+   * within a scheduling cycle, and after INPUT_SUPERSTEP is complete.
+   *
+   * @param partitionId Partition id
+   * @return The removed partition
+   */
+  Partition<I, V, E> removePartition(Integer partitionId);
+
+  /**
+   * Whether a specific partition is present in the store.
+   *
+   * @param partitionId Partition id
+   * @return True iff the partition is present
+   */
+  boolean hasPartition(Integer partitionId);
+
+  /**
+   * Return the ids of all the stored partitions as an Iterable.
+   *
+   * @return The partition ids
+   */
+  Iterable<Integer> getPartitionIds();
+
+  /**
+   * Return the number of stored partitions.
+   *
+   * @return The number of partitions
+   */
+  int getNumPartitions();
+
+  /**
+   * Return the number of vertices in a partition.
+   *
+   * @param partitionId Partition id
+   * @return The number of vertices in the specified partition
+   */
+  long getPartitionVertexCount(Integer partitionId);
+
+  /**
+   * Return the number of edges in a partition.
+   *
+   * @param partitionId Partition id
+   * @return The number of edges in the specified partition
+   */
+  long getPartitionEdgeCount(Integer partitionId);
+
+  /**
+   * Whether the partition store is empty.
+   *
+   * @return True iff there are no partitions in the store
+   */
+  boolean isEmpty();
+
+  /**
+   * Add vertices to a given partition from a given DataOutput instance. This
+   * method is called right after receipt of vertex request in INPUT_SUPERSTEP.
+   *
+   * @param partitionId Partition id
+   * @param extendedDataOutput Output containing serialized vertex data
+   */
+  void addPartitionVertices(Integer partitionId,
+      ExtendedDataOutput extendedDataOutput);
+
+  /**
+   * Add edges to a given partition from a given send edge request. This
+   * method is called right after receipt of edge request in INPUT_SUPERSTEP.
+   *
+   * @param partitionId Partition id
+   * @param edges Edges in the request
+   */
+  void addPartitionEdges(Integer partitionId, VertexIdEdges<I, E> edges);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java 
b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index d3f3902..2facff8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -18,10 +18,22 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.VertexIdEdges;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageData;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.EdgeStore;
+import org.apache.giraph.edge.EdgeStoreFactory;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+import static 
org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
 
 /**
  * Structure that stores partitions for a worker. PartitionStore does not allow
@@ -33,72 +45,116 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <E> Edge data
  */
 public abstract class PartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable> {
-  /**
-   * Add a *new* partition to the store. If the partition is already existed,
-   * it does not add the partition and returns false.
-   *
-   * @param partition Partition to add
-   * @return Whether the addition made any change in the partition store
-   */
-  public abstract boolean addPartition(Partition<I, V, E> partition);
+    V extends Writable, E extends Writable>
+    implements PartitionData<I, V, E>, MessageData<I> {
+  /** Configuration. */
+  protected final ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Context used to report progress */
+  protected final Mapper<?, ?, ?, ?>.Context context;
+  /** service worker reference */
+  protected final CentralizedServiceWorker<I, V, E> serviceWorker;
 
-  /**
-   * Remove a partition and return it. Called from a single thread, *not* from
-   * within an iteration cycle, and after INPUT_SUPERSTEP is complete.
-   *
-   * @param partitionId Partition id
-   * @return The removed partition
-   */
-  public abstract Partition<I, V, E> removePartition(Integer partitionId);
+  /** Edge store for this worker */
+  protected final EdgeStore<I, V, E> edgeStore;
 
+  /** Message store factory */
+  protected MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
+      messageStoreFactory;
   /**
-   * Whether a specific partition is present in the store.
-   *
-   * @param partitionId Partition id
-   * @return True iff the partition is present
+   * Message store for incoming messages (messages which will be consumed
+   * in the next super step)
    */
-  public abstract boolean hasPartition(Integer partitionId);
-
+  protected volatile MessageStore<I, Writable> incomingMessageStore;
   /**
-   * Return the ids of all the stored partitions as an Iterable.
-   *
-   * @return The partition ids
+   * Message store for current messages (messages which we received in
+   * previous super step and which will be consumed in current super step)
    */
-  public abstract Iterable<Integer> getPartitionIds();
+  protected volatile MessageStore<I, Writable> currentMessageStore;
 
   /**
-   * Return the number of stored partitions.
+   * Constructor for abstract partition store
    *
-   * @return The number of partitions
+   * @param conf Job configuration
+   * @param context Mapper context
+   * @param serviceWorker Worker service
    */
-  public abstract int getNumPartitions();
+  public PartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      Mapper<?, ?, ?, ?>.Context context,
+      CentralizedServiceWorker<I, V, E> serviceWorker) {
+    this.conf = conf;
+    this.context = context;
+    this.serviceWorker = serviceWorker;
+    this.messageStoreFactory = createMessageStoreFactory();
+    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
+    edgeStoreFactory.initialize(serviceWorker, conf, context);
+    this.edgeStore = edgeStoreFactory.newStore();
+  }
 
   /**
-   * Return the number of vertices in a partition.
+   * Decide which message store should be used for current application,
+   * and create the factory for that store
    *
-   * @param partitionId Partition id
-   * @return The number of vertices in the specified partition
+   * @return Message store factory
    */
-  public abstract long getPartitionVertexCount(Integer partitionId);
+  private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
+  createMessageStoreFactory() {
+    Class<? extends MessageStoreFactory> messageStoreFactoryClass =
+        MESSAGE_STORE_FACTORY_CLASS.get(conf);
 
-  /**
-   * Return the number of edges in a partition.
-   *
-   * @param partitionId Partition id
-   * @return The number of edges in the specified partition
-   */
-  public abstract long getPartitionEdgeCount(Integer partitionId);
+    MessageStoreFactory messageStoreFactoryInstance =
+        ReflectionUtils.newInstance(messageStoreFactoryClass);
+    messageStoreFactoryInstance.initialize(serviceWorker, conf);
 
-  /**
-   * Whether the partition store is empty.
-   *
-   * @return True iff there are no partitions in the store
-   */
+    return messageStoreFactoryInstance;
+  }
+
+  @Override
   public boolean isEmpty() {
     return getNumPartitions() == 0;
   }
 
+  @Override
+  public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
+    return (MessageStore<I, M>) incomingMessageStore;
+  }
+
+  @Override
+  public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
+    return (MessageStore<I, M>) currentMessageStore;
+  }
+
+  @Override
+  public void resetMessageStores() throws IOException {
+    if (currentMessageStore != null) {
+      currentMessageStore.clearAll();
+      currentMessageStore = null;
+    }
+    if (incomingMessageStore != null) {
+      incomingMessageStore.clearAll();
+      incomingMessageStore = null;
+    }
+    prepareSuperstep();
+  }
+
+  /** Prepare for next super step */
+  public void prepareSuperstep() {
+    if (currentMessageStore != null) {
+      try {
+        currentMessageStore.clearAll();
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "Failed to clear previous message store");
+      }
+    }
+    currentMessageStore = incomingMessageStore != null ?
+        incomingMessageStore :
+        messageStoreFactory.newStore(conf.getIncomingMessageClasses());
+    incomingMessageStore =
+        messageStoreFactory.newStore(conf.getOutgoingMessageClasses());
+    // finalize current message-store before resolving mutations
+    currentMessageStore.finalizeStore();
+  }
+
   /**
    * Called at the end of the computation. Called from a single thread.
    */
@@ -160,28 +216,18 @@ public abstract class PartitionStore<I extends 
WritableComparable,
   public abstract void putPartition(Partition<I, V, E> partition);
 
   /**
-   * Add vertices to a given partition from a given DataOutput instance. This
-   * method is called right after receipt of vertex request in INPUT_SUPERSTEP.
-   *
-   * @param partitionId Partition id
-   * @param extendedDataOutput Output containing serialized vertex data
-   */
-  public abstract void addPartitionVertices(Integer partitionId,
-      ExtendedDataOutput extendedDataOutput);
-
-  /**
-   * Add edges to a given partition from a given send edge request. This
-   * method is called right after receipt of edge request in INPUT_SUPERSTEP.
-   *
-   * @param partitionId Partition id
-   * @param edges Edges in the request
-   */
-  public abstract void addPartitionEdges(Integer partitionId,
-      VertexIdEdges<I, E> edges);
-
-  /**
    * Move edges from edge store to partitions. This method is called from a
    * *single thread* once all vertices and edges are read in INPUT_SUPERSTEP.
    */
   public abstract void moveEdgesToVertices();
+
+  /**
+   * In case of async message store we have to wait for all messages
+   * to be processed before going into next superstep.
+   */
+  public void waitForComplete() {
+    if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
+      ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index 8f34fed..9f0c408 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -20,20 +20,23 @@ package org.apache.giraph.partition;
 
 import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.EdgeStore;
-import org.apache.giraph.edge.EdgeStoreFactory;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.VertexIdEdges;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.VertexIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
+import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * A simple in-memory partition store.
  *
@@ -47,12 +50,6 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   /** Map of stored partitions. */
   private final ConcurrentMap<Integer, Partition<I, V, E>> partitions =
       Maps.newConcurrentMap();
-  /** Edge store for this worker. */
-  private final EdgeStore<I, V, E> edgeStore;
-  /** Configuration. */
-  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
-  /** Context used to report progress */
-  private final Mapper<?, ?, ?, ?>.Context context;
   /** Queue of partitions to be precessed in a superstep */
   private BlockingQueue<Partition<I, V, E>> partitionQueue;
 
@@ -65,11 +62,7 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> 
conf,
       Mapper<?, ?, ?, ?>.Context context,
       CentralizedServiceWorker<I, V, E> serviceWorker) {
-    this.conf = conf;
-    this.context = context;
-    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
-    edgeStoreFactory.initialize(serviceWorker, conf, context);
-    edgeStore = edgeStoreFactory.newStore();
+    super(conf, context, serviceWorker);
   }
 
   @Override
@@ -119,11 +112,11 @@ public class SimplePartitionStore<I extends 
WritableComparable,
 
   @Override
   public void startIteration() {
-    if (partitionQueue != null && !partitionQueue.isEmpty()) {
-      throw new IllegalStateException("startIteration: It seems that some of " 
+
+    checkState(partitionQueue == null || partitionQueue.isEmpty(),
+        "startIteration: It seems that some of " +
           "of the partitions from previous iteration over partition store are" 
+
           " not yet processed.");
-    }
+
     partitionQueue =
         new ArrayBlockingQueue<Partition<I, V, E>>(getNumPartitions());
     for (Partition<I, V, E> partition : partitions.values()) {
@@ -178,4 +171,18 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   public void moveEdgesToVertices() {
     edgeStore.moveEdgesToVertices();
   }
+
+  @Override
+  public <M extends Writable> void addPartitionCurrentMessages(
+      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    ((MessageStore<I, M>) currentMessageStore)
+        .addPartitionMessages(partitionId, messages);
+  }
+
+  @Override
+  public <M extends Writable> void addPartitionIncomingMessages(
+      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    ((MessageStore<I, M>) incomingMessageStore)
+        .addPartitionMessages(partitionId, messages);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index e515caf..5b754d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -910,7 +910,7 @@ else[HADOOP_NON_SECURE]*/
     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
 
     MessageStore<I, Writable> incomingMessageStore =
-        getServerData().getIncomingMessageStore();
+        getServerData().getPartitionStore().getIncomingMessageStore();
     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
     }
@@ -1418,8 +1418,8 @@ else[HADOOP_NON_SECURE]*/
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
       // write messages
       checkpointOutputStream.writeInt(partitionId);
-      getServerData().getCurrentMessageStore().writePartition(
-          checkpointOutputStream, partitionId);
+      getServerData().getPartitionStore().getCurrentMessageStore()
+          .writePartition(checkpointOutputStream, partitionId);
       getContext().progress();
 
     }
@@ -1668,8 +1668,8 @@ else[HADOOP_NON_SECURE]*/
 
       for (int i = 0; i < partitions; i++) {
         int partitionId = checkpointStream.readInt();
-        getServerData().getCurrentMessageStore().readFieldsForPartition(
-            checkpointStream, partitionId);
+        getServerData().getPartitionStore().getCurrentMessageStore()
+            .readFieldsForPartition(checkpointStream, partitionId);
       }
 
       List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList(

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 572e290..0bea783 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -106,14 +106,15 @@ public class RequestFailureTest {
   private void checkResult(int numRequests) throws IOException {
     // Check the output
     Iterable<IntWritable> vertices =
-        
serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
+        serverData.getPartitionStore().getIncomingMessageStore()
+            .getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
-              vertexId);
+          serverData.getPartitionStore().<IntWritable>getIncomingMessageStore()
+              .getVertexMessages(vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index b7bec1c..aa3916c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -178,14 +178,15 @@ public class RequestTest {
 
     // Check the output
     Iterable<IntWritable> vertices =
-        
serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
+        serverData.getPartitionStore().getIncomingMessageStore()
+            .getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
-              vertexId);
+          serverData.getPartitionStore().<IntWritable>getIncomingMessageStore()
+              .getVertexMessages(vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();
@@ -223,14 +224,15 @@ public class RequestTest {
 
     // Check the output
     Iterable<IntWritable> vertices =
-        
serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
+        serverData.getPartitionStore().getIncomingMessageStore()
+            .getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
-              vertexId);
+          serverData.getPartitionStore().<IntWritable>getIncomingMessageStore()
+              .getVertexMessages(vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
 
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
index ca1031a..75edb09 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
@@ -96,6 +96,11 @@ public class AsyncMessageStoreWrapperTest {
     }
 
     @Override
+    public boolean hasMessagesForPartition(int partitionId) {
+      return false;
+    }
+
+    @Override
     public void finalizeStore() {
 
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 78e663d..249a337 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -30,6 +30,7 @@ import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
 import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
 import org.apache.giraph.ooc.DiskBackedPartitionStore;
 import org.apache.giraph.utils.InternalVertexRunner;
@@ -268,7 +269,6 @@ public class TestPartitionStores {
     testMultiThreaded();
   }
 
-
   @Test
   public void testDiskBackedPartitionStoreMTStatic() throws Exception {
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 
NUM_PARTITIONS_IN_MEMORY);
@@ -431,7 +431,7 @@ public class TestPartitionStores {
 
   /**
    * Internal checker to verify the correctness of the tests.
-   * @param results   the actual results obtaind
+   * @param results   the actual results obtained
    * @param expected  expected results
    */
   private void checkResults(Iterable<String> results, String[] expected) {
@@ -560,4 +560,59 @@ public class TestPartitionStores {
       }
     }
   }
+
+  @Test
+  public void testOutOfCoreMessages() throws Exception {
+    Iterable<String> results;
+    String[] graph =
+        { "1 0 2 3", "2 0 3 5", "3 0 1 2 4", "4 0 3", "5 0 6 7 1 2",
+            "6 0 10 8 7", "7 0 1 3", "8 0 1 10 9 4 6", "9 0 8 1 5 7",
+            "10 0 9" };
+
+    String[] expected =
+        {
+            "1\t32", "2\t9", "3\t14", "4\t11", "5\t11",
+            "6\t13", "7\t20", "8\t15", "9\t18", "10\t14"
+        };
+
+    GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+    GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
+
+    File directory = Files.createTempDir();
+    GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
+        new File(directory, "giraph_partitions").toString());
+
+    GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+    conf.setComputationClass(TestOutOfCoreMessagesComputation.class);
+    conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    results = InternalVertexRunner.run(conf, graph);
+    checkResults(results, expected);
+    FileUtils.deleteDirectory(directory);
+  }
+
+  public static class TestOutOfCoreMessagesComputation extends
+      BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {
+
+    @Override
+    public void compute(
+        Vertex<IntWritable, IntWritable, NullWritable> vertex,
+        Iterable<IntWritable> messages) throws IOException {
+      if (getSuperstep() == 0) {
+        // Send id to all neighbors
+        sendMessageToAllEdges(vertex, vertex.getId());
+      } else {
+        // Add received messages and halt
+        int sum = 0;
+        for (IntWritable message : messages) {
+          sum += message.get();
+        }
+        vertex.setValue(new IntWritable(sum));
+        vertex.voteToHalt();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java 
b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 5d8d478..0a1da49 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -23,6 +23,7 @@ import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ArrayListEdges;
 import org.apache.giraph.graph.Computation;
@@ -190,10 +191,13 @@ public class MockUtils {
     ImmutableClassesGiraphConfiguration conf, Mapper.Context context) {
     CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> 
serviceWorker =
       MockUtils.mockServiceGetVertexPartitionOwner(1);
+    GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf,
+        ByteArrayMessagesPerVertexStore.newFactory(serviceWorker, conf)
+            .getClass());
+
     ServerData<IntWritable, IntWritable, IntWritable> serverData =
       new ServerData<IntWritable, IntWritable, IntWritable>(
-      serviceWorker, conf, ByteArrayMessagesPerVertexStore.newFactory(
-          serviceWorker, conf), context);
+          serviceWorker, conf, context);
     // Here we add a partition to simulate the case that there is one 
partition.
     serverData.getPartitionStore().addPartition(new SimplePartition());
     return serverData;

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java 
b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
index dd0fe13..ad9ba6f 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
@@ -19,9 +19,11 @@
 package org.apache.giraph;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.combiner.DoubleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimplePageRankComputation;
 import 
org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
 import 
org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat;
 import org.apache.giraph.graph.BasicComputation;
@@ -37,6 +39,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -51,19 +54,6 @@ public class TestOutOfCore extends BspCase {
       super(TestOutOfCore.class.getName());
   }
 
-  public static class EmptyComputation extends BasicComputation<
-      LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
-
-    @Override
-    public void compute(
-        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
-        Iterable<DoubleWritable> messages) throws IOException {
-      if (getSuperstep() > 5) {
-        vertex.voteToHalt();
-      }
-    }
-  }
-
   public static class TestMemoryEstimator implements MemoryEstimator {
     private DiskBackedPartitionStore partitionStore;
     @Override
@@ -100,9 +90,13 @@ public class TestOutOfCore extends BspCase {
   public void testOutOfCore()
           throws IOException, InterruptedException, ClassNotFoundException {
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(EmptyComputation.class);
+    conf.setComputationClass(SimplePageRankComputation.class);
     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+    conf.setWorkerContextClass(
+        SimplePageRankComputation.SimplePageRankWorkerContext.class);
+    conf.setMasterComputeClass(
+        SimplePageRankComputation.SimplePageRankMasterCompute.class);
     GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS);
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
     GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR
@@ -115,7 +109,21 @@ public class TestOutOfCore extends BspCase {
     GiraphJob job = prepareJob(getCallingMethodName(), conf,
         getTempPath(getCallingMethodName()));
     // Overwrite the number of vertices set in BspCase
-    GeneratedVertexReader.READER_VERTICES.set(conf, 400);
+    GeneratedVertexReader.READER_VERTICES.set(conf, 200);
     assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      double maxPageRank =
+          SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMax();
+      double minPageRank =
+          SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMin();
+      long numVertices =
+          SimplePageRankComputation.SimplePageRankWorkerContext.getFinalSum();
+      System.out.println(getCallingMethodName() + ": maxPageRank=" +
+          maxPageRank + " minPageRank=" +
+          minPageRank + " numVertices=" + numVertices);
+      assertEquals(13591.5, maxPageRank, 0.01);
+      assertEquals(9.375e-5, minPageRank, 0.000000001);
+      assertEquals(8 * 200L, numVertices);
+    }
   }
 }

Reply via email to