http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java index 6c08dfd..784d578 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java @@ -20,6 +20,7 @@ package org.apache.giraph.ooc.data; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.giraph.bsp.BspService; import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.worker.BspServiceWorker; import org.apache.log4j.Logger; @@ -27,7 +28,6 @@ import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -59,11 +59,11 @@ public class MetaPartitionManager { /** Number of in-memory partitions */ private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0); - /** Map of partitions to their meta information */ + /** Map (dictionary) of partitions to their meta information */ private final ConcurrentMap<Integer, MetaPartition> partitions = Maps.newConcurrentMap(); - /** List of partitions assigned to each IO threads */ - private final List<PerThreadPartitionStatus> perThreadPartitions; + /** Reverse dictionaries of partitions assigned to each IO thread */ + private final List<MetaPartitionDictionary> perThreadPartitionDictionary; /** For each IO thread, set of partition ids that are on-disk and have * 'large enough' vertex/edge buffers to be offloaded on disk */ @@ -95,11 +95,11 @@ public class MetaPartitionManager { * @param oocEngine out-of-core engine */ public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) { - perThreadPartitions = new ArrayList<>(numIOThreads); + perThreadPartitionDictionary = new ArrayList<>(numIOThreads); perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads); perThreadMessageBuffers = new ArrayList<>(numIOThreads); for (int i = 0; i < numIOThreads; ++i) { - perThreadPartitions.add(new PerThreadPartitionStatus()); + perThreadPartitionDictionary.add(new MetaPartitionDictionary()); perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet()); perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet()); } @@ -156,11 +156,7 @@ public class MetaPartitionManager { if (temp == null) { int ownerThread = oocEngine.getIOScheduler() .getOwnerThreadId(partitionId); - Set<MetaPartition> partitionSet = - perThreadPartitions.get(ownerThread).getInMemoryProcessed(); - synchronized (partitionSet) { - partitionSet.add(meta); - } + perThreadPartitionDictionary.get(ownerThread).addPartition(meta); numInMemoryPartitions.getAndIncrement(); } } @@ -173,6 +169,8 @@ public class MetaPartitionManager { */ public void removePartition(Integer partitionId) { MetaPartition meta = partitions.remove(partitionId); + int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + perThreadPartitionDictionary.get(ownerThread).removePartition(meta); checkState(!meta.isOnDisk()); numInMemoryPartitions.getAndDecrement(); } @@ -216,20 +214,46 @@ public class MetaPartitionManager { * @return id of the partition to offload on disk */ public Integer getOffloadPartitionId(int threadId) { - Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId) - .getInMemoryProcessed(); - synchronized (partitionSet) { - MetaPartition meta = peekFromSet(partitionSet); - if (meta != null) { - return meta.getPartitionId(); - } - } - partitionSet = perThreadPartitions.get(threadId).getInMemoryUnprocessed(); - synchronized (partitionSet) { - MetaPartition meta = peekFromSet(partitionSet); - if (meta != null) { - return meta.getPartitionId(); - } + MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + StorageState.IN_MEM, + StorageState.IN_MEM, + null); + if (meta != null) { + return meta.getPartitionId(); + } + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + StorageState.IN_MEM, + null, + null); + if (meta != null) { + return meta.getPartitionId(); + } + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + null, + StorageState.IN_MEM, + null); + if (meta != null) { + return meta.getPartitionId(); + } + + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + StorageState.IN_MEM, + null, + null); + if (meta != null) { + return meta.getPartitionId(); + } + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + null, + StorageState.IN_MEM, + null); + if (meta != null) { + return meta.getPartitionId(); } return null; } @@ -241,8 +265,7 @@ public class MetaPartitionManager { * @return id of the partition to offload its vertex/edge buffers on disk */ public Integer getOffloadPartitionBufferId(int threadId) { - if (oocEngine.getServiceWorker().getSuperstep() == - BspServiceWorker.INPUT_SUPERSTEP) { + if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { Integer partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId)); if (partitionId == null) { @@ -270,8 +293,7 @@ public class MetaPartitionManager { * @return id of the partition to offload its message buffer on disk */ public Integer getOffloadMessageBufferId(int threadId) { - if (oocEngine.getServiceWorker().getSuperstep() != - BspServiceWorker.INPUT_SUPERSTEP) { + if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) { Integer partitionId = popFromSet(perThreadMessageBuffers.get(threadId)); if (partitionId == null) { @@ -297,53 +319,115 @@ public class MetaPartitionManager { * @return id of the partition to offload its message on disk */ public Integer getOffloadMessageId(int threadId) { - Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId) - .getInDiskProcessed(); - synchronized (partitionSet) { - for (MetaPartition meta : partitionSet) { - if (meta.getIncomingMessagesState() == StorageState.IN_MEM) { - return meta.getPartitionId(); - } - } - } - partitionSet = perThreadPartitions.get(threadId).getInDiskUnprocessed(); - synchronized (partitionSet) { - for (MetaPartition meta : partitionSet) { - if (meta.getIncomingMessagesState() == StorageState.IN_MEM) { - return meta.getPartitionId(); - } - } + if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) { + return null; + } + MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + StorageState.ON_DISK, + null, + StorageState.IN_MEM); + if (meta != null) { + return meta.getPartitionId(); + } + + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + StorageState.ON_DISK, + null, + StorageState.IN_MEM); + if (meta != null) { + return meta.getPartitionId(); } return null; } /** - * Get id of a partition to prefetch its data to memory + * Get id of a partition to load its data to memory * * @param threadId id of the thread who is going to load the partition data * @return id of the partition to load its data to memory */ - public Integer getPrefetchPartitionId(int threadId) { - Set<MetaPartition> partitionSet = - perThreadPartitions.get(threadId).getInDiskUnprocessed(); - synchronized (partitionSet) { - MetaPartition meta = peekFromSet(partitionSet); - return (meta != null) ? meta.getPartitionId() : null; + public Integer getLoadPartitionId(int threadId) { + MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + StorageState.IN_MEM, + StorageState.ON_DISK, + null); + if (meta != null) { + return meta.getPartitionId(); + } + + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + StorageState.ON_DISK, + null, + null); + if (meta != null) { + return meta.getPartitionId(); + } + + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + StorageState.ON_DISK, + null, + null); + if (meta != null) { + meta.getPartitionId(); + } + + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + null, + StorageState.ON_DISK, + null); + if (meta != null) { + meta.getPartitionId(); } + return null; } /** - * Mark a partition inaccessible to IO and compute threads + * Mark a partition as being 'IN_PROCESS' * * @param partitionId id of the partition to mark */ - public void makePartitionInaccessible(int partitionId) { + public void markPartitionAsInProcess(int partitionId) { MetaPartition meta = partitions.get(partitionId); - perThreadPartitions.get(oocEngine.getIOScheduler() - .getOwnerThreadId(partitionId)) - .remove(meta); + int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); synchronized (meta) { + perThreadPartitionDictionary.get(ownerThread).removePartition(meta); meta.setProcessingState(ProcessingState.IN_PROCESS); + perThreadPartitionDictionary.get(ownerThread).addPartition(meta); + } + } + + /** + * Whether there is any processed partition stored in memory (excluding those + * that are prefetched to execute in the next superstep). + * + * @return true iff there is any processed partition in memory + */ + public boolean hasProcessedOnMemory() { + for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) { + if (dictionary.hasProcessedOnMemory()) { + return true; + } + } + return false; + } + + /** + * Whether a partition is *processed* in the current iteration cycle over + * partitions. + * + * @param partitionId id of the partition to check + * @return true iff processing the given partition is done + */ + public boolean isPartitionProcessed(Integer partitionId) { + MetaPartition meta = partitions.get(partitionId); + synchronized (meta) { + return meta.getProcessingState() == ProcessingState.PROCESSED; } } @@ -354,14 +438,11 @@ public class MetaPartitionManager { */ public void setPartitionIsProcessed(int partitionId) { MetaPartition meta = partitions.get(partitionId); + int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); synchronized (meta) { + perThreadPartitionDictionary.get(ownerThread).removePartition(meta); meta.setProcessingState(ProcessingState.PROCESSED); - } - Set<MetaPartition> partitionSet = perThreadPartitions - .get(oocEngine.getIOScheduler().getOwnerThreadId(partitionId)) - .getInMemoryProcessed(); - synchronized (partitionSet) { - partitionSet.add(meta); + perThreadPartitionDictionary.get(ownerThread).addPartition(meta); } numPartitionsProcessed.getAndIncrement(); } @@ -378,7 +459,7 @@ public class MetaPartitionManager { MetaPartition meta = partitions.get(partitionId); synchronized (meta) { boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK; - if (superstep == oocEngine.getServiceWorker().getSuperstep()) { + if (superstep == oocEngine.getSuperstep()) { shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK; } else { shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK; @@ -391,25 +472,27 @@ public class MetaPartitionManager { * Notify this meta store that load of a partition for a specific superstep * is completed * - * @param partitionId id of a the partition that load is completed + * @param partitionId id of the partition for which the load is completed * @param superstep superstep in which the partition is loaded for */ public void doneLoadingPartition(int partitionId, long superstep) { MetaPartition meta = partitions.get(partitionId); numInMemoryPartitions.getAndIncrement(); int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); - boolean removed = perThreadPartitions.get(owner) - .remove(meta, StorageState.ON_DISK); - if (removed || meta.getProcessingState() == ProcessingState.IN_PROCESS) { - synchronized (meta) { - meta.setPartitionState(StorageState.IN_MEM); - if (superstep == oocEngine.getServiceWorker().getSuperstep()) { - meta.setCurrentMessagesState(StorageState.IN_MEM); - } else { - meta.setIncomingMessagesState(StorageState.IN_MEM); - } + synchronized (meta) { + perThreadPartitionDictionary.get(owner).removePartition(meta); + meta.setPartitionState(StorageState.IN_MEM); + if (superstep == oocEngine.getSuperstep()) { + meta.setCurrentMessagesState(StorageState.IN_MEM); + } else { + meta.setIncomingMessagesState(StorageState.IN_MEM); + } + // Check whether load was to prefetch a partition from disk to memory for + // the next superstep + if (meta.getProcessingState() == ProcessingState.PROCESSED) { + perThreadPartitionDictionary.get(owner).increaseNumPrefetch(); } - perThreadPartitions.get(owner).add(meta, StorageState.IN_MEM); + perThreadPartitionDictionary.get(owner).addPartition(meta); } } @@ -422,9 +505,13 @@ public class MetaPartitionManager { */ public boolean startOffloadingMessages(int partitionId) { MetaPartition meta = partitions.get(partitionId); + int ownerThread = + oocEngine.getIOScheduler().getOwnerThreadId(partitionId); synchronized (meta) { if (meta.getIncomingMessagesState() == StorageState.IN_MEM) { + perThreadPartitionDictionary.get(ownerThread).removePartition(meta); meta.setIncomingMessagesState(StorageState.IN_TRANSIT); + perThreadPartitionDictionary.get(ownerThread).addPartition(meta); return true; } else { return false; @@ -441,8 +528,12 @@ public class MetaPartitionManager { */ public void doneOffloadingMessages(int partitionId) { MetaPartition meta = partitions.get(partitionId); + int ownerThread = + oocEngine.getIOScheduler().getOwnerThreadId(partitionId); synchronized (meta) { + perThreadPartitionDictionary.get(ownerThread).removePartition(meta); meta.setIncomingMessagesState(StorageState.ON_DISK); + perThreadPartitionDictionary.get(ownerThread).addPartition(meta); } } @@ -478,17 +569,18 @@ public class MetaPartitionManager { public boolean startOffloadingPartition(int partitionId) { MetaPartition meta = partitions.get(partitionId); int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); - boolean removed = perThreadPartitions.get(owner) - .remove(meta, StorageState.IN_MEM); - if (removed) { - synchronized (meta) { + synchronized (meta) { + if (meta.getProcessingState() != ProcessingState.IN_PROCESS && + (meta.getPartitionState() == StorageState.IN_MEM || + meta.getCurrentMessagesState() == StorageState.IN_MEM)) { + perThreadPartitionDictionary.get(owner).removePartition(meta); meta.setPartitionState(StorageState.IN_TRANSIT); meta.setCurrentMessagesState(StorageState.IN_TRANSIT); + perThreadPartitionDictionary.get(owner).addPartition(meta); + return true; + } else { + return false; } - perThreadPartitions.get(owner).add(meta, StorageState.IN_TRANSIT); - return true; - } else { - return false; } } @@ -502,14 +594,11 @@ public class MetaPartitionManager { numInMemoryPartitions.getAndDecrement(); MetaPartition meta = partitions.get(partitionId); int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); - boolean removed = perThreadPartitions.get(owner) - .remove(meta, StorageState.IN_TRANSIT); - if (removed) { - synchronized (meta) { - meta.setPartitionState(StorageState.ON_DISK); - meta.setCurrentMessagesState(StorageState.ON_DISK); - } - perThreadPartitions.get(owner).add(meta, StorageState.ON_DISK); + synchronized (meta) { + perThreadPartitionDictionary.get(owner).removePartition(meta); + meta.setPartitionState(StorageState.ON_DISK); + meta.setCurrentMessagesState(StorageState.ON_DISK); + perThreadPartitionDictionary.get(owner).addPartition(meta); } } @@ -517,15 +606,17 @@ public class MetaPartitionManager { * Reset the meta store for a new iteration cycle over all partitions. * Note: this is not thread-safe and should be called from a single thread. */ - public void resetPartition() { + public void resetPartitions() { for (MetaPartition meta : partitions.values()) { + int owner = + oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId()); + perThreadPartitionDictionary.get(owner).removePartition(meta); meta.resetPartition(); + perThreadPartitionDictionary.get(owner).addPartition(meta); } - int numPartition = 0; - for (PerThreadPartitionStatus status : perThreadPartitions) { - numPartition += status.reset(); + for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) { + dictionary.reset(); } - checkState(numPartition == partitions.size()); numPartitionsProcessed.set(0); } @@ -535,41 +626,22 @@ public class MetaPartitionManager { */ public void resetMessages() { for (MetaPartition meta : partitions.values()) { + int owner = + oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId()); + perThreadPartitionDictionary.get(owner).removePartition(meta); meta.resetMessages(); - } - // After swapping incoming messages and current messages, it may be the case - // that a partition has data in memory (partitionState == IN_MEM), but now - // its current messages are on disk (currentMessageState == ON_DISK). So, we - // have to mark the partition as ON_DISK, and load its messages once it is - // about to be processed. - for (PerThreadPartitionStatus status : perThreadPartitions) { - Set<MetaPartition> partitionSet = status.getInMemoryUnprocessed(); - Iterator<MetaPartition> it = partitionSet.iterator(); - while (it.hasNext()) { - MetaPartition meta = it.next(); - if (meta.getCurrentMessagesState() == StorageState.ON_DISK) { - it.remove(); - status.getInDiskUnprocessed().add(meta); - numInMemoryPartitions.getAndDecrement(); - } - } - partitionSet = status.getInMemoryProcessed(); - it = partitionSet.iterator(); - while (it.hasNext()) { - MetaPartition meta = it.next(); - if (meta.getCurrentMessagesState() == StorageState.ON_DISK) { - it.remove(); - status.getInDiskProcessed().add(meta); - numInMemoryPartitions.getAndDecrement(); - } + if (meta.getPartitionState() == StorageState.IN_MEM && + meta.getCurrentMessagesState() == StorageState.ON_DISK) { + numInMemoryPartitions.getAndDecrement(); } + perThreadPartitionDictionary.get(owner).addPartition(meta); } } /** * Return the id of an unprocessed partition in memory. If all partitions are * processed, return an appropriate 'finisher signal'. If there are - * unprocessed partitions, but none are is memory, return null. + * unprocessed partitions, but none are in memory, return null. * * @return id of the partition to be processed next. */ @@ -577,20 +649,40 @@ public class MetaPartitionManager { if (numPartitionsProcessed.get() >= partitions.size()) { return NO_PARTITION_TO_PROCESS; } - int numThreads = perThreadPartitions.size(); + int numThreads = perThreadPartitionDictionary.size(); int index = randomGenerator.nextInt(numThreads); int startIndex = index; + MetaPartition meta; do { - Set<MetaPartition> partitionSet = - perThreadPartitions.get(index).getInMemoryUnprocessed(); - MetaPartition meta; - synchronized (partitionSet) { - meta = popFromSet(partitionSet); - } - if (meta != null) { - synchronized (meta) { - meta.setProcessingState(ProcessingState.IN_PROCESS); - return meta.getPartitionId(); + // We first look up a partition in the reverse dictionary. If there is a + // partition with the given properties, we then check whether we can + // return it as the next partition to process. If we cannot, there may + // still be other partitions in the dictionary, so we will continue + // looping through all of them. If all the partitions with our desired + // properties has been examined, we will break the loop. + while (true) { + meta = perThreadPartitionDictionary.get(index).lookup( + ProcessingState.UNPROCESSED, + StorageState.IN_MEM, + StorageState.IN_MEM, + null); + if (meta != null) { + // Here we should check if the 'meta' still has the same property as + // when it was looked up in the dictionary. There may be a case where + // meta changes from the time it is looked up until the moment the + // synchronize block is granted to progress. + synchronized (meta) { + if (meta.getProcessingState() == ProcessingState.UNPROCESSED && + meta.getPartitionState() == StorageState.IN_MEM && + meta.getCurrentMessagesState() == StorageState.IN_MEM) { + perThreadPartitionDictionary.get(index).removePartition(meta); + meta.setProcessingState(ProcessingState.IN_PROCESS); + perThreadPartitionDictionary.get(index).addPartition(meta); + return meta.getPartitionId(); + } + } + } else { + break; } } index = (index + 1) % numThreads; @@ -607,7 +699,9 @@ public class MetaPartitionManager { */ public boolean isPartitionOnDisk(int partitionId) { MetaPartition meta = partitions.get(partitionId); - return meta.isOnDisk(); + synchronized (meta) { + return meta.isOnDisk(); + } } /** @@ -623,18 +717,19 @@ public class MetaPartitionManager { /** Storage state of partition data */ private StorageState partitionState; /** Processing state of a partition */ - private volatile ProcessingState processingState; + private ProcessingState processingState; /** * Constructor * * @param partitionId id of the partition */ - MetaPartition(int partitionId) { + public MetaPartition(int partitionId) { this.partitionId = partitionId; - this.processingState = ProcessingState.PROCESSED; + this.processingState = ProcessingState.UNPROCESSED; this.partitionState = StorageState.IN_MEM; this.currentMessagesState = StorageState.IN_MEM; + this.incomingMessagesState = StorageState.IN_MEM; } @Override @@ -649,65 +744,30 @@ public class MetaPartitionManager { return sb.toString(); } - /** - * Get id of the partition - * - * @return id of the partition - */ public int getPartitionId() { return partitionId; } - /** - * Get storage state of incoming messages of the partition - * - * @return storage state of incoming messages - */ public StorageState getIncomingMessagesState() { return incomingMessagesState; } - /** - * Set storage state of incoming messages of the partition - * - * @param incomingMessagesState storage state of incoming messages - */ public void setIncomingMessagesState(StorageState incomingMessagesState) { this.incomingMessagesState = incomingMessagesState; } - /** - * Get storage state of current messages of the partition - * - * @return storage state of current messages - */ public StorageState getCurrentMessagesState() { return currentMessagesState; } - /** - * Set storage state of current messages of the partition - * - * @param currentMessagesState storage state of current messages - */ public void setCurrentMessagesState(StorageState currentMessagesState) { this.currentMessagesState = currentMessagesState; } - /** - * Get storage state of the partition - * - * @return storage state of the partition - */ public StorageState getPartitionState() { return partitionState; } - /** - * Set storage state of the partition - * - * @param state storage state of the partition - */ public void setPartitionState(StorageState state) { this.partitionState = state; } @@ -748,200 +808,167 @@ public class MetaPartitionManager { } /** - * Representation of partitions' state per IO thread + * Class representing reverse dictionary for partitions. The main operation + * of the reverse dictionary is to lookup for a partition with certain + * properties. The responsibility of keeping the dictionary consistent + * when partition property changes in on the code that changes the property. + * One can simply remove a partition from the dictionary, change the property + * (or properties), and then add the partition to the dictionary. */ - private static class PerThreadPartitionStatus { + private static class MetaPartitionDictionary { /** - * Contains partitions that has been processed in the current iteration - * cycle, and are not in use by any thread. + * Sets of partitions for each possible combination of properties. Each + * partition can have 4 properties, and each property can have any of 3 + * different values. The properties are as follows (in the order in which + * it is used as the dimensions of the following 4-D array): + * - processing status (PROCESSED, UN_PROCESSED, or IN_PROCESS) + * - partition storage status (IN_MEM, IN_TRANSIT, ON_DISK) + * - current messages storage status (IN_MEM, IN_TRANSIT, ON_DISK) + * - incoming messages storage status (IN_MEM, IN_TRANSIT, ON_DISK) */ - private Map<StorageState, Set<MetaPartition>> - processedPartitions = Maps.newConcurrentMap(); + private final Set<MetaPartition>[][][][] partitions = + (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3]; /** - * Contains partitions that has *NOT* been processed in the current - * iteration cycle, and are not in use by any thread. + * Number of partitions that has been prefetched to be computed in the + * next superstep */ - private Map<StorageState, Set<MetaPartition>> - unprocessedPartitions = Maps.newConcurrentMap(); + private final AtomicInteger numPrefetch = new AtomicInteger(0); /** * Constructor */ - public PerThreadPartitionStatus() { - processedPartitions.put(StorageState.IN_MEM, - Sets.<MetaPartition>newLinkedHashSet()); - processedPartitions.put(StorageState.ON_DISK, - Sets.<MetaPartition>newLinkedHashSet()); - processedPartitions.put(StorageState.IN_TRANSIT, - Sets.<MetaPartition>newLinkedHashSet()); - - unprocessedPartitions.put(StorageState.IN_MEM, - Sets.<MetaPartition>newLinkedHashSet()); - unprocessedPartitions.put(StorageState.ON_DISK, - Sets.<MetaPartition>newLinkedHashSet()); - unprocessedPartitions.put(StorageState.IN_TRANSIT, - Sets.<MetaPartition>newLinkedHashSet()); - } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("\nProcessed Partitions: " + processedPartitions + "; "); - sb.append("\nUnprocessedPartitions: " + unprocessedPartitions); - return sb.toString(); - } - - /** - * Get set of partitions that are in memory and are processed - * - * @return set of partition that are in memory are are processed - */ - public Set<MetaPartition> getInMemoryProcessed() { - return processedPartitions.get(StorageState.IN_MEM); - } - - /** - * Get set of partitions that are in memory are are not processed - * - * @return set of partitions that are in memory and are not processed - */ - public Set<MetaPartition> getInMemoryUnprocessed() { - return unprocessedPartitions.get(StorageState.IN_MEM); + public MetaPartitionDictionary() { + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < 3; ++j) { + for (int k = 0; k < 3; ++k) { + for (int t = 0; t < 3; ++t) { + partitions[i][j][k][t] = Sets.newLinkedHashSet(); + } + } + } + } } /** - * Get set of partitions that are on disk and are processed + * Get a partition set associated with property combination that a given + * partition has * - * @return set of partitions that are on disk and are processed + * @param meta meta partition containing properties of a partition + * @return partition set with the same property combination as the given + * meta partition */ - public Set<MetaPartition> getInDiskProcessed() { - return processedPartitions.get(StorageState.ON_DISK); + private Set<MetaPartition> getSet(MetaPartition meta) { + return partitions[meta.getProcessingState().ordinal()] + [meta.getPartitionState().ordinal()] + [meta.getCurrentMessagesState().ordinal()] + [meta.getIncomingMessagesState().ordinal()]; } /** - * Get set of partitions that are on disk and are not processed + * Add a partition to the dictionary * - * @return set of partitions that are on disk and are not processed + * @param meta meta information of the partition to add */ - public Set<MetaPartition> getInDiskUnprocessed() { - return unprocessedPartitions.get(StorageState.ON_DISK); + public void addPartition(MetaPartition meta) { + Set<MetaPartition> partitionSet = getSet(meta); + synchronized (partitionSet) { + partitionSet.add(meta); + } } /** - * Remove a partition from meta information + * Remove a partition to the dictionary * - * @param meta meta-information of a partition to be removed + * @param meta meta infomation of the partition to remove */ - public void remove(MetaPartition meta) { - Set<MetaPartition> partitionSet; - partitionSet = processedPartitions.get(StorageState.IN_MEM); - synchronized (partitionSet) { - if (partitionSet.remove(meta)) { - return; - } - } - partitionSet = unprocessedPartitions.get(StorageState.IN_MEM); - synchronized (partitionSet) { - if (partitionSet.remove(meta)) { - return; - } - } - partitionSet = processedPartitions.get(StorageState.IN_TRANSIT); - synchronized (partitionSet) { - if (partitionSet.remove(meta)) { - return; - } - } - partitionSet = unprocessedPartitions.get(StorageState.IN_TRANSIT); - synchronized (partitionSet) { - if (partitionSet.remove(meta)) { - return; - } - } - partitionSet = processedPartitions.get(StorageState.ON_DISK); - synchronized (partitionSet) { - if (partitionSet.remove(meta)) { - return; - } - } - partitionSet = unprocessedPartitions.get(StorageState.ON_DISK); + public void removePartition(MetaPartition meta) { + Set<MetaPartition> partitionSet = getSet(meta); synchronized (partitionSet) { partitionSet.remove(meta); } } /** - * Reset meta-information for the next iteration cycle over all partitions + * Lookup for a partition with given properties. One can use wildcard as + * a property in lookup operation (by passing null as the property). * - * @return total number of partitions kept for this thread + * @param processingState processing state property + * @param partitionStorageState partition storage property + * @param currentMessagesState current messages storage property + * @param incomingMessagesState incoming messages storage property + * @return a meta partition in the dictionary with the given combination of + * properties. If there is no such partition, return null */ - public int reset() { - checkState(unprocessedPartitions.get(StorageState.IN_MEM).size() == 0); - checkState(unprocessedPartitions.get(StorageState.IN_TRANSIT).size() == - 0); - checkState(unprocessedPartitions.get(StorageState.ON_DISK).size() == 0); - unprocessedPartitions.clear(); - unprocessedPartitions.putAll(processedPartitions); - processedPartitions.clear(); - processedPartitions.put(StorageState.IN_MEM, - Sets.<MetaPartition>newLinkedHashSet()); - processedPartitions.put(StorageState.ON_DISK, - Sets.<MetaPartition>newLinkedHashSet()); - processedPartitions.put(StorageState.IN_TRANSIT, - Sets.<MetaPartition>newLinkedHashSet()); - return unprocessedPartitions.get(StorageState.IN_MEM).size() + - unprocessedPartitions.get(StorageState.IN_TRANSIT).size() + - unprocessedPartitions.get(StorageState.ON_DISK).size(); + public MetaPartition lookup(ProcessingState processingState, + StorageState partitionStorageState, + StorageState currentMessagesState, + StorageState incomingMessagesState) { + int iStart = + (processingState == null) ? 0 : processingState.ordinal(); + int iEnd = + (processingState == null) ? 3 : (processingState.ordinal() + 1); + int jStart = + (partitionStorageState == null) ? 0 : partitionStorageState.ordinal(); + int jEnd = (partitionStorageState == null) ? 3 : + (partitionStorageState.ordinal() + 1); + int kStart = + (currentMessagesState == null) ? 0 : currentMessagesState.ordinal(); + int kEnd = (currentMessagesState == null) ? 3 : + (currentMessagesState.ordinal() + 1); + int tStart = + (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal(); + int tEnd = (incomingMessagesState == null) ? 3 : + (incomingMessagesState.ordinal() + 1); + for (int i = iStart; i < iEnd; ++i) { + for (int j = jStart; j < jEnd; ++j) { + for (int k = kStart; k < kEnd; ++k) { + for (int t = tStart; t < tEnd; ++t) { + Set<MetaPartition> partitionSet = partitions[i][j][k][t]; + synchronized (partitionSet) { + MetaPartition meta = peekFromSet(partitionSet); + if (meta != null) { + return meta; + } + } + } + } + } + } + return null; } /** - * Remove a partition from partition set of a given state + * Whether there is an in-memory partition that is processed already, + * excluding those partitions that are prefetched * - * @param meta meta partition to remove - * @param state state from which the partition should be removed - * @return true iff the partition is actually removed + * @return true if there is a processed in-memory partition */ - public boolean remove(MetaPartition meta, StorageState state) { - boolean removed = false; - Set<MetaPartition> partitionSet = null; - if (meta.getProcessingState() == ProcessingState.UNPROCESSED) { - partitionSet = unprocessedPartitions.get(state); - } else if (meta.getProcessingState() == ProcessingState.PROCESSED) { - partitionSet = processedPartitions.get(state); - } else { - LOG.info("remove: partition " + meta.getPartitionId() + " is " + - "already being processed! This should happen only if partition " + - "removal is done before start of an iteration over all partitions"); - } - if (partitionSet != null) { - synchronized (partitionSet) { - removed = partitionSet.remove(meta); + public boolean hasProcessedOnMemory() { + int count = 0; + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < 3; ++j) { + Set<MetaPartition> partitionSet = + partitions[ProcessingState.PROCESSED.ordinal()] + [StorageState.IN_MEM.ordinal()][i][j]; + synchronized (partitionSet) { + count += partitionSet.size(); + } } } - return removed; + return count - numPrefetch.get() != 0; + } + + /** Increase number of prefetch-ed partition by 1 */ + public void increaseNumPrefetch() { + numPrefetch.getAndIncrement(); } /** - * Add a partition to partition set of a given state - * - * @param meta meta partition to add - * @param state state to which the partition should be added + * Reset the dictionary preparing it for the next iteration cycle over + * partitions */ - public void add(MetaPartition meta, StorageState state) { - Set<MetaPartition> partitionSet = null; - if (meta.getProcessingState() == ProcessingState.UNPROCESSED) { - partitionSet = unprocessedPartitions.get(state); - } else if (meta.getProcessingState() == ProcessingState.PROCESSED) { - partitionSet = processedPartitions.get(state); - } else { - LOG.info("add: partition " + meta.getPartitionId() + " is already " + - "being processed!"); - } - if (partitionSet != null) { - synchronized (partitionSet) { - partitionSet.add(meta); - } - } + public void reset() { + numPrefetch.set(0); } } }
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java index 7d97e51..325850c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java @@ -181,18 +181,22 @@ public abstract class OutOfCoreDataManager<T> { /** * Loads and assembles all data for a given partition, and put it into the - * data store. + * data store. Returns the number of bytes transferred from disk to memory in + * the loading process. * * @param partitionId id of the partition to load ana assemble all data for * @param basePath path to load the data from + * @return number of bytes loaded from disk to memory * @throws IOException */ - public void loadPartitionData(int partitionId, String basePath) + public long loadPartitionData(int partitionId, String basePath) throws IOException { + long numBytes = 0; ReadWriteLock rwLock = getPartitionLock(partitionId); rwLock.writeLock().lock(); if (hasPartitionDataOnDisk.contains(partitionId)) { - loadInMemoryPartitionData(partitionId, getPath(basePath, partitionId)); + numBytes += loadInMemoryPartitionData(partitionId, + getPath(basePath, partitionId)); hasPartitionDataOnDisk.remove(partitionId); // Loading raw data buffers from disk if there is any and applying those // to already loaded in-memory data. @@ -213,6 +217,7 @@ public abstract class OutOfCoreDataManager<T> { addEntryToImMemoryPartitionData(partitionId, entry); } dis.close(); + numBytes += file.length(); checkState(file.delete(), "loadPartitionData: failed to delete %s.", file.getAbsoluteFile()); } @@ -225,38 +230,44 @@ public abstract class OutOfCoreDataManager<T> { } } rwLock.writeLock().unlock(); + return numBytes; } /** - * Offloads partition data of a given partition in the data store to disk + * Offloads partition data of a given partition in the data store to disk, and + * returns the number of bytes offloaded from memory to disk. * * @param partitionId id of the partition to offload its data * @param basePath path to offload the data to + * @return number of bytes offloaded from memory to disk * @throws IOException */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( "UL_UNRELEASED_LOCK_EXCEPTION_PATH") - public void offloadPartitionData(int partitionId, String basePath) + public long offloadPartitionData(int partitionId, String basePath) throws IOException { ReadWriteLock rwLock = getPartitionLock(partitionId); rwLock.writeLock().lock(); hasPartitionDataOnDisk.add(partitionId); rwLock.writeLock().unlock(); - offloadInMemoryPartitionData(partitionId, getPath(basePath, partitionId)); + return offloadInMemoryPartitionData(partitionId, + getPath(basePath, partitionId)); } /** - * Offloads raw data buffers of a given partition to disk + * Offloads raw data buffers of a given partition to disk, and returns the + * number of bytes offloaded from memory to disk. * * @param partitionId id of the partition to offload its raw data buffers * @param basePath path to offload the data to + * @return number of bytes offloaded from memory to disk * @throws IOException */ - public void offloadBuffers(int partitionId, String basePath) + public long offloadBuffers(int partitionId, String basePath) throws IOException { Pair<Integer, List<T>> pair = dataBuffers.get(partitionId); if (pair == null || pair.getLeft() < minBufferSizeToOffload) { - return; + return 0; } ReadWriteLock rwLock = getPartitionLock(partitionId); rwLock.writeLock().lock(); @@ -272,6 +283,7 @@ public abstract class OutOfCoreDataManager<T> { writeEntry(entry, dos); } dos.close(); + long numBytes = dos.size(); int numBuffers = pair.getRight().size(); Integer oldNumBuffersOnDisk = numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers); @@ -279,6 +291,7 @@ public abstract class OutOfCoreDataManager<T> { numDataBuffersOnDisk.replace(partitionId, oldNumBuffersOnDisk + numBuffers); } + return numBytes; } /** @@ -345,24 +358,27 @@ public abstract class OutOfCoreDataManager<T> { protected abstract T readNextEntry(DataInput in) throws IOException; /** - * Loads data of a partition into data store. + * Loads data of a partition into data store. Returns number of bytes loaded. * * @param partitionId id of the partition to load its data * @param path path from which data should be loaded + * @return number of bytes loaded from disk to memory * @throws IOException */ - protected abstract void loadInMemoryPartitionData(int partitionId, + protected abstract long loadInMemoryPartitionData(int partitionId, String path) throws IOException; /** - * Offloads data of a partition in data store to disk. + * Offloads data of a partition in data store to disk. Returns the number of + * bytes offloaded to disk * * @param partitionId id of the partition to offload to disk * @param path path to which data should be offloaded + * @return number of bytes offloaded from memory to disk * @throws IOException */ - protected abstract void offloadInMemoryPartitionData(int partitionId, + protected abstract long offloadInMemoryPartitionData(int partitionId, String path) throws IOException; http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java index eb6d2c9..e84ad29 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java @@ -27,10 +27,32 @@ import java.io.IOException; * out-of-core mechanism. */ public abstract class IOCommand { + /** Type of IO command */ + public enum IOCommandType { + /** Loading a partition */ + LOAD_PARTITION, + /** Storing a partition */ + STORE_PARTITION, + /** Storing incoming messages of a partition */ + STORE_MESSAGE, + /** + * Storing message/buffer raw data buffer of a currently out-of-core + * partition + */ + STORE_BUFFER, + /** Doing nothing regarding IO */ + WAIT + } + /** Id of the partition involved for the IO */ protected final int partitionId; /** Out-of-core engine */ protected final OutOfCoreEngine oocEngine; + /** + * Number of bytes transferred to/from memory (loaded/stored) during the + * execution of the command + */ + protected long numBytesTransferred; /** * Constructor @@ -41,10 +63,11 @@ public abstract class IOCommand { public IOCommand(OutOfCoreEngine oocEngine, int partitionId) { this.oocEngine = oocEngine; this.partitionId = partitionId; + this.numBytesTransferred = 0; } /** - * GEt the id of the partition involved in the IO + * Get the id of the partition involved in the IO * * @return id of the partition */ @@ -54,12 +77,30 @@ public abstract class IOCommand { /** * Execute (load/store of data) the IO command, and change the data stores - * appropriately based on the data loaded/stored. + * appropriately based on the data loaded/stored. Return true iff the command + * is actually executed (resulted in loading or storing data). * * @param basePath the base path (prefix) to the files/folders IO command * should read/write data from/to + * @return whether the command is actually executed * @throws IOException */ - public abstract void execute(String basePath) throws IOException; + public abstract boolean execute(String basePath) throws IOException; + + /** + * Get the type of the command. + * + * @return type of the command + */ + public abstract IOCommandType getType(); + + /** + * Get the number of bytes transferred (loaded/stored from/to disk). + * + * @return number of bytes transferred during the execution of the command + */ + public long bytesTransferred() { + return numBytesTransferred; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java index c28a0da..ce24fe2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java @@ -54,19 +54,22 @@ public class LoadPartitionIOCommand extends IOCommand { } @Override - public void execute(String basePath) throws IOException { + public boolean execute(String basePath) throws IOException { + boolean executed = false; if (oocEngine.getMetaPartitionManager() .startLoadingPartition(partitionId, superstep)) { - long currentSuperstep = oocEngine.getServiceWorker().getSuperstep(); + long currentSuperstep = oocEngine.getSuperstep(); DiskBackedPartitionStore partitionStore = (DiskBackedPartitionStore) oocEngine.getServerData().getPartitionStore(); - partitionStore.loadPartitionData(partitionId, basePath); + numBytesTransferred += + partitionStore.loadPartitionData(partitionId, basePath); if (currentSuperstep == BspService.INPUT_SUPERSTEP && superstep == currentSuperstep) { DiskBackedEdgeStore edgeStore = (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); - edgeStore.loadPartitionData(partitionId, basePath); + numBytesTransferred += + edgeStore.loadPartitionData(partitionId, basePath); } MessageStore messageStore; if (currentSuperstep == superstep) { @@ -76,12 +79,19 @@ public class LoadPartitionIOCommand extends IOCommand { messageStore = oocEngine.getServerData().getIncomingMessageStore(); } if (messageStore != null) { - ((DiskBackedMessageStore) messageStore) + numBytesTransferred += ((DiskBackedMessageStore) messageStore) .loadPartitionData(partitionId, basePath); } oocEngine.getMetaPartitionManager() .doneLoadingPartition(partitionId, superstep); + executed = true; } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.LOAD_PARTITION; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java index 41a0682..f1769dd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java @@ -54,7 +54,8 @@ public class StoreDataBufferIOCommand extends IOCommand { } @Override - public void execute(String basePath) throws IOException { + public boolean execute(String basePath) throws IOException { + boolean executed = false; if (oocEngine.getMetaPartitionManager() .startOffloadingBuffer(partitionId)) { switch (type) { @@ -62,23 +63,32 @@ public class StoreDataBufferIOCommand extends IOCommand { DiskBackedPartitionStore partitionStore = (DiskBackedPartitionStore) oocEngine.getServerData().getPartitionStore(); - partitionStore.offloadBuffers(partitionId, basePath); + numBytesTransferred += + partitionStore.offloadBuffers(partitionId, basePath); DiskBackedEdgeStore edgeStore = (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); - edgeStore.offloadBuffers(partitionId, basePath); + numBytesTransferred += edgeStore.offloadBuffers(partitionId, basePath); break; case MESSAGE: DiskBackedMessageStore messageStore = (DiskBackedMessageStore) oocEngine.getServerData().getIncomingMessageStore(); - messageStore.offloadBuffers(partitionId, basePath); + numBytesTransferred += + messageStore.offloadBuffers(partitionId, basePath); break; default: throw new IllegalStateException("execute: requested data buffer type " + "does not exist!"); } oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId); + executed = true; } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.STORE_BUFFER; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java index 9c1c0a2..c9d8829 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java @@ -41,16 +41,25 @@ public class StoreIncomingMessageIOCommand extends IOCommand { } @Override - public void execute(String basePath) throws IOException { + public boolean execute(String basePath) throws IOException { + boolean executed = false; if (oocEngine.getMetaPartitionManager() .startOffloadingMessages(partitionId)) { DiskBackedMessageStore messageStore = (DiskBackedMessageStore) oocEngine.getServerData().getIncomingMessageStore(); checkState(messageStore != null); - messageStore.offloadPartitionData(partitionId, basePath); + numBytesTransferred += + messageStore.offloadPartitionData(partitionId, basePath); oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId); + executed = true; } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.STORE_MESSAGE; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java index 77955dc..797ac9d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java @@ -44,29 +44,38 @@ public class StorePartitionIOCommand extends IOCommand { } @Override - public void execute(String basePath) throws IOException { + public boolean execute(String basePath) throws IOException { + boolean executed = false; if (oocEngine.getMetaPartitionManager() .startOffloadingPartition(partitionId)) { DiskBackedPartitionStore partitionStore = (DiskBackedPartitionStore) oocEngine.getServerData().getPartitionStore(); - partitionStore.offloadPartitionData(partitionId, basePath); - if (oocEngine.getServiceWorker().getSuperstep() != - BspService.INPUT_SUPERSTEP) { + numBytesTransferred += + partitionStore.offloadPartitionData(partitionId, basePath); + if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) { MessageStore messageStore = oocEngine.getServerData().getCurrentMessageStore(); if (messageStore != null) { - ((DiskBackedMessageStore) messageStore) + numBytesTransferred += ((DiskBackedMessageStore) messageStore) .offloadPartitionData(partitionId, basePath); } } else { DiskBackedEdgeStore edgeStore = (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); - edgeStore.offloadPartitionData(partitionId, basePath); + numBytesTransferred += + edgeStore.offloadPartitionData(partitionId, basePath); } oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId); + executed = true; } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.STORE_PARTITION; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java index b6e0546..74e72eb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java @@ -42,13 +42,19 @@ public class WaitIOCommand extends IOCommand { } @Override - public void execute(String basePath) throws IOException { + public boolean execute(String basePath) throws IOException { try { TimeUnit.MILLISECONDS.sleep(waitDuration); } catch (InterruptedException e) { throw new IllegalStateException("execute: caught InterruptedException " + "while IO thread is waiting!"); } + return true; + } + + @Override + public IOCommandType getType() { + return IOCommandType.WAIT; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java index 09c1bdf..43a92a8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java @@ -38,6 +38,12 @@ public final class AdjustableSemaphore extends Semaphore { maxPermits = permits; } + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "UG_SYNC_SET_UNSYNC_GET") + public int getMaxPermits() { + return maxPermits; + } + /** * Adjusts the maximum number of available permits. * http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index d29e46d..bf48ea8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -75,6 +75,7 @@ import org.apache.giraph.metrics.GiraphTimerContext; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.metrics.WorkerSuperstepMetrics; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionExchange; import org.apache.giraph.partition.PartitionOwner; @@ -213,6 +214,10 @@ public class BspServiceWorker<I extends WritableComparable, workerClient = new NettyWorkerClient<I, V, E>(context, conf, this, graphTaskManager.createUncaughtExceptionHandler()); workerServer.setFlowControl(workerClient.getFlowControl()); + OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine(); + if (oocEngine != null) { + oocEngine.setFlowControl(workerClient.getFlowControl()); + } workerAggregatorRequestProcessor = new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this); http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index b7f1eb6..87583ed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -27,6 +27,7 @@ import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.InputType; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; import org.apache.hadoop.io.Writable; @@ -154,7 +155,16 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, long inputSplitEdgesLoaded = 0; long inputSplitEdgesFiltered = 0; + int count = 0; + OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine(); while (edgeReader.nextEdge()) { + // If out-of-core mechanism is used, check whether this thread + // can stay active or it should temporarily suspend and stop + // processing and generating more data for the moment. + if (oocEngine != null && + (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) { + oocEngine.activeThreadCheckIn(); + } I sourceId = edgeReader.getCurrentSourceId(); Edge<I, E> readerEdge = edgeReader.getCurrentEdge(); if (sourceId == null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index 92b23bd..40a3bb0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -23,6 +23,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.concurrent.Callable; +import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -33,6 +34,7 @@ import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphMetricsRegistry; import org.apache.giraph.metrics.MeterDesc; import org.apache.giraph.metrics.MetricNames; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; @@ -78,8 +80,9 @@ public abstract class InputSplitsCallable<I extends WritableComparable, private final long startNanos = TIME.getNanoseconds(); /** Whether to prioritize local input splits. */ private final boolean useLocality; + /** Service worker */ + private final CentralizedServiceWorker<I, V, E> serviceWorker; - // CHECKSTYLE: stop ParameterNumberCheck /** * Constructor. * @@ -101,8 +104,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable, this.useLocality = configuration.useInputSplitLocality(); this.splitsHandler = splitsHandler; this.configuration = configuration; + this.serviceWorker = bspServiceWorker; } - // CHECKSTYLE: resume ParameterNumberCheck /** * Get input format @@ -208,13 +211,26 @@ public abstract class InputSplitsCallable<I extends WritableComparable, byte[] serializedInputSplit; int inputSplitsProcessed = 0; try { + OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine(); + if (oocEngine != null) { + oocEngine.processingThreadStart(); + } while ((serializedInputSplit = splitsHandler.reserveInputSplit(getInputType())) != null) { + // If out-of-core mechanism is used, check whether this thread + // can stay active or it should temporarily suspend and stop + // processing and generating more data for the moment. + if (oocEngine != null) { + oocEngine.activeThreadCheckIn(); + } vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount( loadInputSplit(serializedInputSplit)); context.progress(); ++inputSplitsProcessed; } + if (oocEngine != null) { + oocEngine.processingThreadFinish(); + } } catch (InterruptedException e) { throw new IllegalStateException("call: InterruptedException", e); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index 540a6b4..a3f1300 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -31,6 +31,7 @@ import org.apache.giraph.io.VertexReader; import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.io.InputType; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; @@ -168,7 +169,16 @@ public class VertexInputSplitsCallable<I extends WritableComparable, long edgesSinceLastUpdate = 0; long inputSplitEdgesLoaded = 0; + int count = 0; + OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine(); while (vertexReader.nextVertex()) { + // If out-of-core mechanism is used, check whether this thread + // can stay active or it should temporarily suspend and stop + // processing and generating more data for the moment. + if (oocEngine != null && + (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) { + oocEngine.activeThreadCheckIn(); + } Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex(); if (readerVertex.getId() == null) { throw new IllegalArgumentException( http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 7893940..3bb35eb 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.ServerData; +import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -279,13 +280,14 @@ public class TestPartitionStores { GiraphConstants.STATIC_GRAPH.set(conf, true); testMultiThreaded(); } -/* + @Test public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception { GiraphConstants.STATIC_GRAPH.set(conf, true); + NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true); testMultiThreaded(); } -*/ + private void testMultiThreaded() throws Exception { final AtomicInteger vertexCounter = new AtomicInteger(0); ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS); http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java index 6fdfc75..397605d 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java @@ -18,6 +18,7 @@ package org.apache.giraph; +import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.examples.GeneratedVertexReader; @@ -62,12 +63,13 @@ public class TestOutOfCore extends BspCase { SimplePageRankComputation.SimplePageRankWorkerContext.class); conf.setMasterComputeClass( SimplePageRankComputation.SimplePageRankMasterCompute.class); + GiraphConstants.METRICS_ENABLE.set(conf, true); GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS); GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); + NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true); GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY); GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8); GiraphConstants.NUM_INPUT_THREADS.set(conf, 8); - GiraphConstants.NUM_OOC_THREADS.set(conf, 4); GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8); GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2"); GiraphJob job = prepareJob(getCallingMethodName(), conf,
