http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
new file mode 100644
index 0000000..2cb002f
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.io.StoreDataBufferIOCommand;
+import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand;
+import org.apache.giraph.ooc.io.StorePartitionIOCommand;
+import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * IO Scheduler for out-of-core mechanism with fixed number of partitions in
+ * memory
+ */
+public class FixedOutOfCoreIOScheduler extends OutOfCoreIOScheduler {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(FixedOutOfCoreIOScheduler.class);
+  /** Maximum number of partitions to be kept in memory */
+  private final int maxPartitionsInMemory;
+  /**
+   * Number of partitions to be added (loaded) or removed (stored) to/from
+   * memory. Each outstanding load partition counts +1 and each outstanding
+   * store partition counts -1 toward this counter.
+   */
+  private final AtomicInteger deltaNumPartitionsInMemory =
+      new AtomicInteger(0);
+  /** Queue of IO commands for loading partitions to memory */
+  private final List<Queue<IOCommand>> threadLoadCommandQueue;
+  /** Queue of IO commands for storing partition on disk */
+  private final List<Queue<IOCommand>> threadStoreCommandQueue;
+  /** Whether IO threads should terminate */
+  private volatile boolean shouldTerminate;
+
+  /**
+   * Constructor
+   *  @param maxPartitionsInMemory maximum number of partitions can be kept in
+   *                              memory
+   * @param numThreads number of available IO threads (i.e. disks)
+   * @param oocEngine out-of-core engine
+   * @param conf configuration
+   */
+  public FixedOutOfCoreIOScheduler(int maxPartitionsInMemory, int numThreads,
+                                   OutOfCoreEngine oocEngine,
+                                   ImmutableClassesGiraphConfiguration conf) {
+    super(conf, oocEngine, numThreads);
+    this.maxPartitionsInMemory = maxPartitionsInMemory;
+    threadLoadCommandQueue = new ArrayList<>(numThreads);
+    threadStoreCommandQueue = new ArrayList<>(numThreads);
+    for (int i = 0; i < numThreads; ++i) {
+      threadLoadCommandQueue.add(
+          new ConcurrentLinkedQueue<IOCommand>());
+      threadStoreCommandQueue.add(
+          new ConcurrentLinkedQueue<IOCommand>());
+    }
+    shouldTerminate = false;
+  }
+
+  @Override
+  public IOCommand getNextIOCommand(int threadId) {
+    if (shouldTerminate) {
+      return null;
+    }
+    int numPartitionsInMemory =
+        oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
+    IOCommand command = null;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getNextIOCommand with " + numPartitionsInMemory +
+          " partitions in memory, " + deltaNumPartitionsInMemory.get() +
+          " on the fly");
+    }
+    // Check if we have to store a partition on disk
+    if (numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() >
+        maxPartitionsInMemory) {
+      command = threadStoreCommandQueue.get(threadId).poll();
+      if (command == null) {
+        Integer partitionId = oocEngine.getMetaPartitionManager()
+            .getOffloadPartitionId(threadId);
+        if (partitionId != null) {
+          command = new StorePartitionIOCommand(oocEngine, partitionId);
+        } else {
+          deltaNumPartitionsInMemory.getAndIncrement();
+        }
+      } else {
+        checkState(command instanceof StorePartitionIOCommand,
+            "getNextIOCommand: Illegal command type in store command queue!");
+      }
+    } else {
+      // Roll back the decrement in delta counter.
+      deltaNumPartitionsInMemory.getAndIncrement();
+    }
+
+    // Check if there is any buffers/messages of current out-of-core partitions
+    // in memory
+    if (command == null) {
+      Integer partitionId = oocEngine.getMetaPartitionManager()
+          .getOffloadPartitionBufferId(threadId);
+      if (partitionId != null) {
+        command = new StoreDataBufferIOCommand(oocEngine, partitionId,
+            StoreDataBufferIOCommand.DataBufferType.PARTITION);
+      } else {
+        partitionId = oocEngine.getMetaPartitionManager()
+            .getOffloadMessageBufferId(threadId);
+        if (partitionId != null) {
+          command = new StoreDataBufferIOCommand(oocEngine, partitionId,
+              StoreDataBufferIOCommand.DataBufferType.MESSAGE);
+        } else {
+          partitionId = oocEngine.getMetaPartitionManager()
+              .getOffloadMessageId(threadId);
+          if (partitionId != null) {
+            command = new StoreIncomingMessageIOCommand(oocEngine, 
partitionId);
+          }
+        }
+      }
+    }
+
+    // Check if we can load/prefetch a partition to memory
+    if (command == null) {
+      if (numPartitionsInMemory +
+          deltaNumPartitionsInMemory.getAndIncrement() <=
+          maxPartitionsInMemory) {
+        command = threadLoadCommandQueue.get(threadId).poll();
+        if (command == null) {
+          Integer partitionId = oocEngine.getMetaPartitionManager()
+              .getPrefetchPartitionId(threadId);
+          if (partitionId != null) {
+            command = new LoadPartitionIOCommand(oocEngine, partitionId,
+                oocEngine.getServiceWorker().getSuperstep());
+          } else {
+            deltaNumPartitionsInMemory.getAndDecrement();
+          }
+        }
+      } else {
+        // Roll back the increment in delta counter.
+        deltaNumPartitionsInMemory.getAndDecrement();
+      }
+    }
+
+    // Check if no appropriate IO command is found
+    if (command == null) {
+      command = new WaitIOCommand(oocEngine, waitInterval);
+    }
+    return command;
+  }
+
+  @Override
+  public void ioCommandCompleted(IOCommand command) {
+    if (command instanceof LoadPartitionIOCommand) {
+      deltaNumPartitionsInMemory.getAndDecrement();
+    } else if (command instanceof StorePartitionIOCommand) {
+      deltaNumPartitionsInMemory.getAndIncrement();
+    }
+    oocEngine.ioCommandCompleted(command);
+  }
+
+  @Override
+  public void addIOCommand(IOCommand ioCommand) {
+    int ownerThread = getOwnerThreadId(ioCommand.getPartitionId());
+    if (ioCommand instanceof LoadPartitionIOCommand) {
+      threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
+    } else if (ioCommand instanceof StorePartitionIOCommand) {
+      threadStoreCommandQueue.get(ownerThread).offer(ioCommand);
+    } else {
+      throw new IllegalStateException("addIOCommand: IO command type is not " +
+          "supported for addition");
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    shouldTerminate = true;
+  }
+
+  /**
+   * Clear store command queue (should happen at the beginning of each 
iteration
+   * to eliminate eager store commands generated by OOC engine)
+   */
+  public void clearStoreCommandQueue() {
+    for (int i = 0; i < threadStoreCommandQueue.size(); ++i) {
+      threadStoreCommandQueue.get(i).clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java
deleted file mode 100644
index 46d989a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.utils.MemoryUtils;
-
-/**
- * Memory estimator class using JVM runtime methods to estimate the
- * free/available memory.
- */
-public class JVMMemoryEstimator implements MemoryEstimator {
-  /**
-   * Constructor for reflection
-   */
-  public JVMMemoryEstimator() { }
-
-  @Override
-  public void initialize(CentralizedServiceWorker serviceWorker) { }
-
-  @Override
-  public double freeMemoryMB() {
-    return MemoryUtils.freePlusUnallocatedMemoryMB();
-  }
-
-  @Override public double maxMemoryMB() {
-    return MemoryUtils.maxMemoryMB();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java
deleted file mode 100644
index 74a2f7b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-
-/**
- * Interface for memory estimator. Estimated memory is used in adaptive
- * out-of-core mechanism.
- */
-public interface MemoryEstimator {
-  /**
-   * Initialize the memory estimator.
-   *
-   * @param serviceWorker Worker service
-   */
-  void initialize(CentralizedServiceWorker serviceWorker);
-
-  /**
-   * @return amount of free memory in MB
-   */
-  double freeMemoryMB();
-
-  /**
-   * @return amount of available memory in MB
-   */
-  double maxMemoryMB();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index 218a10b..bc0ece4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -18,26 +18,189 @@
 
 package org.apache.giraph.ooc;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.data.MetaPartitionManager;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Preconditions.checkState;
 
 /**
- * Engine for out-of-core mechanism. The engine may attach to a partition store
- * capable of handling disk transfer of partitions.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
+ * Class to represent an out-of-core engine.
  */
-public interface OutOfCoreEngine<I extends WritableComparable,
-    V extends Writable, E extends Writable> {
+public abstract class OutOfCoreEngine {
+  /** Class logger. */
+  private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
+  /** Service worker */
+  protected final CentralizedServiceWorker<?, ?, ?> service;
+  /** Scheduler for IO threads */
+  protected OutOfCoreIOScheduler ioScheduler;
+  /** Data structure to keep meta partition information */
+  protected final MetaPartitionManager metaPartitionManager;
+  /** How many disk (i.e. IO threads) do we have? */
+  protected final int numIOThreads;
+  /**
+   * Whether the job should fail due to IO threads terminating because of
+   * exceptions
+   */
+  protected volatile boolean jobFailed = false;
+  /** Whether the out-of-core engine has initialized */
+  private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+  /**
+   * Global lock for entire superstep. This lock helps to avoid overlapping of
+   * out-of-core decisions (what to do next to help the out-of-core mechanism)
+   * with out-of-core operations (actual IO operations).
+   */
+  private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
+  /** Callable factory for IO threads */
+  private final OutOfCoreIOCallableFactory oocIOCallableFactory;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   * @param service Service worker
+   */
+  public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
+                         CentralizedServiceWorker<?, ?, ?> service) {
+    this.service = service;
+    this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(conf, this);
+    this.numIOThreads = oocIOCallableFactory.getNumDisks();
+    this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
+    oocIOCallableFactory.createCallable();
+  }
+
   /**
    * Initialize/Start the out-of-core engine.
    */
-  void initialize();
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+  public void initialize() {
+    synchronized (isInitialized) {
+      isInitialized.set(true);
+      isInitialized.notifyAll();
+    }
+  }
 
   /**
    * Shutdown/Stop the out-of-core engine.
    */
-  void shutdown();
+  public void shutdown() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("shutdown: out-of-core engine shutting down, signalling IO " +
+          "threads to shutdown");
+    }
+    ioScheduler.shutdown();
+    oocIOCallableFactory.shutdown();
+  }
+
+  /**
+   * Get a reference to the server data
+   *
+   * @return ServerData
+   */
+  public ServerData getServerData() {
+    return service.getServerData();
+  }
+
+  /**
+   * Get a reference to the service worker
+   *
+   * @return CentralizedServiceWorker
+   */
+  public CentralizedServiceWorker getServiceWorker() {
+    return service;
+  }
+
+  /**
+   * Get a reference to IO scheduler
+   *
+   * @return OutOfCoreIOScheduler
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "JLM_JSR166_UTILCONCURRENT_MONITORENTER")
+  public OutOfCoreIOScheduler getIOScheduler() {
+    synchronized (isInitialized) {
+      while (!isInitialized.get()) {
+        try {
+          isInitialized.wait();
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("getIOScheduler: " +
+              "InterruptedException while waiting for out-of-core engine to " +
+              "be initialized!");
+        }
+      }
+    }
+    return ioScheduler;
+  }
+
+  /**
+   * Get a reference to meta partition information
+   *
+   * @return MetaPartitionManager
+   */
+  public MetaPartitionManager getMetaPartitionManager() {
+    checkState(isInitialized.get());
+    return metaPartitionManager;
+  }
+
+  /**
+   * Get a refernce to superstep lock
+   *
+   * @return read/write lock used for global superstep lock
+   */
+  public ReadWriteLock getSuperstepLock() {
+    return superstepLock;
+  }
+
+  /**
+   * Get the id of the next partition to process in the current iteration over
+   * all the partitions. If all partitions are already processed, this method
+   * returns null.
+   *
+   * @return id of a partition to process. 'null' if all partitions are
+   *         processed in current iteration over partitions.
+   */
+  public abstract Integer getNextPartition();
+
+  /**
+   * Notify out-of-core engine that processing of a particular partition is 
done
+   *
+   * @param partitionId id of the partition that its processing is done
+   */
+  public abstract void doneProcessingPartition(int partitionId);
+
+  /**
+   * Notify out=of-core engine that iteration cycle over all partitions is 
about
+   * to begin.
+   */
+  public abstract void startIteration();
+
+  /**
+   * Retrieve a particular partition. After this method is complete the
+   * requested partition should be in memory.
+   *
+   * @param partitionId id of the partition to retrieve
+   */
+  public abstract void retrievePartition(int partitionId);
+
+  /**
+   * Notify out-of-core engine that an IO command is competed by an IO thread
+   *
+   * @param command the IO command that is completed
+   */
+  public abstract void ioCommandCompleted(IOCommand command);
+
+  /**
+   * Set a flag to fail the job.
+   */
+  public void failTheJob() {
+    jobFailed = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
new file mode 100644
index 0000000..6c6e4ff
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.Callable;
+
+/**
+ * IO threads for out-of-core mechanism.
+ */
+public class OutOfCoreIOCallable implements Callable<Void> {
+  /** Class logger. */
+  private static final Logger LOG = 
Logger.getLogger(OutOfCoreIOCallable.class);
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+  /** Base path that this thread will write to/read from */
+  private final String basePath;
+  /** Thread id/Disk id */
+  private final int diskId;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param basePath base path this thread will be using
+   * @param diskId thread id/disk id
+   */
+  public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, String basePath,
+                             int diskId) {
+    this.oocEngine = oocEngine;
+    this.basePath = basePath;
+    this.diskId = diskId;
+  }
+
+  @Override
+  public Void call() {
+    while (true) {
+      oocEngine.getSuperstepLock().readLock().lock();
+      IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("call: thread " + diskId + "'s next IO command is: " +
+            command);
+      }
+      if (command == null) {
+        oocEngine.getSuperstepLock().readLock().unlock();
+        break;
+      }
+      if (command instanceof WaitIOCommand) {
+        oocEngine.getSuperstepLock().readLock().unlock();
+      }
+      // CHECKSTYLE: stop IllegalCatch
+      try {
+        command.execute(basePath);
+      } catch (Exception e) {
+        oocEngine.failTheJob();
+        LOG.info("call: execution of IO command " + command + " failed!");
+        throw new RuntimeException(e);
+      }
+      // CHECKSTYLE: resume IllegalCatch
+      if (!(command instanceof WaitIOCommand)) {
+        oocEngine.getSuperstepLock().readLock().unlock();
+      }
+      oocEngine.getIOScheduler().ioCommandCompleted(command);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("call: out-of-core IO thread " + diskId + " terminating!");
+    }
+    return null;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
new file mode 100644
index 0000000..d4fea22
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
+
+/**
+ * Factory class to create IO threads for out-of-core engine.
+ */
+public class OutOfCoreIOCallableFactory {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(OutOfCoreIOCallableFactory.class);
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+  /** Result of IO threads at the end of the computation */
+  private final List<Future> results;
+  /** How many disks (i.e. IO threads) do we have? */
+  private int numDisks;
+  /** Path prefix for different disks */
+  private final String[] basePaths;
+  /** Executor service for IO threads */
+  private ExecutorService outOfCoreIOExecutor;
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   * @param oocEngine Out-of-core engine
+   */
+  public OutOfCoreIOCallableFactory(
+      ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
+      OutOfCoreEngine oocEngine) {
+    this.oocEngine = oocEngine;
+    this.results = new ArrayList<>();
+    // Take advantage of multiple disks
+    String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
+    this.numDisks = userPaths.length;
+    this.basePaths = new String[numDisks];
+    int ptr = 0;
+    for (String path : userPaths) {
+      File file = new File(path);
+      if (!file.exists()) {
+        checkState(file.mkdirs(), "OutOfCoreIOCallableFactory: cannot create " 
+
+            "directory " + file.getAbsolutePath());
+      }
+      basePaths[ptr] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
+      ptr++;
+    }
+  }
+
+  /**
+   * Creates/Launches IO threads
+   */
+  public void createCallable() {
+    CallableFactory<Void> outOfCoreIOCallableFactory =
+      new CallableFactory<Void>() {
+        @Override
+        public Callable<Void> newCallable(int callableId) {
+          return new OutOfCoreIOCallable(oocEngine, basePaths[callableId],
+              callableId);
+        }
+      };
+    outOfCoreIOExecutor = new ThreadPoolExecutor(numDisks, numDisks, 0L,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+        ThreadUtils.createThreadFactory("ooc-io-%d")) {
+      @Override
+      protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+        if (t == null && r instanceof Future<?>) {
+          try {
+            Future<?> future = (Future<?>) r;
+            if (future.isDone()) {
+              future.get();
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          } catch (ExecutionException e) {
+            t = e;
+          }
+          if (t != null) {
+            LOG.info("afterExecute: an out-of-core thread terminated " +
+                "unexpectedly with " + t);
+            oocEngine.failTheJob();
+          }
+        }
+      }
+    };
+
+    for (int i = 0; i < numDisks; ++i) {
+      Future<Void> future = outOfCoreIOExecutor.submit(
+          new LogStacktraceCallable<>(
+              outOfCoreIOCallableFactory.newCallable(i)));
+      results.add(future);
+    }
+    // Notify executor to not accept any more tasks
+    outOfCoreIOExecutor.shutdown();
+  }
+
+  /**
+   * How many disks do we have?
+   *
+   * @return number of disks (IO threads)
+   */
+  public int getNumDisks() {
+    return numDisks;
+  }
+
+  /**
+   * Check whether all IO threads terminated gracefully.
+   */
+  public void shutdown() {
+    boolean threadsTerminated = false;
+    while (!threadsTerminated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("shutdown: waiting for IO threads to finish!");
+      }
+      try {
+        threadsTerminated =
+            outOfCoreIOExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("shutdown: caught " +
+            "InterruptedException while waiting for IO threads to finish");
+      }
+    }
+    for (int i = 0; i < numDisks; ++i) {
+      try {
+        // Check whether the tread terminated gracefully
+        results.get(i).get();
+      } catch (InterruptedException e) {
+        LOG.error("shutdown: IO thread " + i + " was interrupted during its " +
+            "execution");
+        throw new IllegalStateException(e);
+      } catch (ExecutionException e) {
+        LOG.error("shutdown: IO thread " + i + " threw an exception during " +
+            "its execution");
+        throw new IllegalStateException(e);
+      }
+    }
+    for (String path : basePaths) {
+      File file = new File(path).getParentFile();
+      for (String subFileName : file.list()) {
+        File subFile = new File(file.getPath(), subFileName);
+        checkState(subFile.delete(), "shutdown: cannot delete file %s",
+            subFile.getAbsoluteFile());
+      }
+      checkState(file.delete(), "shutdown: cannot delete directory %s",
+          file.getAbsoluteFile());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
new file mode 100644
index 0000000..dee632d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import com.google.common.hash.Hashing;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.log4j.Logger;
+
+/**
+ * Representation of IO thread scheduler for out-of-core mechanism
+ */
+public abstract class OutOfCoreIOScheduler {
+  /**
+   * If an IO thread does not have any command to do, it waits for certain a
+   * period and check back again to see if there exist any command to perform.
+   * This constant determines this wait period in milliseconds.
+   */
+  public static final IntConfOption OOC_WAIT_INTERVAL =
+      new IntConfOption("giraph.oocWaitInterval", 1000,
+          "Duration (in milliseconds) which IO threads in out-of-core " +
+              "mechanism would wait until a command becomes available");
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(OutOfCoreIOScheduler.class);
+  /** Out-of-core engine */
+  protected final OutOfCoreEngine oocEngine;
+  /** How much an IO thread should wait if there is no IO command */
+  protected final int waitInterval;
+  /** How many disks (i.e. IO threads) do we have? */
+  private final int numDisks;
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param oocEngine out-of-core engine
+   * @param numDisks number of disks (IO threads)
+   */
+  OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf,
+                       OutOfCoreEngine oocEngine, int numDisks) {
+    this.oocEngine = oocEngine;
+    this.numDisks = numDisks;
+    this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
+  }
+
+  /**
+   * Get the thread id that is responsible for a particular partition
+   *
+   * @param partitionId id of the given partition
+   * @return id of the thread responsible for the given partition
+   */
+  public int getOwnerThreadId(int partitionId) {
+    int result = Hashing.murmur3_32().hashInt(partitionId).asInt() % numDisks;
+    return (result >= 0) ? result : (result + numDisks);
+  }
+
+  /**
+   * Generate and return the next appropriate IO command for a given thread
+   *
+   * @param threadId id of the thread ready to execute the next IO command
+   * @return next IO command to be executed by the given thread
+   */
+  public abstract IOCommand getNextIOCommand(int threadId);
+
+  /**
+   * Notify IO scheduler that the IO command is completed
+   *
+   * @param command completed command
+   */
+  public abstract void ioCommandCompleted(IOCommand command);
+
+  /**
+   * Add an IO command to the scheduling queue of the IO scheduler
+   *
+   * @param ioCommand IO command to add to the scheduler
+   */
+  public abstract void addIOCommand(IOCommand ioCommand);
+
+  /**
+   * Shutdown/Terminate the IO scheduler, and notify all IO threads to halt
+   */
+  public void shutdown() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("shutdown: OutOfCoreIOScheduler shutting down!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
deleted file mode 100644
index 7c4d8df..0000000
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Class to implement slaves for adaptive out-of-core brain. Basically the 
brain
- * decides on when to offload and what data to offload to disk and generates
- * commands for offloading. Slaves just execute the commands. Commands can be:
- *   1) offloading vertex buffer of partitions in INPUT_SUPERSTEP,
- *   2) offloading edge buffer of partitions in INPUT_SUPERSTEP,
- *   3) offloading partitions.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-public class OutOfCoreProcessorCallable<I extends WritableComparable,
-    V extends Writable, E extends Writable> implements Callable<Void> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(OutOfCoreProcessorCallable.class);
-  /** Partition store */
-  private final DiskBackedPartitionStore<I, V, E> partitionStore;
-  /** Adaptive out-of-core engine */
-  private final AdaptiveOutOfCoreEngine<I, V, E> oocEngine;
-
-  /**
-   * Constructor for out-of-core processor threads.
-   *
-   * @param oocEngine out-of-core engine
-   * @param serviceWorker worker service
-   */
-  public OutOfCoreProcessorCallable(AdaptiveOutOfCoreEngine<I, V, E> oocEngine,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
-    this.oocEngine = oocEngine;
-    this.partitionStore =
-        (DiskBackedPartitionStore<I, V, E>) serviceWorker.getPartitionStore();
-  }
-
-  @Override
-  public Void call() {
-    while (true) {
-      // First wait on a gate to be opened by memory-check thread. Memory-check
-      // thread opens the gate once there are data available to be spilled to
-      // disk.
-      try {
-        oocEngine.waitOnGate();
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("call: Caught InterruptedException " +
-            "while waiting on memory check thread signal on available " +
-            "partitions to put on disk");
-      } catch (BrokenBarrierException e) {
-        throw new IllegalStateException("call Caught BrokenBarrierException. " 
+
-            "Looks like some other threads broke while waiting on barrier");
-      }
-
-      // The value of 'done' is true iff it is set right before the gate
-      // at end of check-memory thread. In such case, the computation is done
-      // and OOC processing threads should terminate gracefully.
-      if (oocEngine.isDone()) {
-        break;
-      }
-
-      BlockingQueue<Integer> partitionsWithInputVertices =
-          oocEngine.getPartitionsWithInputVertices();
-      BlockingQueue<Integer> partitionsWithInputEdges =
-          oocEngine.getPartitionsWithInputEdges();
-      BlockingQueue<Integer> partitionsWithPendingMessages =
-          oocEngine.getPartitionsWithPendingMessages();
-      AtomicInteger numPartitionsToSpill =
-          oocEngine.getNumPartitionsToSpill();
-
-      while (!partitionsWithInputVertices.isEmpty()) {
-        Integer partitionId = partitionsWithInputVertices.poll();
-        if (partitionId == null) {
-          break;
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("call: spilling vertex buffer of partition " + partitionId);
-        }
-        try {
-          partitionStore.spillPartitionInputVertexBuffer(partitionId);
-        } catch (IOException e) {
-          throw new IllegalStateException("call: caught IOException while " +
-              "spilling vertex buffers to disk");
-        }
-      }
-
-      while (!partitionsWithInputEdges.isEmpty()) {
-        Integer partitionId = partitionsWithInputEdges.poll();
-        if (partitionId == null) {
-          break;
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("call: spilling edge buffer of partition " + partitionId);
-        }
-        try {
-          partitionStore.spillPartitionInputEdgeStore(partitionId);
-        } catch (IOException e) {
-          throw new IllegalStateException("call: caught IOException while " +
-              "spilling edge buffers/store to disk");
-        }
-      }
-
-      while (!partitionsWithPendingMessages.isEmpty()) {
-        Integer partitionId = partitionsWithPendingMessages.poll();
-        if (partitionId == null) {
-          break;
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info(
-              "call: spilling message buffers of partition " + partitionId);
-        }
-        try {
-          partitionStore.spillPartitionMessages(partitionId);
-        } catch (IOException e) {
-          throw new IllegalStateException("call: caught IOException while " +
-              "spilling edge buffers/store to disk");
-        }
-      }
-
-      // Put partitions on disk
-      while (numPartitionsToSpill.getAndDecrement() > 0) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("call: start offloading a partition");
-        }
-        partitionStore.spillOnePartition();
-      }
-
-      // Signal memory check thread that I am done putting data on disk
-      try {
-        oocEngine.waitOnOocSignal();
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("call: Caught InterruptedException " +
-            "while waiting to notify memory check thread that I am done");
-      } catch (BrokenBarrierException e) {
-        throw new IllegalStateException("call: Caught BrokenBarrierException " 
+
-            "while waiting to notify memory check thread that I am done");
-      }
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
new file mode 100644
index 0000000..7909100
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.EdgeStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdges;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of an edge-store used for out-of-core mechanism.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class DiskBackedEdgeStore<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends OutOfCoreDataManager<VertexIdEdges<I, E>>
+    implements EdgeStore<I, V, E> {
+  /** Class logger. */
+  private static final Logger LOG = 
Logger.getLogger(DiskBackedEdgeStore.class);
+  /** In-memory message store */
+  private final EdgeStore<I, V, E> edgeStore;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+
+  /**
+   * Constructor
+   *
+   * @param edgeStore In-memory edge store for which out-of-core edge store
+   *                  would be a wrapper
+   * @param conf Configuration
+   * @param oocEngine Out-of-core engine
+   */
+  public DiskBackedEdgeStore(
+      EdgeStore<I, V, E> edgeStore,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      OutOfCoreEngine oocEngine) {
+    super(conf);
+    this.edgeStore = edgeStore;
+    this.conf = conf;
+    this.oocEngine = oocEngine;
+  }
+
+  @Override
+  public void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges) {
+    addEntry(partitionId, edges);
+  }
+
+  @Override
+  public void moveEdgesToVertices() {
+    edgeStore.moveEdgesToVertices();
+  }
+
+  @Override
+  public void writePartitionEdgeStore(int partitionId, DataOutput output)
+      throws IOException {
+    // This method is only called (should only be called) on in-memory edge
+    // stores
+    throw new IllegalStateException("writePartitionEdgeStore: this method " +
+        "should not be called for DiskBackedEdgeStore!");
+  }
+
+  @Override
+  public void readPartitionEdgeStore(int partitionId, DataInput input)
+      throws IOException {
+    // This method is only called (should only be called) on in-memory edge
+    // stores
+    throw new IllegalStateException("readPartitionEdgeStore: this method " +
+        "should not be called for DiskBackedEdgeStore!");
+  }
+
+  @Override
+  public boolean hasEdgesForPartition(int partitionId) {
+    // This method is only called (should only be called) on in-memory edge
+    // stores
+    throw new IllegalStateException("hasEdgesForPartition: this method " +
+        "should not be called for DiskBackedEdgeStore!");
+  }
+
+  /**
+   * Gets the path that should be used specifically for edge data.
+   *
+   * @param basePath path prefix to build the actual path from
+   * @return path to files specific for edge data
+   */
+  private static String getPath(String basePath) {
+    return basePath + "_edge_store";
+  }
+
+  @Override
+  public void loadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    super.loadPartitionData(partitionId, getPath(basePath));
+  }
+
+  @Override
+  public void offloadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    super.offloadPartitionData(partitionId, getPath(basePath));
+  }
+
+  @Override
+  public void offloadBuffers(int partitionId, String basePath)
+      throws IOException {
+    super.offloadBuffers(partitionId, getPath(basePath));
+  }
+
+  @Override
+  protected void writeEntry(VertexIdEdges<I, E> edges, DataOutput out)
+      throws IOException {
+    edges.write(out);
+  }
+
+  @Override
+  protected VertexIdEdges<I, E> readNextEntry(DataInput in) throws IOException 
{
+    VertexIdEdges<I, E> vertexIdEdges = new ByteArrayVertexIdEdges<>();
+    vertexIdEdges.setConf(conf);
+    vertexIdEdges.readFields(in);
+    return vertexIdEdges;
+  }
+
+  @Override
+  protected void loadInMemoryPartitionData(int partitionId, String path)
+      throws IOException {
+    File file = new File(path);
+    if (file.exists()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loadInMemoryPartitionData: loading edge data for " +
+            "partition " + partitionId + " from " + file.getAbsolutePath());
+      }
+      FileInputStream fis = new FileInputStream(file);
+      BufferedInputStream bis = new BufferedInputStream(fis);
+      DataInputStream dis = new DataInputStream(bis);
+      edgeStore.readPartitionEdgeStore(partitionId, dis);
+      dis.close();
+      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
+          "%s.", file.getAbsoluteFile());
+    }
+  }
+
+  @Override
+  protected void offloadInMemoryPartitionData(int partitionId, String path)
+      throws IOException {
+    if (edgeStore.hasEdgesForPartition(partitionId)) {
+      File file = new File(path);
+      checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " +
+          "file %s already exist", file.getAbsoluteFile());
+      checkState(file.createNewFile(),
+          "offloadInMemoryPartitionData: cannot create edge store file %s",
+          file.getAbsoluteFile());
+      FileOutputStream fos = new FileOutputStream(file);
+      BufferedOutputStream bos = new BufferedOutputStream(fos);
+      DataOutputStream dos = new DataOutputStream(bos);
+      edgeStore.writePartitionEdgeStore(partitionId, dos);
+      dos.close();
+    }
+  }
+
+  @Override
+  protected int entrySerializedSize(VertexIdEdges<I, E> edges) {
+    return edges.getSerializedSize();
+  }
+
+  @Override
+  protected void addEntryToImMemoryPartitionData(int partitionId,
+                                                 VertexIdEdges<I, E> edges) {
+    oocEngine.getMetaPartitionManager().addPartition(partitionId);
+    edgeStore.addPartitionEdges(partitionId, edges);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
new file mode 100644
index 0000000..2a40a58
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of a message store used for out-of-core mechanism.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class DiskBackedMessageStore<I extends WritableComparable,
+    M extends Writable> extends OutOfCoreDataManager<VertexIdMessages<I, M>>
+    implements MessageStore<I, M> {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(DiskBackedMessageStore.class);
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+  /** In-memory message store */
+  private final MessageStore<I, M> messageStore;
+  /** Whether the message store uses message combiner or not */
+  private final boolean useMessageCombiner;
+  /** Which superstep this message store is used for */
+  private final long superstep;
+
+  /**
+   * Type of VertexIdMessage class (container for serialized messages) received
+   * for a particular message. If we write the received messages to disk before
+   * adding them to message store, we need this type when we want to read the
+   * messages back from disk (so that we know how to read the messages from
+   * disk).
+   */
+  private enum SerializedMessageClass {
+    /** ByteArrayVertexIdMessages */
+    BYTE_ARRAY_VERTEX_ID_MESSAGES,
+    /** ByteArrayOneMEssageToManyIds */
+    BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
+  }
+
+  /**
+   * Constructor
+   *
+   * @param config Configuration
+   * @param messageStore In-memory message store for which out-of-core message
+   *                     store would be wrapper
+   * @param useMessageCombiner Whether message combiner is used for this 
message
+   *                           store
+   * @param superstep superstep number this messages store is used for
+   */
+  public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
+                                    config,
+                                MessageStore<I, M> messageStore,
+                                boolean useMessageCombiner, long superstep) {
+    super(config);
+    this.config = config;
+    this.messageStore = messageStore;
+    this.useMessageCombiner = useMessageCombiner;
+    this.superstep = superstep;
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return messageStore.isPointerListEncoding();
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    return messageStore.getVertexMessages(vertexId);
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    messageStore.clearVertexMessages(vertexId);
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    messageStore.clearAll();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return messageStore.hasMessagesForVertex(vertexId);
+  }
+
+  @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return messageStore.hasMessagesForPartition(partitionId);
+  }
+
+  @Override
+  public void addPartitionMessages(
+      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    if (useMessageCombiner) {
+      messageStore.addPartitionMessages(partitionId, messages);
+    } else {
+      addEntry(partitionId, messages);
+    }
+  }
+
+  /**
+   * Gets the path that should be used specifically for message data.
+   *
+   * @param basePath path prefix to build the actual path from
+   * @param superstep superstep for which message data should be stored
+   * @return path to files specific for message data
+   */
+  private static String getPath(String basePath, long superstep) {
+    return basePath + "_messages-S" + superstep;
+  }
+
+  @Override
+  public void loadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    if (!useMessageCombiner) {
+      super.loadPartitionData(partitionId, getPath(basePath, superstep));
+    }
+  }
+
+  @Override
+  public void offloadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    if (!useMessageCombiner) {
+      super.offloadPartitionData(partitionId, getPath(basePath, superstep));
+    }
+  }
+
+  @Override
+  public void offloadBuffers(int partitionId, String basePath)
+      throws IOException {
+    if (!useMessageCombiner) {
+      super.offloadBuffers(partitionId, getPath(basePath, superstep));
+    }
+  }
+
+  @Override
+  public void finalizeStore() {
+    messageStore.finalizeStore();
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+    return messageStore.getPartitionDestinationVertices(partitionId);
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    messageStore.clearPartition(partitionId);
+  }
+
+  @Override
+  public void writePartition(DataOutput out, int partitionId)
+      throws IOException {
+    messageStore.writePartition(out, partitionId);
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in, int partitionId)
+      throws IOException {
+    messageStore.readFieldsForPartition(in, partitionId);
+  }
+
+  @Override
+  protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out)
+      throws IOException {
+    SerializedMessageClass messageClass;
+    if (messages instanceof ByteArrayVertexIdMessages) {
+      messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
+    } else if (messages instanceof ByteArrayOneMessageToManyIds) {
+      messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
+    } else {
+      throw new IllegalStateException("writeEntry: serialized message " +
+          "type is not supported");
+    }
+    out.writeInt(messageClass.ordinal());
+    messages.write(out);
+  }
+
+  @Override
+  protected VertexIdMessages<I, M> readNextEntry(DataInput in)
+      throws IOException {
+    int messageType = in.readInt();
+    SerializedMessageClass messageClass =
+        SerializedMessageClass.values()[messageType];
+    VertexIdMessages<I, M> vim;
+    switch (messageClass) {
+    case BYTE_ARRAY_VERTEX_ID_MESSAGES:
+      vim = new ByteArrayVertexIdMessages<>(
+          config.<M>createOutgoingMessageValueFactory());
+      vim.setConf(config);
+      break;
+    case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
+      vim = new ByteArrayOneMessageToManyIds<>(
+          config.<M>createOutgoingMessageValueFactory());
+      vim.setConf(config);
+      break;
+    default:
+      throw new IllegalStateException("readNextEntry: unsupported " +
+          "serialized message type!");
+    }
+    vim.readFields(in);
+    return vim;
+  }
+
+  @Override
+  protected void loadInMemoryPartitionData(int partitionId, String basePath)
+      throws IOException {
+    File file = new File(basePath);
+    if (file.exists()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loadInMemoryPartitionData: loading message data for " +
+            "partition " + partitionId + " from " + file.getAbsolutePath());
+      }
+      FileInputStream fis = new FileInputStream(file);
+      BufferedInputStream bis = new BufferedInputStream(fis);
+      DataInputStream dis = new DataInputStream(bis);
+      messageStore.readFieldsForPartition(dis, partitionId);
+      dis.close();
+      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
+          "%s.", file.getAbsoluteFile());
+    }
+  }
+
+  @Override
+  protected void offloadInMemoryPartitionData(int partitionId, String basePath)
+      throws IOException {
+    if (messageStore.hasMessagesForPartition(partitionId)) {
+      File file = new File(basePath);
+      checkState(!file.exists(), "offloadInMemoryPartitionData: message store" 
+
+          " file %s already exist", file.getAbsoluteFile());
+      checkState(file.createNewFile(),
+          "offloadInMemoryPartitionData: cannot create message store file %s",
+          file.getAbsoluteFile());
+      FileOutputStream fileout = new FileOutputStream(file);
+      BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
+      DataOutputStream outputStream = new DataOutputStream(bufferout);
+      messageStore.writePartition(outputStream, partitionId);
+      messageStore.clearPartition(partitionId);
+      outputStream.close();
+    }
+  }
+
+  @Override
+  protected int entrySerializedSize(VertexIdMessages<I, M> messages) {
+    return messages.getSerializedSize();
+  }
+
+  @Override
+  protected void addEntryToImMemoryPartitionData(int partitionId,
+                                                 VertexIdMessages<I, M>
+                                                     messages) {
+    try {
+      messageStore.addPartitionMessages(partitionId, messages);
+    } catch (IOException e) {
+      throw new IllegalStateException("Caught IOException while adding a new " 
+
+          "message to in-memory message store");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
new file mode 100644
index 0000000..5854c8d
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of a partition-store used for out-of-core mechanism.
+ * Partition store is responsible for partition data, as well as data buffers 
in
+ * INPUT_SUPERSTEP ("raw data buffer" -- defined in OutOfCoreDataManager --
+ * refers to vertex buffers in INPUT_SUPERSTEP).
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class DiskBackedPartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends OutOfCoreDataManager<ExtendedDataOutput>
+    implements PartitionStore<I, V, E> {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(DiskBackedPartitionStore.class);
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Job context (for progress) */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E> serviceWorker;
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+  /** In-memory partition store */
+  private final PartitionStore<I, V, E> partitionStore;
+  /**
+   * Keeps number of vertices in partitions, right when they are last spilled
+   * to the disk. This value may be inaccurate for in-memory partitions, but
+   * is accurate for out-of-core partitions.
+   */
+  private final Map<Integer, Long> partitionVertexCount =
+      Maps.newConcurrentMap();
+  /**
+   * Keeps number of edges in partitions, right when they are last spilled
+   * to the disk. This value may be inaccurate for in-memory partitions, but
+   * is accurate for out-of-core partitions.
+   */
+  private final Map<Integer, Long> partitionEdgeCount =
+      Maps.newConcurrentMap();
+
+  /**
+   * Constructor.
+   *
+   * @param partitionStore In-memory partition store for which out-of-code
+   *                       partition store would be a wrapper
+   * @param conf Configuration
+   * @param context Job context
+   * @param serviceWorker Service worker
+   * @param oocEngine Out-of-core engine
+   */
+  public DiskBackedPartitionStore(
+      PartitionStore<I, V, E> partitionStore,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      Mapper<?, ?, ?, ?>.Context context,
+      CentralizedServiceWorker<I, V, E> serviceWorker,
+      OutOfCoreEngine oocEngine) {
+    super(conf);
+    this.partitionStore = partitionStore;
+    this.conf = conf;
+    this.context = context;
+    this.serviceWorker = serviceWorker;
+    this.oocEngine = oocEngine;
+  }
+
+  @Override
+  public boolean addPartition(Partition<I, V, E> partition) {
+    boolean added = partitionStore.addPartition(partition);
+    if (added) {
+      oocEngine.getMetaPartitionManager()
+          .addPartition(partition.getId());
+    }
+    return added;
+  }
+
+  @Override
+  public Partition<I, V, E> removePartition(Integer partitionId) {
+    // Set the partition as 'in process' so its data and messages do not get
+    // spilled to disk until the remove is complete.
+    oocEngine.getMetaPartitionManager().makePartitionInaccessible(partitionId);
+    oocEngine.retrievePartition(partitionId);
+    Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
+    checkNotNull(partition, "removePartition: partition " + partitionId +
+        " is not in memory for removal!");
+    oocEngine.getMetaPartitionManager().removePartition(partitionId);
+    return partition;
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return oocEngine.getMetaPartitionManager().hasPartition(partitionId);
+  }
+
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    return oocEngine.getMetaPartitionManager().getPartitionIds();
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return oocEngine.getMetaPartitionManager().getNumPartitions();
+  }
+
+  @Override
+  public long getPartitionVertexCount(Integer partitionId) {
+    if (partitionStore.hasPartition(partitionId)) {
+      return partitionStore.getPartitionVertexCount(partitionId);
+    } else {
+      return partitionVertexCount.get(partitionId);
+    }
+  }
+
+  @Override
+  public long getPartitionEdgeCount(Integer partitionId) {
+    if (partitionStore.hasPartition(partitionId)) {
+      return partitionStore.getPartitionEdgeCount(partitionId);
+    } else {
+      return partitionEdgeCount.get(partitionId);
+    }
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return getNumPartitions() == 0;
+  }
+
+  @Override
+  public void startIteration() {
+    oocEngine.startIteration();
+  }
+
+  @Override
+  public Partition<I, V, E> getNextPartition() {
+    Integer partitionId = oocEngine.getNextPartition();
+    if (partitionId == null) {
+      return null;
+    }
+    Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
+    if (partition == null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getNextPartition: partition " + partitionId + " is not in " +
+            "the partition store. Creating an empty partition for it.");
+      }
+      partition = conf.createPartition(partitionId, context);
+    }
+    partitionStore.addPartition(partition);
+    return partition;
+  }
+
+  @Override
+  public void putPartition(Partition<I, V, E> partition) {
+    oocEngine.doneProcessingPartition(partition.getId());
+  }
+
+  @Override
+  public void addPartitionVertices(Integer partitionId,
+                                   ExtendedDataOutput extendedDataOutput) {
+    addEntry(partitionId, extendedDataOutput);
+  }
+
+  @Override
+  public void shutdown() {
+    oocEngine.shutdown();
+  }
+
+  @Override
+  public void initialize() {
+    oocEngine.initialize();
+  }
+
+  /**
+   * Gets the path that should be used specifically for partition data.
+   *
+   * @param basePath path prefix to build the actual path from
+   * @return path to files specific for partition data
+   */
+  private static String getPath(String basePath) {
+    return basePath + "_partition";
+  }
+
+  /**
+   * Get the path to the file where vertices are stored.
+   *
+   * @param basePath path prefix to build the actual path from
+   * @return The path to the vertices file
+   */
+  private static String getVerticesPath(String basePath) {
+    return basePath + "_vertices";
+  }
+
+  /**
+   * Get the path to the file where edges are stored.
+   *
+   * @param basePath path prefix to build the actual path from
+   * @return The path to the edges file
+   */
+  private static String getEdgesPath(String basePath) {
+    return basePath + "_edges";
+  }
+
+  /**
+   * Read vertex data from an input and initialize the vertex.
+   *
+   * @param in     The input stream
+   * @param vertex The vertex to initialize
+   * @throws IOException
+   */
+  private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
+      throws IOException {
+    I id = conf.createVertexId();
+    id.readFields(in);
+    V value = conf.createVertexValue();
+    value.readFields(in);
+    OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
+    vertex.initialize(id, value, edges);
+    if (in.readBoolean()) {
+      vertex.voteToHalt();
+    } else {
+      vertex.wakeUp();
+    }
+  }
+
+  /**
+   * Read vertex edges from an input and set them to the vertex.
+   *
+   * @param in        The input stream
+   * @param partition The partition owning the vertex
+   * @throws IOException
+   */
+  private void readOutEdges(DataInput in, Partition<I, V, E> partition)
+      throws IOException {
+    I id = conf.createVertexId();
+    id.readFields(in);
+    Vertex<I, V, E> v = partition.getVertex(id);
+    OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
+    edges.readFields(in);
+    partition.saveVertex(v);
+  }
+
+  @Override
+  protected void loadInMemoryPartitionData(int partitionId, String path)
+      throws IOException {
+    // Load vertices
+    File file = new File(getVerticesPath(path));
+    if (file.exists()) {
+      Partition<I, V, E> partition = conf.createPartition(partitionId, 
context);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loadInMemoryPartitionData: loading partition vertices " +
+            partitionId + " from " + file.getAbsolutePath());
+      }
+
+      FileInputStream fis = new FileInputStream(file);
+      BufferedInputStream bis = new BufferedInputStream(fis);
+      DataInputStream inputStream = new DataInputStream(bis);
+      long numVertices = inputStream.readLong();
+      for (long i = 0; i < numVertices; ++i) {
+        Vertex<I, V, E> vertex = conf.createVertex();
+        readVertexData(inputStream, vertex);
+        partition.putVertex(vertex);
+      }
+      inputStream.close();
+      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
+          "%s", file.getAbsolutePath());
+
+      // Load edges
+      file = new File(getEdgesPath(path));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loadInMemoryPartitionData: loading partition edges " +
+            partitionId + " from " + file.getAbsolutePath());
+      }
+
+      fis = new FileInputStream(file);
+      bis = new BufferedInputStream(fis);
+      inputStream = new DataInputStream(bis);
+      for (int i = 0; i < numVertices; ++i) {
+        readOutEdges(inputStream, partition);
+      }
+      inputStream.close();
+      // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
+      // around.
+      if (!conf.isStaticGraph() ||
+          serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
+        checkState(file.delete(), "loadPartition: failed to delete %s",
+            file.getAbsolutePath());
+      }
+      partitionStore.addPartition(partition);
+    }
+  }
+
+  @Override
+  protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException {
+    return WritableUtils.readExtendedDataOutput(in, conf);
+  }
+
+  @Override
+  protected void addEntryToImMemoryPartitionData(int partitionId,
+                                                 ExtendedDataOutput vertices) {
+    if (!partitionStore.hasPartition(partitionId)) {
+      oocEngine.getMetaPartitionManager().addPartition(partitionId);
+    }
+    partitionStore.addPartitionVertices(partitionId, vertices);
+  }
+
+  @Override
+  public void loadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    super.loadPartitionData(partitionId, getPath(basePath));
+  }
+
+  @Override
+  public void offloadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    super.offloadPartitionData(partitionId, getPath(basePath));
+  }
+
+  /**
+   * Writes vertex data (Id, value and halted state) to stream.
+   *
+   * @param output The output stream
+   * @param vertex The vertex to serialize
+   * @throws IOException
+   */
+  private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
+      throws IOException {
+    vertex.getId().write(output);
+    vertex.getValue().write(output);
+    output.writeBoolean(vertex.isHalted());
+  }
+
+  /**
+   * Writes vertex edges (Id, edges) to stream.
+   *
+   * @param output The output stream
+   * @param vertex The vertex to serialize
+   * @throws IOException
+   */
+  private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
+      throws IOException {
+    vertex.getId().write(output);
+    OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
+    edges.write(output);
+  }
+
+  @Override
+  protected void offloadInMemoryPartitionData(int partitionId, String path)
+      throws IOException {
+    if (partitionStore.hasPartition(partitionId)) {
+      partitionVertexCount.put(partitionId,
+          partitionStore.getPartitionVertexCount(partitionId));
+      partitionEdgeCount.put(partitionId,
+          partitionStore.getPartitionEdgeCount(partitionId));
+      Partition<I, V, E> partition =
+          partitionStore.removePartition(partitionId);
+      File file = new File(getVerticesPath(path));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("offloadInMemoryPartitionData: writing partition vertices " +
+            partitionId + " to " + file.getAbsolutePath());
+      }
+      checkState(!file.exists(), "offloadInMemoryPartitionData: partition " +
+          "store file %s already exist", file.getAbsoluteFile());
+      checkState(file.createNewFile(),
+          "offloadInMemoryPartitionData: file %s already exists.",
+          file.getAbsolutePath());
+
+      FileOutputStream fileout = new FileOutputStream(file);
+      BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
+      DataOutputStream outputStream = new DataOutputStream(bufferout);
+      outputStream.writeLong(partition.getVertexCount());
+      for (Vertex<I, V, E> vertex : partition) {
+        writeVertexData(outputStream, vertex);
+      }
+      outputStream.close();
+
+      // Avoid writing back edges if we have already written them once and
+      // the graph is not changing.
+      // If we are in the input superstep, we need to write the files
+      // at least the first time, even though the graph is static.
+      file = new File(getEdgesPath(path));
+      if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
+          partitionVertexCount.get(partitionId) == null ||
+          partitionVertexCount.get(partitionId) != partition.getVertexCount() 
||
+          !conf.isStaticGraph() || !file.exists()) {
+        checkState(file.createNewFile(), "offloadInMemoryPartitionData: file " 
+
+            "%s already exists.", file.getAbsolutePath());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("offloadInMemoryPartitionData: writing partition edges " +
+              partitionId + " to " + file.getAbsolutePath());
+        }
+        fileout = new FileOutputStream(file);
+        bufferout = new BufferedOutputStream(fileout);
+        outputStream = new DataOutputStream(bufferout);
+        for (Vertex<I, V, E> vertex : partition) {
+          writeOutEdges(outputStream, vertex);
+        }
+        outputStream.close();
+      }
+    }
+  }
+
+  @Override
+  protected void writeEntry(ExtendedDataOutput vertices, DataOutput out)
+      throws IOException {
+    WritableUtils.writeExtendedDataOutput(vertices, out);
+  }
+
+  @Override
+  public void offloadBuffers(int partitionId, String basePath)
+      throws IOException {
+    super.offloadBuffers(partitionId, getPath(basePath));
+  }
+
+  @Override
+  protected int entrySerializedSize(ExtendedDataOutput vertices) {
+    return vertices.getPos();
+  }
+}

Reply via email to