http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
new file mode 100644
index 0000000..b38f957
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * IOCommand to store incoming message of a particular partition.
+ */
+public class StoreIncomingMessageIOCommand extends IOCommand {
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to store its incoming messages
+   */
+  public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine,
+                                       int partitionId) {
+    super(oocEngine, partitionId);
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    boolean executed = false;
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingMessages(partitionId)) {
+      DiskBackedMessageStore messageStore =
+          (DiskBackedMessageStore)
+              oocEngine.getServerData().getIncomingMessageStore();
+      checkState(messageStore != null);
+      numBytesTransferred +=
+          messageStore.offloadPartitionData(partitionId);
+      oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
+      executed = true;
+    }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_MESSAGE;
+  }
+
+  @Override
+  public String toString() {
+    return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + 
")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
new file mode 100644
index 0000000..31fa345
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and
+ * message data (if in compute supersteps).
+ */
+public class StorePartitionIOCommand extends IOCommand {
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to store its data
+   */
+  public StorePartitionIOCommand(OutOfCoreEngine oocEngine,
+                                 int partitionId) {
+    super(oocEngine, partitionId);
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    boolean executed = false;
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingPartition(partitionId)) {
+      DiskBackedPartitionStore partitionStore =
+          (DiskBackedPartitionStore)
+              oocEngine.getServerData().getPartitionStore();
+      numBytesTransferred +=
+          partitionStore.offloadPartitionData(partitionId);
+      if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
+        MessageStore messageStore =
+            oocEngine.getServerData().getCurrentMessageStore();
+        if (messageStore != null) {
+          numBytesTransferred += ((DiskBackedMessageStore) messageStore)
+              .offloadPartitionData(partitionId);
+        }
+      } else {
+        DiskBackedEdgeStore edgeStore =
+            (DiskBackedEdgeStore)
+                oocEngine.getServerData().getEdgeStore();
+        numBytesTransferred +=
+            edgeStore.offloadPartitionData(partitionId);
+      }
+      oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
+      executed = true;
+    }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_PARTITION;
+  }
+
+  @Override
+  public String toString() {
+    return "StorePartitionIOCommand: (partitionId = " + partitionId + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
new file mode 100644
index 0000000..83540c1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * IOCommand to do nothing regarding moving data to/from disk.
+ */
+public class WaitIOCommand extends IOCommand {
+  /** How long should the disk be idle? (in milliseconds) */
+  private final long waitDuration;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param waitDuration duration of wait
+   */
+  public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) {
+    super(oocEngine, -1);
+    this.waitDuration = waitDuration;
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    try {
+      TimeUnit.MILLISECONDS.sleep(waitDuration);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("execute: caught InterruptedException " +
+          "while IO thread is waiting!");
+    }
+    return true;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.WAIT;
+  }
+
+  @Override
+  public String toString() {
+    return "WaitIOCommand: (duration = " + waitDuration + "ms)";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
new file mode 100644
index 0000000..930b139
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to IO commands in out-of-core mechanism
+ */
+package org.apache.giraph.ooc.command;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
new file mode 100644
index 0000000..7265410
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
+
+/**
+ * This class provides basic operations for data structures that have to
+ * participate in out-of-core mechanism. Essential subclasses of this class 
are:
+ *  - DiskBackedPartitionStore (for partition data)
+ *  - DiskBackedMessageStore (for messages)
+ *  - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
+ * Basically, any data structure that may cause OOM to happen can be 
implemented
+ * as a subclass of this class.
+ *
+ * There are two different terms used in the rest of this class:
+ *  - "data store" refers to in-memory representation of data. Usually this is
+ *    stored per-partition in in-memory implementations of data structures. For
+ *    instance, "data store" of a DiskBackedPartitionStore would collection of
+ *    all partitions kept in the in-memory partition store within the
+ *    DiskBackedPartitionStore.
+ *  - "raw data buffer" refers to raw data which were supposed to be
+ *    de-serialized and added to the data store, but they remain 'as is' in the
+ *    memory because their corresponding partition is offloaded to disk and is
+ *    not available in the data store.
+ *
+ * @param <T> raw data format of the data store subclassing this class
+ */
+public abstract class DiskBackedDataStore<T> {
+  /**
+   * Minimum size of a buffer (in bytes) to flush to disk. This is used to
+   * decide whether vertex/edge buffers are large enough to flush to disk.
+   */
+  public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
+      new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
+          "Minimum size of a buffer (in bytes) to flush to disk.");
+
+  /** Class logger. */
+  private static final Logger LOG = Logger.getLogger(
+      DiskBackedDataStore.class);
+  /** Out-of-core engine */
+  protected final OutOfCoreEngine oocEngine;
+  /**
+   * Set containing ids of all partitions where the partition data is in some
+   * file on disk.
+   * Note that the out-of-core mechanism may decide to put the data for a
+   * partition on disk, while the partition data is empty. For instance, at the
+   * beginning of a superstep, out-of-core mechanism may decide to put incoming
+   * messages of a partition on disk, while the partition has not received any
+   * messages. In such scenarios, the "out-of-core mechanism" thinks that the
+   * partition data is on disk, while disk-backed data stores may want to
+   * optimize for IO/metadata accesses and decide not to create/write anything
+   * on files on disk.
+   * In summary, there is a subtle difference between this field and
+   * `hasPartitionOnDisk` field. Basically, this field is used for optimizing
+   * IO (mainly metadata) accesses by disk-backed stores, while
+   * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has
+   * regarding partition storage statuses. Since out-of-core mechanism does not
+   * know about the actual data for a partition, these two fields have to be
+   * separate.
+   */
+  protected final Set<Integer> hasPartitionDataOnFile =
+      Sets.newConcurrentHashSet();
+  /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
+  private final int minBufferSizeToOffload;
+  /** Set containing ids of all out-of-core partitions */
+  private final Set<Integer> hasPartitionDataOnDisk =
+      Sets.newConcurrentHashSet();
+  /**
+   * Map of partition ids to list of raw data buffers. The map will have 
entries
+   * only for partitions that their in-memory data structures are currently
+   * offloaded to disk. We keep the aggregate size of buffers for each 
partition
+   * as part of the values in the map to estimate how much memory we can free 
up
+   * if we offload data buffers of a particular partition to disk.
+   */
+  private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
+      Maps.newConcurrentMap();
+  /**
+   * Map of partition ids to number of raw data buffers offloaded to disk for
+   * each partition. The map will have entries only for partitions that their
+   * in-memory data structures are currently out of core. It is necessary to
+   * know the number of data buffers on disk for a particular partition when we
+   * are loading all these buffers back in memory.
+   */
+  private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
+      Maps.newConcurrentMap();
+  /**
+   * Lock to avoid overlapping of read and write on data associated with each
+   * partition.
+   * */
+  private final ConcurrentMap<Integer, ReadWriteLock> locks =
+      Maps.newConcurrentMap();
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   * @param oocEngine Out-of-core engine
+   */
+  DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
+                      OutOfCoreEngine oocEngine) {
+    this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
+    this.oocEngine = oocEngine;
+  }
+
+  /**
+   * Retrieves a lock for a given partition. If the lock for the given 
partition
+   * does not exist, creates a new lock.
+   *
+   * @param partitionId id of the partition the lock is needed for
+   * @return lock for a given partition
+   */
+  private ReadWriteLock getPartitionLock(int partitionId) {
+    ReadWriteLock readWriteLock = locks.get(partitionId);
+    if (readWriteLock == null) {
+      readWriteLock = new ReentrantReadWriteLock();
+      ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
+      if (temp != null) {
+        readWriteLock = temp;
+      }
+    }
+    return readWriteLock;
+  }
+
+  /**
+   * Adds a data entry for a given partition to the current data store. If data
+   * of a given partition in data store is already offloaded to disk, adds the
+   * data entry to appropriate raw data buffer list.
+   *
+   * @param partitionId id of the partition to add the data entry to
+   * @param entry data entry to add
+   */
+  protected void addEntry(int partitionId, T entry) {
+    // Addition of data entries to a data store is much more common than
+    // out-of-core operations. Besides, in-memory data store implementations
+    // existing in the code base already account for parallel addition to data
+    // stores. Therefore, using read lock would optimize for parallel addition
+    // to data stores, specially for cases where the addition should happen for
+    // partitions that are entirely in memory.
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.readLock().lock();
+    if (hasPartitionDataOnDisk.contains(partitionId)) {
+      List<T> entryList = new ArrayList<>();
+      entryList.add(entry);
+      int entrySize = entrySerializedSize(entry);
+      MutablePair<Integer, List<T>> newPair =
+          new MutablePair<>(entrySize, entryList);
+      Pair<Integer, List<T>> oldPair =
+          dataBuffers.putIfAbsent(partitionId, newPair);
+      if (oldPair != null) {
+        synchronized (oldPair) {
+          newPair = (MutablePair<Integer, List<T>>) oldPair;
+          newPair.setLeft(oldPair.getLeft() + entrySize);
+          newPair.getRight().add(entry);
+        }
+      }
+    } else {
+      addEntryToInMemoryPartitionData(partitionId, entry);
+    }
+    rwLock.readLock().unlock();
+  }
+
+  /**
+   * Loads and assembles all data for a given partition, and put it into the
+   * data store. Returns the number of bytes transferred from disk to memory in
+   * the loading process.
+   *
+   * @param partitionId id of the partition to load and assemble all data for
+   * @return number of bytes loaded from disk to memory
+   * @throws IOException
+   */
+  public abstract long loadPartitionData(int partitionId) throws IOException;
+
+  /**
+   * The proxy method that does the actual operation for `loadPartitionData`,
+   * but uses the data index given by the caller.
+   *
+   * @param partitionId id of the partition to load and assemble all data for
+   * @param index data index chain for the data to load
+   * @return number of bytes loaded from disk to memory
+   * @throws IOException
+   */
+  protected long loadPartitionDataProxy(int partitionId, DataIndex index)
+      throws IOException {
+    long numBytes = 0;
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    if (hasPartitionDataOnDisk.contains(partitionId)) {
+      int ioThreadId =
+          oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+      numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
+          index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
+      hasPartitionDataOnDisk.remove(partitionId);
+      // Loading raw data buffers from disk if there is any and applying those
+      // to already loaded in-memory data.
+      Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
+      if (numBuffers != null) {
+        checkState(numBuffers > 0);
+        index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
+        OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+            oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+        for (int i = 0; i < numBuffers; ++i) {
+          T entry = readNextEntry(inputWrapper.getDataInput());
+          addEntryToInMemoryPartitionData(partitionId, entry);
+        }
+        numBytes += inputWrapper.finalizeInput(true);
+        index.removeLastIndex();
+      }
+      index.removeLastIndex();
+      // Applying in-memory raw data buffers to in-memory partition data.
+      Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
+      if (pair != null) {
+        for (T entry : pair.getValue()) {
+          addEntryToInMemoryPartitionData(partitionId, entry);
+        }
+      }
+    }
+    rwLock.writeLock().unlock();
+    return numBytes;
+  }
+
+  /**
+   * Offloads partition data of a given partition in the data store to disk, 
and
+   * returns the number of bytes offloaded from memory to disk.
+   *
+   * @param partitionId id of the partition to offload its data
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  public abstract long offloadPartitionData(int partitionId) throws 
IOException;
+
+  /**
+   * The proxy method that does the actual operation for 
`offloadPartitionData`,
+   * but uses the data index given by the caller.
+   *
+   * @param partitionId id of the partition to offload its data
+   * @param index data index chain for the data to offload
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
+  protected long offloadPartitionDataProxy(
+      int partitionId, DataIndex index) throws IOException {
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    hasPartitionDataOnDisk.add(partitionId);
+    rwLock.writeLock().unlock();
+    int ioThreadId =
+        oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+    long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
+        index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
+    index.removeLastIndex();
+    return numBytes;
+  }
+
+  /**
+   * Offloads raw data buffers of a given partition to disk, and returns the
+   * number of bytes offloaded from memory to disk.
+   *
+   * @param partitionId id of the partition to offload its raw data buffers
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  public abstract long offloadBuffers(int partitionId) throws IOException;
+
+  /**
+   * The proxy method that does the actual operation for `offloadBuffers`,
+   * but uses the data index given by the caller.
+   *
+   * @param partitionId id of the partition to offload its raw data buffers
+   * @param index data index chain for the data to offload its buffers
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  protected long offloadBuffersProxy(int partitionId, DataIndex index)
+      throws IOException {
+    Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
+    if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
+      return 0;
+    }
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    pair = dataBuffers.remove(partitionId);
+    rwLock.writeLock().unlock();
+    checkNotNull(pair);
+    checkState(!pair.getRight().isEmpty());
+    int ioThreadId =
+        oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+    index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
+        .addIndex(DataIndex.TypeIndexEntry.BUFFER);
+    OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+        oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+            true);
+    for (T entry : pair.getRight()) {
+      writeEntry(entry, outputWrapper.getDataOutput());
+    }
+    long numBytes = outputWrapper.finalizeOutput();
+    index.removeLastIndex().removeLastIndex();
+    int numBuffers = pair.getRight().size();
+    Integer oldNumBuffersOnDisk =
+        numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
+    if (oldNumBuffersOnDisk != null) {
+      numDataBuffersOnDisk.replace(partitionId,
+          oldNumBuffersOnDisk + numBuffers);
+    }
+    return numBytes;
+  }
+
+  /**
+   * Looks through all partitions that their data is not in the data store (is
+   * offloaded to disk), and sees if any of them has enough raw data buffer in
+   * memory. If so, puts that partition in a list to return.
+   *
+   * @return Set of partition ids of all partition raw buffers where the
+   *         aggregate size of buffers are large enough and it is worth 
flushing
+   *         those buffers to disk
+   */
+  public Set<Integer> getCandidateBuffersToOffload() {
+    Set<Integer> result = new HashSet<>();
+    for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
+        dataBuffers.entrySet()) {
+      if (entry.getValue().getLeft() > minBufferSizeToOffload) {
+        result.add(entry.getKey());
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Writes a single raw entry to a given output stream.
+   *
+   * @param entry entry to write to output
+   * @param out output stream to write the entry to
+   * @throws IOException
+   */
+  protected abstract void writeEntry(T entry, DataOutput out)
+      throws IOException;
+
+  /**
+   * Reads the next available raw entry from a given input stream.
+   *
+   * @param in input stream to read the entry from
+   * @return entry read from an input stream
+   * @throws IOException
+   */
+  protected abstract T readNextEntry(DataInput in) throws IOException;
+
+  /**
+   * Loads data of a partition into data store. Returns number of bytes loaded.
+   *
+   * @param partitionId id of the partition to load its data
+   * @param ioThreadId id of the IO thread performing the load
+   * @param index data index chain for the data to load
+   * @return number of bytes loaded from disk to memory
+   * @throws IOException
+   */
+  protected abstract long loadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException;
+
+  /**
+   * Offloads data of a partition in data store to disk. Returns the number of
+   * bytes offloaded to disk
+   *
+   * @param partitionId id of the partition to offload to disk
+   * @param ioThreadId id of the IO thread performing the offload
+   * @param index data index chain for the data to offload
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  protected abstract long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException;
+
+  /**
+   * Gets the size of a given entry in bytes.
+   *
+   * @param entry input entry to find its size
+   * @return size of given input entry in bytes
+   */
+  protected abstract int entrySerializedSize(T entry);
+
+  /**
+   * Adds a single entry for a given partition to the in-memory data store.
+   *
+   * @param partitionId id of the partition to add the data to
+   * @param entry input entry to add to the data store
+   */
+  protected abstract void addEntryToInMemoryPartitionData(int partitionId,
+                                                          T entry);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
index 53de52f..e727fbd 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
@@ -21,25 +21,18 @@ package org.apache.giraph.ooc.data;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeStore;
 import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * Implementation of an edge-store used for out-of-core mechanism.
  *
@@ -49,7 +42,7 @@ import static com.google.common.base.Preconditions.checkState;
  */
 public class DiskBackedEdgeStore<I extends WritableComparable,
     V extends Writable, E extends Writable>
-    extends OutOfCoreDataManager<VertexIdEdges<I, E>>
+    extends DiskBackedDataStore<VertexIdEdges<I, E>>
     implements EdgeStore<I, V, E> {
   /** Class logger. */
   private static final Logger LOG = 
Logger.getLogger(DiskBackedEdgeStore.class);
@@ -57,8 +50,6 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
   private final EdgeStore<I, V, E> edgeStore;
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
 
   /**
    * Constructor
@@ -72,10 +63,9 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
       EdgeStore<I, V, E> edgeStore,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
       OutOfCoreEngine oocEngine) {
-    super(conf);
+    super(conf, oocEngine);
     this.edgeStore = edgeStore;
     this.conf = conf;
-    this.oocEngine = oocEngine;
   }
 
   @Override
@@ -114,32 +104,25 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
         "should not be called for DiskBackedEdgeStore!");
   }
 
-  /**
-   * Gets the path that should be used specifically for edge data.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return path to files specific for edge data
-   */
-  private static String getPath(String basePath) {
-    return basePath + "_edge_store";
-  }
-
   @Override
-  public long loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId)
       throws IOException {
-    return super.loadPartitionData(partitionId, getPath(basePath));
+    return loadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
   }
 
   @Override
-  public long offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId)
       throws IOException {
-    return super.offloadPartitionData(partitionId, getPath(basePath));
+    return offloadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
   }
 
   @Override
-  public long offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId)
       throws IOException {
-    return super.offloadBuffers(partitionId, getPath(basePath));
+    return offloadBuffersProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
   }
 
   @Override
@@ -157,44 +140,31 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected long loadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long loadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
-    File file = new File(path);
-    if (file.exists()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading edge data for " +
-            "partition " + partitionId + " from " + file.getAbsolutePath());
-      }
-      FileInputStream fis = new FileInputStream(file);
-      BufferedInputStream bis = new BufferedInputStream(fis);
-      DataInputStream dis = new DataInputStream(bis);
-      edgeStore.readPartitionEdgeStore(partitionId, dis);
-      dis.close();
-      numBytes = file.length();
-      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
-          "%s.", file.getAbsoluteFile());
+    if (hasPartitionDataOnFile.remove(partitionId)) {
+      OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+          oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+      edgeStore.readPartitionEdgeStore(partitionId,
+          inputWrapper.getDataInput());
+      numBytes = inputWrapper.finalizeInput(true);
     }
     return numBytes;
   }
 
   @Override
