Repository: giraph Updated Branches: refs/heads/trunk 4f3551dfd -> 6f5a457fa
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java index b8a2dd5..cdafa3f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java @@ -26,25 +26,26 @@ import com.google.common.hash.Hashing; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.IntConfOption; -import org.apache.giraph.edge.EdgeStore; -import org.apache.giraph.edge.EdgeStoreFactory; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.graph.Vertex; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.PairList; import org.apache.giraph.utils.VertexIdEdges; +import org.apache.giraph.utils.VertexIdMessages; import org.apache.giraph.utils.VertexIterator; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.worker.BspServiceWorker; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.log4j.Logger; import java.io.BufferedInputStream; @@ -53,6 +54,7 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -73,6 +75,10 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY; import static org.apache.giraph.conf.GiraphConstants.ONE_MB; import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + /** * Disk-backed PartitionStore. An instance of this class can be coupled with an * out-of-core engine. Out-of-core engine is responsible to determine when to @@ -142,13 +148,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable, * time (a read lock used for spilling), and cannot be overlapped with * change of data structure holding the data. */ - private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - /** Giraph configuration */ - private final - ImmutableClassesGiraphConfiguration<I, V, E> conf; - /** Mapper context */ - private final Context context; /** Base path where the partition files are written to */ private final String[] basePaths; /** Used to hash partition Ids */ @@ -157,13 +158,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, private final AtomicInteger maxPartitionsInMem = new AtomicInteger(-1); /** Number of slots used */ private final AtomicInteger numPartitionsInMem = new AtomicInteger(0); - /** service worker reference */ - private CentralizedServiceWorker<I, V, E> serviceWorker; /** Out-of-core engine */ private final OutOfCoreEngine oocEngine; - /** Edge store for this worker */ - private final EdgeStore<I, V, E> edgeStore; /** If moving of edges to vertices in INPUT_SUPERSTEP has been started */ private volatile boolean movingEdges; /** Whether the partition store is initialized */ @@ -187,7 +184,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, private final ConcurrentMap<Integer, Integer> numPendingInputVerticesOnDisk = Maps.newConcurrentMap(); /** Lock to avoid overlap of addition and removal on pendingInputVertices */ - private ReadWriteLock vertexBufferRWLock = new ReentrantReadWriteLock(); + private final ReadWriteLock vertexBufferRWLock = new ReentrantReadWriteLock(); /** * Similar to vertex buffer, but used for input edges (see comments for @@ -199,7 +196,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, private final ConcurrentMap<Integer, Integer> numPendingInputEdgesOnDisk = Maps.newConcurrentMap(); /** Lock to avoid overlap of addition and removal on pendingInputEdges */ - private ReadWriteLock edgeBufferRWLock = new ReentrantReadWriteLock(); + private final ReadWriteLock edgeBufferRWLock = new ReentrantReadWriteLock(); /** * For each out-of-core partitions, whether its edge store is also @@ -209,6 +206,48 @@ public class DiskBackedPartitionStore<I extends WritableComparable, Maps.newConcurrentMap(); /** + * Type of VertexIdMessage class (container for serialized messages) received + * for a particular message. If we write the received messages to disk before + * adding them to message store, we need this type when we want to read the + * messages back from disk (so that we know how to read the messages from + * disk). + */ + private enum SerializedMessageClass { + /** ByteArrayVertexIdMessages */ + BYTE_ARRAY_VERTEX_ID_MESSAGES, + /** ByteArrayOneMEssageToManyIds */ + BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS + } + + /** + * Similar to vertex buffer and edge buffer, but used for messages (see + * comments for pendingInputVertices). + */ + private volatile ConcurrentMap<Integer, + Pair<Integer, List<VertexIdMessages<I, Writable>>>> + pendingIncomingMessages = Maps.newConcurrentMap(); + /** Whether a partition has any incoming message buffer on disk */ + private volatile ConcurrentMap<Integer, Boolean> incomingMessagesOnDisk = + Maps.newConcurrentMap(); + + /** + * Similar to pendingIncomingMessages, but is used for messages for current + * superstep instead. + */ + private volatile ConcurrentMap<Integer, + Pair<Integer, List<VertexIdMessages<I, Writable>>>> + pendingCurrentMessages = Maps.newConcurrentMap(); + /** Similar to incomingMessagesOnDisk for messages for current superstep */ + private volatile ConcurrentMap<Integer, Boolean> currentMessagesOnDisk = + Maps.newConcurrentMap(); + + /** + * Lock to avoid overlap of addition and removal of pending message buffers + */ + private final ReadWriteLock messageBufferRWLock = + new ReentrantReadWriteLock(); + + /** * Constructor * * @param conf Configuration @@ -219,9 +258,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, ImmutableClassesGiraphConfiguration<I, V, E> conf, Mapper<?, ?, ?, ?>.Context context, CentralizedServiceWorker<I, V, E> serviceWorker) { - this.conf = conf; - this.context = context; - this.serviceWorker = serviceWorker; + super(conf, context, serviceWorker); this.minBuffSize = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf); int userMaxNumPartitions = MAX_PARTITIONS_IN_MEMORY.get(conf); if (userMaxNumPartitions > 0) { @@ -233,9 +270,6 @@ public class DiskBackedPartitionStore<I extends WritableComparable, this.oocEngine = new AdaptiveOutOfCoreEngine<I, V, E>(conf, serviceWorker); } - EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); - edgeStoreFactory.initialize(serviceWorker, conf, context); - this.edgeStore = edgeStoreFactory.newStore(); this.movingEdges = false; this.isInitialized = new AtomicBoolean(false); @@ -425,10 +459,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, partitionId + " to disk"); } MetaPartition swapOutPartition = partitions.get(partitionId); - if (swapOutPartition == null) { - throw new IllegalStateException("swapOnePartitionToDisk: the partition " + - "is not found to spill to disk (impossible)"); - } + checkNotNull(swapOutPartition, + "swapOnePartitionToDisk: the partition is not found to spill to disk " + + "(impossible)"); // Since the partition is popped from the maps, it is not going to be // processed (or change its process state) until spilling of the partition @@ -523,10 +556,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } MetaPartition meta = partitions.get(partitionId); - if (meta == null) { - throw new IllegalStateException("getNextPartition: partition " + - partitionId + " does not exist (impossible)"); - } + checkNotNull(meta, "getNextPartition: partition " + partitionId + + " does not exist (impossible)"); // The only time we iterate through all partitions in INPUT_SUPERSTEP is // when we want to move @@ -582,6 +613,18 @@ public class DiskBackedPartitionStore<I extends WritableComparable, while (!partitionInMemory) { switch (meta.getState()) { case INACTIVE: + // Check if the message store for the current superstep is in memory, + // and if not, load it from the disk. + Boolean messagesOnDisk = currentMessagesOnDisk.get(partitionId); + if (messagesOnDisk != null && messagesOnDisk) { + try { + loadMessages(partitionId); + } catch (IOException e) { + throw new IllegalStateException("getPartition: failed while " + + "loading messages of current superstep for partition " + + partitionId); + } + } meta.setState(State.ACTIVE); partitionInMemory = true; break; @@ -617,7 +660,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, LOG.info("getPartition: start reading partition " + partitionId + " from disk"); } - partition = loadPartition(partitionId, meta.getVertexCount()); + partition = loadPartition(meta); if (LOG.isInfoEnabled()) { LOG.info("getPartition: done reading partition " + partitionId + " from disk"); @@ -639,6 +682,221 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } } + @Override + public void prepareSuperstep() { + rwLock.writeLock().lock(); + super.prepareSuperstep(); + pendingCurrentMessages = pendingIncomingMessages; + currentMessagesOnDisk = incomingMessagesOnDisk; + pendingIncomingMessages = Maps.newConcurrentMap(); + incomingMessagesOnDisk = Maps.newConcurrentMap(); + rwLock.writeLock().unlock(); + } + + /** + * Spill message buffers (either buffers for messages for current superstep, + * or buffers for incoming messages) of a given partition to disk. Note that + * the partition should be ON_DISK or IN_TRANSIT. + * + * @param partitionId Id of the partition to spill its message buffers + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "UL_UNRELEASED_LOCK_EXCEPTION_PATH") + public void spillPartitionMessages(Integer partitionId) throws IOException { + rwLock.readLock().lock(); + spillMessages(partitionId, pendingCurrentMessages, + serviceWorker.getSuperstep()); + spillMessages(partitionId, pendingIncomingMessages, + serviceWorker.getSuperstep() + 1); + rwLock.readLock().unlock(); + } + + /** + * Spill message buffers of a particular type of message (current or incoming + * buffer) for a partition to disk. + * + * @param partitionId Id of the partition to spill the messages for + * @param pendingMessages The map to get the message buffers from + * @param superstep Superstep of which we want to offload messages. This is + * equal to current superstep number if we want to offload + * buffers for currentMessageStore, and is equal to next + * superstep number if we want to offload buffer for + * incomingMessageStore + * @throws IOException + */ + private void spillMessages(Integer partitionId, + ConcurrentMap<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>> + pendingMessages, long superstep) throws IOException { + Pair<Integer, List<VertexIdMessages<I, Writable>>> entry; + messageBufferRWLock.writeLock().lock(); + entry = pendingMessages.remove(partitionId); + if (entry != null && entry.getLeft() < minBuffSize) { + pendingMessages.put(partitionId, entry); + entry = null; + } + messageBufferRWLock.writeLock().unlock(); + + if (entry == null) { + return; + } + + // Sanity check + checkState(!entry.getRight().isEmpty(), + "spillMessages: the message buffer that is supposed to be flushed to " + + "disk does not exist."); + + File file = new File(getPendingMessagesBufferPath(partitionId, superstep)); + + FileOutputStream fos = new FileOutputStream(file, true); + BufferedOutputStream bos = new BufferedOutputStream(fos); + DataOutputStream dos = new DataOutputStream(bos); + for (VertexIdMessages<I, Writable> messages : entry.getRight()) { + SerializedMessageClass messageClass; + if (messages instanceof ByteArrayVertexIdMessages) { + messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES; + } else if (messages instanceof ByteArrayOneMessageToManyIds) { + messageClass = + SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS; + } else { + throw new IllegalStateException("spillMessages: serialized message " + + "type is not supported"); + } + dos.writeInt(messageClass.ordinal()); + messages.write(dos); + } + dos.close(); + } + + /** + * Looks through all partitions already on disk, and see if any of them has + * enough pending message in its buffer in memory. This can be message buffer + * of current superstep, or incoming superstep. If so, put that partition + * along with an approximate amount of memory it took (in bytes) in a list to + * return. + + * @return List of pairs (partitionId, sizeInByte) of the partitions where + * their pending messages are worth flushing to disk + */ + public PairList<Integer, Integer> getOocPartitionIdsWithPendingMessages() { + PairList<Integer, Integer> pairList = new PairList<>(); + pairList.initialize(); + Set<Integer> partitionIds = Sets.newHashSet(); + // First, iterating over pending incoming messages + if (pendingIncomingMessages != null) { + for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>> + entry : pendingIncomingMessages.entrySet()) { + if (entry.getValue().getLeft() > minBuffSize) { + pairList.add(entry.getKey(), entry.getValue().getLeft()); + partitionIds.add(entry.getKey()); + } + } + } + // Second, iterating over pending current messages (i.e. pending incoming + // messages of previous superstep) + if (pendingCurrentMessages != null) { + for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>> + entry : pendingCurrentMessages.entrySet()) { + if (entry.getValue().getLeft() > minBuffSize && + !partitionIds.contains(entry.getKey())) { + pairList.add(entry.getKey(), entry.getValue().getLeft()); + } + } + } + return pairList; + } + + @Override + public <M extends Writable> void addPartitionCurrentMessages( + int partitionId, VertexIdMessages<I, M> messages) throws IOException { + // Current messages are only added to the store in the event of partition + // migration. Presumably the partition has just migrated and its data is + // still available in memory. Note that partition migration only happens at + // the beginning of a superstep. + ((MessageStore<I, M>) currentMessageStore) + .addPartitionMessages(partitionId, messages); + } + + @Override + public <M extends Writable> void addPartitionIncomingMessages( + int partitionId, VertexIdMessages<I, M> messages) throws IOException { + if (conf.getIncomingMessageClasses().useMessageCombiner()) { + ((MessageStore<I, M>) incomingMessageStore) + .addPartitionMessages(partitionId, messages); + } else { + MetaPartition meta = partitions.get(partitionId); + checkNotNull(meta, "addPartitionIncomingMessages: trying to add " + + "messages to partition " + partitionId + " which does not exist " + + "in the partition set of this worker!"); + + synchronized (meta) { + switch (meta.getState()) { + case INACTIVE: + case ACTIVE: + // A partition might be in memory, but its message store might still + // be on disk. This happens because while we are loading the partition + // to memory, we only load its current messages, not the incoming + // messages. If a new superstep has been started, while the partition + // is still in memory, the incoming message store in the previous + // superstep (which is the current messages in the current superstep) + // is on disk. + // This may also happen when a partition is offloaded to disk while + // it was unprocessed, and then again loaded in the same superstep for + // processing. + Boolean isMsgOnDisk = incomingMessagesOnDisk.get(partitionId); + if (isMsgOnDisk == null || !isMsgOnDisk) { + ((MessageStore<I, M>) incomingMessageStore) + .addPartitionMessages(partitionId, messages); + break; + } + // Continue to IN_TRANSIT and ON_DISK cases as the partition is in + // memory, but it's messages are not yet loaded + // CHECKSTYLE: stop FallThrough + case IN_TRANSIT: + case ON_DISK: + // CHECKSTYLE: resume FallThrough + List<VertexIdMessages<I, Writable>> newMessages = + new ArrayList<VertexIdMessages<I, Writable>>(); + newMessages.add((VertexIdMessages<I, Writable>) messages); + int length = messages.getSerializedSize(); + Pair<Integer, List<VertexIdMessages<I, Writable>>> newPair = + new MutablePair<>(length, newMessages); + messageBufferRWLock.readLock().lock(); + Pair<Integer, List<VertexIdMessages<I, Writable>>> oldPair = + pendingIncomingMessages.putIfAbsent(partitionId, newPair); + if (oldPair != null) { + synchronized (oldPair) { + MutablePair<Integer, List<VertexIdMessages<I, Writable>>> pair = + (MutablePair<Integer, List<VertexIdMessages<I, Writable>>>) + oldPair; + pair.setLeft(pair.getLeft() + length); + pair.getRight().add((VertexIdMessages<I, Writable>) messages); + } + } + messageBufferRWLock.readLock().unlock(); + // In the case that the number of partitions is asked to be fixed by + // the user, we should offload the message buffers as necessary. + if (isNumPartitionsFixed && + pendingIncomingMessages.get(partitionId).getLeft() > + minBuffSize) { + try { + spillPartitionMessages(partitionId); + } catch (IOException e) { + throw new IllegalStateException("addPartitionIncomingMessages: " + + "spilling message buffers for partition " + partitionId + + " failed!"); + } + } + break; + default: + throw new IllegalStateException("addPartitionIncomingMessages: " + + "illegal state " + meta.getState() + " for partition " + + meta.getId()); + } + } + } + } + /** * Spills edge store generated for specified partition in INPUT_SUPERSTEP * Note that the partition should be ON_DISK or IN_TRANSIT. @@ -667,18 +925,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } // Sanity check - if (entry.getRight().isEmpty()) { - throw new IllegalStateException("spillPartitionInputEdgeStore: " + - "the edge buffer that is supposed to be flushed to disk does not" + - "exist."); - } + checkState(!entry.getRight().isEmpty(), + "spillPartitionInputEdgeStore: the edge buffer that is supposed to " + + "be flushed to disk does not exist."); List<VertexIdEdges<I, E>> bufferList = entry.getRight(); Integer numBuffers = numPendingInputEdgesOnDisk.putIfAbsent(partitionId, bufferList.size()); if (numBuffers != null) { - numPendingInputEdgesOnDisk.replace( - partitionId, numBuffers + bufferList.size()); + numPendingInputEdgesOnDisk.replace(partitionId, + numBuffers + bufferList.size()); } File file = new File(getPendingEdgesBufferPath(partitionId)); @@ -833,11 +1089,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, return; } // Sanity check - if (entry.getRight().isEmpty()) { - throw new IllegalStateException("spillPartitionInputVertexBuffer: " + - "the vertex buffer that is supposed to be flushed to disk does not" + - "exist."); - } + checkState(!entry.getRight().isEmpty(), + "spillPartitionInputVertexBuffer: the vertex buffer that is " + + "supposed to be flushed to disk does not exist."); List<ExtendedDataOutput> bufferList = entry.getRight(); Integer numBuffers = numPendingInputVerticesOnDisk.putIfAbsent(partitionId, @@ -968,23 +1222,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable, @Override public void putPartition(Partition<I, V, E> partition) { - if (partition == null) { - throw new IllegalStateException("putPartition: partition to put is null" + - " (impossible)"); - } + checkArgument(partition != null); + Integer id = partition.getId(); MetaPartition meta = partitions.get(id); - if (meta == null) { - throw new IllegalStateException("putPartition: partition to put does" + - "not exist in the store (impossible)"); - } + checkNotNull(meta, "putPartition: partition to put does " + + "not exist in the store (impossible)"); synchronized (meta) { - if (meta.getState() != State.ACTIVE) { - String msg = "It is not possible to put back a partition which is " + - "not ACTIVE.\n" + meta.toString(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(meta.getState() == State.ACTIVE, + "It is not possible to put back a partition which is not ACTIVE. " + + "meta = " + meta.toString()); meta.setState(State.INACTIVE); meta.setProcessed(true); @@ -1003,10 +1250,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable, MetaPartition meta = partitions.remove(partitionId); // Since this method is called outside of the iteration cycle, all // partitions in the store should be in the processed state. - if (!processedPartitions.get(meta.getState()).remove(partitionId)) { - throw new IllegalStateException("removePartition: partition that is" + - "about to remove is not in processed list (impossible)"); - } + checkState(processedPartitions.get(meta.getState()).remove(partitionId), + "removePartition: partition that is about to remove is not in " + + "processed list (impossible)"); + getPartition(meta); numPartitionsInMem.getAndDecrement(); return meta.getPartition(); @@ -1028,7 +1275,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } if (LOG.isInfoEnabled()) { - LOG.info("addPartition: partition " + id + "is added to the store."); + LOG.info("addPartition: partition " + id + " is added to the store."); } meta.setPartition(partition); @@ -1050,13 +1297,11 @@ public class DiskBackedPartitionStore<I extends WritableComparable, @Override public void shutdown() { // Sanity check to check there is nothing left from previous superstep - if (!unProcessedPartitions.get(State.INACTIVE).isEmpty() || - !unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() || - !unProcessedPartitions.get(State.ON_DISK).isEmpty()) { - throw new IllegalStateException("shutdown: There are some " + - "unprocessed partitions left from the " + - "previous superstep. This should not be possible"); - } + checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() && + unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() && + unProcessedPartitions.get(State.ON_DISK).isEmpty(), + "shutdown: There are some unprocessed partitions left from the " + + "previous superstep. This should not be possible."); for (MetaPartition meta : partitions.values()) { synchronized (meta) { @@ -1092,30 +1337,28 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } // Sanity check to make sure nothing left unprocessed from previous // superstep - if (!unProcessedPartitions.get(State.INACTIVE).isEmpty() || - !unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() || - !unProcessedPartitions.get(State.ON_DISK).isEmpty()) { - throw new IllegalStateException("startIteration: There are some " + - "unprocessed and/or in-transition partitions left from the " + - "previous superstep. This should not be possible"); - } + checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() && + unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() && + unProcessedPartitions.get(State.ON_DISK).isEmpty(), + "startIteration: There are some unprocessed and/or " + + "in-transition partitions left from the previous superstep. " + + "This should not be possible."); rwLock.writeLock().lock(); for (MetaPartition meta : partitions.values()) { // Sanity check - if (!meta.isProcessed()) { - throw new IllegalStateException("startIteration: meta-partition " + - meta + " has not been processed in the previous superstep."); - } + checkState(meta.isProcessed(), "startIteration: " + + "meta-partition " + meta + " has not been processed in the " + + "previous superstep."); + // The only case where a partition can be IN_TRANSIT is where it is still // being offloaded to disk, and it happens only in swapOnePartitionToDisk, // where we at least hold a read lock while transfer is in progress. Since // the write lock is held in this section, no partition should be // IN_TRANSIT. - if (meta.getState() == State.IN_TRANSIT) { - throw new IllegalStateException("startIteration: meta-partition " + - meta + " is still IN_TRANSIT (impossible)"); - } + checkState(meta.getState() != State.IN_TRANSIT, + "startIteration: meta-partition " + meta + " is still IN_TRANSIT " + + "(impossible)"); meta.setProcessed(false); } @@ -1255,21 +1498,109 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** + * Load messages for a given partition for the current superstep to memory. + * + * @param partitionId Id of the partition to load the messages for + * @throws IOException + */ + private void loadMessages(int partitionId) throws IOException { + // Messages for current superstep + if (currentMessageStore != null && + !conf.getOutgoingMessageClasses().useMessageCombiner()) { + checkState(!currentMessageStore.hasMessagesForPartition(partitionId), + "loadMessages: partition " + partitionId + " is on disk, " + + "but its message store is in memory (impossible)"); + // First, reading the message store for the partition if there is any + File file = new File( + getMessagesPath(partitionId, serviceWorker.getSuperstep())); + if (file.exists()) { + if (LOG.isDebugEnabled()) { + LOG.debug("loadMessages: loading message store of partition " + + partitionId); + } + FileInputStream filein = new FileInputStream(file); + BufferedInputStream bufferin = new BufferedInputStream(filein); + DataInputStream inputStream = new DataInputStream(bufferin); + currentMessageStore.readFieldsForPartition(inputStream, partitionId); + inputStream.close(); + checkState(file.delete(), "loadMessages: failed to delete %s.", + file.getAbsolutePath()); + } + + messageBufferRWLock.writeLock().lock(); + Pair<Integer, List<VertexIdMessages<I, Writable>>> pendingMessages = + pendingCurrentMessages.remove(partitionId); + messageBufferRWLock.writeLock().unlock(); + + // Second, reading message buffers (incoming messages in previous + // superstep) + file = new File(getPendingMessagesBufferPath(partitionId, + serviceWorker.getSuperstep())); + if (file.exists()) { + FileInputStream filein = new FileInputStream(file); + BufferedInputStream bufferin = new BufferedInputStream(filein); + DataInputStream inputStream = new DataInputStream(bufferin); + while (true) { + int type; + try { + type = inputStream.readInt(); + } catch (EOFException e) { + // Reached end of file, so all the records are read. + break; + } + SerializedMessageClass messageClass = + SerializedMessageClass.values()[type]; + VertexIdMessages<I, Writable> vim; + switch (messageClass) { + case BYTE_ARRAY_VERTEX_ID_MESSAGES: + vim = new ByteArrayVertexIdMessages<>( + conf.createOutgoingMessageValueFactory()); + vim.setConf(conf); + break; + case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS: + vim = new ByteArrayOneMessageToManyIds<>( + conf.createOutgoingMessageValueFactory()); + vim.setConf(conf); + break; + default: + throw new IllegalStateException("loadMessages: unsupported " + + "serialized message type!"); + } + vim.readFields(inputStream); + currentMessageStore.addPartitionMessages(partitionId, vim); + } + inputStream.close(); + checkState(!file.delete(), "loadMessages: failed to delete %s", + file.getAbsolutePath()); + } + + // Third, applying message buffers already in memory + if (pendingMessages != null) { + for (VertexIdMessages<I, Writable> vim : pendingMessages.getRight()) { + currentMessageStore.addPartitionMessages(partitionId, vim); + } + } + currentMessagesOnDisk.put(partitionId, false); + } + } + + /** * Load a partition from disk. It deletes the files after the load, * except for the edges, if the graph is static. * - * @param id The id of the partition to load - * @param numVertices The number of vertices contained on disk + * @param meta meta partition to load the partition of * @return The partition * @throws IOException */ @SuppressWarnings("unchecked") - private Partition<I, V, E> loadPartition(int id, long numVertices) - throws IOException { - Partition<I, V, E> partition = conf.createPartition(id, context); + private Partition<I, V, E> loadPartition(MetaPartition meta) + throws IOException { + Integer partitionId = meta.getId(); + long numVertices = meta.getVertexCount(); + Partition<I, V, E> partition = conf.createPartition(partitionId, context); // Vertices - File file = new File(getVerticesPath(id)); + File file = new File(getVerticesPath(partitionId)); if (LOG.isDebugEnabled()) { LOG.debug("loadPartition: loading partition vertices " + partition.getId() + " from " + file.getAbsolutePath()); @@ -1284,14 +1615,11 @@ public class DiskBackedPartitionStore<I extends WritableComparable, partition.putVertex(vertex); } inputStream.close(); - if (!file.delete()) { - String msg = "loadPartition: failed to delete " + file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(file.delete(), "loadPartition: failed to delete %s", + file.getAbsolutePath()); // Edges - file = new File(getEdgesPath(id)); + file = new File(getEdgesPath(partitionId)); if (LOG.isDebugEnabled()) { LOG.debug("loadPartition: loading partition edges " + @@ -1309,86 +1637,79 @@ public class DiskBackedPartitionStore<I extends WritableComparable, // around. if (!conf.isStaticGraph() || serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { - if (!file.delete()) { - String msg = - "loadPartition: failed to delete " + file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(file.delete(), "loadPartition: failed to delete %s", + file.getAbsolutePath()); } - if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { - // Input vertex buffers - // First, applying vertex buffers on disk (since they came earlier) - Integer numBuffers = numPendingInputVerticesOnDisk.remove(id); - if (numBuffers != null) { - file = new File(getPendingVerticesBufferPath(id)); - if (LOG.isDebugEnabled()) { - LOG.debug("loadPartition: loading " + numBuffers + " input vertex " + - "buffers of partition " + id + " from " + file.getAbsolutePath()); - } - filein = new FileInputStream(file); - bufferin = new BufferedInputStream(filein); - inputStream = new DataInputStream(bufferin); - for (int i = 0; i < numBuffers; ++i) { - ExtendedDataOutput extendedDataOutput = - WritableUtils.readExtendedDataOutput(inputStream, conf); - partition.addPartitionVertices( - new VertexIterator<I, V, E>(extendedDataOutput, conf)); - } - inputStream.close(); - if (!file.delete()) { - String msg = - "loadPartition: failed to delete " + file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + // Load message for the current superstep + loadMessages(partitionId); + + // Input vertex buffers + // First, applying vertex buffers on disk (since they came earlier) + Integer numBuffers = numPendingInputVerticesOnDisk.remove(partitionId); + if (numBuffers != null) { + file = new File(getPendingVerticesBufferPath(partitionId)); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading " + numBuffers + " input vertex " + + "buffers of partition " + partitionId + " from " + + file.getAbsolutePath()); } - // Second, applying vertex buffers already in memory - Pair<Integer, List<ExtendedDataOutput>> vertexPair; - vertexBufferRWLock.writeLock().lock(); - vertexPair = pendingInputVertices.remove(id); - vertexBufferRWLock.writeLock().unlock(); - if (vertexPair != null) { - for (ExtendedDataOutput extendedDataOutput : vertexPair.getRight()) { - partition.addPartitionVertices( - new VertexIterator<I, V, E>(extendedDataOutput, conf)); - } + filein = new FileInputStream(file); + bufferin = new BufferedInputStream(filein); + inputStream = new DataInputStream(bufferin); + for (int i = 0; i < numBuffers; ++i) { + ExtendedDataOutput extendedDataOutput = + WritableUtils.readExtendedDataOutput(inputStream, conf); + partition.addPartitionVertices( + new VertexIterator<I, V, E>(extendedDataOutput, conf)); } - - // Edge store - if (!hasEdgeStoreOnDisk.containsKey(id)) { - throw new IllegalStateException("loadPartition: partition is written" + - " to disk in INPUT_SUPERSTEP, but it is not clear whether its " + - "edge store is on disk or not (impossible)"); + inputStream.close(); + checkState(file.delete(), "loadPartition: failed to delete %s", + file.getAbsolutePath()); + } + // Second, applying vertex buffers already in memory + Pair<Integer, List<ExtendedDataOutput>> vertexPair; + vertexBufferRWLock.writeLock().lock(); + vertexPair = pendingInputVertices.remove(partitionId); + vertexBufferRWLock.writeLock().unlock(); + if (vertexPair != null) { + for (ExtendedDataOutput extendedDataOutput : vertexPair.getRight()) { + partition.addPartitionVertices( + new VertexIterator<I, V, E>(extendedDataOutput, conf)); } - if (hasEdgeStoreOnDisk.remove(id)) { - file = new File(getEdgeStorePath(id)); + } + + // Edge store + if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { + checkState(hasEdgeStoreOnDisk.containsKey(partitionId), + "loadPartition: partition is written to disk in INPUT_SUPERSTEP, " + + "but it is not clear whether its edge store is on disk or not " + + "(impossible)"); + + if (hasEdgeStoreOnDisk.remove(partitionId)) { + file = new File(getEdgeStorePath(partitionId)); if (LOG.isDebugEnabled()) { - LOG.debug("loadPartition: loading edge store of partition " + id + - " from " + file.getAbsolutePath()); + LOG.debug("loadPartition: loading edge store of partition " + + partitionId + " from " + file.getAbsolutePath()); } filein = new FileInputStream(file); bufferin = new BufferedInputStream(filein); inputStream = new DataInputStream(bufferin); - edgeStore.readPartitionEdgeStore(id, inputStream); + edgeStore.readPartitionEdgeStore(partitionId, inputStream); inputStream.close(); - if (!file.delete()) { - String msg = - "loadPartition: failed to delete " + file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(file.delete(), "loadPartition: failed to delete %s", + file.getAbsolutePath()); } // Input edge buffers // First, applying edge buffers on disk (since they came earlier) - numBuffers = numPendingInputEdgesOnDisk.remove(id); + numBuffers = numPendingInputEdgesOnDisk.remove(partitionId); if (numBuffers != null) { - file = new File(getPendingEdgesBufferPath(id)); + file = new File(getPendingEdgesBufferPath(partitionId)); if (LOG.isDebugEnabled()) { LOG.debug("loadPartition: loading " + numBuffers + " input edge " + - "buffers of partition " + id + " from " + file.getAbsolutePath()); + "buffers of partition " + partitionId + " from " + + file.getAbsolutePath()); } filein = new FileInputStream(file); bufferin = new BufferedInputStream(filein); @@ -1398,24 +1719,20 @@ public class DiskBackedPartitionStore<I extends WritableComparable, new ByteArrayVertexIdEdges<I, E>(); vertexIdEdges.setConf(conf); vertexIdEdges.readFields(inputStream); - edgeStore.addPartitionEdges(id, vertexIdEdges); + edgeStore.addPartitionEdges(partitionId, vertexIdEdges); } inputStream.close(); - if (!file.delete()) { - String msg = - "loadPartition: failed to delete " + file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(file.delete(), "loadPartition: failed to delete %s", + file.getAbsolutePath()); } // Second, applying edge buffers already in memory Pair<Integer, List<VertexIdEdges<I, E>>> edgePair = null; edgeBufferRWLock.writeLock().lock(); - edgePair = pendingInputEdges.remove(id); + edgePair = pendingInputEdges.remove(partitionId); edgeBufferRWLock.writeLock().unlock(); if (edgePair != null) { for (VertexIdEdges<I, E> vertexIdEdges : edgePair.getRight()) { - edgeStore.addPartitionEdges(id, vertexIdEdges); + edgeStore.addPartitionEdges(partitionId, vertexIdEdges); } } } @@ -1438,12 +1755,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable, " already exists."); } - if (!file.createNewFile()) { - String msg = "offloadPartition: file " + parent.getAbsolutePath() + - " already exists."; - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(file.createNewFile(), + "offloadPartition: file %s already exists.", parent.getAbsolutePath()); if (LOG.isDebugEnabled()) { LOG.debug("offloadPartition: writing partition vertices " + @@ -1488,6 +1801,19 @@ public class DiskBackedPartitionStore<I extends WritableComparable, outputStream.close(); } + if (currentMessageStore != null && + !conf.getOutgoingMessageClasses().useMessageCombiner() && + currentMessageStore.hasMessagesForPartition(partitionId)) { + writeMessageData(currentMessageStore, currentMessagesOnDisk, partitionId, + serviceWorker.getSuperstep()); + } + if (incomingMessageStore != null && + !conf.getIncomingMessageClasses().useMessageCombiner() && + incomingMessageStore.hasMessagesForPartition(partitionId)) { + writeMessageData(incomingMessageStore, incomingMessagesOnDisk, + partitionId, serviceWorker.getSuperstep() + 1); + } + // Writing edge store to disk in the input superstep if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { if (edgeStore.hasPartitionEdges(partitionId)) { @@ -1515,28 +1841,59 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** + * Offload message data of a particular type of store (current or incoming) to + * disk. + * + * @param messageStore The message store to write to disk + * @param messagesOnDisk Map to update and let others know that this message + * store is on disk + * @param partitionId Id of the partition we want to offload the message store + * of + * @param superstep Superstep for which we want to offload message data for. + * It is equal the current superstep number for offloading + * currentMessageStore, and is equal to next superstep + * number for offloading incomingMessageStore + * @throws IOException + */ + private void writeMessageData(MessageStore<I, Writable> messageStore, + ConcurrentMap<Integer, Boolean> messagesOnDisk, int partitionId, + long superstep) throws IOException { + File file = new File(getMessagesPath(partitionId, superstep)); + checkState(!file.exists(), + "writeMessageData: message store file for partition " + + partitionId + " for messages in superstep " + + superstep + " already exist (impossible)."); + + checkState(file.createNewFile(), + "offloadPartition: cannot create message store file for " + + "partition " + partitionId); + + FileOutputStream fileout = new FileOutputStream(file); + BufferedOutputStream bufferout = new BufferedOutputStream(fileout); + DataOutputStream outputStream = new DataOutputStream(bufferout); + messageStore.writePartition(outputStream, partitionId); + messageStore.clearPartition(partitionId); + outputStream.close(); + messagesOnDisk.put(partitionId, true); + } + + /** * Delete a partition's files. * * @param id The id of the partition owning the file. */ - public void deletePartitionFiles(Integer id) { + private void deletePartitionFiles(Integer id) { // File containing vertices File file = new File(getVerticesPath(id)); - if (file.exists() && !file.delete()) { - String msg = "deletePartitionFiles: Failed to delete file " + - file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(!file.exists() || file.delete(), + "deletePartitionFiles: Failed to delete file " + + file.getAbsolutePath()); // File containing edges file = new File(getEdgesPath(id)); - if (file.exists() && !file.delete()) { - String msg = "deletePartitionFiles: Failed to delete file " + - file.getAbsolutePath(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + checkState(!file.exists() || file.delete(), + "deletePartitionFiles: Failed to delete file " + + file.getAbsolutePath()); } /** @@ -1605,6 +1962,29 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** + * Get the path to the file where pending incoming messages are stored. + * + * @param partitionId The partition + * @param superstep superstep number + * @return The path to the file + */ + private String getPendingMessagesBufferPath(Integer partitionId, + long superstep) { + return getPartitionPath(partitionId) + "_pending_messages_" + superstep; + } + + /** + * Get the path to the file where messages are stored. + * + * @param partitionId The partition + * @param superstep superstep number + * @return The path to the file + */ + private String getMessagesPath(Integer partitionId, long superstep) { + return getPartitionPath(partitionId) + "_messages_" + superstep; + } + + /** * Partition container holding additional meta data associated with each * partition. */ http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java index d58ebe0..7c4d8df 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java @@ -92,6 +92,8 @@ public class OutOfCoreProcessorCallable<I extends WritableComparable, oocEngine.getPartitionsWithInputVertices(); BlockingQueue<Integer> partitionsWithInputEdges = oocEngine.getPartitionsWithInputEdges(); + BlockingQueue<Integer> partitionsWithPendingMessages = + oocEngine.getPartitionsWithPendingMessages(); AtomicInteger numPartitionsToSpill = oocEngine.getNumPartitionsToSpill(); @@ -100,7 +102,9 @@ public class OutOfCoreProcessorCallable<I extends WritableComparable, if (partitionId == null) { break; } - LOG.info("call: spilling vertex buffer of partition " + partitionId); + if (LOG.isInfoEnabled()) { + LOG.info("call: spilling vertex buffer of partition " + partitionId); + } try { partitionStore.spillPartitionInputVertexBuffer(partitionId); } catch (IOException e) { @@ -114,7 +118,9 @@ public class OutOfCoreProcessorCallable<I extends WritableComparable, if (partitionId == null) { break; } - LOG.info("call: spilling edge buffer of partition " + partitionId); + if (LOG.isInfoEnabled()) { + LOG.info("call: spilling edge buffer of partition " + partitionId); + } try { partitionStore.spillPartitionInputEdgeStore(partitionId); } catch (IOException e) { @@ -123,9 +129,28 @@ public class OutOfCoreProcessorCallable<I extends WritableComparable, } } + while (!partitionsWithPendingMessages.isEmpty()) { + Integer partitionId = partitionsWithPendingMessages.poll(); + if (partitionId == null) { + break; + } + if (LOG.isInfoEnabled()) { + LOG.info( + "call: spilling message buffers of partition " + partitionId); + } + try { + partitionStore.spillPartitionMessages(partitionId); + } catch (IOException e) { + throw new IllegalStateException("call: caught IOException while " + + "spilling edge buffers/store to disk"); + } + } + // Put partitions on disk while (numPartitionsToSpill.getAndDecrement() > 0) { - LOG.info("call: start offloading a partition"); + if (LOG.isInfoEnabled()) { + LOG.info("call: start offloading a partition"); + } partitionStore.spillOnePartition(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java new file mode 100644 index 0000000..d6e3a70 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.VertexIdEdges; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Structure that keeps partition information. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public interface PartitionData<I extends WritableComparable, + V extends Writable, E extends Writable> { + /** + * Add a *new* partition to the store. If the partition is already existed, + * it does not add the partition and returns false. + * + * @param partition Partition to add + * @return Whether the addition made any change in the partition store + */ + boolean addPartition(Partition<I, V, E> partition); + + /** + * Remove a partition and return it. Called from a single thread, *not* from + * within a scheduling cycle, and after INPUT_SUPERSTEP is complete. + * + * @param partitionId Partition id + * @return The removed partition + */ + Partition<I, V, E> removePartition(Integer partitionId); + + /** + * Whether a specific partition is present in the store. + * + * @param partitionId Partition id + * @return True iff the partition is present + */ + boolean hasPartition(Integer partitionId); + + /** + * Return the ids of all the stored partitions as an Iterable. + * + * @return The partition ids + */ + Iterable<Integer> getPartitionIds(); + + /** + * Return the number of stored partitions. + * + * @return The number of partitions + */ + int getNumPartitions(); + + /** + * Return the number of vertices in a partition. + * + * @param partitionId Partition id + * @return The number of vertices in the specified partition + */ + long getPartitionVertexCount(Integer partitionId); + + /** + * Return the number of edges in a partition. + * + * @param partitionId Partition id + * @return The number of edges in the specified partition + */ + long getPartitionEdgeCount(Integer partitionId); + + /** + * Whether the partition store is empty. + * + * @return True iff there are no partitions in the store + */ + boolean isEmpty(); + + /** + * Add vertices to a given partition from a given DataOutput instance. This + * method is called right after receipt of vertex request in INPUT_SUPERSTEP. + * + * @param partitionId Partition id + * @param extendedDataOutput Output containing serialized vertex data + */ + void addPartitionVertices(Integer partitionId, + ExtendedDataOutput extendedDataOutput); + + /** + * Add edges to a given partition from a given send edge request. This + * method is called right after receipt of edge request in INPUT_SUPERSTEP. + * + * @param partitionId Partition id + * @param edges Edges in the request + */ + void addPartitionEdges(Integer partitionId, VertexIdEdges<I, E> edges); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java index d3f3902..2facff8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java @@ -18,10 +18,22 @@ package org.apache.giraph.partition; -import org.apache.giraph.utils.ExtendedDataOutput; -import org.apache.giraph.utils.VertexIdEdges; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageData; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.EdgeStore; +import org.apache.giraph.edge.EdgeStoreFactory; +import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS; /** * Structure that stores partitions for a worker. PartitionStore does not allow @@ -33,72 +45,116 @@ import org.apache.hadoop.io.WritableComparable; * @param <E> Edge data */ public abstract class PartitionStore<I extends WritableComparable, - V extends Writable, E extends Writable> { - /** - * Add a *new* partition to the store. If the partition is already existed, - * it does not add the partition and returns false. - * - * @param partition Partition to add - * @return Whether the addition made any change in the partition store - */ - public abstract boolean addPartition(Partition<I, V, E> partition); + V extends Writable, E extends Writable> + implements PartitionData<I, V, E>, MessageData<I> { + /** Configuration. */ + protected final ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Context used to report progress */ + protected final Mapper<?, ?, ?, ?>.Context context; + /** service worker reference */ + protected final CentralizedServiceWorker<I, V, E> serviceWorker; - /** - * Remove a partition and return it. Called from a single thread, *not* from - * within an iteration cycle, and after INPUT_SUPERSTEP is complete. - * - * @param partitionId Partition id - * @return The removed partition - */ - public abstract Partition<I, V, E> removePartition(Integer partitionId); + /** Edge store for this worker */ + protected final EdgeStore<I, V, E> edgeStore; + /** Message store factory */ + protected MessageStoreFactory<I, Writable, MessageStore<I, Writable>> + messageStoreFactory; /** - * Whether a specific partition is present in the store. - * - * @param partitionId Partition id - * @return True iff the partition is present + * Message store for incoming messages (messages which will be consumed + * in the next super step) */ - public abstract boolean hasPartition(Integer partitionId); - + protected volatile MessageStore<I, Writable> incomingMessageStore; /** - * Return the ids of all the stored partitions as an Iterable. - * - * @return The partition ids + * Message store for current messages (messages which we received in + * previous super step and which will be consumed in current super step) */ - public abstract Iterable<Integer> getPartitionIds(); + protected volatile MessageStore<I, Writable> currentMessageStore; /** - * Return the number of stored partitions. + * Constructor for abstract partition store * - * @return The number of partitions + * @param conf Job configuration + * @param context Mapper context + * @param serviceWorker Worker service */ - public abstract int getNumPartitions(); + public PartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf, + Mapper<?, ?, ?, ?>.Context context, + CentralizedServiceWorker<I, V, E> serviceWorker) { + this.conf = conf; + this.context = context; + this.serviceWorker = serviceWorker; + this.messageStoreFactory = createMessageStoreFactory(); + EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); + edgeStoreFactory.initialize(serviceWorker, conf, context); + this.edgeStore = edgeStoreFactory.newStore(); + } /** - * Return the number of vertices in a partition. + * Decide which message store should be used for current application, + * and create the factory for that store * - * @param partitionId Partition id - * @return The number of vertices in the specified partition + * @return Message store factory */ - public abstract long getPartitionVertexCount(Integer partitionId); + private MessageStoreFactory<I, Writable, MessageStore<I, Writable>> + createMessageStoreFactory() { + Class<? extends MessageStoreFactory> messageStoreFactoryClass = + MESSAGE_STORE_FACTORY_CLASS.get(conf); - /** - * Return the number of edges in a partition. - * - * @param partitionId Partition id - * @return The number of edges in the specified partition - */ - public abstract long getPartitionEdgeCount(Integer partitionId); + MessageStoreFactory messageStoreFactoryInstance = + ReflectionUtils.newInstance(messageStoreFactoryClass); + messageStoreFactoryInstance.initialize(serviceWorker, conf); - /** - * Whether the partition store is empty. - * - * @return True iff there are no partitions in the store - */ + return messageStoreFactoryInstance; + } + + @Override public boolean isEmpty() { return getNumPartitions() == 0; } + @Override + public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() { + return (MessageStore<I, M>) incomingMessageStore; + } + + @Override + public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() { + return (MessageStore<I, M>) currentMessageStore; + } + + @Override + public void resetMessageStores() throws IOException { + if (currentMessageStore != null) { + currentMessageStore.clearAll(); + currentMessageStore = null; + } + if (incomingMessageStore != null) { + incomingMessageStore.clearAll(); + incomingMessageStore = null; + } + prepareSuperstep(); + } + + /** Prepare for next super step */ + public void prepareSuperstep() { + if (currentMessageStore != null) { + try { + currentMessageStore.clearAll(); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to clear previous message store"); + } + } + currentMessageStore = incomingMessageStore != null ? + incomingMessageStore : + messageStoreFactory.newStore(conf.getIncomingMessageClasses()); + incomingMessageStore = + messageStoreFactory.newStore(conf.getOutgoingMessageClasses()); + // finalize current message-store before resolving mutations + currentMessageStore.finalizeStore(); + } + /** * Called at the end of the computation. Called from a single thread. */ @@ -160,28 +216,18 @@ public abstract class PartitionStore<I extends WritableComparable, public abstract void putPartition(Partition<I, V, E> partition); /** - * Add vertices to a given partition from a given DataOutput instance. This - * method is called right after receipt of vertex request in INPUT_SUPERSTEP. - * - * @param partitionId Partition id - * @param extendedDataOutput Output containing serialized vertex data - */ - public abstract void addPartitionVertices(Integer partitionId, - ExtendedDataOutput extendedDataOutput); - - /** - * Add edges to a given partition from a given send edge request. This - * method is called right after receipt of edge request in INPUT_SUPERSTEP. - * - * @param partitionId Partition id - * @param edges Edges in the request - */ - public abstract void addPartitionEdges(Integer partitionId, - VertexIdEdges<I, E> edges); - - /** * Move edges from edge store to partitions. This method is called from a * *single thread* once all vertices and edges are read in INPUT_SUPERSTEP. */ public abstract void moveEdgesToVertices(); + + /** + * In case of async message store we have to wait for all messages + * to be processed before going into next superstep. + */ + public void waitForComplete() { + if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { + ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java index 8f34fed..9f0c408 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java @@ -20,20 +20,23 @@ package org.apache.giraph.partition; import com.google.common.collect.Maps; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.edge.EdgeStore; -import org.apache.giraph.edge.EdgeStoreFactory; import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.VertexIdEdges; +import org.apache.giraph.utils.VertexIdMessages; import org.apache.giraph.utils.VertexIterator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; +import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; +import static com.google.common.base.Preconditions.checkState; + /** * A simple in-memory partition store. * @@ -47,12 +50,6 @@ public class SimplePartitionStore<I extends WritableComparable, /** Map of stored partitions. */ private final ConcurrentMap<Integer, Partition<I, V, E>> partitions = Maps.newConcurrentMap(); - /** Edge store for this worker. */ - private final EdgeStore<I, V, E> edgeStore; - /** Configuration. */ - private final ImmutableClassesGiraphConfiguration<I, V, E> conf; - /** Context used to report progress */ - private final Mapper<?, ?, ?, ?>.Context context; /** Queue of partitions to be precessed in a superstep */ private BlockingQueue<Partition<I, V, E>> partitionQueue; @@ -65,11 +62,7 @@ public class SimplePartitionStore<I extends WritableComparable, public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf, Mapper<?, ?, ?, ?>.Context context, CentralizedServiceWorker<I, V, E> serviceWorker) { - this.conf = conf; - this.context = context; - EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); - edgeStoreFactory.initialize(serviceWorker, conf, context); - edgeStore = edgeStoreFactory.newStore(); + super(conf, context, serviceWorker); } @Override @@ -119,11 +112,11 @@ public class SimplePartitionStore<I extends WritableComparable, @Override public void startIteration() { - if (partitionQueue != null && !partitionQueue.isEmpty()) { - throw new IllegalStateException("startIteration: It seems that some of " + + checkState(partitionQueue == null || partitionQueue.isEmpty(), + "startIteration: It seems that some of " + "of the partitions from previous iteration over partition store are" + " not yet processed."); - } + partitionQueue = new ArrayBlockingQueue<Partition<I, V, E>>(getNumPartitions()); for (Partition<I, V, E> partition : partitions.values()) { @@ -178,4 +171,18 @@ public class SimplePartitionStore<I extends WritableComparable, public void moveEdgesToVertices() { edgeStore.moveEdgesToVertices(); } + + @Override + public <M extends Writable> void addPartitionCurrentMessages( + int partitionId, VertexIdMessages<I, M> messages) throws IOException { + ((MessageStore<I, M>) currentMessageStore) + .addPartitionMessages(partitionId, messages); + } + + @Override + public <M extends Writable> void addPartitionIncomingMessages( + int partitionId, VertexIdMessages<I, M> messages) throws IOException { + ((MessageStore<I, M>) incomingMessageStore) + .addPartitionMessages(partitionId, messages); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index e515caf..5b754d6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -910,7 +910,7 @@ else[HADOOP_NON_SECURE]*/ globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor); MessageStore<I, Writable> incomingMessageStore = - getServerData().getIncomingMessageStore(); + getServerData().getPartitionStore().getIncomingMessageStore(); if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); } @@ -1418,8 +1418,8 @@ else[HADOOP_NON_SECURE]*/ for (Integer partitionId : getPartitionStore().getPartitionIds()) { // write messages checkpointOutputStream.writeInt(partitionId); - getServerData().getCurrentMessageStore().writePartition( - checkpointOutputStream, partitionId); + getServerData().getPartitionStore().getCurrentMessageStore() + .writePartition(checkpointOutputStream, partitionId); getContext().progress(); } @@ -1668,8 +1668,8 @@ else[HADOOP_NON_SECURE]*/ for (int i = 0; i < partitions; i++) { int partitionId = checkpointStream.readInt(); - getServerData().getCurrentMessageStore().readFieldsForPartition( - checkpointStream, partitionId); + getServerData().getPartitionStore().getCurrentMessageStore() + .readFieldsForPartition(checkpointStream, partitionId); } List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList( http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java index 572e290..0bea783 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java @@ -106,14 +106,15 @@ public class RequestFailureTest { private void checkResult(int numRequests) throws IOException { // Check the output Iterable<IntWritable> vertices = - serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); + serverData.getPartitionStore().getIncomingMessageStore() + .getPartitionDestinationVertices(0); int keySum = 0; int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); Iterable<IntWritable> messages = - serverData.<IntWritable>getIncomingMessageStore().getVertexMessages( - vertexId); + serverData.getPartitionStore().<IntWritable>getIncomingMessageStore() + .getVertexMessages(vertexId); synchronized (messages) { for (IntWritable message : messages) { messageSum += message.get(); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index b7bec1c..aa3916c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -178,14 +178,15 @@ public class RequestTest { // Check the output Iterable<IntWritable> vertices = - serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); + serverData.getPartitionStore().getIncomingMessageStore() + .getPartitionDestinationVertices(0); int keySum = 0; int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); Iterable<IntWritable> messages = - serverData.<IntWritable>getIncomingMessageStore().getVertexMessages( - vertexId); + serverData.getPartitionStore().<IntWritable>getIncomingMessageStore() + .getVertexMessages(vertexId); synchronized (messages) { for (IntWritable message : messages) { messageSum += message.get(); @@ -223,14 +224,15 @@ public class RequestTest { // Check the output Iterable<IntWritable> vertices = - serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); + serverData.getPartitionStore().getIncomingMessageStore() + .getPartitionDestinationVertices(0); int keySum = 0; int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); Iterable<IntWritable> messages = - serverData.<IntWritable>getIncomingMessageStore().getVertexMessages( - vertexId); + serverData.getPartitionStore().<IntWritable>getIncomingMessageStore() + .getVertexMessages(vertexId); synchronized (messages) { for (IntWritable message : messages) { messageSum += message.get(); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java index ca1031a..75edb09 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java @@ -96,6 +96,11 @@ public class AsyncMessageStoreWrapperTest { } @Override + public boolean hasMessagesForPartition(int partitionId) { + return false; + } + + @Override public void finalizeStore() { } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 78e663d..249a337 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -30,6 +30,7 @@ import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.BasicComputation; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; +import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat; import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat; import org.apache.giraph.ooc.DiskBackedPartitionStore; import org.apache.giraph.utils.InternalVertexRunner; @@ -268,7 +269,6 @@ public class TestPartitionStores { testMultiThreaded(); } - @Test public void testDiskBackedPartitionStoreMTStatic() throws Exception { GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY); @@ -431,7 +431,7 @@ public class TestPartitionStores { /** * Internal checker to verify the correctness of the tests. - * @param results the actual results obtaind + * @param results the actual results obtained * @param expected expected results */ private void checkResults(Iterable<String> results, String[] expected) { @@ -560,4 +560,59 @@ public class TestPartitionStores { } } } + + @Test + public void testOutOfCoreMessages() throws Exception { + Iterable<String> results; + String[] graph = + { "1 0 2 3", "2 0 3 5", "3 0 1 2 4", "4 0 3", "5 0 6 7 1 2", + "6 0 10 8 7", "7 0 1 3", "8 0 1 10 9 4 6", "9 0 8 1 5 7", + "10 0 9" }; + + String[] expected = + { + "1\t32", "2\t9", "3\t14", "4\t11", "5\t11", + "6\t13", "7\t20", "8\t15", "9\t18", "10\t14" + }; + + GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + GiraphConstants.USER_PARTITION_COUNT.set(conf, 10); + + File directory = Files.createTempDir(); + GiraphConstants.PARTITIONS_DIRECTORY.set(conf, + new File(directory, "giraph_partitions").toString()); + + GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + conf.setComputationClass(TestOutOfCoreMessagesComputation.class); + conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + results = InternalVertexRunner.run(conf, graph); + checkResults(results, expected); + FileUtils.deleteDirectory(directory); + } + + public static class TestOutOfCoreMessagesComputation extends + BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> { + + @Override + public void compute( + Vertex<IntWritable, IntWritable, NullWritable> vertex, + Iterable<IntWritable> messages) throws IOException { + if (getSuperstep() == 0) { + // Send id to all neighbors + sendMessageToAllEdges(vertex, vertex.getId()); + } else { + // Add received messages and halt + int sum = 0; + for (IntWritable message : messages) { + sum += message.get(); + } + vertex.setValue(new IntWritable(sum)); + vertex.voteToHalt(); + } + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java index 5d8d478..0a1da49 100644 --- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java +++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java @@ -23,6 +23,7 @@ import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.ArrayListEdges; import org.apache.giraph.graph.Computation; @@ -190,10 +191,13 @@ public class MockUtils { ImmutableClassesGiraphConfiguration conf, Mapper.Context context) { CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> serviceWorker = MockUtils.mockServiceGetVertexPartitionOwner(1); + GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf, + ByteArrayMessagesPerVertexStore.newFactory(serviceWorker, conf) + .getClass()); + ServerData<IntWritable, IntWritable, IntWritable> serverData = new ServerData<IntWritable, IntWritable, IntWritable>( - serviceWorker, conf, ByteArrayMessagesPerVertexStore.newFactory( - serviceWorker, conf), context); + serviceWorker, conf, context); // Here we add a partition to simulate the case that there is one partition. serverData.getPartitionStore().addPartition(new SimplePartition()); return serverData; http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java index dd0fe13..ad9ba6f 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java @@ -19,9 +19,11 @@ package org.apache.giraph; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.combiner.DoubleSumMessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.examples.GeneratedVertexReader; +import org.apache.giraph.examples.SimplePageRankComputation; import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat; import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat; import org.apache.giraph.graph.BasicComputation; @@ -37,6 +39,7 @@ import org.junit.Test; import java.io.IOException; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -51,19 +54,6 @@ public class TestOutOfCore extends BspCase { super(TestOutOfCore.class.getName()); } - public static class EmptyComputation extends BasicComputation< - LongWritable, DoubleWritable, FloatWritable, DoubleWritable> { - - @Override - public void compute( - Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, - Iterable<DoubleWritable> messages) throws IOException { - if (getSuperstep() > 5) { - vertex.voteToHalt(); - } - } - } - public static class TestMemoryEstimator implements MemoryEstimator { private DiskBackedPartitionStore partitionStore; @Override @@ -100,9 +90,13 @@ public class TestOutOfCore extends BspCase { public void testOutOfCore() throws IOException, InterruptedException, ClassNotFoundException { GiraphConfiguration conf = new GiraphConfiguration(); - conf.setComputationClass(EmptyComputation.class); + conf.setComputationClass(SimplePageRankComputation.class); conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class); conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class); + conf.setWorkerContextClass( + SimplePageRankComputation.SimplePageRankWorkerContext.class); + conf.setMasterComputeClass( + SimplePageRankComputation.SimplePageRankMasterCompute.class); GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS); GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR @@ -115,7 +109,21 @@ public class TestOutOfCore extends BspCase { GiraphJob job = prepareJob(getCallingMethodName(), conf, getTempPath(getCallingMethodName())); // Overwrite the number of vertices set in BspCase - GeneratedVertexReader.READER_VERTICES.set(conf, 400); + GeneratedVertexReader.READER_VERTICES.set(conf, 200); assertTrue(job.run(true)); + if (!runningInDistributedMode()) { + double maxPageRank = + SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMax(); + double minPageRank = + SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMin(); + long numVertices = + SimplePageRankComputation.SimplePageRankWorkerContext.getFinalSum(); + System.out.println(getCallingMethodName() + ": maxPageRank=" + + maxPageRank + " minPageRank=" + + minPageRank + " numVertices=" + numVertices); + assertEquals(13591.5, maxPageRank, 0.01); + assertEquals(9.375e-5, minPageRank, 0.000000001); + assertEquals(8 * 200L, numVertices); + } } }
