http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
deleted file mode 100644
index 7368420..0000000
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ /dev/null
@@ -1,1300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
-import static org.apache.giraph.conf.GiraphConstants.MAX_STICKY_PARTITIONS;
-import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS;
-import static org.apache.giraph.conf.GiraphConstants.NUM_INPUT_THREADS;
-import static org.apache.giraph.conf.GiraphConstants.NUM_OUTPUT_THREADS;
-import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-/**
- * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
- * The operations are thread-safe, and guarantees safety for concurrent
- * execution of different operations on each partition.<br />
- * <br />
- * The algorithm implemented by this class is quite intricate due to the
- * interaction of several locks to guarantee performance. For this reason, here
- * follows an overview of the implemented algorithm.<br />
- * <b>ALGORITHM</b>:
- * In general, the partition store keeps N partitions in memory. To improve
- * I/O performance, part of the N partitions are kept in memory in a sticky
- * manner while preserving the capability of each thread to swap partitions on
- * the disk. This means that, for T threads, at least T partitions must remain
- * non-sticky. The number of sicky partitions can also be specified manually.
- * <b>CONCURRENCY</b>:
- * <ul>
- *   <li>
- *     <b>Meta Partition Lock</b>.All the partitions are held in a
- *     container, called the MetaPartition. This object contains also meta
- *     information about the partition. All these objects are used to
- *     atomically operate on partitions. In fact, each time a thread accesses
- *     a partition, it will firstly acquire a lock on the container,
- *     guaranteeing exclusion in managing the partition. Besides, this
- *     partition-based lock allows the threads to concurrently operate on
- *     different partitions, guaranteeing performance.
- *   </li>
- *   <li>
- *     <b>Meta Partition Container</b>. All the references to the meta
- *     partition objects are kept in a concurrent hash map. This ADT guarantees
- *     performance and atomic access to each single reference, which is then
- *     use for atomic operations on partitions, as previously described.
- *   </li>
- *   <li>
- *     <b>LRU Lock</b>. Finally, an additional ADT is used to keep the LRU
- *     order of unused partitions. In this case, the ADT is not thread safe and
- *     to guarantee safety, we use its intrinsic lock. Additionally, this lock
- *     is used for all in memory count changes. In fact, the amount of elements
- *     in memory and the inactive partitions are very strictly related and this
- *     justifies the sharing of the same lock.
- *   </li>
- * </ul>
- * <b>XXX</b>:<br/>
- * while most of the concurrent behaviors are gracefully handled, the
- * concurrent call of {@link #getOrCreatePartition(Integer partitionId)} and
- * {@link #deletePartition(Integer partitionId)} need yet to be handled. Since
- * the usage of this class does not currently incure in this type of concurrent
- * behavior, it has been left as a future work.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- */
-@SuppressWarnings("rawtypes")
-public class DiskBackedPartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends PartitionStore<I, V, E> {
-  /** Class logger. */
-  private static final Logger LOG =
-      Logger.getLogger(DiskBackedPartitionStore.class);
-  /** States the partition can be found in */
-  private enum State { INIT, ACTIVE, INACTIVE, ONDISK };
-
-  /** Hash map containing all the partitions  */
-  private final ConcurrentMap<Integer, MetaPartition> partitions =
-    Maps.newConcurrentMap();
-  /** Inactive partitions to re-activate or spill to disk to make space */
-  private final Map<Integer, MetaPartition> lru = Maps.newLinkedHashMap();
-
-  /** Giraph configuration */
-  private final
-  ImmutableClassesGiraphConfiguration<I, V, E> conf;
-  /** Mapper context */
-  private final Context context;
-  /** Base path where the partition files are written to */
-  private final String[] basePaths;
-  /** Used to hash partition Ids */
-  private final HashFunction hasher = Hashing.murmur3_32();
-  /** Maximum number of slots. Read-only value, no need for concurrency
-      safety. */
-  private final int maxPartitionsInMem;
-  /** Number of slots used */
-  private AtomicInteger numPartitionsInMem;
-  /** service worker reference */
-  private CentralizedServiceWorker<I, V, E> serviceWorker;
-  /** mumber of slots that are always kept in memory */
-  private AtomicLong numOfStickyPartitions;
-  /** counter */
-  private long passedThroughEdges;
-
-  /**
-   * Constructor
-   *
-   * @param conf Configuration
-   * @param context Context
-   * @param serviceWorker service worker reference
-   */
-  public DiskBackedPartitionStore(
-      ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      Mapper<?, ?, ?, ?>.Context context,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
-    this.conf = conf;
-    this.context = context;
-    this.serviceWorker = serviceWorker;
-
-    this.passedThroughEdges = 0;
-    this.numPartitionsInMem = new AtomicInteger(0);
-
-    // We must be able to hold at least one partition in memory
-    this.maxPartitionsInMem = Math.max(MAX_PARTITIONS_IN_MEMORY.get(conf), 1);
-
-    int numInputThreads = NUM_INPUT_THREADS.get(conf);
-    int numComputeThreads = NUM_COMPUTE_THREADS.get(conf);
-    int numOutputThreads = NUM_OUTPUT_THREADS.get(conf);
-
-    int maxThreads =
-      Math.max(numInputThreads,
-        Math.max(numComputeThreads, numOutputThreads));
-
-    // check if the sticky partition option is set and, if so, set the
-    long maxSticky = MAX_STICKY_PARTITIONS.get(conf);
-
-    // number of sticky partitions
-    if (maxSticky > 0 && maxPartitionsInMem - maxSticky >= maxThreads) {
-      this.numOfStickyPartitions = new AtomicLong(maxSticky);
-    } else {
-      if (maxPartitionsInMem - maxSticky >= maxThreads) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("giraph.maxSticky parameter unset or improperly set " +
-            "resetting to automatically computed value.");
-        }
-      }
-
-      if (maxPartitionsInMem == 1 || maxThreads >= maxPartitionsInMem) {
-        this.numOfStickyPartitions = new AtomicLong(0);
-      } else {
-        this.numOfStickyPartitions =
-          new AtomicLong(maxPartitionsInMem - maxThreads);
-      }
-    }
-
-    // Take advantage of multiple disks
-    String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
-    basePaths = new String[userPaths.length];
-    int i = 0;
-    for (String path : userPaths) {
-      basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("DiskBackedPartitionStore with maxInMemoryPartitions=" +
-        maxPartitionsInMem + ", isStaticGraph=" + conf.isStaticGraph());
-    }
-  }
-
-  @Override
-  public Iterable<Integer> getPartitionIds() {
-    return Iterables.unmodifiableIterable(partitions.keySet());
-  }
-
-  @Override
-  public boolean hasPartition(final Integer id) {
-    return partitions.containsKey(id);
-  }
-
-  @Override
-  public int getNumPartitions() {
-    return partitions.size();
-  }
-
-  @Override
-  public long getPartitionVertexCount(int partitionId) {
-    MetaPartition meta = partitions.get(partitionId);
-    if (meta.getState() == State.ONDISK) {
-      return meta.getVertexCount();
-    } else {
-      return meta.getPartition().getVertexCount();
-    }
-  }
-
-  @Override
-  public long getPartitionEdgeCount(int partitionId) {
-    MetaPartition meta = partitions.get(partitionId);
-    if (meta.getState() == State.ONDISK) {
-      return meta.getEdgeCount();
-    } else {
-      return meta.getPartition().getEdgeCount();
-    }
-  }
-
-  @Override
-  public Partition<I, V, E> getOrCreatePartition(Integer id) {
-    MetaPartition meta = new MetaPartition(id);
-    MetaPartition temp;
-
-    temp = partitions.putIfAbsent(id, meta);
-    if (temp != null) {
-      meta = temp;
-    }
-
-    synchronized (meta) {
-      if (temp == null && numOfStickyPartitions.getAndDecrement() > 0) {
-        meta.setSticky();
-      }
-      getPartition(meta);
-      if (meta.getPartition() == null) {
-        Partition<I, V, E> partition = conf.createPartition(id, context);
-        meta.setPartition(partition);
-        addPartition(meta, partition);
-        if (meta.getState() == State.INIT) {
-          LOG.warn("Partition was still INIT after ADD (not possibile).");
-        }
-        // This get is necessary. When creating a new partition, it will be
-        // placed by default as INACTIVE. However, here the user will retrieve
-        // a reference to it, and hence there is the need to update the
-        // reference count, as well as the state of the object.
-        getPartition(meta);
-        if (meta.getState() == State.INIT) {
-          LOG.warn("Partition was still INIT after GET (not possibile).");
-        }
-      }
-
-      if (meta.getState() == State.INIT) {
-        String msg = "Getting a partition which is in INIT state is " +
-                     "not allowed. " + meta;
-        LOG.error(msg);
-        throw new IllegalStateException(msg);
-      }
-
-      return meta.getPartition();
-    }
-  }
-
-  @Override
-  public void putPartition(Partition<I, V, E> partition) {
-    Integer id = partition.getId();
-    MetaPartition meta = partitions.get(id);
-    putPartition(meta);
-  }
-
-  @Override
-  public void deletePartition(Integer id) {
-    if (hasPartition(id)) {
-      MetaPartition meta = partitions.get(id);
-      deletePartition(meta);
-    }
-  }
-
-  @Override
-  public Partition<I, V, E> removePartition(Integer id) {
-    if (hasPartition(id)) {
-      MetaPartition meta;
-
-      meta = partitions.get(id);
-      synchronized (meta) {
-        getPartition(meta);
-        putPartition(meta);
-        deletePartition(id);
-      }
-      return meta.getPartition();
-    }
-    return null;
-  }
-
-  @Override
-  public void addPartition(Partition<I, V, E> partition) {
-    Integer id = partition.getId();
-    MetaPartition meta = new MetaPartition(id);
-    MetaPartition temp;
-
-    meta.setPartition(partition);
-    temp = partitions.putIfAbsent(id, meta);
-    if (temp != null) {
-      meta = temp;
-    }
-
-    synchronized (meta) {
-      if (temp == null && numOfStickyPartitions.getAndDecrement() > 0) {
-        meta.setSticky();
-      }
-      addPartition(meta, partition);
-    }
-  }
-
-  @Override
-  public void shutdown() {
-    for (MetaPartition e : partitions.values()) {
-      if (e.getState() == State.ONDISK) {
-        deletePartitionFiles(e.getId());
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for (MetaPartition e : partitions.values()) {
-      sb.append(e.toString() + "\n");
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Writes vertex data (Id, value and halted state) to stream.
-   *
-   * @param output The output stream
-   * @param vertex The vertex to serialize
-   * @throws IOException
-   */
-  private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
-    throws IOException {
-
-    vertex.getId().write(output);
-    vertex.getValue().write(output);
-    output.writeBoolean(vertex.isHalted());
-  }
-
-  /**
-   * Writes vertex edges (Id, edges) to stream.
-   *
-   * @param output The output stream
-   * @param vertex The vertex to serialize
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
-    throws IOException {
-
-    vertex.getId().write(output);
-    OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
-    edges.write(output);
-  }
-
-  /**
-   * Read vertex data from an input and initialize the vertex.
-   *
-   * @param in The input stream
-   * @param vertex The vertex to initialize
-   * @throws IOException
-   */
-  private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
-    throws IOException {
-
-    I id = conf.createVertexId();
-    id.readFields(in);
-    V value = conf.createVertexValue();
-    value.readFields(in);
-    OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
-    vertex.initialize(id, value, edges);
-    if (in.readBoolean()) {
-      vertex.voteToHalt();
-    } else {
-      vertex.wakeUp();
-    }
-  }
-
-  /**
-   * Read vertex edges from an input and set them to the vertex.
-   *
-   * @param in The input stream
-   * @param partition The partition owning the vertex
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  private void readOutEdges(DataInput in, Partition<I, V, E> partition)
-    throws IOException {
-
-    I id = conf.createVertexId();
-    id.readFields(in);
-    Vertex<I, V, E> v = partition.getVertex(id);
-    OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
-    edges.readFields(in);
-    partition.saveVertex(v);
-  }
-
-  /**
-   * Load a partition from disk. It deletes the files after the load,
-   * except for the edges, if the graph is static.
-   *
-   * @param id The id of the partition to load
-   * @param numVertices The number of vertices contained on disk
-   * @return The partition
-   * @throws IOException
-   */
-  private Partition<I, V, E> loadPartition(int id, long numVertices)
-    throws IOException {
-
-    Partition<I, V, E> partition = conf.createPartition(id, context);
-
-    // Vertices
-    File file = new File(getVerticesPath(id));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("loadPartition: loading partition vertices " +
-        partition.getId() + " from " + file.getAbsolutePath());
-    }
-
-    FileInputStream filein = new FileInputStream(file);
-    BufferedInputStream bufferin = new BufferedInputStream(filein);
-    DataInputStream inputStream  = new DataInputStream(bufferin);
-    for (int i = 0; i < numVertices; ++i) {
-      Vertex<I, V , E> vertex = conf.createVertex();
-      readVertexData(inputStream, vertex);
-      partition.putVertex(vertex);
-    }
-    inputStream.close();
-    if (!file.delete()) {
-      String msg = "loadPartition: failed to delete " + file.getAbsolutePath();
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
-
-    // Edges
-    file = new File(getEdgesPath(id));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("loadPartition: loading partition edges " +
-        partition.getId() + " from " + file.getAbsolutePath());
-    }
-
-    filein = new FileInputStream(file);
-    bufferin = new BufferedInputStream(filein);
-    inputStream  = new DataInputStream(bufferin);
-    for (int i = 0; i < numVertices; ++i) {
-      readOutEdges(inputStream, partition);
-    }
-    inputStream.close();
-    // If the graph is static, keep the file around.
-    if (!conf.isStaticGraph() && !file.delete()) {
-      String msg = "loadPartition: failed to delete " + file.getAbsolutePath();
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
-    return partition;
-  }
-
-  /**
-   * Write a partition to disk.
-   *
-   * @param meta meta partition containing the partition to offload
-   * @throws IOException
-   */
-  private void offloadPartition(MetaPartition meta) throws IOException {
-
-    Partition<I, V, E> partition = meta.getPartition();
-    File file = new File(getVerticesPath(partition.getId()));
-    File parent = file.getParentFile();
-    if (!parent.exists() && !parent.mkdirs() && LOG.isDebugEnabled()) {
-      LOG.debug("offloadPartition: directory " + parent.getAbsolutePath() +
-        " already exists.");
-    }
-
-    if (!file.createNewFile()) {
-      String msg = "offloadPartition: file " + parent.getAbsolutePath() +
-        " already exists.";
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("offloadPartition: writing partition vertices " +
-        partition.getId() + " to " + file.getAbsolutePath());
-    }
-
-    FileOutputStream fileout = new FileOutputStream(file);
-    BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-    DataOutputStream outputStream  = new DataOutputStream(bufferout);
-    for (Vertex<I, V, E> vertex : partition) {
-      writeVertexData(outputStream, vertex);
-    }
-    outputStream.close();
-
-    // Avoid writing back edges if we have already written them once and
-    // the graph is not changing.
-    // If we are in the input superstep, we need to write the files
-    // at least the first time, even though the graph is static.
-    file = new File(getEdgesPath(partition.getId()));
-    if (meta.getPrevVertexCount() != partition.getVertexCount() ||
-        !conf.isStaticGraph() || !file.exists()) {
-
-      meta.setPrevVertexCount(partition.getVertexCount());
-
-      if (!file.createNewFile() && LOG.isDebugEnabled()) {
-        LOG.debug("offloadPartition: file " + file.getAbsolutePath() +
-            " already exists.");
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("offloadPartition: writing partition edges " +
-          partition.getId() + " to " + file.getAbsolutePath());
-      }
-
-      fileout = new FileOutputStream(file);
-      bufferout = new BufferedOutputStream(fileout);
-      outputStream = new DataOutputStream(bufferout);
-      for (Vertex<I, V, E> vertex : partition) {
-        writeOutEdges(outputStream, vertex);
-      }
-      outputStream.close();
-    }
-  }
-
-  /**
-   * Append a partition on disk at the end of the file. Expects the caller
-   * to hold the global lock.
-   *
-   * @param meta      meta partition container for the partitiont to save
-   *                  to disk
-   * @param partition The partition
-   * @throws IOException
-   */
-  private void addToOOCPartition(MetaPartition meta,
-    Partition<I, V, E> partition) throws IOException {
-
-    Integer id = partition.getId();
-    File file = new File(getVerticesPath(id));
-    DataOutputStream outputStream = null;
-
-    FileOutputStream fileout = new FileOutputStream(file, true);
-    BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-    outputStream = new DataOutputStream(bufferout);
-    for (Vertex<I, V, E> vertex : partition) {
-      writeVertexData(outputStream, vertex);
-    }
-    outputStream.close();
-
-    file = new File(getEdgesPath(id));
-    fileout = new FileOutputStream(file, true);
-    bufferout = new BufferedOutputStream(fileout);
-    outputStream = new DataOutputStream(bufferout);
-    for (Vertex<I, V, E> vertex : partition) {
-      writeOutEdges(outputStream, vertex);
-    }
-    outputStream.close();
-  }
-
-  /**
-   * Delete a partition's files.
-   *
-   * @param id The id of the partition owning the file.
-   */
-  public void deletePartitionFiles(Integer id) {
-    // File containing vertices
-    File file = new File(getVerticesPath(id));
-    if (file.exists() && !file.delete()) {
-      String msg = "deletePartitionFiles: Failed to delete file " +
-        file.getAbsolutePath();
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
-
-    // File containing edges
-    file = new File(getEdgesPath(id));
-    if (file.exists() && !file.delete()) {
-      String msg = "deletePartitionFiles: Failed to delete file " +
-        file.getAbsolutePath();
-      LOG.error(msg);
-      throw new IllegalStateException(msg);
-    }
-  }
-
-  /**
-   * Get the path and basename of the storage files.
-   *
-   * @param partitionId The partition
-   * @return The path to the given partition
-   */
-  private String getPartitionPath(Integer partitionId) {
-    int hash = hasher.hashInt(partitionId).asInt();
-    int idx = Math.abs(hash % basePaths.length);
-    return basePaths[idx] + "/partition-" + partitionId;
-  }
-
-  /**
-   * Get the path to the file where vertices are stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the vertices file
-   */
-  private String getVerticesPath(Integer partitionId) {
-    return getPartitionPath(partitionId) + "_vertices";
-  }
-
-  /**
-   * Get the path to the file where edges are stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the edges file
-   */
-  private String getEdgesPath(Integer partitionId) {
-    return getPartitionPath(partitionId) + "_edges";
-  }
-
-  /**
-   * Removes and returns the last recently used entry.
-   *
-   * @return The last recently used entry.
-   */
-  private MetaPartition getLRUPartition() {
-    synchronized (lru) {
-      Iterator<Entry<Integer, MetaPartition>> i =
-          lru.entrySet().iterator();
-      Entry<Integer, MetaPartition> entry = i.next();
-      i.remove();
-      return entry.getValue();
-    }
-  }
-
-  /**
-   * Method that gets a partition from the store.
-   * The partition is produced as a side effect of the computation and is
-   * reflected inside the META object provided as parameter.
-   * This function is thread-safe since it locks the whole computation
-   * on the metapartition provided.<br />
-   * <br />
-   * <b>ONDISK case</b><br />
-   * When a thread tries to access an element on disk, it waits until it
-   * space in memory and inactive data to swap resources.
-   * It is possible that multiple threads wait for a single
-   * partition to be restored from disk. The semantic of this
-   * function is that only the first thread interested will be
-   * in charge to load it back to memory, hence waiting on 'lru'.
-   * The others, will be waiting on the lock to be available,
-   * preventing consistency issues.<br />
-   * <br />
-   * <b>Deadlock</b><br />
-   * The code of this method can in principle lead to a deadlock, due to the
-   * fact that two locks are held together while running the "wait" method.
-   * However, this problem does not occur. The two locks held are:
-   * <ol>
-   *  <li><b>Meta Object</b>, which is the object the thread is trying to
-   *  acquire and is currently stored on disk.</li>
-   *  <li><b>LRU data structure</b>, which keeps track of the objects which are
-   *  inactive and hence swappable.</li>
-   * </ol>
-   * It is not possible that two getPartition calls cross because this means
-   * that LRU objects are both INACTIVE and ONDISK at the same time, which is
-   * not possible.
-   *
-   * @param meta meta partition container with the partition itself
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-    value = "TLW_TWO_LOCK_WAIT",
-    justification = "The two locks held do not produce a deadlock")
-  private void getPartition(MetaPartition meta) {
-    synchronized (meta) {
-      boolean isNotDone = true;
-
-      if (meta.getState() != State.INIT) {
-        State state;
-
-        while (isNotDone) {
-          state = meta.getState();
-          switch (state) {
-          case ONDISK:
-            MetaPartition swapOutPartition = null;
-            long numVertices = meta.getVertexCount();
-
-          synchronized (lru) {
-            try {
-              while (numPartitionsInMem.get() >= maxPartitionsInMem &&
-                     lru.isEmpty()) {
-                // notification for threads waiting on this are
-                // required in two cases:
-                // a) when an element is added to the LRU (hence
-                //    a new INACTIVE partition is added).
-                // b) when additioanl space is available in memory (hence
-                //    in memory counter is decreased).
-                lru.wait();
-              }
-            } catch (InterruptedException e) {
-              LOG.error("getPartition: error while waiting on " +
-                "LRU data structure: " + e.getMessage());
-              throw new IllegalStateException(e);
-            }
-
-            // We have to make some space first, by removing the least used
-            // partition (hence the first in the LRU data structure).
-            //
-            // NB: In case the LRU is not empty, we are _swapping_ elements.
-            //     This, means that there is no need to "make space" by
-            //     changing the in-memory counter. Differently, if the element
-            //     can directly be placed into memory, the memory usage
-            //     increases by one.
-            if (numPartitionsInMem.get() >= maxPartitionsInMem &&
-                !lru.isEmpty()) {
-              swapOutPartition = getLRUPartition();
-            } else if (numPartitionsInMem.get() < maxPartitionsInMem) {
-              numPartitionsInMem.getAndIncrement();
-            } else {
-              String msg = "lru is empty and there is not space in memory, " +
-                           "hence the partition cannot be loaded.";
-              LOG.error(msg);
-              throw new IllegalStateException(msg);
-            }
-          }
-
-            if (swapOutPartition != null) {
-              synchronized (swapOutPartition) {
-                if (swapOutPartition.isSticky()) {
-                  String msg = "Partition " + meta.getId() + " is sticky " +
-                    " and cannot be offloaded.";
-                  LOG.error(msg);
-                  throw new IllegalStateException(msg);
-                }
-                // safety check
-                if (swapOutPartition.getState() != State.INACTIVE) {
-                  String msg = "Someone is holding the partition with id " +
-                    swapOutPartition.getId() + " but is supposed to be " +
-                    "inactive.";
-                  LOG.error(msg);
-                  throw new IllegalStateException(msg);
-                }
-
-                try {
-                  offloadPartition(swapOutPartition);
-                  Partition<I, V, E> p = swapOutPartition.getPartition();
-                  swapOutPartition.setOnDisk(p);
-                  // notify all the threads waiting to the offloading process,
-                  // that they are allowed again to access the
-                  // swapped-out object.
-                  swapOutPartition.notifyAll();
-                } catch (IOException e)  {
-                  LOG.error("getPartition: Failed while Offloading " +
-                    "New Partition: " + e.getMessage());
-                  throw new IllegalStateException(e);
-                }
-              }
-            }
-
-            // If it was needed, the partition to be swpped out is on disk.
-            // Additionally, we are guaranteed that we have a free spot in
-            // memory, in fact,
-            // a) either there was space in memory, and hence the in memory
-            //    counter was incremented reserving the space for this element.
-            // b) or the space has been created by swapping out the partition
-            //    that was inactive in the LRU.
-            // This means that, even in the case that concurrently swapped
-            // element is restored back to memory, there must have been
-            // place for only an additional partition.
-            Partition<I, V, E> partition;
-            try {
-              partition = loadPartition(meta.getId(), numVertices);
-            } catch (IOException e)  {
-              LOG.error("getPartition: Failed while Loading Partition from " +
-                "disk: " + e.getMessage());
-              throw new IllegalStateException(e);
-            }
-            meta.setActive(partition);
-
-            isNotDone = false;
-            break;
-          case INACTIVE:
-            MetaPartition p = null;
-
-            if (meta.isSticky()) {
-              meta.setActive();
-              isNotDone = false;
-              break;
-            }
-
-          synchronized (lru) {
-            p = lru.remove(meta.getId());
-          }
-            if (p == meta && p.getState() == State.INACTIVE) {
-              meta.setActive();
-              isNotDone = false;
-            } else {
-              try {
-                // A thread could wait here when an inactive partition is
-                // concurrently swapped to disk. In fact, the meta object is
-                // locked but, even though the object is inactive, it is not
-                // present in the LRU ADT.
-                // The thread need to be signaled when the partition is
-                // finally swapped out of the disk.
-                meta.wait();
-              } catch (InterruptedException e) {
-                LOG.error("getPartition: error while waiting on " +
-                  "previously Inactive Partition: " + e.getMessage());
-                throw new IllegalStateException(e);
-              }
-              isNotDone = true;
-            }
-            break;
-          case ACTIVE:
-            meta.incrementReferences();
-            isNotDone = false;
-            break;
-          default:
-            throw new IllegalStateException("illegal state " + meta.getState() 
+
-              " for partition " + meta.getId());
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Method that puts a partition back to the store. This function is
-   * thread-safe using meta partition intrinsic lock.
-   *
-   * @param meta meta partition container with the partition itself
-   */
-  private void putPartition(MetaPartition meta) {
-    synchronized (meta) {
-      if (meta.getState() != State.ACTIVE) {
-        String msg = "It is not possible to put back a partition which is " +
-          "not ACTIVE.\n" + meta.toString();
-        LOG.error(msg);
-        throw new IllegalStateException(msg);
-      }
-
-      if (meta.decrementReferences() == 0) {
-        meta.setState(State.INACTIVE);
-        if (!meta.isSticky()) {
-          synchronized (lru) {
-            lru.put(meta.getId(), meta);
-            lru.notifyAll();  // notify every waiting process about the fact
-                              // that the LRU is not empty anymore
-          }
-        }
-        meta.notifyAll(); // needed for the threads that are waiting for the
-                          // partition to become inactive, when
-                          // trying to delete the partition
-      }
-    }
-  }
-
-  /**
-   * Task that adds a partition to the store.
-   * This function is thread-safe since it locks using the intrinsic lock of
-   * the meta partition.
-   *
-   * @param meta meta partition container with the partition itself
-   * @param partition partition to be added
-   */
-  private void addPartition(MetaPartition meta, Partition<I, V, E> partition) {
-    synchronized (meta) {
-      // If the state of the partition is INIT, this means that the META
-      // object was just created, and hence the partition is new.
-      if (meta.getState() == State.INIT) {
-        // safety check to guarantee that the partition was set.
-        if (partition == null) {
-          String msg = "No partition was provided.";
-          LOG.error(msg);
-          throw new IllegalStateException(msg);
-        }
-
-        // safety check to guarantee that the partition provided is the one
-        // that is also set in the meta partition.
-        if (partition != meta.getPartition()) {
-          String msg = "Partition and Meta-Partition should " +
-            "contain the same data";
-          LOG.error(msg);
-          throw new IllegalStateException(msg);
-        }
-
-        synchronized (lru) {
-          if (numPartitionsInMem.get() < maxPartitionsInMem || meta.isSticky) {
-            meta.setState(State.INACTIVE);
-            numPartitionsInMem.getAndIncrement();
-            if (!meta.isSticky) {
-              lru.put(meta.getId(), meta);
-              lru.notifyAll();  // signaling that at least one element is
-                                // present in the LRU ADT.
-            }
-            return; // this is used to exit the function and avoid using the
-                    // else clause. This is required to keep only this part of
-                    // the code under lock and aoivd keeping the lock when
-                    // performing the Offload I/O
-          }
-        }
-
-        // ELSE
-        try {
-          offloadPartition(meta);
-          meta.setOnDisk(partition);
-        } catch (IOException e)  {
-          LOG.error("addPartition: Failed while Offloading New Partition: " +
-            e.getMessage());
-          throw new IllegalStateException(e);
-        }
-      } else {
-        Partition<I, V, E> existing = null;
-        boolean isOOC = false;
-        boolean isNotDone = true;
-        State state;
-
-        while (isNotDone) {
-          state = meta.getState();
-          switch (state) {
-          case ONDISK:
-            isOOC = true;
-            isNotDone = false;
-            meta.addToVertexCount(partition.getVertexCount());
-            break;
-          case INACTIVE:
-            MetaPartition p = null;
-
-            if (meta.isSticky()) {
-              existing = meta.getPartition();
-              isNotDone = false;
-              break;
-            }
-
-          synchronized (lru) {
-            p = lru.get(meta.getId());
-          }
-            // this check is safe because, even though we are out of the LRU
-            // lock, we still hold the lock on the partition. This means that
-            // a) if the partition was removed from the LRU, p will be null
-            //    and the current thread will wait.
-            // b) if the partition was not removed, its state cannot be
-            //    modified since the lock is held on meta, which refers to
-            //    the same object.
-            if (p == meta) {
-              existing = meta.getPartition();
-              isNotDone = false;
-            } else {
-              try {
-                // A thread could wait here when an inactive partition is
-                // concurrently swapped to disk. In fact, the meta object is
-                // locked but, even though the object is inactive, it is not
-                // present in the LRU ADT.
-                // The thread need to be signaled when the partition is finally
-                // swapped out of the disk.
-                meta.wait();
-              } catch (InterruptedException e) {
-                LOG.error("addPartition: error while waiting on " +
-                  "previously inactive partition: " + e.getMessage());
-                throw new IllegalStateException(e);
-              }
-
-              isNotDone = true;
-            }
-            break;
-          case ACTIVE:
-            existing = meta.getPartition();
-            isNotDone = false;
-            break;
-          default:
-            throw new IllegalStateException("illegal state " + state +
-              " for partition " + meta.getId());
-          }
-        }
-
-        if (isOOC) {
-          try {
-            addToOOCPartition(meta, partition);
-          } catch (IOException e) {
-            LOG.error("addPartition: Failed while Adding to OOC Partition: " +
-              e.getMessage());
-            throw new IllegalStateException(e);
-          }
-        } else {
-          existing.addPartition(partition);
-        }
-      }
-    }
-  }
-
-  /**
-   * Task that deletes a partition to the store
-   * This function is thread-safe using the intrinsic lock of the meta
-   * partition object
-   *
-   * @param meta meta partition container with the partition itself
-   */
-  private void deletePartition(MetaPartition meta) {
-    synchronized (meta) {
-      boolean isDone = false;
-      int id = meta.getId();
-
-      State state;
-      while (!isDone) {
-        state = meta.getState();
-        switch (state) {
-        case ONDISK:
-          deletePartitionFiles(id);
-          isDone = true;
-          break;
-        case INACTIVE:
-          MetaPartition p;
-
-          if (meta.isSticky()) {
-            isDone = true;
-            numPartitionsInMem.getAndDecrement();
-            break;
-          }
-
-        synchronized (lru) {
-          p = lru.remove(id);
-          if (p == meta && p.getState() == State.INACTIVE) {
-            isDone = true;
-            numPartitionsInMem.getAndDecrement();
-            lru.notifyAll(); // notify all waiting processes that there is now
-                             // at least one new free place in memory.
-            // XXX: attention, here a race condition with getPartition is
-            //      possible, since changing lru is separated by the removal
-            //      of the element from the parittions ADT.
-            break;
-          }
-        }
-          try {
-            // A thread could wait here when an inactive partition is
-            // concurrently swapped to disk. In fact, the meta object is
-            // locked but, even though the object is inactive, it is not
-            // present in the LRU ADT.
-            // The thread need to be signaled when the partition is
-            // finally swapped out of the disk.
-            meta.wait();
-          } catch (InterruptedException e) {
-            LOG.error("deletePartition: error while waiting on " +
-              "previously inactive partition: " + e.getMessage());
-            throw new IllegalStateException(e);
-          }
-          isDone = false;
-          break;
-        case ACTIVE:
-          try {
-            // the thread waits that the object to be deleted becomes inactive,
-            // otherwise the deletion is not possible.
-            // The thread needs to be signaled when the partition becomes
-            // inactive.
-            meta.wait();
-          } catch (InterruptedException e) {
-            LOG.error("deletePartition: error while waiting on " +
-              "active partition: " + e.getMessage());
-            throw new IllegalStateException(e);
-          }
-          break;
-        default:
-          throw new IllegalStateException("illegal state " + state +
-            " for partition " + id);
-        }
-      }
-      partitions.remove(id);
-    }
-  }
-
-  /**
-   * Partition container holding additional meta data associated with each
-   * partition.
-   */
-  private class MetaPartition {
-    // ---- META INFORMATION ----
-    /** ID of the partition */
-    private int id;
-    /** State in which the partition is */
-    private State state;
-    /**
-     * Counter used to keep track of the number of references retained by
-     * user-threads
-     */
-    private int references;
-    /** Number of vertices contained in the partition */
-    private long vertexCount;
-    /** Previous number of vertices contained in the partition */
-    private long prevVertexCount;
-    /** Number of edges contained in the partition */
-    private long edgeCount;
-    /**
-     * Sticky bit; if set, this partition is never supposed to be
-     * written to disk
-     */
-    private boolean isSticky;
-
-    // ---- PARTITION ----
-    /** the actual partition. Depending on the state of the partition,
-        this object could be empty. */
-    private Partition<I, V, E> partition;
-
-    /**
-     * Initialization of the metadata enriched partition.
-     *
-     * @param id id of the partition
-     */
-    public MetaPartition(int id) {
-      this.id = id;
-      this.state = State.INIT;
-      this.references = 0;
-      this.vertexCount = 0;
-      this.prevVertexCount = 0;
-      this.edgeCount = 0;
-      this.isSticky = false;
-
-      this.partition = null;
-    }
-
-    /**
-     * @return the id
-     */
-    public int getId() {
-      return id;
-    }
-
-    /**
-     * @return the state
-     */
-    public State getState() {
-      return state;
-    }
-
-    /**
-     * This function sets the metadata for on-disk partition.
-     *
-     * @param partition partition related to this container
-     */
-    public void setOnDisk(Partition<I, V, E> partition) {
-      this.state = State.ONDISK;
-      this.partition = null;
-      this.vertexCount = partition.getVertexCount();
-      this.edgeCount = partition.getEdgeCount();
-    }
-
-    /**
-     *
-     */
-    public void setActive() {
-      this.setActive(null);
-    }
-
-    /**
-     *
-     * @param partition the partition associate to this container
-     */
-    public void setActive(Partition<I, V, E> partition) {
-      if (partition != null) {
-        this.partition = partition;
-      }
-      this.state = State.ACTIVE;
-      this.prevVertexCount = this.vertexCount;
-      this.vertexCount = 0;
-      this.incrementReferences();
-    }
-
-    /**
-     * @param state the state to set
-     */
-    public void setState(State state) {
-      this.state = state;
-    }
-
-    /**
-     * @return decremented references
-     */
-    public int decrementReferences() {
-      if (references > 0) {
-        references -= 1;
-      }
-      return references;
-    }
-
-    /**
-     * @return incremented references
-     */
-    public int incrementReferences() {
-      return ++references;
-    }
-
-    /**
-     * set previous number of vertexes
-     * @param vertexCount number of vertexes
-     */
-    public void setPrevVertexCount(long vertexCount) {
-      this.prevVertexCount = vertexCount;
-    }
-
-    /**
-     * @return the vertexCount
-     */
-    public long getPrevVertexCount() {
-      return prevVertexCount;
-    }
-
-    /**
-     * @return the vertexCount
-     */
-    public long getVertexCount() {
-      return vertexCount;
-    }
-
-    /**
-     * @return the edgeCount
-     */
-    public long getEdgeCount() {
-      return edgeCount;
-    }
-
-    /**
-     * @param inc amount to add to the vertex count
-     */
-    public void addToVertexCount(long inc) {
-      this.vertexCount += inc;
-      this.prevVertexCount = vertexCount;
-    }
-
-    /**
-     * @return the partition
-     */
-    public Partition<I, V, E> getPartition() {
-      return partition;
-    }
-
-    /**
-     * @param partition the partition to set
-     */
-    public void setPartition(Partition<I, V, E> partition) {
-      this.partition = partition;
-    }
-
-    /**
-     * Set sticky bit to this partition
-     */
-    public void setSticky() {
-      this.isSticky = true;
-    }
-
-    /**
-     * Get sticky bit to this partition
-     * @return boolean ture iff the sticky bit is set
-     */
-    public boolean isSticky() {
-      return this.isSticky;
-    }
-
-    @Override
-    public String toString() {
-      StringBuffer sb = new StringBuffer();
-
-      sb.append("Meta Data: { ");
-      sb.append("ID: " + id + "; ");
-      sb.append("State: " + state + "; ");
-      sb.append("Number of References: " + references + "; ");
-      sb.append("Number of Vertices: " + vertexCount + "; ");
-      sb.append("Previous number of Vertices: " + prevVertexCount + "; ");
-      sb.append("Number of edges: " + edgeCount + "; ");
-      sb.append("Is Sticky: " + isSticky + "; ");
-      sb.append("Partition: " + partition + "; }");
-
-      return sb.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java 
b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index bbcdcba..d3f3902 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -18,11 +18,15 @@
 
 package org.apache.giraph.partition;
 
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
- * Structure that stores partitions for a worker.
+ * Structure that stores partitions for a worker. PartitionStore does not allow
+ * random accesses to partitions except upon removal.
+ * This structure is thread-safe.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -31,33 +35,17 @@ import org.apache.hadoop.io.WritableComparable;
 public abstract class PartitionStore<I extends WritableComparable,
     V extends Writable, E extends Writable> {
   /**
-   * Add a new partition to the store or just the vertices from the partition
-   * to the old partition.
+   * Add a *new* partition to the store. If the partition is already existed,
+   * it does not add the partition and returns false.
    *
    * @param partition Partition to add
+   * @return Whether the addition made any change in the partition store
    */
-  public abstract void addPartition(Partition<I, V, E> partition);
+  public abstract boolean addPartition(Partition<I, V, E> partition);
 
   /**
-   * Get or create a partition. Note: user has to put back
-   * it to the store through {@link #putPartition(Partition)} after use.
-   *
-   * @param partitionId Partition id
-   * @return The requested partition (never null)
-   */
-  public abstract Partition<I, V, E> getOrCreatePartition(Integer partitionId);
-
-  /**
-   * Put a partition back to the store. Use this method to be put a partition
-   * back after it has been retrieved through
-   * {@link #getOrCreatePartition(Integer)}.
-   *
-   * @param partition Partition
-   */
-  public abstract void putPartition(Partition<I, V, E> partition);
-
-  /**
-   * Remove a partition and return it.
+   * Remove a partition and return it. Called from a single thread, *not* from
+   * within an iteration cycle, and after INPUT_SUPERSTEP is complete.
    *
    * @param partitionId Partition id
    * @return The removed partition
@@ -65,15 +53,6 @@ public abstract class PartitionStore<I extends 
WritableComparable,
   public abstract Partition<I, V, E> removePartition(Integer partitionId);
 
   /**
-   * Just delete a partition
-   * (more efficient than {@link #removePartition(Integer partitionID)} if the
-   * partition is out of core).
-   *
-   * @param partitionId Partition id
-   */
-  public abstract void deletePartition(Integer partitionId);
-
-  /**
    * Whether a specific partition is present in the store.
    *
    * @param partitionId Partition id
@@ -97,17 +76,19 @@ public abstract class PartitionStore<I extends 
WritableComparable,
 
   /**
    * Return the number of vertices in a partition.
+   *
    * @param partitionId Partition id
    * @return The number of vertices in the specified partition
    */
-  public abstract long getPartitionVertexCount(int partitionId);
+  public abstract long getPartitionVertexCount(Integer partitionId);
 
   /**
    * Return the number of edges in a partition.
+   *
    * @param partitionId Partition id
    * @return The number of edges in the specified partition
    */
-  public abstract long getPartitionEdgeCount(int partitionId);
+  public abstract long getPartitionEdgeCount(Integer partitionId);
 
   /**
    * Whether the partition store is empty.
@@ -119,7 +100,88 @@ public abstract class PartitionStore<I extends 
WritableComparable,
   }
 
   /**
-   * Called at the end of the computation.
+   * Called at the end of the computation. Called from a single thread.
    */
   public void shutdown() { }
+
+  /**
+   * Called at the beginning of the computation. Called from a single thread.
+   */
+  public void initialize() { }
+
+  /**
+   * Start the iteration cycle to iterate over partitions. Note that each
+   * iteration cycle *must* iterate over *all* partitions. Usually an iteration
+   * cycle is necessary for
+   *   1) moving edges (from edge store) to vertices after edge input splits 
are
+   *      loaded in INPUT_SUPERSTEP,
+   *   2) computing partitions in each superstep (called once per superstep),
+   *   3) saving vertices/edges in the output superstep.
+   *   4) any sort of populating a data-structure based on the partitions in
+   *      this store.
+   *
+   * After an iteration is started, multiple threads can access the partition
+   * store using {@link #getNextPartition()} to iterate over the partitions.
+   * Each time {@link #getNextPartition()} is called an unprocessed partition 
in
+   * the current iteration is returned. After processing of the partition is
+   * done, partition should be put back in the store using
+   * {@link #putPartition(Partition)}. Here is an example of the entire
+   * workflow:
+   *
+   * In the main thread:
+   *   partitionStore.startIteration();
+   *
+   * In multiple threads iterating over the partitions:
+   *   Partition partition = partitionStore.getNextPartition();
+   *   ... do stuff with partition ...
+   *   partitionStore.putPartition(partition);
+   *
+   * Called from a single thread.
+   */
+  public abstract void startIteration();
+
+  /**
+   * Return the next partition in iteration for the current superstep.
+   * Note: user has to put back the partition to the store through
+   * {@link #putPartition(Partition)} after use. Look at comments on
+   * {@link #startIteration()} for more detail.
+   *
+   * @return The next partition to process
+   */
+  public abstract Partition<I, V, E> getNextPartition();
+
+  /**
+   * Put a partition back to the store. Use this method to put a partition
+   * back after it has been retrieved through {@link #getNextPartition()}.
+   * Look at comments on {@link #startIteration()} for more detail.
+   *
+   * @param partition Partition
+   */
+  public abstract void putPartition(Partition<I, V, E> partition);
+
+  /**
+   * Add vertices to a given partition from a given DataOutput instance. This
+   * method is called right after receipt of vertex request in INPUT_SUPERSTEP.
+   *
+   * @param partitionId Partition id
+   * @param extendedDataOutput Output containing serialized vertex data
+   */
+  public abstract void addPartitionVertices(Integer partitionId,
+      ExtendedDataOutput extendedDataOutput);
+
+  /**
+   * Add edges to a given partition from a given send edge request. This
+   * method is called right after receipt of edge request in INPUT_SUPERSTEP.
+   *
+   * @param partitionId Partition id
+   * @param edges Edges in the request
+   */
+  public abstract void addPartitionEdges(Integer partitionId,
+      VertexIdEdges<I, E> edges);
+
+  /**
+   * Move edges from edge store to partitions. This method is called from a
+   * *single thread* once all vertices and edges are read in INPUT_SUPERSTEP.
+   */
+  public abstract void moveEdgesToVertices();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index 8ed6081..8f34fed 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -19,11 +19,19 @@
 package org.apache.giraph.partition;
 
 import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.EdgeStore;
+import org.apache.giraph.edge.EdgeStoreFactory;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdEdges;
+import org.apache.giraph.utils.VertexIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -39,49 +47,34 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   /** Map of stored partitions. */
   private final ConcurrentMap<Integer, Partition<I, V, E>> partitions =
       Maps.newConcurrentMap();
+  /** Edge store for this worker. */
+  private final EdgeStore<I, V, E> edgeStore;
   /** Configuration. */
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Context used to report progress */
   private final Mapper<?, ?, ?, ?>.Context context;
+  /** Queue of partitions to be precessed in a superstep */
+  private BlockingQueue<Partition<I, V, E>> partitionQueue;
 
   /**
    * Constructor.
-   *
    * @param conf Configuration
    * @param context Mapper context
+   * @param serviceWorker Service worker
    */
-  public SimplePartitionStore(
-      ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      Mapper<?, ?, ?, ?>.Context context) {
+  public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> 
conf,
+      Mapper<?, ?, ?, ?>.Context context,
+      CentralizedServiceWorker<I, V, E> serviceWorker) {
     this.conf = conf;
     this.context = context;
+    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
+    edgeStoreFactory.initialize(serviceWorker, conf, context);
+    edgeStore = edgeStoreFactory.newStore();
   }
 
   @Override
-  public void addPartition(Partition<I, V, E> partition) {
-    Partition<I, V, E> oldPartition = partitions.get(partition.getId());
-    if (oldPartition == null) {
-      oldPartition = partitions.putIfAbsent(partition.getId(), partition);
-      if (oldPartition == null) {
-        return;
-      }
-    }
-    // This is thread-safe
-    oldPartition.addPartition(partition);
-  }
-
-  @Override
-  public Partition<I, V, E> getOrCreatePartition(Integer partitionId) {
-    Partition<I, V, E> oldPartition = partitions.get(partitionId);
-    if (oldPartition == null) {
-      Partition<I, V, E> newPartition =
-          conf.createPartition(partitionId, context);
-      oldPartition = partitions.putIfAbsent(partitionId, newPartition);
-      if (oldPartition == null) {
-        return newPartition;
-      }
-    }
-    return oldPartition;
+  public boolean addPartition(Partition<I, V, E> partition) {
+    return partitions.putIfAbsent(partition.getId(), partition) == null;
   }
 
   @Override
@@ -90,11 +83,6 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public void deletePartition(Integer partitionId) {
-    partitions.remove(partitionId);
-  }
-
-  @Override
   public boolean hasPartition(Integer partitionId) {
     return partitions.containsKey(partitionId);
   }
@@ -110,7 +98,7 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public long getPartitionVertexCount(int partitionId) {
+  public long getPartitionVertexCount(Integer partitionId) {
     Partition partition = partitions.get(partitionId);
     if (partition == null) {
       return 0;
@@ -120,7 +108,7 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public long getPartitionEdgeCount(int partitionId) {
+  public long getPartitionEdgeCount(Integer partitionId) {
     Partition partition = partitions.get(partitionId);
     if (partition == null) {
       return 0;
@@ -130,5 +118,64 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   }
 
   @Override
+  public void startIteration() {
+    if (partitionQueue != null && !partitionQueue.isEmpty()) {
+      throw new IllegalStateException("startIteration: It seems that some of " 
+
+          "of the partitions from previous iteration over partition store are" 
+
+          " not yet processed.");
+    }
+    partitionQueue =
+        new ArrayBlockingQueue<Partition<I, V, E>>(getNumPartitions());
+    for (Partition<I, V, E> partition : partitions.values()) {
+      partitionQueue.add(partition);
+    }
+  }
+
+  @Override
+  public Partition<I, V, E> getNextPartition() {
+    return partitionQueue.poll();
+  }
+
+  @Override
   public void putPartition(Partition<I, V, E> partition) { }
+
+  /**
+   * Get or create a partition.
+   * @param partitionId Partition Id
+   * @return The requested partition (never null)
+   */
+  private Partition<I, V, E> getOrCreatePartition(Integer partitionId) {
+    Partition<I, V, E> oldPartition = partitions.get(partitionId);
+    if (oldPartition == null) {
+      Partition<I, V, E> newPartition =
+          conf.createPartition(partitionId, context);
+      oldPartition = partitions.putIfAbsent(partitionId, newPartition);
+      if (oldPartition == null) {
+        return newPartition;
+      }
+    }
+    return oldPartition;
+  }
+
+  @Override
+  public void addPartitionVertices(Integer partitionId,
+      ExtendedDataOutput extendedDataOutput) {
+    VertexIterator<I, V, E> vertexIterator =
+        new VertexIterator<I, V, E>(extendedDataOutput, conf);
+
+    Partition<I, V, E> partition = getOrCreatePartition(partitionId);
+    partition.addPartitionVertices(vertexIterator);
+    putPartition(partition);
+  }
+
+  @Override
+  public void addPartitionEdges(Integer partitionId,
+      VertexIdEdges<I, E> edges) {
+    edgeStore.addPartitionEdges(partitionId, edges);
+  }
+
+  @Override
+  public void moveEdgesToVertices() {
+    edgeStore.moveEdgesToVertices();
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index ed9a492..e515caf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -26,13 +26,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -116,7 +114,6 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 /**
@@ -584,6 +581,7 @@ public class BspServiceWorker<I extends WritableComparable,
         startSuperstep();
     workerGraphPartitioner.updatePartitionOwners(
         getWorkerInfo(), masterSetPartitionOwners);
+    getPartitionStore().initialize();
 
 /*if[HADOOP_NON_SECURE]
     workerClient.setup();
@@ -697,7 +695,7 @@ else[HADOOP_NON_SECURE]*/
 
     if (getConfiguration().hasEdgeInputFormat()) {
       // Move edges from temporary storage to their source vertices.
-      getServerData().getEdgeStore().moveEdgesToVertices();
+      getServerData().getPartitionStore().moveEdgesToVertices();
     }
 
     // Generate the partition stats for the input superstep and process
@@ -1111,10 +1109,7 @@ else[HADOOP_NON_SECURE]*/
     final VertexOutputFormat<I, V, E> vertexOutputFormat =
         getConfiguration().createWrappedVertexOutputFormat();
 
-    final Queue<Integer> partitionIdQueue =
-        (numPartitions == 0) ? new LinkedList<Integer>() :
-            new ArrayBlockingQueue<Integer>(numPartitions);
-    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+    getPartitionStore().startIteration();
 
     long verticesToStore = 0;
     PartitionStore<I, V, E> partitionStore = getPartitionStore();
@@ -1142,14 +1137,13 @@ else[HADOOP_NON_SECURE]*/
             long nextPrintMsecs = System.currentTimeMillis() + 15000;
             int partitionIndex = 0;
             int numPartitions = getPartitionStore().getNumPartitions();
-            while (!partitionIdQueue.isEmpty()) {
-              Integer partitionId = partitionIdQueue.poll();
-              if (partitionId == null) {
+            while (true) {
+              Partition<I, V, E> partition =
+                  getPartitionStore().getNextPartition();
+              if (partition == null) {
                 break;
               }
 
-              Partition<I, V, E> partition =
-                  getPartitionStore().getOrCreatePartition(partitionId);
               long verticesWritten = 0;
               for (Vertex<I, V, E> vertex : partition) {
                 vertexWriter.writeVertex(vertex);
@@ -1239,10 +1233,7 @@ else[HADOOP_NON_SECURE]*/
     final EdgeOutputFormat<I, V, E> edgeOutputFormat =
         conf.createWrappedEdgeOutputFormat();
 
-    final Queue<Integer> partitionIdQueue =
-        (numPartitions == 0) ? new LinkedList<Integer>() :
-            new ArrayBlockingQueue<Integer>(numPartitions);
-    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+    getPartitionStore().startIteration();
 
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
       @Override
@@ -1259,14 +1250,13 @@ else[HADOOP_NON_SECURE]*/
             long nextPrintMsecs = System.currentTimeMillis() + 15000;
             int partitionIndex = 0;
             int numPartitions = getPartitionStore().getNumPartitions();
-            while (!partitionIdQueue.isEmpty()) {
-              Integer partitionId = partitionIdQueue.poll();
-              if (partitionId == null) {
+            while (true) {
+              Partition<I, V, E> partition =
+                  getPartitionStore().getNextPartition();
+              if (partition == null) {
                 break;
               }
 
-              Partition<I, V, E> partition =
-                  getPartitionStore().getOrCreatePartition(partitionId);
               long vertices = 0;
               long edges = 0;
               long partitionEdgeCount = partition.getEdgeCount();
@@ -1505,10 +1495,7 @@ else[HADOOP_NON_SECURE]*/
         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
         numPartitions);
 
-    final Queue<Integer> partitionIdQueue =
-        (numPartitions == 0) ? new LinkedList<Integer>() :
-            new ArrayBlockingQueue<Integer>(numPartitions);
-    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+    getPartitionStore().startIteration();
 
     final CompressionCodec codec =
         new CompressionCodecFactory(getConfiguration())
@@ -1525,13 +1512,14 @@ else[HADOOP_NON_SECURE]*/
 
           @Override
           public Void call() throws Exception {
-            while (!partitionIdQueue.isEmpty()) {
-              Integer partitionId = partitionIdQueue.poll();
-              if (partitionId == null) {
+            while (true) {
+              Partition<I, V, E> partition =
+                  getPartitionStore().getNextPartition();
+              if (partition == null) {
                 break;
               }
               Path path =
-                  createCheckpointFilePathSafe("_" + partitionId +
+                  createCheckpointFilePathSafe("_" + partition.getId() +
                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
 
               FSDataOutputStream uncompressedStream =
@@ -1542,8 +1530,6 @@ else[HADOOP_NON_SECURE]*/
                   new DataOutputStream(
                       codec.createOutputStream(uncompressedStream));
 
-              Partition<I, V, E> partition =
-                  getPartitionStore().getOrCreatePartition(partitionId);
 
               partition.write(stream);
 
@@ -1966,4 +1952,15 @@ else[HADOOP_NON_SECURE]*/
     }
     return globalStats;
   }
+
+  @Override
+  public int getNumPartitionsOwned() {
+    int count = 0;
+    for (PartitionOwner partitionOwner : getPartitionOwners()) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+        count++;
+      }
+    }
+    return count;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index b9ef1e3..b7bec1c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -133,7 +133,7 @@ public class RequestTest {
     assertTrue(partitionStore.hasPartition(partitionId));
     int total = 0;
     Partition<IntWritable, IntWritable, IntWritable> partition2 =
-        partitionStore.getOrCreatePartition(partitionId);
+        partitionStore.removePartition(partitionId);
     for (Vertex<IntWritable, IntWritable, IntWritable> vertex : partition2) {
       total += vertex.getId().get();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
 
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index b8137c0..611b021 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -75,8 +75,8 @@ public class TestIntFloatPrimitiveMessageStores {
         Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
-    Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
-    Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
+    Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
+    Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
 
     GiraphConfiguration initConf = new GiraphConfiguration();
     initConf.setComputationClass(IntFloatNoOpComputation.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
 
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index 5903eb8..2027628 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -72,8 +72,8 @@ public class TestLongDoublePrimitiveMessageStores {
         Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
-    Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
-    Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
+    Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
+    Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
   }
 
   private static class LongDoubleNoOpComputation extends

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 88e66a6..78e663d 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -18,23 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
-import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
-import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
-import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_GRAPH;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -46,6 +31,7 @@ import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
+import org.apache.giraph.ooc.DiskBackedPartitionStore;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.giraph.utils.NoOpComputation;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
@@ -60,8 +46,20 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test case for partition stores.
@@ -73,10 +71,11 @@ public class TestPartitionStores {
   private Mapper<?, ?, ?, ?>.Context context;
 
   /* these static variables are used for the multithreaded tests */
-  private static final int NUM_OF_VERTEXES_PER_THREAD = 10;
+  private static final int NUM_OF_VERTEXES_PER_PARTITION = 20;
   private static final int NUM_OF_EDGES_PER_VERTEX = 5;
-  private static final int NUM_OF_THREADS = 10;
-  private static final int NUM_OF_PARTITIONS = 3;
+  private static final int NUM_OF_THREADS = 8;
+  private static final int NUM_OF_PARTITIONS = 30;
+  private static final int NUM_PARTITIONS_IN_MEMORY = 12;
 
   public static class MyComputation extends NoOpComputation<IntWritable,
       IntWritable, NullWritable, IntWritable> { }
@@ -105,9 +104,11 @@ public class TestPartitionStores {
 
   @Test
   public void testSimplePartitionStore() {
+    CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
+    serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
     PartitionStore<IntWritable, IntWritable, NullWritable>
-        partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
-                NullWritable>(conf, context);
+    partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
+                NullWritable>(conf, context, serviceWorker);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
   }
@@ -149,11 +150,10 @@ public class TestPartitionStores {
     assertEquals(0, deserializatedPartition.getEdgeCount());
     assertEquals(7, deserializatedPartition.getVertexCount());
   }
-  
+
   @Test
   public void testDiskBackedPartitionStoreWithByteArrayPartition()
     throws IOException {
-
     File directory = Files.createTempDir();
     GiraphConstants.PARTITIONS_DIRECTORY.set(
         conf, new File(directory, "giraph_partitions").toString());
@@ -164,7 +164,7 @@ public class TestPartitionStores {
     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
       serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
-      BspService.INPUT_SUPERSTEP);
+        BspService.INPUT_SUPERSTEP);
 
     PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
         new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
@@ -186,7 +186,7 @@ public class TestPartitionStores {
     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
 
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
-      BspService.INPUT_SUPERSTEP);
+        BspService.INPUT_SUPERSTEP);
 
     PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
         new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
@@ -194,18 +194,43 @@ public class TestPartitionStores {
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
 
-    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 2);
-    partitionStore = new DiskBackedPartitionStore<IntWritable,
-            IntWritable, NullWritable>(conf, context, serviceWorker);
-    testReadWrite(partitionStore, conf);
-    partitionStore.shutdown();
+    FileUtils.deleteDirectory(directory);
+  }
+
+  @Test
+  public void testDiskBackedPartitionStoreComputation() throws Exception {
+    Iterable<String> results;
+    String[] graph =
+        {
+            "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
+            "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
+        };
+    String[] expected =
+        {
+            "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
+            "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
+        };
+
+    GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+    GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
+
+    File directory = Files.createTempDir();
+    GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
+        new File(directory, "giraph_partitions").toString());
+
+    conf.setComputationClass(EmptyComputation.class);
+    
conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    results = InternalVertexRunner.run(conf, graph);
+    checkResults(results, expected);
     FileUtils.deleteDirectory(directory);
   }
 
   @Test
   public void testDiskBackedPartitionStoreWithByteArrayComputation()
     throws Exception {
-
     Iterable<String> results;
     String[] graph =
     {
@@ -218,12 +243,12 @@ public class TestPartitionStores {
       "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
     };
 
-    USE_OUT_OF_CORE_GRAPH.set(conf, true);
-    MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
-    USER_PARTITION_COUNT.set(conf, 10);
+    GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+    GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
 
     File directory = Files.createTempDir();
-    PARTITIONS_DIRECTORY.set(conf,
+    GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
       new File(directory, "giraph_partitions").toString());
 
     conf.setPartitionClass(ByteArrayPartition.class);
@@ -238,17 +263,24 @@ public class TestPartitionStores {
 
   @Test
   public void testDiskBackedPartitionStoreMT() throws Exception {
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 
NUM_PARTITIONS_IN_MEMORY);
     GiraphConstants.STATIC_GRAPH.set(conf, false);
     testMultiThreaded();
   }
 
-  /*
+
   @Test
   public void testDiskBackedPartitionStoreMTStatic() throws Exception {
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 
NUM_PARTITIONS_IN_MEMORY);
+    GiraphConstants.STATIC_GRAPH.set(conf, true);
+    testMultiThreaded();
+  }
+
+  @Test
+  public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
     GiraphConstants.STATIC_GRAPH.set(conf, true);
     testMultiThreaded();
   }
-  */
 
   private void testMultiThreaded() throws Exception {
     final AtomicInteger vertexCounter = new AtomicInteger(0);
@@ -260,23 +292,26 @@ public class TestPartitionStores {
     GiraphConstants.PARTITIONS_DIRECTORY.set(
         conf, new File(directory, "giraph_partitions").toString());
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
-    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
 
     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
 
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
-      BspService.INPUT_SUPERSTEP);
+        BspService.INPUT_SUPERSTEP);
 
     PartitionStore<IntWritable, IntWritable, NullWritable> store =
         new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
             conf, context, serviceWorker);
+    store.initialize();
 
     // Create a new Graph in memory using multiple threads
     for (int i = 0; i < NUM_OF_THREADS; ++i) {
-      int partitionId = i % NUM_OF_PARTITIONS;
+      List<Integer> partitionIds = new ArrayList<Integer>();
+      for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) {
+        partitionIds.add(id);
+      }
       Worker worker =
-        new Worker(vertexCounter, store, partitionId, conf);
+        new Worker(vertexCounter, store, partitionIds, conf);
       executor.submit(worker, new Boolean(true));
     }
     for (int i = 0; i < NUM_OF_THREADS; ++i)
@@ -291,19 +326,22 @@ public class TestPartitionStores {
       totalVertexes += store.getPartitionVertexCount(i);
       totalEdges += store.getPartitionEdgeCount(i);
     }
-    assert vertexCounter.get() == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD;
-    assert totalVertexes == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD;
+
+    assert vertexCounter.get() == NUM_OF_PARTITIONS * 
NUM_OF_VERTEXES_PER_PARTITION;
+    assert totalVertexes == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
     assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX;
 
     // Check the content of the vertices
     int expected = 0;
-    for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD * 
NUM_OF_VERTEXES_PER_THREAD; ++i) {
+    for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION * NUM_OF_PARTITIONS; 
++i) {
       expected += i;
     }
     int totalValues = 0;
+    store.startIteration();
     for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
-      partition = store.getOrCreatePartition(i);
-      Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes = 
+      partition = store.getNextPartition();
+      assert partition != null;
+      Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes =
         partition.iterator();
 
       while (vertexes.hasNext()) {
@@ -313,39 +351,23 @@ public class TestPartitionStores {
       store.putPartition(partition);
     }
     assert totalValues == expected;
-    
+
     store.shutdown();
   }
 
-  @Test
-  public void testDiskBackedPartitionStoreComputation() throws Exception {
-    Iterable<String> results;
-    String[] graph =
-    {
-      "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
-      "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
-    };
-    String[] expected =
-    {
-      "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
-      "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
-    };
-
-    USE_OUT_OF_CORE_GRAPH.set(conf, true);
-    MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
-    USER_PARTITION_COUNT.set(conf, 10);
-
-    File directory = Files.createTempDir();
-    PARTITIONS_DIRECTORY.set(conf,
-      new File(directory, "giraph_partitions").toString());
-
-    conf.setComputationClass(EmptyComputation.class);
-    
conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
-    results = InternalVertexRunner.run(conf, graph);
-    checkResults(results, expected);
-    FileUtils.deleteDirectory(directory);
+  private Partition<IntWritable, IntWritable, NullWritable>
+  getPartition(PartitionStore<IntWritable, IntWritable,
+      NullWritable> partitionStore, int partitionId) {
+    partitionStore.startIteration();
+    Partition p;
+    Partition result = null;
+    while ((p = partitionStore.getNextPartition()) != null) {
+      if (p.getId() == partitionId) {
+        result = p;
+      }
+      partitionStore.putPartition(p);
+    }
+    return result;
   }
 
   /**
@@ -376,35 +398,26 @@ public class TestPartitionStores {
     v7.addEdge(EdgeFactory.create(new IntWritable(1)));
     v7.addEdge(EdgeFactory.create(new IntWritable(2)));
 
-    partitionStore.addPartition(createPartition(conf, 1, v1, v2));
-    partitionStore.addPartition(createPartition(conf, 2, v3));
-    partitionStore.addPartition(createPartition(conf, 2, v4));
+    partitionStore.addPartition(createPartition(conf, 1, v1, v2, v6));
+    partitionStore.addPartition(createPartition(conf, 2, v3, v4));
     partitionStore.addPartition(createPartition(conf, 3, v5));
-    partitionStore.addPartition(createPartition(conf, 1, v6));
     partitionStore.addPartition(createPartition(conf, 4, v7));
 
-    Partition<IntWritable, IntWritable, NullWritable> partition1 =
-        partitionStore.getOrCreatePartition(1);
-    partitionStore.putPartition(partition1);
-    Partition<IntWritable, IntWritable, NullWritable> partition2 =
-        partitionStore.getOrCreatePartition(2);
-    partitionStore.putPartition(partition2);
-    Partition<IntWritable, IntWritable, NullWritable> partition3 =
-        partitionStore.removePartition(3);
-    Partition<IntWritable, IntWritable, NullWritable> partition4 =
-        partitionStore.getOrCreatePartition(4);
-    partitionStore.putPartition(partition4);
+    getPartition(partitionStore, 1);
+    getPartition(partitionStore, 2);
+    partitionStore.removePartition(3);
+    getPartition(partitionStore, 4);
 
     assertEquals(3, partitionStore.getNumPartitions());
     assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
     int partitionsNumber = 0;
-    for (Integer partitionId : partitionStore.getPartitionIds()) {
-      Partition<IntWritable, IntWritable, NullWritable> p =
-          partitionStore.getOrCreatePartition(partitionId);
+
+    partitionStore.startIteration();
+    Partition<IntWritable, IntWritable, NullWritable> p;
+    while ((p = partitionStore.getNextPartition()) != null) {
       partitionStore.putPartition(p);
       partitionsNumber++;
     }
-    Partition<IntWritable, IntWritable, NullWritable> partition;
     assertEquals(3, partitionsNumber);
     assertTrue(partitionStore.hasPartition(1));
     assertTrue(partitionStore.hasPartition(2));
@@ -414,8 +427,6 @@ public class TestPartitionStores {
     assertEquals(2, partitionStore.getPartitionVertexCount(2));
     assertEquals(1, partitionStore.getPartitionVertexCount(4));
     assertEquals(2, partitionStore.getPartitionEdgeCount(4));
-    partitionStore.deletePartition(2);
-    assertEquals(2, partitionStore.getNumPartitions());
   }
 
   /**
@@ -463,12 +474,12 @@ public class TestPartitionStores {
   public void testEdgeCombineWithSimplePartition() throws IOException {
     testEdgeCombine(SimplePartition.class);
   }
- 
+
   @Test
   public void testEdgeCombineWithByteArrayPartition() throws IOException {
     testEdgeCombine(ByteArrayPartition.class);
   }
- 
+
   private void testEdgeCombine(Class<? extends Partition> partitionClass)
       throws IOException {
     Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
@@ -512,39 +523,40 @@ public class TestPartitionStores {
     private final AtomicInteger vertexCounter;
     private final PartitionStore<IntWritable, IntWritable, NullWritable>
       partitionStore;
-    private final int partitionId;
+    private final List<Integer> partitionIds;
     private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
             NullWritable> conf;
 
     public Worker(AtomicInteger vertexCounter,
         PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore,
-        int partitionId,
+        List<Integer> partitionIds,
         ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
           NullWritable> conf) {
 
       this.vertexCounter = vertexCounter;
       this.partitionStore = partitionStore;
-      this.partitionId = partitionId;
+      this.partitionIds = partitionIds;
       this.conf = conf;
     }
 
     public void run() {
-      for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD; ++i) {
-        int id = vertexCounter.getAndIncrement();
-        Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex();
-        v.initialize(new IntWritable(id), new IntWritable(id));
-
+      for (int partitionId : partitionIds) {
         Partition<IntWritable, IntWritable, NullWritable> partition =
-          partitionStore.getOrCreatePartition(partitionId);
-
-        Random rand = new Random(id);
-        for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) {
-          int dest = rand.nextInt(id + 1);
-          v.addEdge(EdgeFactory.create(new IntWritable(dest)));
+            conf.createPartition(partitionId, context);
+        for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION; ++i) {
+          int id = vertexCounter.getAndIncrement();
+          Vertex<IntWritable, IntWritable, NullWritable> v = 
conf.createVertex();
+          v.initialize(new IntWritable(id), new IntWritable(id));
+
+          Random rand = new Random(id);
+          for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) {
+            int dest = rand.nextInt(id + 1);
+            v.addEdge(EdgeFactory.create(new IntWritable(dest)));
+          }
+
+          partition.putVertex(v);
         }
-
-        partition.putVertex(v);
-        partitionStore.putPartition(partition);
+        partitionStore.addPartition(partition);
       }
     }
   }

Reply via email to