-  protected long offloadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
     if (edgeStore.hasEdgesForPartition(partitionId)) {
-      File file = new File(path);
-      checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " +
-          "file %s already exist", file.getAbsoluteFile());
-      checkState(file.createNewFile(),
-          "offloadInMemoryPartitionData: cannot create edge store file %s",
-          file.getAbsoluteFile());
-      FileOutputStream fos = new FileOutputStream(file);
-      BufferedOutputStream bos = new BufferedOutputStream(fos);
-      DataOutputStream dos = new DataOutputStream(bos);
-      edgeStore.writePartitionEdgeStore(partitionId, dos);
-      dos.close();
-      numBytes = dos.size();
+      OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+          oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+              false);
+      edgeStore.writePartitionEdgeStore(partitionId,
+          outputWrapper.getDataOutput());
+      numBytes = outputWrapper.finalizeOutput();
+      hasPartitionDataOnFile.add(partitionId);
     }
     return numBytes;
   }
@@ -205,7 +175,7 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected void addEntryToImMemoryPartitionData(int partitionId,
+  protected void addEntryToInMemoryPartitionData(int partitionId,
                                                  VertexIdEdges<I, E> edges) {
     oocEngine.getMetaPartitionManager().addPartition(partitionId);
     edgeStore.addPartitionEdges(partitionId, edges);

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
index 94ba83a..c8d0f79 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -21,6 +21,10 @@ package org.apache.giraph.ooc.data;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -28,19 +32,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * Implementation of a message store used for out-of-core mechanism.
  *
@@ -48,7 +43,7 @@ import static com.google.common.base.Preconditions.checkState;
  * @param <M> Message data
  */
 public class DiskBackedMessageStore<I extends WritableComparable,
-    M extends Writable> extends OutOfCoreDataManager<VertexIdMessages<I, M>>
+    M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
     implements MessageStore<I, M> {
   /** Class logger. */
   private static final Logger LOG =
@@ -82,6 +77,7 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
    * Constructor
    *
    * @param config Configuration
+   * @param oocEngine Out-of-core engine
    * @param messageStore In-memory message store for which out-of-core message
    *                     store would be wrapper
    * @param useMessageCombiner Whether message combiner is used for this 
message
@@ -90,9 +86,10 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
    */
   public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
                                     config,
+                                OutOfCoreEngine oocEngine,
                                 MessageStore<I, M> messageStore,
                                 boolean useMessageCombiner, long superstep) {
-    super(config);
+    super(config, oocEngine);
     this.config = config;
     this.messageStore = messageStore;
     this.useMessageCombiner = useMessageCombiner;
@@ -140,43 +137,38 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
     }
   }
 
-  /**
-   * Gets the path that should be used specifically for message data.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @param superstep superstep for which message data should be stored
-   * @return path to files specific for message data
-   */
-  private static String getPath(String basePath, long superstep) {
-    return basePath + "_messages-S" + superstep;
-  }
 
   @Override
-  public long loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId)
       throws IOException {
     if (!useMessageCombiner) {
-      return super.loadPartitionData(partitionId, getPath(basePath, 
superstep));
+      return loadPartitionDataProxy(partitionId,
+          new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+              .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
     } else {
       return 0;
     }
   }
 
   @Override
-  public long offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId)
       throws IOException {
     if (!useMessageCombiner) {
-      return
-          super.offloadPartitionData(partitionId, getPath(basePath, 
superstep));
+      return offloadPartitionDataProxy(partitionId,
+          new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+              .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
     } else {
       return 0;
     }
   }
 
   @Override
-  public long offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId)
       throws IOException {
     if (!useMessageCombiner) {
-      return super.offloadBuffers(partitionId, getPath(basePath, superstep));
+      return offloadBuffersProxy(partitionId,
+          new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+              .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
     } else {
       return 0;
     }
@@ -250,45 +242,31 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected long loadInMemoryPartitionData(int partitionId, String basePath)
-      throws IOException {
+  protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
+                                           DataIndex index) throws IOException 
{
     long numBytes = 0;
-    File file = new File(basePath);
-    if (file.exists()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading message data for " +
-            "partition " + partitionId + " from " + file.getAbsolutePath());
-      }
-      FileInputStream fis = new FileInputStream(file);
-      BufferedInputStream bis = new BufferedInputStream(fis);
-      DataInputStream dis = new DataInputStream(bis);
-      messageStore.readFieldsForPartition(dis, partitionId);
-      dis.close();
-      numBytes = file.length();
-      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
-          "%s.", file.getAbsoluteFile());
+    if (hasPartitionDataOnFile.remove(partitionId)) {
+      OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+          oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+      messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
+          partitionId);
+      numBytes = inputWrapper.finalizeInput(true);
     }
     return numBytes;
   }
 
   @Override
