http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java deleted file mode 100644 index 7368420..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java +++ /dev/null @@ -1,1300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY; -import static org.apache.giraph.conf.GiraphConstants.MAX_STICKY_PARTITIONS; -import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS; -import static org.apache.giraph.conf.GiraphConstants.NUM_INPUT_THREADS; -import static org.apache.giraph.conf.GiraphConstants.NUM_OUTPUT_THREADS; -import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.edge.OutEdges; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.log4j.Logger; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - -/** - * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis. - * The operations are thread-safe, and guarantees safety for concurrent - * execution of different operations on each partition.<br /> - * <br /> - * The algorithm implemented by this class is quite intricate due to the - * interaction of several locks to guarantee performance. For this reason, here - * follows an overview of the implemented algorithm.<br /> - * <b>ALGORITHM</b>: - * In general, the partition store keeps N partitions in memory. To improve - * I/O performance, part of the N partitions are kept in memory in a sticky - * manner while preserving the capability of each thread to swap partitions on - * the disk. This means that, for T threads, at least T partitions must remain - * non-sticky. The number of sicky partitions can also be specified manually. - * <b>CONCURRENCY</b>: - * <ul> - * <li> - * <b>Meta Partition Lock</b>.All the partitions are held in a - * container, called the MetaPartition. This object contains also meta - * information about the partition. All these objects are used to - * atomically operate on partitions. In fact, each time a thread accesses - * a partition, it will firstly acquire a lock on the container, - * guaranteeing exclusion in managing the partition. Besides, this - * partition-based lock allows the threads to concurrently operate on - * different partitions, guaranteeing performance. - * </li> - * <li> - * <b>Meta Partition Container</b>. All the references to the meta - * partition objects are kept in a concurrent hash map. This ADT guarantees - * performance and atomic access to each single reference, which is then - * use for atomic operations on partitions, as previously described. - * </li> - * <li> - * <b>LRU Lock</b>. Finally, an additional ADT is used to keep the LRU - * order of unused partitions. In this case, the ADT is not thread safe and - * to guarantee safety, we use its intrinsic lock. Additionally, this lock - * is used for all in memory count changes. In fact, the amount of elements - * in memory and the inactive partitions are very strictly related and this - * justifies the sharing of the same lock. - * </li> - * </ul> - * <b>XXX</b>:<br/> - * while most of the concurrent behaviors are gracefully handled, the - * concurrent call of {@link #getOrCreatePartition(Integer partitionId)} and - * {@link #deletePartition(Integer partitionId)} need yet to be handled. Since - * the usage of this class does not currently incure in this type of concurrent - * behavior, it has been left as a future work. - * - * @param <I> Vertex id - * @param <V> Vertex data - * @param <E> Edge data - */ -@SuppressWarnings("rawtypes") -public class DiskBackedPartitionStore<I extends WritableComparable, - V extends Writable, E extends Writable> - extends PartitionStore<I, V, E> { - /** Class logger. */ - private static final Logger LOG = - Logger.getLogger(DiskBackedPartitionStore.class); - /** States the partition can be found in */ - private enum State { INIT, ACTIVE, INACTIVE, ONDISK }; - - /** Hash map containing all the partitions */ - private final ConcurrentMap<Integer, MetaPartition> partitions = - Maps.newConcurrentMap(); - /** Inactive partitions to re-activate or spill to disk to make space */ - private final Map<Integer, MetaPartition> lru = Maps.newLinkedHashMap(); - - /** Giraph configuration */ - private final - ImmutableClassesGiraphConfiguration<I, V, E> conf; - /** Mapper context */ - private final Context context; - /** Base path where the partition files are written to */ - private final String[] basePaths; - /** Used to hash partition Ids */ - private final HashFunction hasher = Hashing.murmur3_32(); - /** Maximum number of slots. Read-only value, no need for concurrency - safety. */ - private final int maxPartitionsInMem; - /** Number of slots used */ - private AtomicInteger numPartitionsInMem; - /** service worker reference */ - private CentralizedServiceWorker<I, V, E> serviceWorker; - /** mumber of slots that are always kept in memory */ - private AtomicLong numOfStickyPartitions; - /** counter */ - private long passedThroughEdges; - - /** - * Constructor - * - * @param conf Configuration - * @param context Context - * @param serviceWorker service worker reference - */ - public DiskBackedPartitionStore( - ImmutableClassesGiraphConfiguration<I, V, E> conf, - Mapper<?, ?, ?, ?>.Context context, - CentralizedServiceWorker<I, V, E> serviceWorker) { - this.conf = conf; - this.context = context; - this.serviceWorker = serviceWorker; - - this.passedThroughEdges = 0; - this.numPartitionsInMem = new AtomicInteger(0); - - // We must be able to hold at least one partition in memory - this.maxPartitionsInMem = Math.max(MAX_PARTITIONS_IN_MEMORY.get(conf), 1); - - int numInputThreads = NUM_INPUT_THREADS.get(conf); - int numComputeThreads = NUM_COMPUTE_THREADS.get(conf); - int numOutputThreads = NUM_OUTPUT_THREADS.get(conf); - - int maxThreads = - Math.max(numInputThreads, - Math.max(numComputeThreads, numOutputThreads)); - - // check if the sticky partition option is set and, if so, set the - long maxSticky = MAX_STICKY_PARTITIONS.get(conf); - - // number of sticky partitions - if (maxSticky > 0 && maxPartitionsInMem - maxSticky >= maxThreads) { - this.numOfStickyPartitions = new AtomicLong(maxSticky); - } else { - if (maxPartitionsInMem - maxSticky >= maxThreads) { - if (LOG.isInfoEnabled()) { - LOG.info("giraph.maxSticky parameter unset or improperly set " + - "resetting to automatically computed value."); - } - } - - if (maxPartitionsInMem == 1 || maxThreads >= maxPartitionsInMem) { - this.numOfStickyPartitions = new AtomicLong(0); - } else { - this.numOfStickyPartitions = - new AtomicLong(maxPartitionsInMem - maxThreads); - } - } - - // Take advantage of multiple disks - String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf); - basePaths = new String[userPaths.length]; - int i = 0; - for (String path : userPaths) { - basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job"); - } - if (LOG.isInfoEnabled()) { - LOG.info("DiskBackedPartitionStore with maxInMemoryPartitions=" + - maxPartitionsInMem + ", isStaticGraph=" + conf.isStaticGraph()); - } - } - - @Override - public Iterable<Integer> getPartitionIds() { - return Iterables.unmodifiableIterable(partitions.keySet()); - } - - @Override - public boolean hasPartition(final Integer id) { - return partitions.containsKey(id); - } - - @Override - public int getNumPartitions() { - return partitions.size(); - } - - @Override - public long getPartitionVertexCount(int partitionId) { - MetaPartition meta = partitions.get(partitionId); - if (meta.getState() == State.ONDISK) { - return meta.getVertexCount(); - } else { - return meta.getPartition().getVertexCount(); - } - } - - @Override - public long getPartitionEdgeCount(int partitionId) { - MetaPartition meta = partitions.get(partitionId); - if (meta.getState() == State.ONDISK) { - return meta.getEdgeCount(); - } else { - return meta.getPartition().getEdgeCount(); - } - } - - @Override - public Partition<I, V, E> getOrCreatePartition(Integer id) { - MetaPartition meta = new MetaPartition(id); - MetaPartition temp; - - temp = partitions.putIfAbsent(id, meta); - if (temp != null) { - meta = temp; - } - - synchronized (meta) { - if (temp == null && numOfStickyPartitions.getAndDecrement() > 0) { - meta.setSticky(); - } - getPartition(meta); - if (meta.getPartition() == null) { - Partition<I, V, E> partition = conf.createPartition(id, context); - meta.setPartition(partition); - addPartition(meta, partition); - if (meta.getState() == State.INIT) { - LOG.warn("Partition was still INIT after ADD (not possibile)."); - } - // This get is necessary. When creating a new partition, it will be - // placed by default as INACTIVE. However, here the user will retrieve - // a reference to it, and hence there is the need to update the - // reference count, as well as the state of the object. - getPartition(meta); - if (meta.getState() == State.INIT) { - LOG.warn("Partition was still INIT after GET (not possibile)."); - } - } - - if (meta.getState() == State.INIT) { - String msg = "Getting a partition which is in INIT state is " + - "not allowed. " + meta; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - return meta.getPartition(); - } - } - - @Override - public void putPartition(Partition<I, V, E> partition) { - Integer id = partition.getId(); - MetaPartition meta = partitions.get(id); - putPartition(meta); - } - - @Override - public void deletePartition(Integer id) { - if (hasPartition(id)) { - MetaPartition meta = partitions.get(id); - deletePartition(meta); - } - } - - @Override - public Partition<I, V, E> removePartition(Integer id) { - if (hasPartition(id)) { - MetaPartition meta; - - meta = partitions.get(id); - synchronized (meta) { - getPartition(meta); - putPartition(meta); - deletePartition(id); - } - return meta.getPartition(); - } - return null; - } - - @Override - public void addPartition(Partition<I, V, E> partition) { - Integer id = partition.getId(); - MetaPartition meta = new MetaPartition(id); - MetaPartition temp; - - meta.setPartition(partition); - temp = partitions.putIfAbsent(id, meta); - if (temp != null) { - meta = temp; - } - - synchronized (meta) { - if (temp == null && numOfStickyPartitions.getAndDecrement() > 0) { - meta.setSticky(); - } - addPartition(meta, partition); - } - } - - @Override - public void shutdown() { - for (MetaPartition e : partitions.values()) { - if (e.getState() == State.ONDISK) { - deletePartitionFiles(e.getId()); - } - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - for (MetaPartition e : partitions.values()) { - sb.append(e.toString() + "\n"); - } - return sb.toString(); - } - - /** - * Writes vertex data (Id, value and halted state) to stream. - * - * @param output The output stream - * @param vertex The vertex to serialize - * @throws IOException - */ - private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex) - throws IOException { - - vertex.getId().write(output); - vertex.getValue().write(output); - output.writeBoolean(vertex.isHalted()); - } - - /** - * Writes vertex edges (Id, edges) to stream. - * - * @param output The output stream - * @param vertex The vertex to serialize - * @throws IOException - */ - @SuppressWarnings("unchecked") - private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex) - throws IOException { - - vertex.getId().write(output); - OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges(); - edges.write(output); - } - - /** - * Read vertex data from an input and initialize the vertex. - * - * @param in The input stream - * @param vertex The vertex to initialize - * @throws IOException - */ - private void readVertexData(DataInput in, Vertex<I, V, E> vertex) - throws IOException { - - I id = conf.createVertexId(); - id.readFields(in); - V value = conf.createVertexValue(); - value.readFields(in); - OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0); - vertex.initialize(id, value, edges); - if (in.readBoolean()) { - vertex.voteToHalt(); - } else { - vertex.wakeUp(); - } - } - - /** - * Read vertex edges from an input and set them to the vertex. - * - * @param in The input stream - * @param partition The partition owning the vertex - * @throws IOException - */ - @SuppressWarnings("unchecked") - private void readOutEdges(DataInput in, Partition<I, V, E> partition) - throws IOException { - - I id = conf.createVertexId(); - id.readFields(in); - Vertex<I, V, E> v = partition.getVertex(id); - OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges(); - edges.readFields(in); - partition.saveVertex(v); - } - - /** - * Load a partition from disk. It deletes the files after the load, - * except for the edges, if the graph is static. - * - * @param id The id of the partition to load - * @param numVertices The number of vertices contained on disk - * @return The partition - * @throws IOException - */ - private Partition<I, V, E> loadPartition(int id, long numVertices) - throws IOException { - - Partition<I, V, E> partition = conf.createPartition(id, context); - - // Vertices - File file = new File(getVerticesPath(id)); - if (LOG.isDebugEnabled()) { - LOG.debug("loadPartition: loading partition vertices " + - partition.getId() + " from " + file.getAbsolutePath()); - } - - FileInputStream filein = new FileInputStream(file); - BufferedInputStream bufferin = new BufferedInputStream(filein); - DataInputStream inputStream = new DataInputStream(bufferin); - for (int i = 0; i < numVertices; ++i) { - Vertex<I, V , E> vertex = conf.createVertex(); - readVertexData(inputStream, vertex); - partition.putVertex(vertex); - } - inputStream.close(); - if (!file.delete()) { - String msg = "loadPartition: failed to delete " + file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } - - // Edges - file = new File(getEdgesPath(id)); - - if (LOG.isDebugEnabled()) { - LOG.debug("loadPartition: loading partition edges " + - partition.getId() + " from " + file.getAbsolutePath()); - } - - filein = new FileInputStream(file); - bufferin = new BufferedInputStream(filein); - inputStream = new DataInputStream(bufferin); - for (int i = 0; i < numVertices; ++i) { - readOutEdges(inputStream, partition); - } - inputStream.close(); - // If the graph is static, keep the file around. - if (!conf.isStaticGraph() && !file.delete()) { - String msg = "loadPartition: failed to delete " + file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } - return partition; - } - - /** - * Write a partition to disk. - * - * @param meta meta partition containing the partition to offload - * @throws IOException - */ - private void offloadPartition(MetaPartition meta) throws IOException { - - Partition<I, V, E> partition = meta.getPartition(); - File file = new File(getVerticesPath(partition.getId())); - File parent = file.getParentFile(); - if (!parent.exists() && !parent.mkdirs() && LOG.isDebugEnabled()) { - LOG.debug("offloadPartition: directory " + parent.getAbsolutePath() + - " already exists."); - } - - if (!file.createNewFile()) { - String msg = "offloadPartition: file " + parent.getAbsolutePath() + - " already exists."; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("offloadPartition: writing partition vertices " + - partition.getId() + " to " + file.getAbsolutePath()); - } - - FileOutputStream fileout = new FileOutputStream(file); - BufferedOutputStream bufferout = new BufferedOutputStream(fileout); - DataOutputStream outputStream = new DataOutputStream(bufferout); - for (Vertex<I, V, E> vertex : partition) { - writeVertexData(outputStream, vertex); - } - outputStream.close(); - - // Avoid writing back edges if we have already written them once and - // the graph is not changing. - // If we are in the input superstep, we need to write the files - // at least the first time, even though the graph is static. - file = new File(getEdgesPath(partition.getId())); - if (meta.getPrevVertexCount() != partition.getVertexCount() || - !conf.isStaticGraph() || !file.exists()) { - - meta.setPrevVertexCount(partition.getVertexCount()); - - if (!file.createNewFile() && LOG.isDebugEnabled()) { - LOG.debug("offloadPartition: file " + file.getAbsolutePath() + - " already exists."); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("offloadPartition: writing partition edges " + - partition.getId() + " to " + file.getAbsolutePath()); - } - - fileout = new FileOutputStream(file); - bufferout = new BufferedOutputStream(fileout); - outputStream = new DataOutputStream(bufferout); - for (Vertex<I, V, E> vertex : partition) { - writeOutEdges(outputStream, vertex); - } - outputStream.close(); - } - } - - /** - * Append a partition on disk at the end of the file. Expects the caller - * to hold the global lock. - * - * @param meta meta partition container for the partitiont to save - * to disk - * @param partition The partition - * @throws IOException - */ - private void addToOOCPartition(MetaPartition meta, - Partition<I, V, E> partition) throws IOException { - - Integer id = partition.getId(); - File file = new File(getVerticesPath(id)); - DataOutputStream outputStream = null; - - FileOutputStream fileout = new FileOutputStream(file, true); - BufferedOutputStream bufferout = new BufferedOutputStream(fileout); - outputStream = new DataOutputStream(bufferout); - for (Vertex<I, V, E> vertex : partition) { - writeVertexData(outputStream, vertex); - } - outputStream.close(); - - file = new File(getEdgesPath(id)); - fileout = new FileOutputStream(file, true); - bufferout = new BufferedOutputStream(fileout); - outputStream = new DataOutputStream(bufferout); - for (Vertex<I, V, E> vertex : partition) { - writeOutEdges(outputStream, vertex); - } - outputStream.close(); - } - - /** - * Delete a partition's files. - * - * @param id The id of the partition owning the file. - */ - public void deletePartitionFiles(Integer id) { - // File containing vertices - File file = new File(getVerticesPath(id)); - if (file.exists() && !file.delete()) { - String msg = "deletePartitionFiles: Failed to delete file " + - file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } - - // File containing edges - file = new File(getEdgesPath(id)); - if (file.exists() && !file.delete()) { - String msg = "deletePartitionFiles: Failed to delete file " + - file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } - } - - /** - * Get the path and basename of the storage files. - * - * @param partitionId The partition - * @return The path to the given partition - */ - private String getPartitionPath(Integer partitionId) { - int hash = hasher.hashInt(partitionId).asInt(); - int idx = Math.abs(hash % basePaths.length); - return basePaths[idx] + "/partition-" + partitionId; - } - - /** - * Get the path to the file where vertices are stored. - * - * @param partitionId The partition - * @return The path to the vertices file - */ - private String getVerticesPath(Integer partitionId) { - return getPartitionPath(partitionId) + "_vertices"; - } - - /** - * Get the path to the file where edges are stored. - * - * @param partitionId The partition - * @return The path to the edges file - */ - private String getEdgesPath(Integer partitionId) { - return getPartitionPath(partitionId) + "_edges"; - } - - /** - * Removes and returns the last recently used entry. - * - * @return The last recently used entry. - */ - private MetaPartition getLRUPartition() { - synchronized (lru) { - Iterator<Entry<Integer, MetaPartition>> i = - lru.entrySet().iterator(); - Entry<Integer, MetaPartition> entry = i.next(); - i.remove(); - return entry.getValue(); - } - } - - /** - * Method that gets a partition from the store. - * The partition is produced as a side effect of the computation and is - * reflected inside the META object provided as parameter. - * This function is thread-safe since it locks the whole computation - * on the metapartition provided.<br /> - * <br /> - * <b>ONDISK case</b><br /> - * When a thread tries to access an element on disk, it waits until it - * space in memory and inactive data to swap resources. - * It is possible that multiple threads wait for a single - * partition to be restored from disk. The semantic of this - * function is that only the first thread interested will be - * in charge to load it back to memory, hence waiting on 'lru'. - * The others, will be waiting on the lock to be available, - * preventing consistency issues.<br /> - * <br /> - * <b>Deadlock</b><br /> - * The code of this method can in principle lead to a deadlock, due to the - * fact that two locks are held together while running the "wait" method. - * However, this problem does not occur. The two locks held are: - * <ol> - * <li><b>Meta Object</b>, which is the object the thread is trying to - * acquire and is currently stored on disk.</li> - * <li><b>LRU data structure</b>, which keeps track of the objects which are - * inactive and hence swappable.</li> - * </ol> - * It is not possible that two getPartition calls cross because this means - * that LRU objects are both INACTIVE and ONDISK at the same time, which is - * not possible. - * - * @param meta meta partition container with the partition itself - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value = "TLW_TWO_LOCK_WAIT", - justification = "The two locks held do not produce a deadlock") - private void getPartition(MetaPartition meta) { - synchronized (meta) { - boolean isNotDone = true; - - if (meta.getState() != State.INIT) { - State state; - - while (isNotDone) { - state = meta.getState(); - switch (state) { - case ONDISK: - MetaPartition swapOutPartition = null; - long numVertices = meta.getVertexCount(); - - synchronized (lru) { - try { - while (numPartitionsInMem.get() >= maxPartitionsInMem && - lru.isEmpty()) { - // notification for threads waiting on this are - // required in two cases: - // a) when an element is added to the LRU (hence - // a new INACTIVE partition is added). - // b) when additioanl space is available in memory (hence - // in memory counter is decreased). - lru.wait(); - } - } catch (InterruptedException e) { - LOG.error("getPartition: error while waiting on " + - "LRU data structure: " + e.getMessage()); - throw new IllegalStateException(e); - } - - // We have to make some space first, by removing the least used - // partition (hence the first in the LRU data structure). - // - // NB: In case the LRU is not empty, we are _swapping_ elements. - // This, means that there is no need to "make space" by - // changing the in-memory counter. Differently, if the element - // can directly be placed into memory, the memory usage - // increases by one. - if (numPartitionsInMem.get() >= maxPartitionsInMem && - !lru.isEmpty()) { - swapOutPartition = getLRUPartition(); - } else if (numPartitionsInMem.get() < maxPartitionsInMem) { - numPartitionsInMem.getAndIncrement(); - } else { - String msg = "lru is empty and there is not space in memory, " + - "hence the partition cannot be loaded."; - LOG.error(msg); - throw new IllegalStateException(msg); - } - } - - if (swapOutPartition != null) { - synchronized (swapOutPartition) { - if (swapOutPartition.isSticky()) { - String msg = "Partition " + meta.getId() + " is sticky " + - " and cannot be offloaded."; - LOG.error(msg); - throw new IllegalStateException(msg); - } - // safety check - if (swapOutPartition.getState() != State.INACTIVE) { - String msg = "Someone is holding the partition with id " + - swapOutPartition.getId() + " but is supposed to be " + - "inactive."; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - try { - offloadPartition(swapOutPartition); - Partition<I, V, E> p = swapOutPartition.getPartition(); - swapOutPartition.setOnDisk(p); - // notify all the threads waiting to the offloading process, - // that they are allowed again to access the - // swapped-out object. - swapOutPartition.notifyAll(); - } catch (IOException e) { - LOG.error("getPartition: Failed while Offloading " + - "New Partition: " + e.getMessage()); - throw new IllegalStateException(e); - } - } - } - - // If it was needed, the partition to be swpped out is on disk. - // Additionally, we are guaranteed that we have a free spot in - // memory, in fact, - // a) either there was space in memory, and hence the in memory - // counter was incremented reserving the space for this element. - // b) or the space has been created by swapping out the partition - // that was inactive in the LRU. - // This means that, even in the case that concurrently swapped - // element is restored back to memory, there must have been - // place for only an additional partition. - Partition<I, V, E> partition; - try { - partition = loadPartition(meta.getId(), numVertices); - } catch (IOException e) { - LOG.error("getPartition: Failed while Loading Partition from " + - "disk: " + e.getMessage()); - throw new IllegalStateException(e); - } - meta.setActive(partition); - - isNotDone = false; - break; - case INACTIVE: - MetaPartition p = null; - - if (meta.isSticky()) { - meta.setActive(); - isNotDone = false; - break; - } - - synchronized (lru) { - p = lru.remove(meta.getId()); - } - if (p == meta && p.getState() == State.INACTIVE) { - meta.setActive(); - isNotDone = false; - } else { - try { - // A thread could wait here when an inactive partition is - // concurrently swapped to disk. In fact, the meta object is - // locked but, even though the object is inactive, it is not - // present in the LRU ADT. - // The thread need to be signaled when the partition is - // finally swapped out of the disk. - meta.wait(); - } catch (InterruptedException e) { - LOG.error("getPartition: error while waiting on " + - "previously Inactive Partition: " + e.getMessage()); - throw new IllegalStateException(e); - } - isNotDone = true; - } - break; - case ACTIVE: - meta.incrementReferences(); - isNotDone = false; - break; - default: - throw new IllegalStateException("illegal state " + meta.getState() + - " for partition " + meta.getId()); - } - } - } - } - } - - /** - * Method that puts a partition back to the store. This function is - * thread-safe using meta partition intrinsic lock. - * - * @param meta meta partition container with the partition itself - */ - private void putPartition(MetaPartition meta) { - synchronized (meta) { - if (meta.getState() != State.ACTIVE) { - String msg = "It is not possible to put back a partition which is " + - "not ACTIVE.\n" + meta.toString(); - LOG.error(msg); - throw new IllegalStateException(msg); - } - - if (meta.decrementReferences() == 0) { - meta.setState(State.INACTIVE); - if (!meta.isSticky()) { - synchronized (lru) { - lru.put(meta.getId(), meta); - lru.notifyAll(); // notify every waiting process about the fact - // that the LRU is not empty anymore - } - } - meta.notifyAll(); // needed for the threads that are waiting for the - // partition to become inactive, when - // trying to delete the partition - } - } - } - - /** - * Task that adds a partition to the store. - * This function is thread-safe since it locks using the intrinsic lock of - * the meta partition. - * - * @param meta meta partition container with the partition itself - * @param partition partition to be added - */ - private void addPartition(MetaPartition meta, Partition<I, V, E> partition) { - synchronized (meta) { - // If the state of the partition is INIT, this means that the META - // object was just created, and hence the partition is new. - if (meta.getState() == State.INIT) { - // safety check to guarantee that the partition was set. - if (partition == null) { - String msg = "No partition was provided."; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - // safety check to guarantee that the partition provided is the one - // that is also set in the meta partition. - if (partition != meta.getPartition()) { - String msg = "Partition and Meta-Partition should " + - "contain the same data"; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - synchronized (lru) { - if (numPartitionsInMem.get() < maxPartitionsInMem || meta.isSticky) { - meta.setState(State.INACTIVE); - numPartitionsInMem.getAndIncrement(); - if (!meta.isSticky) { - lru.put(meta.getId(), meta); - lru.notifyAll(); // signaling that at least one element is - // present in the LRU ADT. - } - return; // this is used to exit the function and avoid using the - // else clause. This is required to keep only this part of - // the code under lock and aoivd keeping the lock when - // performing the Offload I/O - } - } - - // ELSE - try { - offloadPartition(meta); - meta.setOnDisk(partition); - } catch (IOException e) { - LOG.error("addPartition: Failed while Offloading New Partition: " + - e.getMessage()); - throw new IllegalStateException(e); - } - } else { - Partition<I, V, E> existing = null; - boolean isOOC = false; - boolean isNotDone = true; - State state; - - while (isNotDone) { - state = meta.getState(); - switch (state) { - case ONDISK: - isOOC = true; - isNotDone = false; - meta.addToVertexCount(partition.getVertexCount()); - break; - case INACTIVE: - MetaPartition p = null; - - if (meta.isSticky()) { - existing = meta.getPartition(); - isNotDone = false; - break; - } - - synchronized (lru) { - p = lru.get(meta.getId()); - } - // this check is safe because, even though we are out of the LRU - // lock, we still hold the lock on the partition. This means that - // a) if the partition was removed from the LRU, p will be null - // and the current thread will wait. - // b) if the partition was not removed, its state cannot be - // modified since the lock is held on meta, which refers to - // the same object. - if (p == meta) { - existing = meta.getPartition(); - isNotDone = false; - } else { - try { - // A thread could wait here when an inactive partition is - // concurrently swapped to disk. In fact, the meta object is - // locked but, even though the object is inactive, it is not - // present in the LRU ADT. - // The thread need to be signaled when the partition is finally - // swapped out of the disk. - meta.wait(); - } catch (InterruptedException e) { - LOG.error("addPartition: error while waiting on " + - "previously inactive partition: " + e.getMessage()); - throw new IllegalStateException(e); - } - - isNotDone = true; - } - break; - case ACTIVE: - existing = meta.getPartition(); - isNotDone = false; - break; - default: - throw new IllegalStateException("illegal state " + state + - " for partition " + meta.getId()); - } - } - - if (isOOC) { - try { - addToOOCPartition(meta, partition); - } catch (IOException e) { - LOG.error("addPartition: Failed while Adding to OOC Partition: " + - e.getMessage()); - throw new IllegalStateException(e); - } - } else { - existing.addPartition(partition); - } - } - } - } - - /** - * Task that deletes a partition to the store - * This function is thread-safe using the intrinsic lock of the meta - * partition object - * - * @param meta meta partition container with the partition itself - */ - private void deletePartition(MetaPartition meta) { - synchronized (meta) { - boolean isDone = false; - int id = meta.getId(); - - State state; - while (!isDone) { - state = meta.getState(); - switch (state) { - case ONDISK: - deletePartitionFiles(id); - isDone = true; - break; - case INACTIVE: - MetaPartition p; - - if (meta.isSticky()) { - isDone = true; - numPartitionsInMem.getAndDecrement(); - break; - } - - synchronized (lru) { - p = lru.remove(id); - if (p == meta && p.getState() == State.INACTIVE) { - isDone = true; - numPartitionsInMem.getAndDecrement(); - lru.notifyAll(); // notify all waiting processes that there is now - // at least one new free place in memory. - // XXX: attention, here a race condition with getPartition is - // possible, since changing lru is separated by the removal - // of the element from the parittions ADT. - break; - } - } - try { - // A thread could wait here when an inactive partition is - // concurrently swapped to disk. In fact, the meta object is - // locked but, even though the object is inactive, it is not - // present in the LRU ADT. - // The thread need to be signaled when the partition is - // finally swapped out of the disk. - meta.wait(); - } catch (InterruptedException e) { - LOG.error("deletePartition: error while waiting on " + - "previously inactive partition: " + e.getMessage()); - throw new IllegalStateException(e); - } - isDone = false; - break; - case ACTIVE: - try { - // the thread waits that the object to be deleted becomes inactive, - // otherwise the deletion is not possible. - // The thread needs to be signaled when the partition becomes - // inactive. - meta.wait(); - } catch (InterruptedException e) { - LOG.error("deletePartition: error while waiting on " + - "active partition: " + e.getMessage()); - throw new IllegalStateException(e); - } - break; - default: - throw new IllegalStateException("illegal state " + state + - " for partition " + id); - } - } - partitions.remove(id); - } - } - - /** - * Partition container holding additional meta data associated with each - * partition. - */ - private class MetaPartition { - // ---- META INFORMATION ---- - /** ID of the partition */ - private int id; - /** State in which the partition is */ - private State state; - /** - * Counter used to keep track of the number of references retained by - * user-threads - */ - private int references; - /** Number of vertices contained in the partition */ - private long vertexCount; - /** Previous number of vertices contained in the partition */ - private long prevVertexCount; - /** Number of edges contained in the partition */ - private long edgeCount; - /** - * Sticky bit; if set, this partition is never supposed to be - * written to disk - */ - private boolean isSticky; - - // ---- PARTITION ---- - /** the actual partition. Depending on the state of the partition, - this object could be empty. */ - private Partition<I, V, E> partition; - - /** - * Initialization of the metadata enriched partition. - * - * @param id id of the partition - */ - public MetaPartition(int id) { - this.id = id; - this.state = State.INIT; - this.references = 0; - this.vertexCount = 0; - this.prevVertexCount = 0; - this.edgeCount = 0; - this.isSticky = false; - - this.partition = null; - } - - /** - * @return the id - */ - public int getId() { - return id; - } - - /** - * @return the state - */ - public State getState() { - return state; - } - - /** - * This function sets the metadata for on-disk partition. - * - * @param partition partition related to this container - */ - public void setOnDisk(Partition<I, V, E> partition) { - this.state = State.ONDISK; - this.partition = null; - this.vertexCount = partition.getVertexCount(); - this.edgeCount = partition.getEdgeCount(); - } - - /** - * - */ - public void setActive() { - this.setActive(null); - } - - /** - * - * @param partition the partition associate to this container - */ - public void setActive(Partition<I, V, E> partition) { - if (partition != null) { - this.partition = partition; - } - this.state = State.ACTIVE; - this.prevVertexCount = this.vertexCount; - this.vertexCount = 0; - this.incrementReferences(); - } - - /** - * @param state the state to set - */ - public void setState(State state) { - this.state = state; - } - - /** - * @return decremented references - */ - public int decrementReferences() { - if (references > 0) { - references -= 1; - } - return references; - } - - /** - * @return incremented references - */ - public int incrementReferences() { - return ++references; - } - - /** - * set previous number of vertexes - * @param vertexCount number of vertexes - */ - public void setPrevVertexCount(long vertexCount) { - this.prevVertexCount = vertexCount; - } - - /** - * @return the vertexCount - */ - public long getPrevVertexCount() { - return prevVertexCount; - } - - /** - * @return the vertexCount - */ - public long getVertexCount() { - return vertexCount; - } - - /** - * @return the edgeCount - */ - public long getEdgeCount() { - return edgeCount; - } - - /** - * @param inc amount to add to the vertex count - */ - public void addToVertexCount(long inc) { - this.vertexCount += inc; - this.prevVertexCount = vertexCount; - } - - /** - * @return the partition - */ - public Partition<I, V, E> getPartition() { - return partition; - } - - /** - * @param partition the partition to set - */ - public void setPartition(Partition<I, V, E> partition) { - this.partition = partition; - } - - /** - * Set sticky bit to this partition - */ - public void setSticky() { - this.isSticky = true; - } - - /** - * Get sticky bit to this partition - * @return boolean ture iff the sticky bit is set - */ - public boolean isSticky() { - return this.isSticky; - } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer(); - - sb.append("Meta Data: { "); - sb.append("ID: " + id + "; "); - sb.append("State: " + state + "; "); - sb.append("Number of References: " + references + "; "); - sb.append("Number of Vertices: " + vertexCount + "; "); - sb.append("Previous number of Vertices: " + prevVertexCount + "; "); - sb.append("Number of edges: " + edgeCount + "; "); - sb.append("Is Sticky: " + isSticky + "; "); - sb.append("Partition: " + partition + "; }"); - - return sb.toString(); - } - } -}
http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java index bbcdcba..d3f3902 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java @@ -18,11 +18,15 @@ package org.apache.giraph.partition; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.VertexIdEdges; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; /** - * Structure that stores partitions for a worker. + * Structure that stores partitions for a worker. PartitionStore does not allow + * random accesses to partitions except upon removal. + * This structure is thread-safe. * * @param <I> Vertex id * @param <V> Vertex data @@ -31,33 +35,17 @@ import org.apache.hadoop.io.WritableComparable; public abstract class PartitionStore<I extends WritableComparable, V extends Writable, E extends Writable> { /** - * Add a new partition to the store or just the vertices from the partition - * to the old partition. + * Add a *new* partition to the store. If the partition is already existed, + * it does not add the partition and returns false. * * @param partition Partition to add + * @return Whether the addition made any change in the partition store */ - public abstract void addPartition(Partition<I, V, E> partition); + public abstract boolean addPartition(Partition<I, V, E> partition); /** - * Get or create a partition. Note: user has to put back - * it to the store through {@link #putPartition(Partition)} after use. - * - * @param partitionId Partition id - * @return The requested partition (never null) - */ - public abstract Partition<I, V, E> getOrCreatePartition(Integer partitionId); - - /** - * Put a partition back to the store. Use this method to be put a partition - * back after it has been retrieved through - * {@link #getOrCreatePartition(Integer)}. - * - * @param partition Partition - */ - public abstract void putPartition(Partition<I, V, E> partition); - - /** - * Remove a partition and return it. + * Remove a partition and return it. Called from a single thread, *not* from + * within an iteration cycle, and after INPUT_SUPERSTEP is complete. * * @param partitionId Partition id * @return The removed partition @@ -65,15 +53,6 @@ public abstract class PartitionStore<I extends WritableComparable, public abstract Partition<I, V, E> removePartition(Integer partitionId); /** - * Just delete a partition - * (more efficient than {@link #removePartition(Integer partitionID)} if the - * partition is out of core). - * - * @param partitionId Partition id - */ - public abstract void deletePartition(Integer partitionId); - - /** * Whether a specific partition is present in the store. * * @param partitionId Partition id @@ -97,17 +76,19 @@ public abstract class PartitionStore<I extends WritableComparable, /** * Return the number of vertices in a partition. + * * @param partitionId Partition id * @return The number of vertices in the specified partition */ - public abstract long getPartitionVertexCount(int partitionId); + public abstract long getPartitionVertexCount(Integer partitionId); /** * Return the number of edges in a partition. + * * @param partitionId Partition id * @return The number of edges in the specified partition */ - public abstract long getPartitionEdgeCount(int partitionId); + public abstract long getPartitionEdgeCount(Integer partitionId); /** * Whether the partition store is empty. @@ -119,7 +100,88 @@ public abstract class PartitionStore<I extends WritableComparable, } /** - * Called at the end of the computation. + * Called at the end of the computation. Called from a single thread. */ public void shutdown() { } + + /** + * Called at the beginning of the computation. Called from a single thread. + */ + public void initialize() { } + + /** + * Start the iteration cycle to iterate over partitions. Note that each + * iteration cycle *must* iterate over *all* partitions. Usually an iteration + * cycle is necessary for + * 1) moving edges (from edge store) to vertices after edge input splits are + * loaded in INPUT_SUPERSTEP, + * 2) computing partitions in each superstep (called once per superstep), + * 3) saving vertices/edges in the output superstep. + * 4) any sort of populating a data-structure based on the partitions in + * this store. + * + * After an iteration is started, multiple threads can access the partition + * store using {@link #getNextPartition()} to iterate over the partitions. + * Each time {@link #getNextPartition()} is called an unprocessed partition in + * the current iteration is returned. After processing of the partition is + * done, partition should be put back in the store using + * {@link #putPartition(Partition)}. Here is an example of the entire + * workflow: + * + * In the main thread: + * partitionStore.startIteration(); + * + * In multiple threads iterating over the partitions: + * Partition partition = partitionStore.getNextPartition(); + * ... do stuff with partition ... + * partitionStore.putPartition(partition); + * + * Called from a single thread. + */ + public abstract void startIteration(); + + /** + * Return the next partition in iteration for the current superstep. + * Note: user has to put back the partition to the store through + * {@link #putPartition(Partition)} after use. Look at comments on + * {@link #startIteration()} for more detail. + * + * @return The next partition to process + */ + public abstract Partition<I, V, E> getNextPartition(); + + /** + * Put a partition back to the store. Use this method to put a partition + * back after it has been retrieved through {@link #getNextPartition()}. + * Look at comments on {@link #startIteration()} for more detail. + * + * @param partition Partition + */ + public abstract void putPartition(Partition<I, V, E> partition); + + /** + * Add vertices to a given partition from a given DataOutput instance. This + * method is called right after receipt of vertex request in INPUT_SUPERSTEP. + * + * @param partitionId Partition id + * @param extendedDataOutput Output containing serialized vertex data + */ + public abstract void addPartitionVertices(Integer partitionId, + ExtendedDataOutput extendedDataOutput); + + /** + * Add edges to a given partition from a given send edge request. This + * method is called right after receipt of edge request in INPUT_SUPERSTEP. + * + * @param partitionId Partition id + * @param edges Edges in the request + */ + public abstract void addPartitionEdges(Integer partitionId, + VertexIdEdges<I, E> edges); + + /** + * Move edges from edge store to partitions. This method is called from a + * *single thread* once all vertices and edges are read in INPUT_SUPERSTEP. + */ + public abstract void moveEdgesToVertices(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java index 8ed6081..8f34fed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java @@ -19,11 +19,19 @@ package org.apache.giraph.partition; import com.google.common.collect.Maps; +import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.EdgeStore; +import org.apache.giraph.edge.EdgeStoreFactory; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.VertexIdEdges; +import org.apache.giraph.utils.VertexIterator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; /** @@ -39,49 +47,34 @@ public class SimplePartitionStore<I extends WritableComparable, /** Map of stored partitions. */ private final ConcurrentMap<Integer, Partition<I, V, E>> partitions = Maps.newConcurrentMap(); + /** Edge store for this worker. */ + private final EdgeStore<I, V, E> edgeStore; /** Configuration. */ private final ImmutableClassesGiraphConfiguration<I, V, E> conf; /** Context used to report progress */ private final Mapper<?, ?, ?, ?>.Context context; + /** Queue of partitions to be precessed in a superstep */ + private BlockingQueue<Partition<I, V, E>> partitionQueue; /** * Constructor. - * * @param conf Configuration * @param context Mapper context + * @param serviceWorker Service worker */ - public SimplePartitionStore( - ImmutableClassesGiraphConfiguration<I, V, E> conf, - Mapper<?, ?, ?, ?>.Context context) { + public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf, + Mapper<?, ?, ?, ?>.Context context, + CentralizedServiceWorker<I, V, E> serviceWorker) { this.conf = conf; this.context = context; + EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); + edgeStoreFactory.initialize(serviceWorker, conf, context); + edgeStore = edgeStoreFactory.newStore(); } @Override - public void addPartition(Partition<I, V, E> partition) { - Partition<I, V, E> oldPartition = partitions.get(partition.getId()); - if (oldPartition == null) { - oldPartition = partitions.putIfAbsent(partition.getId(), partition); - if (oldPartition == null) { - return; - } - } - // This is thread-safe - oldPartition.addPartition(partition); - } - - @Override - public Partition<I, V, E> getOrCreatePartition(Integer partitionId) { - Partition<I, V, E> oldPartition = partitions.get(partitionId); - if (oldPartition == null) { - Partition<I, V, E> newPartition = - conf.createPartition(partitionId, context); - oldPartition = partitions.putIfAbsent(partitionId, newPartition); - if (oldPartition == null) { - return newPartition; - } - } - return oldPartition; + public boolean addPartition(Partition<I, V, E> partition) { + return partitions.putIfAbsent(partition.getId(), partition) == null; } @Override @@ -90,11 +83,6 @@ public class SimplePartitionStore<I extends WritableComparable, } @Override - public void deletePartition(Integer partitionId) { - partitions.remove(partitionId); - } - - @Override public boolean hasPartition(Integer partitionId) { return partitions.containsKey(partitionId); } @@ -110,7 +98,7 @@ public class SimplePartitionStore<I extends WritableComparable, } @Override - public long getPartitionVertexCount(int partitionId) { + public long getPartitionVertexCount(Integer partitionId) { Partition partition = partitions.get(partitionId); if (partition == null) { return 0; @@ -120,7 +108,7 @@ public class SimplePartitionStore<I extends WritableComparable, } @Override - public long getPartitionEdgeCount(int partitionId) { + public long getPartitionEdgeCount(Integer partitionId) { Partition partition = partitions.get(partitionId); if (partition == null) { return 0; @@ -130,5 +118,64 @@ public class SimplePartitionStore<I extends WritableComparable, } @Override + public void startIteration() { + if (partitionQueue != null && !partitionQueue.isEmpty()) { + throw new IllegalStateException("startIteration: It seems that some of " + + "of the partitions from previous iteration over partition store are" + + " not yet processed."); + } + partitionQueue = + new ArrayBlockingQueue<Partition<I, V, E>>(getNumPartitions()); + for (Partition<I, V, E> partition : partitions.values()) { + partitionQueue.add(partition); + } + } + + @Override + public Partition<I, V, E> getNextPartition() { + return partitionQueue.poll(); + } + + @Override public void putPartition(Partition<I, V, E> partition) { } + + /** + * Get or create a partition. + * @param partitionId Partition Id + * @return The requested partition (never null) + */ + private Partition<I, V, E> getOrCreatePartition(Integer partitionId) { + Partition<I, V, E> oldPartition = partitions.get(partitionId); + if (oldPartition == null) { + Partition<I, V, E> newPartition = + conf.createPartition(partitionId, context); + oldPartition = partitions.putIfAbsent(partitionId, newPartition); + if (oldPartition == null) { + return newPartition; + } + } + return oldPartition; + } + + @Override + public void addPartitionVertices(Integer partitionId, + ExtendedDataOutput extendedDataOutput) { + VertexIterator<I, V, E> vertexIterator = + new VertexIterator<I, V, E>(extendedDataOutput, conf); + + Partition<I, V, E> partition = getOrCreatePartition(partitionId); + partition.addPartitionVertices(vertexIterator); + putPartition(partition); + } + + @Override + public void addPartitionEdges(Integer partitionId, + VertexIdEdges<I, E> edges) { + edgeStore.addPartitionEdges(partitionId, edges); + } + + @Override + public void moveEdgesToVertices() { + edgeStore.moveEdgesToVertices(); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index ed9a492..e515caf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -26,13 +26,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -116,7 +114,6 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; /** @@ -584,6 +581,7 @@ public class BspServiceWorker<I extends WritableComparable, startSuperstep(); workerGraphPartitioner.updatePartitionOwners( getWorkerInfo(), masterSetPartitionOwners); + getPartitionStore().initialize(); /*if[HADOOP_NON_SECURE] workerClient.setup(); @@ -697,7 +695,7 @@ else[HADOOP_NON_SECURE]*/ if (getConfiguration().hasEdgeInputFormat()) { // Move edges from temporary storage to their source vertices. - getServerData().getEdgeStore().moveEdgesToVertices(); + getServerData().getPartitionStore().moveEdgesToVertices(); } // Generate the partition stats for the input superstep and process @@ -1111,10 +1109,7 @@ else[HADOOP_NON_SECURE]*/ final VertexOutputFormat<I, V, E> vertexOutputFormat = getConfiguration().createWrappedVertexOutputFormat(); - final Queue<Integer> partitionIdQueue = - (numPartitions == 0) ? new LinkedList<Integer>() : - new ArrayBlockingQueue<Integer>(numPartitions); - Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds()); + getPartitionStore().startIteration(); long verticesToStore = 0; PartitionStore<I, V, E> partitionStore = getPartitionStore(); @@ -1142,14 +1137,13 @@ else[HADOOP_NON_SECURE]*/ long nextPrintMsecs = System.currentTimeMillis() + 15000; int partitionIndex = 0; int numPartitions = getPartitionStore().getNumPartitions(); - while (!partitionIdQueue.isEmpty()) { - Integer partitionId = partitionIdQueue.poll(); - if (partitionId == null) { + while (true) { + Partition<I, V, E> partition = + getPartitionStore().getNextPartition(); + if (partition == null) { break; } - Partition<I, V, E> partition = - getPartitionStore().getOrCreatePartition(partitionId); long verticesWritten = 0; for (Vertex<I, V, E> vertex : partition) { vertexWriter.writeVertex(vertex); @@ -1239,10 +1233,7 @@ else[HADOOP_NON_SECURE]*/ final EdgeOutputFormat<I, V, E> edgeOutputFormat = conf.createWrappedEdgeOutputFormat(); - final Queue<Integer> partitionIdQueue = - (numPartitions == 0) ? new LinkedList<Integer>() : - new ArrayBlockingQueue<Integer>(numPartitions); - Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds()); + getPartitionStore().startIteration(); CallableFactory<Void> callableFactory = new CallableFactory<Void>() { @Override @@ -1259,14 +1250,13 @@ else[HADOOP_NON_SECURE]*/ long nextPrintMsecs = System.currentTimeMillis() + 15000; int partitionIndex = 0; int numPartitions = getPartitionStore().getNumPartitions(); - while (!partitionIdQueue.isEmpty()) { - Integer partitionId = partitionIdQueue.poll(); - if (partitionId == null) { + while (true) { + Partition<I, V, E> partition = + getPartitionStore().getNextPartition(); + if (partition == null) { break; } - Partition<I, V, E> partition = - getPartitionStore().getOrCreatePartition(partitionId); long vertices = 0; long edges = 0; long partitionEdgeCount = partition.getEdgeCount(); @@ -1505,10 +1495,7 @@ else[HADOOP_NON_SECURE]*/ GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()), numPartitions); - final Queue<Integer> partitionIdQueue = - (numPartitions == 0) ? new LinkedList<Integer>() : - new ArrayBlockingQueue<Integer>(numPartitions); - Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds()); + getPartitionStore().startIteration(); final CompressionCodec codec = new CompressionCodecFactory(getConfiguration()) @@ -1525,13 +1512,14 @@ else[HADOOP_NON_SECURE]*/ @Override public Void call() throws Exception { - while (!partitionIdQueue.isEmpty()) { - Integer partitionId = partitionIdQueue.poll(); - if (partitionId == null) { + while (true) { + Partition<I, V, E> partition = + getPartitionStore().getNextPartition(); + if (partition == null) { break; } Path path = - createCheckpointFilePathSafe("_" + partitionId + + createCheckpointFilePathSafe("_" + partition.getId() + CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX); FSDataOutputStream uncompressedStream = @@ -1542,8 +1530,6 @@ else[HADOOP_NON_SECURE]*/ new DataOutputStream( codec.createOutputStream(uncompressedStream)); - Partition<I, V, E> partition = - getPartitionStore().getOrCreatePartition(partitionId); partition.write(stream); @@ -1966,4 +1952,15 @@ else[HADOOP_NON_SECURE]*/ } return globalStats; } + + @Override + public int getNumPartitionsOwned() { + int count = 0; + for (PartitionOwner partitionOwner : getPartitionOwners()) { + if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) { + count++; + } + } + return count; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index b9ef1e3..b7bec1c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -133,7 +133,7 @@ public class RequestTest { assertTrue(partitionStore.hasPartition(partitionId)); int total = 0; Partition<IntWritable, IntWritable, IntWritable> partition2 = - partitionStore.getOrCreatePartition(partitionId); + partitionStore.removePartition(partitionId); for (Vertex<IntWritable, IntWritable, IntWritable> vertex : partition2) { total += vertex.getId().get(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java index b8137c0..611b021 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java @@ -75,8 +75,8 @@ public class TestIntFloatPrimitiveMessageStores { Lists.newArrayList(0, 1)); Partition partition = Mockito.mock(Partition.class); Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1)); - Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition); - Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition); + Mockito.when(partitionStore.getNextPartition()).thenReturn(partition); + Mockito.when(partitionStore.getNextPartition()).thenReturn(partition); GiraphConfiguration initConf = new GiraphConfiguration(); initConf.setComputationClass(IntFloatNoOpComputation.class); http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java index 5903eb8..2027628 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java @@ -72,8 +72,8 @@ public class TestLongDoublePrimitiveMessageStores { Lists.newArrayList(0, 1)); Partition partition = Mockito.mock(Partition.class); Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1)); - Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition); - Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition); + Mockito.when(partitionStore.getNextPartition()).thenReturn(partition); + Mockito.when(partitionStore.getNextPartition()).thenReturn(partition); } private static class LongDoubleNoOpComputation extends http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 88e66a6..78e663d 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -18,23 +18,8 @@ package org.apache.giraph.partition; -import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY; -import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY; -import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT; -import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_GRAPH; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.Random; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; - +import com.google.common.collect.Iterables; +import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; @@ -46,6 +31,7 @@ import org.apache.giraph.graph.BasicComputation; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat; +import org.apache.giraph.ooc.DiskBackedPartitionStore; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.giraph.utils.NoOpComputation; import org.apache.giraph.utils.UnsafeByteArrayInputStream; @@ -60,8 +46,20 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import com.google.common.collect.Iterables; -import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Test case for partition stores. @@ -73,10 +71,11 @@ public class TestPartitionStores { private Mapper<?, ?, ?, ?>.Context context; /* these static variables are used for the multithreaded tests */ - private static final int NUM_OF_VERTEXES_PER_THREAD = 10; + private static final int NUM_OF_VERTEXES_PER_PARTITION = 20; private static final int NUM_OF_EDGES_PER_VERTEX = 5; - private static final int NUM_OF_THREADS = 10; - private static final int NUM_OF_PARTITIONS = 3; + private static final int NUM_OF_THREADS = 8; + private static final int NUM_OF_PARTITIONS = 30; + private static final int NUM_PARTITIONS_IN_MEMORY = 12; public static class MyComputation extends NoOpComputation<IntWritable, IntWritable, NullWritable, IntWritable> { } @@ -105,9 +104,11 @@ public class TestPartitionStores { @Test public void testSimplePartitionStore() { + CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> + serviceWorker = Mockito.mock(CentralizedServiceWorker.class); PartitionStore<IntWritable, IntWritable, NullWritable> - partitionStore = new SimplePartitionStore<IntWritable, IntWritable, - NullWritable>(conf, context); + partitionStore = new SimplePartitionStore<IntWritable, IntWritable, + NullWritable>(conf, context, serviceWorker); testReadWrite(partitionStore, conf); partitionStore.shutdown(); } @@ -149,11 +150,10 @@ public class TestPartitionStores { assertEquals(0, deserializatedPartition.getEdgeCount()); assertEquals(7, deserializatedPartition.getVertexCount()); } - + @Test public void testDiskBackedPartitionStoreWithByteArrayPartition() throws IOException { - File directory = Files.createTempDir(); GiraphConstants.PARTITIONS_DIRECTORY.set( conf, new File(directory, "giraph_partitions").toString()); @@ -164,7 +164,7 @@ public class TestPartitionStores { CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> serviceWorker = Mockito.mock(CentralizedServiceWorker.class); Mockito.when(serviceWorker.getSuperstep()).thenReturn( - BspService.INPUT_SUPERSTEP); + BspService.INPUT_SUPERSTEP); PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( @@ -186,7 +186,7 @@ public class TestPartitionStores { serviceWorker = Mockito.mock(CentralizedServiceWorker.class); Mockito.when(serviceWorker.getSuperstep()).thenReturn( - BspService.INPUT_SUPERSTEP); + BspService.INPUT_SUPERSTEP); PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( @@ -194,18 +194,43 @@ public class TestPartitionStores { testReadWrite(partitionStore, conf); partitionStore.shutdown(); - GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 2); - partitionStore = new DiskBackedPartitionStore<IntWritable, - IntWritable, NullWritable>(conf, context, serviceWorker); - testReadWrite(partitionStore, conf); - partitionStore.shutdown(); + FileUtils.deleteDirectory(directory); + } + + @Test + public void testDiskBackedPartitionStoreComputation() throws Exception { + Iterable<String> results; + String[] graph = + { + "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]", + "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]" + }; + String[] expected = + { + "1\t0", "2\t0", "3\t0", "4\t0", "5\t0", + "6\t0", "7\t0", "8\t0", "9\t0", "10\t0" + }; + + GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + GiraphConstants.USER_PARTITION_COUNT.set(conf, 10); + + File directory = Files.createTempDir(); + GiraphConstants.PARTITIONS_DIRECTORY.set(conf, + new File(directory, "giraph_partitions").toString()); + + conf.setComputationClass(EmptyComputation.class); + conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + results = InternalVertexRunner.run(conf, graph); + checkResults(results, expected); FileUtils.deleteDirectory(directory); } @Test public void testDiskBackedPartitionStoreWithByteArrayComputation() throws Exception { - Iterable<String> results; String[] graph = { @@ -218,12 +243,12 @@ public class TestPartitionStores { "6\t0", "7\t0", "8\t0", "9\t0", "10\t0" }; - USE_OUT_OF_CORE_GRAPH.set(conf, true); - MAX_PARTITIONS_IN_MEMORY.set(conf, 1); - USER_PARTITION_COUNT.set(conf, 10); + GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + GiraphConstants.USER_PARTITION_COUNT.set(conf, 10); File directory = Files.createTempDir(); - PARTITIONS_DIRECTORY.set(conf, + GiraphConstants.PARTITIONS_DIRECTORY.set(conf, new File(directory, "giraph_partitions").toString()); conf.setPartitionClass(ByteArrayPartition.class); @@ -238,17 +263,24 @@ public class TestPartitionStores { @Test public void testDiskBackedPartitionStoreMT() throws Exception { + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY); GiraphConstants.STATIC_GRAPH.set(conf, false); testMultiThreaded(); } - /* + @Test public void testDiskBackedPartitionStoreMTStatic() throws Exception { + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY); + GiraphConstants.STATIC_GRAPH.set(conf, true); + testMultiThreaded(); + } + + @Test + public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception { GiraphConstants.STATIC_GRAPH.set(conf, true); testMultiThreaded(); } - */ private void testMultiThreaded() throws Exception { final AtomicInteger vertexCounter = new AtomicInteger(0); @@ -260,23 +292,26 @@ public class TestPartitionStores { GiraphConstants.PARTITIONS_DIRECTORY.set( conf, new File(directory, "giraph_partitions").toString()); GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); - GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); CentralizedServiceWorker<IntWritable, IntWritable, NullWritable> serviceWorker = Mockito.mock(CentralizedServiceWorker.class); Mockito.when(serviceWorker.getSuperstep()).thenReturn( - BspService.INPUT_SUPERSTEP); + BspService.INPUT_SUPERSTEP); PartitionStore<IntWritable, IntWritable, NullWritable> store = new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>( conf, context, serviceWorker); + store.initialize(); // Create a new Graph in memory using multiple threads for (int i = 0; i < NUM_OF_THREADS; ++i) { - int partitionId = i % NUM_OF_PARTITIONS; + List<Integer> partitionIds = new ArrayList<Integer>(); + for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) { + partitionIds.add(id); + } Worker worker = - new Worker(vertexCounter, store, partitionId, conf); + new Worker(vertexCounter, store, partitionIds, conf); executor.submit(worker, new Boolean(true)); } for (int i = 0; i < NUM_OF_THREADS; ++i) @@ -291,19 +326,22 @@ public class TestPartitionStores { totalVertexes += store.getPartitionVertexCount(i); totalEdges += store.getPartitionEdgeCount(i); } - assert vertexCounter.get() == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD; - assert totalVertexes == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD; + + assert vertexCounter.get() == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION; + assert totalVertexes == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION; assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX; // Check the content of the vertices int expected = 0; - for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD * NUM_OF_VERTEXES_PER_THREAD; ++i) { + for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION * NUM_OF_PARTITIONS; ++i) { expected += i; } int totalValues = 0; + store.startIteration(); for (int i = 0; i < NUM_OF_PARTITIONS; ++i) { - partition = store.getOrCreatePartition(i); - Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes = + partition = store.getNextPartition(); + assert partition != null; + Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes = partition.iterator(); while (vertexes.hasNext()) { @@ -313,39 +351,23 @@ public class TestPartitionStores { store.putPartition(partition); } assert totalValues == expected; - + store.shutdown(); } - @Test - public void testDiskBackedPartitionStoreComputation() throws Exception { - Iterable<String> results; - String[] graph = - { - "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]", - "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]" - }; - String[] expected = - { - "1\t0", "2\t0", "3\t0", "4\t0", "5\t0", - "6\t0", "7\t0", "8\t0", "9\t0", "10\t0" - }; - - USE_OUT_OF_CORE_GRAPH.set(conf, true); - MAX_PARTITIONS_IN_MEMORY.set(conf, 1); - USER_PARTITION_COUNT.set(conf, 10); - - File directory = Files.createTempDir(); - PARTITIONS_DIRECTORY.set(conf, - new File(directory, "giraph_partitions").toString()); - - conf.setComputationClass(EmptyComputation.class); - conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class); - conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); - - results = InternalVertexRunner.run(conf, graph); - checkResults(results, expected); - FileUtils.deleteDirectory(directory); + private Partition<IntWritable, IntWritable, NullWritable> + getPartition(PartitionStore<IntWritable, IntWritable, + NullWritable> partitionStore, int partitionId) { + partitionStore.startIteration(); + Partition p; + Partition result = null; + while ((p = partitionStore.getNextPartition()) != null) { + if (p.getId() == partitionId) { + result = p; + } + partitionStore.putPartition(p); + } + return result; } /** @@ -376,35 +398,26 @@ public class TestPartitionStores { v7.addEdge(EdgeFactory.create(new IntWritable(1))); v7.addEdge(EdgeFactory.create(new IntWritable(2))); - partitionStore.addPartition(createPartition(conf, 1, v1, v2)); - partitionStore.addPartition(createPartition(conf, 2, v3)); - partitionStore.addPartition(createPartition(conf, 2, v4)); + partitionStore.addPartition(createPartition(conf, 1, v1, v2, v6)); + partitionStore.addPartition(createPartition(conf, 2, v3, v4)); partitionStore.addPartition(createPartition(conf, 3, v5)); - partitionStore.addPartition(createPartition(conf, 1, v6)); partitionStore.addPartition(createPartition(conf, 4, v7)); - Partition<IntWritable, IntWritable, NullWritable> partition1 = - partitionStore.getOrCreatePartition(1); - partitionStore.putPartition(partition1); - Partition<IntWritable, IntWritable, NullWritable> partition2 = - partitionStore.getOrCreatePartition(2); - partitionStore.putPartition(partition2); - Partition<IntWritable, IntWritable, NullWritable> partition3 = - partitionStore.removePartition(3); - Partition<IntWritable, IntWritable, NullWritable> partition4 = - partitionStore.getOrCreatePartition(4); - partitionStore.putPartition(partition4); + getPartition(partitionStore, 1); + getPartition(partitionStore, 2); + partitionStore.removePartition(3); + getPartition(partitionStore, 4); assertEquals(3, partitionStore.getNumPartitions()); assertEquals(3, Iterables.size(partitionStore.getPartitionIds())); int partitionsNumber = 0; - for (Integer partitionId : partitionStore.getPartitionIds()) { - Partition<IntWritable, IntWritable, NullWritable> p = - partitionStore.getOrCreatePartition(partitionId); + + partitionStore.startIteration(); + Partition<IntWritable, IntWritable, NullWritable> p; + while ((p = partitionStore.getNextPartition()) != null) { partitionStore.putPartition(p); partitionsNumber++; } - Partition<IntWritable, IntWritable, NullWritable> partition; assertEquals(3, partitionsNumber); assertTrue(partitionStore.hasPartition(1)); assertTrue(partitionStore.hasPartition(2)); @@ -414,8 +427,6 @@ public class TestPartitionStores { assertEquals(2, partitionStore.getPartitionVertexCount(2)); assertEquals(1, partitionStore.getPartitionVertexCount(4)); assertEquals(2, partitionStore.getPartitionEdgeCount(4)); - partitionStore.deletePartition(2); - assertEquals(2, partitionStore.getNumPartitions()); } /** @@ -463,12 +474,12 @@ public class TestPartitionStores { public void testEdgeCombineWithSimplePartition() throws IOException { testEdgeCombine(SimplePartition.class); } - + @Test public void testEdgeCombineWithByteArrayPartition() throws IOException { testEdgeCombine(ByteArrayPartition.class); } - + private void testEdgeCombine(Class<? extends Partition> partitionClass) throws IOException { Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex(); @@ -512,39 +523,40 @@ public class TestPartitionStores { private final AtomicInteger vertexCounter; private final PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore; - private final int partitionId; + private final List<Integer> partitionIds; private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, NullWritable> conf; public Worker(AtomicInteger vertexCounter, PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore, - int partitionId, + List<Integer> partitionIds, ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, NullWritable> conf) { this.vertexCounter = vertexCounter; this.partitionStore = partitionStore; - this.partitionId = partitionId; + this.partitionIds = partitionIds; this.conf = conf; } public void run() { - for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD; ++i) { - int id = vertexCounter.getAndIncrement(); - Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex(); - v.initialize(new IntWritable(id), new IntWritable(id)); - + for (int partitionId : partitionIds) { Partition<IntWritable, IntWritable, NullWritable> partition = - partitionStore.getOrCreatePartition(partitionId); - - Random rand = new Random(id); - for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) { - int dest = rand.nextInt(id + 1); - v.addEdge(EdgeFactory.create(new IntWritable(dest))); + conf.createPartition(partitionId, context); + for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION; ++i) { + int id = vertexCounter.getAndIncrement(); + Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex(); + v.initialize(new IntWritable(id), new IntWritable(id)); + + Random rand = new Random(id); + for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) { + int dest = rand.nextInt(id + 1); + v.addEdge(EdgeFactory.create(new IntWritable(dest))); + } + + partition.putVertex(v); } - - partition.putVertex(v); - partitionStore.putPartition(partition); + partitionStore.addPartition(partition); } } }