-  protected long offloadInMemoryPartitionData(int partitionId, String basePath)
-      throws IOException {
+  protected long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
     if (messageStore.hasMessagesForPartition(partitionId)) {
-      File file = new File(basePath);
-      checkState(!file.exists(), "offloadInMemoryPartitionData: message store" 
+
-          " file %s already exist", file.getAbsoluteFile());
-      checkState(file.createNewFile(),
-          "offloadInMemoryPartitionData: cannot create message store file %s",
-          file.getAbsoluteFile());
-      FileOutputStream fileout = new FileOutputStream(file);
-      BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-      DataOutputStream outputStream = new DataOutputStream(bufferout);
-      messageStore.writePartition(outputStream, partitionId);
+      OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+          oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+              false);
+      messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
       messageStore.clearPartition(partitionId);
-      outputStream.close();
-      numBytes += outputStream.size();
+      numBytes = outputWrapper.finalizeOutput();
+      hasPartitionDataOnFile.add(partitionId);
     }
     return numBytes;
   }
@@ -299,7 +277,7 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected void addEntryToImMemoryPartitionData(int partitionId,
+  protected void addEntryToInMemoryPartitionData(int partitionId,
                                                  VertexIdMessages<I, M>
                                                      messages) {
     messageStore.addPartitionMessages(partitionId, messages);

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
index 2a5e47a..6b7822f 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
@@ -20,11 +20,12 @@ package org.apache.giraph.ooc.data;
 
 import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.utils.ExtendedDataOutput;
@@ -35,25 +36,17 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Implementation of a partition-store used for out-of-core mechanism.
  * Partition store is responsible for partition data, as well as data buffers 
in
- * INPUT_SUPERSTEP ("raw data buffer" -- defined in OutOfCoreDataManager --
+ * INPUT_SUPERSTEP ("raw data buffer" -- defined in DiskBackedDataStore --
  * refers to vertex buffers in INPUT_SUPERSTEP).
  *
  * @param <I> Vertex id
@@ -62,7 +55,7 @@ import static com.google.common.base.Preconditions.checkState;
  */
 public class DiskBackedPartitionStore<I extends WritableComparable,
     V extends Writable, E extends Writable>
-    extends OutOfCoreDataManager<ExtendedDataOutput>
+    extends DiskBackedDataStore<ExtendedDataOutput>
     implements PartitionStore<I, V, E> {
   /** Class logger. */
   private static final Logger LOG =
@@ -71,10 +64,6 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Job context (for progress) */
   private final Mapper<?, ?, ?, ?>.Context context;
-  /** Service worker */
-  private final CentralizedServiceWorker<I, V, E> serviceWorker;
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
   /** In-memory partition store */
   private final PartitionStore<I, V, E> partitionStore;
   /**
@@ -99,21 +88,17 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
    *                       partition store would be a wrapper
    * @param conf Configuration
    * @param context Job context
-   * @param serviceWorker Service worker
    * @param oocEngine Out-of-core engine
    */
   public DiskBackedPartitionStore(
       PartitionStore<I, V, E> partitionStore,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context,
-      CentralizedServiceWorker<I, V, E> serviceWorker,
       OutOfCoreEngine oocEngine) {
-    super(conf);
+    super(conf, oocEngine);
     this.partitionStore = partitionStore;
     this.conf = conf;
     this.context = context;
-    this.serviceWorker = serviceWorker;
-    this.oocEngine = oocEngine;
   }
 
   @Override
@@ -222,36 +207,6 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   /**
-   * Gets the path that should be used specifically for partition data.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return path to files specific for partition data
-   */
-  private static String getPath(String basePath) {
-    return basePath + "_partition";
-  }
-
-  /**
-   * Get the path to the file where vertices are stored.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return The path to the vertices file
-   */
-  private static String getVerticesPath(String basePath) {
-    return basePath + "_vertices";
-  }
-
-  /**
-   * Get the path to the file where edges are stored.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return The path to the edges file
-   */
-  private static String getEdgesPath(String basePath) {
-    return basePath + "_edges";
-  }
-
-  /**
    * Read vertex data from an input and initialize the vertex.
    *
    * @param in     The input stream
@@ -295,54 +250,42 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected long loadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
+                                           DataIndex index) throws IOException 
{
     long numBytes = 0;
     // Load vertices
-    File file = new File(getVerticesPath(path));
-    if (file.exists()) {
+    if (hasPartitionDataOnFile.remove(partitionId)) {
       Partition<I, V, E> partition = conf.createPartition(partitionId, 
context);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading partition vertices " +
-            partitionId + " from " + file.getAbsolutePath());
-      }
-
-      FileInputStream fis = new FileInputStream(file);
-      BufferedInputStream bis = new BufferedInputStream(fis);
-      DataInputStream inputStream = new DataInputStream(bis);
-      long numVertices = inputStream.readLong();
+      OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
+      index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
+      OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+          dataAccessor.prepareInput(ioThreadId, index.copy());
+      DataInput dataInput = inputWrapper.getDataInput();
+      long numVertices = dataInput.readLong();
       for (long i = 0; i < numVertices; ++i) {
         Vertex<I, V, E> vertex = conf.createVertex();
-        readVertexData(inputStream, vertex);
+        readVertexData(dataInput, vertex);
         partition.putVertex(vertex);
       }
-      inputStream.close();
-      numBytes += file.length();
-      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
-          "%s", file.getAbsolutePath());
+      numBytes += inputWrapper.finalizeInput(true);
 
       // Load edges
-      file = new File(getEdgesPath(path));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading partition edges " +
-            partitionId + " from " + file.getAbsolutePath());
-      }
-
-      fis = new FileInputStream(file);
-      bis = new BufferedInputStream(fis);
-      inputStream = new DataInputStream(bis);
+      index.removeLastIndex()
+          .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
+      inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy());
+      dataInput = inputWrapper.getDataInput();
       for (int i = 0; i < numVertices; ++i) {
-        readOutEdges(inputStream, partition);
+        readOutEdges(dataInput, partition);
       }
-      inputStream.close();
-      numBytes += file.length();
       // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
       // around.
+      boolean shouldDeleteEdges = false;
       if (!conf.isStaticGraph() ||
           oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
-        checkState(file.delete(), "loadPartition: failed to delete %s",
-            file.getAbsolutePath());
+        shouldDeleteEdges = true;
       }
+      numBytes += inputWrapper.finalizeInput(shouldDeleteEdges);
+      index.removeLastIndex();
       partitionStore.addPartition(partition);
     }
     return numBytes;
@@ -354,7 +297,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected void addEntryToImMemoryPartitionData(int partitionId,
+  protected void addEntryToInMemoryPartitionData(int partitionId,
                                                  ExtendedDataOutput vertices) {
     if (!partitionStore.hasPartition(partitionId)) {
       oocEngine.getMetaPartitionManager().addPartition(partitionId);
@@ -363,15 +306,17 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public long loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId)
       throws IOException {
-    return super.loadPartitionData(partitionId, getPath(basePath));
+    return loadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
   }
 
   @Override
-  public long offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId)
       throws IOException {
-    return super.offloadPartitionData(partitionId, getPath(basePath));
+    return offloadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
   }
 
   /**
@@ -409,61 +354,44 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected long offloadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
     if (partitionStore.hasPartition(partitionId)) {
+      OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
       partitionVertexCount.put(partitionId,
           partitionStore.getPartitionVertexCount(partitionId));
       partitionEdgeCount.put(partitionId,
           partitionStore.getPartitionEdgeCount(partitionId));
       Partition<I, V, E> partition =
           partitionStore.removePartition(partitionId);
-      File file = new File(getVerticesPath(path));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("offloadInMemoryPartitionData: writing partition vertices " +
-            partitionId + " to " + file.getAbsolutePath());
-      }
-      checkState(!file.exists(), "offloadInMemoryPartitionData: partition " +
-          "store file %s already exist", file.getAbsoluteFile());
-      checkState(file.createNewFile(),
-          "offloadInMemoryPartitionData: file %s already exists.",
-          file.getAbsolutePath());
-
-      FileOutputStream fileout = new FileOutputStream(file);
-      BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-      DataOutputStream outputStream = new DataOutputStream(bufferout);
-      outputStream.writeLong(partition.getVertexCount());
+      index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
+      OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+          dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
+      DataOutput dataOutput = outputWrapper.getDataOutput();
+      dataOutput.writeLong(partition.getVertexCount());
       for (Vertex<I, V, E> vertex : partition) {
-        writeVertexData(outputStream, vertex);
+        writeVertexData(dataOutput, vertex);
       }
-      outputStream.close();
-      numBytes += outputStream.size();
-
+      numBytes += outputWrapper.finalizeOutput();
+      index.removeLastIndex();
       // Avoid writing back edges if we have already written them once and
       // the graph is not changing.
       // If we are in the input superstep, we need to write the files
       // at least the first time, even though the graph is static.
-      file = new File(getEdgesPath(path));
+      index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
       if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
-          partitionVertexCount.get(partitionId) == null ||
-          partitionVertexCount.get(partitionId) != partition.getVertexCount() 
||
-          !conf.isStaticGraph() || !file.exists()) {
-        checkState(file.createNewFile(), "offloadInMemoryPartitionData: file " 
+
-            "%s already exists.", file.getAbsolutePath());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("offloadInMemoryPartitionData: writing partition edges " +
-              partitionId + " to " + file.getAbsolutePath());
-        }
-        fileout = new FileOutputStream(file);
-        bufferout = new BufferedOutputStream(fileout);
-        outputStream = new DataOutputStream(bufferout);
+          !conf.isStaticGraph() ||
+          !dataAccessor.dataExist(ioThreadId, index)) {
+        outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(),
+            false);
         for (Vertex<I, V, E> vertex : partition) {
-          writeOutEdges(outputStream, vertex);
+          writeOutEdges(outputWrapper.getDataOutput(), vertex);
         }
-        outputStream.close();
-        numBytes += outputStream.size();
+        numBytes += outputWrapper.finalizeOutput();
       }
+      index.removeLastIndex();
+      hasPartitionDataOnFile.add(partitionId);
     }
     return numBytes;
   }
@@ -475,9 +403,10 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public long offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId)
       throws IOException {
-    return super.offloadBuffers(partitionId, getPath(basePath));
+    return offloadBuffersProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 1332a3a..64e3aed 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -99,6 +99,21 @@ public class MetaPartitionManager {
    */
   private final AtomicDouble lowestGraphFractionInMemory =
       new AtomicDouble(1);
+  /**
+   * Map of partition ids to their indices. index of a partition is the order
+   * with which the partition has been inserted. Partitions are indexed as 0, 
1,
+   * 2, etc. This indexing is later used to find the id of the IO thread who is
+   * responsible for handling a partition. Partitions are assigned to IO 
threads
+   * in a round-robin fashion based on their indices.
+   */
+  private final ConcurrentMap<Integer, Integer> partitionIndex =
+      Maps.newConcurrentMap();
+  /**
+   * Sequential counter used to assign indices to partitions as they are added
+   */
+  private final AtomicInteger indexCounter = new AtomicInteger(0);
+  /** How many disks (i.e. IO threads) do we have? */
+  private final int numIOThreads;
 
   /**
    * Constructor
@@ -117,6 +132,7 @@ public class MetaPartitionManager {
     }
     this.oocEngine = oocEngine;
     this.randomGenerator = new Random();
+    this.numIOThreads = numIOThreads;
   }
 
   /**
@@ -131,7 +147,7 @@ public class MetaPartitionManager {
   /**
    * Get total number of partitions
    *
-   * @return total number of partition
+   * @return total number of partitions
    */
   public int getNumPartitions() {
     return partitions.size();
@@ -175,6 +191,18 @@ public class MetaPartitionManager {
   }
 
   /**
+   * Get the thread id that is responsible for a particular partition
+   *
+   * @param partitionId id of the given partition
+   * @return id of the thread responsible for the given partition
+   */
+  public int getOwnerThreadId(int partitionId) {
+    Integer index = partitionIndex.get(partitionId);
+    checkState(index != null);
+    return index % numIOThreads;
+  }
+
+  /**
    * Add a partition
    *
    * @param partitionId id of a partition to add
@@ -184,8 +212,9 @@ public class MetaPartitionManager {
     MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
     // Check if the given partition is new
     if (temp == null) {
-      int ownerThread = oocEngine.getIOScheduler()
-          .getOwnerThreadId(partitionId);
+      int index = indexCounter.getAndIncrement();
+      checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
+      int ownerThread = getOwnerThreadId(partitionId);
       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
       numInMemoryPartitions.getAndIncrement();
     }
@@ -199,7 +228,7 @@ public class MetaPartitionManager {
    */
   public void removePartition(Integer partitionId) {
     MetaPartition meta = partitions.remove(partitionId);
-    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
     checkState(!meta.isOnDisk());
     numInMemoryPartitions.getAndDecrement();
@@ -424,7 +453,7 @@ public class MetaPartitionManager {
    */
   public void markPartitionAsInProcess(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setProcessingState(ProcessingState.IN_PROCESS);
@@ -468,7 +497,7 @@ public class MetaPartitionManager {
    */
   public void setPartitionIsProcessed(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setProcessingState(ProcessingState.PROCESSED);
@@ -508,7 +537,7 @@ public class MetaPartitionManager {
   public void doneLoadingPartition(int partitionId, long superstep) {
     MetaPartition meta = partitions.get(partitionId);
     numInMemoryPartitions.getAndIncrement();
-    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.setPartitionState(StorageState.IN_MEM);
@@ -535,8 +564,7 @@ public class MetaPartitionManager {
    */
   public boolean startOffloadingMessages(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread =
-        oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
         perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
@@ -558,8 +586,7 @@ public class MetaPartitionManager {
    */
   public void doneOffloadingMessages(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread =
-        oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setIncomingMessagesState(StorageState.ON_DISK);
@@ -598,7 +625,7 @@ public class MetaPartitionManager {
    */
   public boolean startOffloadingPartition(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
       if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
           (meta.getPartitionState() == StorageState.IN_MEM ||
@@ -624,7 +651,7 @@ public class MetaPartitionManager {
     numInMemoryPartitions.getAndDecrement();
     updateGraphFractionInMemory();
     MetaPartition meta = partitions.get(partitionId);
-    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.setPartitionState(StorageState.ON_DISK);
@@ -639,8 +666,7 @@ public class MetaPartitionManager {
    */
   public void resetPartitions() {
     for (MetaPartition meta : partitions.values()) {
-      int owner =
-          oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+      int owner = getOwnerThreadId(meta.getPartitionId());
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.resetPartition();
       perThreadPartitionDictionary.get(owner).addPartition(meta);
@@ -659,8 +685,7 @@ public class MetaPartitionManager {
    */
   public void resetMessages() {
     for (MetaPartition meta : partitions.values()) {
-      int owner =
-          oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+      int owner = getOwnerThreadId(meta.getPartitionId());
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.resetMessages();
       if (meta.getPartitionState() == StorageState.IN_MEM &&

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
deleted file mode 100644
index 325850c..0000000
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.data;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.log4j.Logger;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
-
-/**
- * This class provides basic operations for data structures that have to
- * participate in out-of-core mechanism. Essential subclasses of this class 
are:
- *  - DiskBackedPartitionStore (for partition data)
- *  - DiskBackedMessageStore (for messages)
- *  - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
- * Basically, any data structure that may cause OOM to happen can be 
implemented
- * as a subclass of this class.
- *
- * There are two different terms used in the rest of this class:
- *  - "data store" refers to in-memory representation of data. Usually this is
- *    stored per-partition in in-memory implementations of data structures. For
- *    instance, "data store" of a DiskBackedPartitionStore would collection of
- *    all partitions kept in the in-memory partition store within the
- *    DiskBackedPartitionStore.
- *  - "raw data buffer" refers to raw data which were supposed to be
- *    de-serialized and added to the data store, but they remain 'as is' in the
- *    memory because their corresponding partition is offloaded to disk and is
- *    not available in the data store.
- *
- * @param <T> raw data format of the data store subclassing this class
- */
-public abstract class OutOfCoreDataManager<T> {
-  /**
-   * Minimum size of a buffer (in bytes) to flush to disk. This is used to
-   * decide whether vertex/edge buffers are large enough to flush to disk.
-   */
-  public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
-      new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
-          "Minimum size of a buffer (in bytes) to flush to disk.");
-
-  /** Class logger. */
-  private static final Logger LOG = Logger.getLogger(
-      OutOfCoreDataManager.class);
-  /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
-  private final int minBufferSizeToOffload;
-  /** Set containing ids of all out-of-core partitions */
-  private final Set<Integer> hasPartitionDataOnDisk =
-      Sets.newConcurrentHashSet();
-  /**
-   * Map of partition ids to list of raw data buffers. The map will have 
entries
-   * only for partitions that their in-memory data structures are currently
-   * offloaded to disk. We keep the aggregate size of buffers for each 
partition
-   * as part of the values in the map to estimate how much memory we can free 
up
-   * if we offload data buffers of a particular partition to disk.
-   */
-  private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
-      Maps.newConcurrentMap();
-  /**
-   * Map of partition ids to number of raw data buffers offloaded to disk for
-   * each partition. The map will have entries only for partitions that their
-   * in-memory data structures are currently out of core. It is necessary to
-   * know the number of data buffers on disk for a particular partition when we
-   * are loading all these buffers back in memory.
-   */
-  private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
-      Maps.newConcurrentMap();
-  /**
-   * Lock to avoid overlapping of read and write on data associated with each
-   * partition.
-   * */
-  private final ConcurrentMap<Integer, ReadWriteLock> locks =
-      Maps.newConcurrentMap();
-
-  /**
-   * Constructor.
-   *
-   * @param conf Configuration
-   */
-  OutOfCoreDataManager(ImmutableClassesGiraphConfiguration conf) {
-    this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
-  }
-
-  /**
-   * Retrieves a lock for a given partition. If the lock for the given 
partition
-   * does not exist, creates a new lock.
-   *
-   * @param partitionId id of the partition the lock is needed for
-   * @return lock for a given partition
-   */
-  private ReadWriteLock getPartitionLock(int partitionId) {
-    ReadWriteLock readWriteLock = locks.get(partitionId);
-    if (readWriteLock == null) {
-      readWriteLock = new ReentrantReadWriteLock();
-      ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
-      if (temp != null) {
-        readWriteLock = temp;
-      }
-    }
-    return readWriteLock;
-  }
-
-  /**
-   * Adds a data entry for a given partition to the current data store. If data
-   * of a given partition in data store is already offloaded to disk, adds the
-   * data entry to appropriate raw data buffer list.
-   *
-   * @param partitionId id of the partition to add the data entry to
-   * @param entry data entry to add
-   */
-  protected void addEntry(int partitionId, T entry) {
-    // Addition of data entries to a data store is much more common than
-    // out-of-core operations. Besides, in-memory data store implementations
-    // existing in the code base already account for parallel addition to data
-    // stores. Therefore, using read lock would optimize for parallel addition
-    // to data stores, specially for cases where the addition should happen for
-    // partitions that are entirely in memory.
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.readLock().lock();
-    if (hasPartitionDataOnDisk.contains(partitionId)) {
-      List<T> entryList = new ArrayList<>();
-      entryList.add(entry);
-      int entrySize = entrySerializedSize(entry);
-      MutablePair<Integer, List<T>> newPair =
-          new MutablePair<>(entrySize, entryList);
-      Pair<Integer, List<T>> oldPair =
-          dataBuffers.putIfAbsent(partitionId, newPair);
-      if (oldPair != null) {
-        synchronized (oldPair) {
-          newPair = (MutablePair<Integer, List<T>>) oldPair;
-          newPair.setLeft(oldPair.getLeft() + entrySize);
-          newPair.getRight().add(entry);
-        }
-      }
-    } else {
-      addEntryToImMemoryPartitionData(partitionId, entry);
-    }
-    rwLock.readLock().unlock();
-  }
-
-  /**
-   * Loads and assembles all data for a given partition, and put it into the
-   * data store. Returns the number of bytes transferred from disk to memory in
-   * the loading process.
-   *
-   * @param partitionId id of the partition to load ana assemble all data for
-   * @param basePath path to load the data from
-   * @return number of bytes loaded from disk to memory
-   * @throws IOException
-   */
-  public long loadPartitionData(int partitionId, String basePath)
-      throws IOException {
-    long numBytes = 0;
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.writeLock().lock();
-    if (hasPartitionDataOnDisk.contains(partitionId)) {
-      numBytes += loadInMemoryPartitionData(partitionId,
-          getPath(basePath, partitionId));
-      hasPartitionDataOnDisk.remove(partitionId);
-      // Loading raw data buffers from disk if there is any and applying those
-      // to already loaded in-memory data.
-      Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
-      if (numBuffers != null) {
-        checkState(numBuffers > 0);
-        File file = new File(getBuffersPath(basePath, partitionId));
-        checkState(file.exists());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("loadPartitionData: loading " + numBuffers + " buffers of" 
+
-              " partition " + partitionId + " from " + file.getAbsolutePath());
-        }
-        FileInputStream fis = new FileInputStream(file);
-        BufferedInputStream bis = new BufferedInputStream(fis);
-        DataInputStream dis = new DataInputStream(bis);
-        for (int i = 0; i < numBuffers; ++i) {
-          T entry = readNextEntry(dis);
-          addEntryToImMemoryPartitionData(partitionId, entry);
-        }
-        dis.close();
-        numBytes +=  file.length();
-        checkState(file.delete(), "loadPartitionData: failed to delete %s.",
-            file.getAbsoluteFile());
-      }
-      // Applying in-memory raw data buffers to in-memory partition data.
-      Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
-      if (pair != null) {
-        for (T entry : pair.getValue()) {
-          addEntryToImMemoryPartitionData(partitionId, entry);
-        }
-      }
-    }
-    rwLock.writeLock().unlock();
-    return numBytes;
-  }
-
-  /**
-   * Offloads partition data of a given partition in the data store to disk, 
and
-   * returns the number of bytes offloaded from memory to disk.
-   *
-   * @param partitionId id of the partition to offload its data
-   * @param basePath path to offload the data to
-   * @return number of bytes offloaded from memory to disk
-   * @throws IOException
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
-  public long offloadPartitionData(int partitionId, String basePath)
-      throws IOException {
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.writeLock().lock();
-    hasPartitionDataOnDisk.add(partitionId);
-    rwLock.writeLock().unlock();
-    return offloadInMemoryPartitionData(partitionId,
-        getPath(basePath, partitionId));
-  }
-
-  /**
-   * Offloads raw data buffers of a given partition to disk, and returns the
-   * number of bytes offloaded from memory to disk.
-   *
-   * @param partitionId id of the partition to offload its raw data buffers
-   * @param basePath path to offload the data to
-   * @return number of bytes offloaded from memory to disk
-   * @throws IOException
-   */
-  public long offloadBuffers(int partitionId, String basePath)
-      throws IOException {
-    Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
-    if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
-      return 0;
-    }
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.writeLock().lock();
-    pair = dataBuffers.remove(partitionId);
-    rwLock.writeLock().unlock();
-    checkNotNull(pair);
-    checkState(!pair.getRight().isEmpty());
-    File file = new File(getBuffersPath(basePath, partitionId));
-    FileOutputStream fos = new FileOutputStream(file, true);
-    BufferedOutputStream bos = new BufferedOutputStream(fos);
-    DataOutputStream dos = new DataOutputStream(bos);
-    for (T entry : pair.getRight()) {
-      writeEntry(entry, dos);
-    }
-    dos.close();
-    long numBytes = dos.size();
-    int numBuffers = pair.getRight().size();
-    Integer oldNumBuffersOnDisk =
-        numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
-    if (oldNumBuffersOnDisk != null) {
-      numDataBuffersOnDisk.replace(partitionId,
-          oldNumBuffersOnDisk + numBuffers);
-    }
-    return numBytes;
-  }
-
-  /**
-   * Looks through all partitions that their data is not in the data store (is
-   * offloaded to disk), and sees if any of them has enough raw data buffer in
-   * memory. If so, puts that partition in a list to return.
-   *
-   * @return Set of partition ids of all partition raw buffers where the
-   *         aggregate size of buffers are large enough and it is worth 
flushing
-   *         those buffers to disk
-   */
-  public Set<Integer> getCandidateBuffersToOffload() {
-    Set<Integer> result = new HashSet<>();
-    for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
-        dataBuffers.entrySet()) {
-      if (entry.getValue().getLeft() > minBufferSizeToOffload) {
-        result.add(entry.getKey());
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Creates the path to read/write partition data from/to for a given
-   * partition.
-   *
-   * @param basePath path prefix to create the actual path from
-   * @param partitionId id of the partition
-   * @return path to read/write data from/to
-   */
-  private static String getPath(String basePath, int partitionId) {
-    return basePath + "-P" + partitionId;
-  }
-
-  /**
-   * Creates the path to read/write raw data buffers of a given partition
-   * from/to.
-   *
-   * @param basePath path prefix to create the actual path from
-   * @param partitionId id of the partition
-   * @return path to read/write raw data buffer to/from
-   */
-  private static String getBuffersPath(String basePath, int partitionId) {
-    return getPath(basePath, partitionId) + "_buffers";
-  }
-
-  /**
-   * Writes a single raw entry to a given output stream.
-   *
-   * @param entry entry to write to output
-   * @param out output stream to write the entry to
-   * @throws IOException
-   */
-  protected abstract void writeEntry(T entry, DataOutput out)
-      throws IOException;
-
-  /**
-   * Reads the next available raw entry from a given input stream.
-   *
-   * @param in input stream to read the entry from
-   * @return entry read from an input stream
-   * @throws IOException
-   */
-  protected abstract T readNextEntry(DataInput in) throws IOException;
-
-  /**
-   * Loads data of a partition into data store. Returns number of bytes loaded.
-   *
-   * @param partitionId id of the partition to load its data
-   * @param path path from which data should be loaded
-   * @return number of bytes loaded from disk to memory
-   * @throws IOException
-   */
-  protected abstract long loadInMemoryPartitionData(int partitionId,
-                                                    String path)
-      throws IOException;
-
-  /**
-   * Offloads data of a partition in data store to disk. Returns the number of
-   * bytes offloaded to disk
-   *
-   * @param partitionId id of the partition to offload to disk
-   * @param path path to which data should be offloaded
-   * @return number of bytes offloaded from memory to disk
-   * @throws IOException
-   */
-  protected abstract long offloadInMemoryPartitionData(int partitionId,
-                                                       String path)
-      throws IOException;
-
-  /**
-   * Gets the size of a given entry in bytes.
-   *
-   * @param entry input entry to find its size
-   * @return size of given input entry in bytes
-   */
-  protected abstract int entrySerializedSize(T entry);
-
-  /**
-   * Adds a single entry for a given partition to the in-memory data store.
-   *
-   * @param partitionId id of the partition to add the data to
-   * @param entry input entry to add to the data store
-   */
-  protected abstract void addEntryToImMemoryPartitionData(int partitionId,
-                                                          T entry);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
deleted file mode 100644
index e84ad29..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-
-import java.io.IOException;
-
-/**
- * Representation of an IO command (moving data to disk/memory) used in
- * out-of-core mechanism.
- */
-public abstract class IOCommand {
-  /** Type of IO command */
-  public enum IOCommandType {
-    /** Loading a partition */
-    LOAD_PARTITION,
-    /** Storing a partition */
-    STORE_PARTITION,
-    /** Storing incoming messages of a partition */
-    STORE_MESSAGE,
-    /**
-     * Storing message/buffer raw data buffer of a currently out-of-core
-     * partition
-     */
-    STORE_BUFFER,
-    /** Doing nothing regarding IO */
-    WAIT
-  }
-
-  /** Id of the partition involved for the IO */
-  protected final int partitionId;
-  /** Out-of-core engine */
-  protected final OutOfCoreEngine oocEngine;
-  /**
-   * Number of bytes transferred to/from memory (loaded/stored) during the
-   * execution of the command
-   */
-  protected long numBytesTransferred;
-
-  /**
-   * Constructor
-   *
-   * @param oocEngine Out-of-core engine
-   * @param partitionId Id of the partition involved in the IO
-   */
-  public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
-    this.oocEngine = oocEngine;
-    this.partitionId = partitionId;
-    this.numBytesTransferred = 0;
-  }
-
-  /**
-   * Get the id of the partition involved in the IO
-   *
-   * @return id of the partition
-   */
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  /**
-   * Execute (load/store of data) the IO command, and change the data stores
-   * appropriately based on the data loaded/stored. Return true iff the command
-   * is actually executed (resulted in loading or storing data).
-   *
-   * @param basePath the base path (prefix) to the files/folders IO command
-   *                 should read/write data from/to
-   * @return whether the command is actually executed
-   * @throws IOException
-   */
-  public abstract boolean execute(String basePath) throws IOException;
-
-  /**
-   * Get the type of the command.
-   *
-   * @return type of the command
-   */
-  public abstract IOCommandType getType();
-
-  /**
-   * Get the number of bytes transferred (loaded/stored from/to disk).
-   *
-   * @return number of bytes transferred during the execution of the command
-   */
-  public long bytesTransferred() {
-    return numBytesTransferred;
-  }
-}
-

Reply via email to