[GIRAPH-1022] Adaptive out-of-core mechanism for input superstep and graph Summary: This code adds the ability to adaptively control the out-of-core mechanism for graph data structure at run-time during input/output superstep and computation superstep. Basically, the implemented mechanism monitors the amount of available free memory in a separate thread. If there is not enough memory, the code adjusts the number of partitions in memory, and spills a series of partitions/buffers to disk. Also, if the amount of free memory is more than expected, some of the on-disk partitions are brought back to memory. Additionally, if amount of free memory is marginal, the mechanism mocks the memory usage by gradually bringing partitions to memory.
Test Plan: mvn clean verify Unit tests added to giraph-core End-to-end test added to giraph-example Running the code on PageRank on a large graph and not getting OOM failures. Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis Reviewed By: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D40563 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/03ade425 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/03ade425 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/03ade425 Branch: refs/heads/trunk Commit: 03ade425dd5a65d3a713d5e7d85aa7605956fbd2 Parents: 1ca3222 Author: Hassan Eslami <[email protected]> Authored: Mon Jul 27 11:59:21 2015 -0700 Committer: Avery Ching <[email protected]> Committed: Mon Jul 27 12:20:38 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + findbugs-exclude.xml | 8 - .../giraph/bsp/CentralizedServiceWorker.java | 7 + .../java/org/apache/giraph/comm/ServerData.java | 15 +- .../long_id/LongAbstractListMessageStore.java | 18 +- .../comm/requests/SendWorkerEdgesRequest.java | 4 +- .../requests/SendWorkerVerticesRequest.java | 16 +- .../apache/giraph/conf/GiraphConfiguration.java | 4 + .../org/apache/giraph/conf/GiraphConstants.java | 36 +- .../apache/giraph/edge/AbstractEdgeStore.java | 74 +- .../java/org/apache/giraph/edge/EdgeStore.java | 39 + .../org/apache/giraph/edge/SimpleEdgeStore.java | 15 + .../giraph/edge/primitives/IntEdgeStore.java | 15 + .../giraph/edge/primitives/LongEdgeStore.java | 14 + .../apache/giraph/graph/ComputeCallable.java | 36 +- .../apache/giraph/graph/GraphTaskManager.java | 13 +- .../giraph/ooc/AdaptiveOutOfCoreEngine.java | 270 +++ .../apache/giraph/ooc/CheckMemoryCallable.java | 466 +++++ .../giraph/ooc/DiskBackedPartitionStore.java | 1769 ++++++++++++++++++ .../apache/giraph/ooc/JVMMemoryEstimator.java | 45 + .../org/apache/giraph/ooc/MemoryEstimator.java | 44 + .../org/apache/giraph/ooc/OutOfCoreEngine.java | 43 + .../giraph/ooc/OutOfCoreProcessorCallable.java | 145 ++ .../org/apache/giraph/ooc/package-info.java | 21 + .../partition/DiskBackedPartitionStore.java | 1300 ------------- .../apache/giraph/partition/PartitionStore.java | 132 +- .../giraph/partition/SimplePartitionStore.java | 117 +- .../apache/giraph/worker/BspServiceWorker.java | 61 +- .../org/apache/giraph/comm/RequestTest.java | 2 +- .../TestIntFloatPrimitiveMessageStores.java | 4 +- .../TestLongDoublePrimitiveMessageStores.java | 4 +- .../giraph/partition/TestPartitionStores.java | 250 +-- .../java/org/apache/giraph/TestOutOfCore.java | 121 ++ 33 files changed, 3492 insertions(+), 1618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index fd31545..c844f61 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Giraph Change Log Release 1.2.0 - unreleased ======= + GIRAPH-1022: Out-of-core mechanism for input superstep and graph data (heslami via aching) + GIRAPH-1021: Missing progress report for graph mutations. (heslami via aching) GIRAPH-1020: TaskInfo equality condition. (heslami via aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index afdf041..0ab2c73 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -89,14 +89,6 @@ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/> </Match> <Match> - <Class name="org.apache.giraph.partition.DiskBackedPartitionStore$AddPartition"/> - <Bug pattern="UL_UNRELEASED_LOCK"/> - </Match> - <Match> - <Class name="org.apache.giraph.partition.DiskBackedPartitionStore$GetPartition"/> - <Bug pattern="UL_UNRELEASED_LOCK"/> - </Match> - <Match> <Class name="~org.apache.giraph.function.primitive.PrimitiveRefs.*Ref"/> <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/> </Match> http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index 37aed45..f6d77d0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -245,4 +245,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable, * previous superstep. */ GlobalStats getGlobalStats(); + + /** + * Get the number of partitions owned by this worker + * + * @return number of partitions owned + */ + int getNumPartitionsOwned(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index 8269998..eddfbc6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -36,12 +36,10 @@ import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.edge.EdgeStore; -import org.apache.giraph.edge.EdgeStoreFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.graph.VertexResolver; -import org.apache.giraph.partition.DiskBackedPartitionStore; +import org.apache.giraph.ooc.DiskBackedPartitionStore; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStore; import org.apache.giraph.partition.SimplePartitionStore; @@ -67,8 +65,6 @@ public class ServerData<I extends WritableComparable, private final ImmutableClassesGiraphConfiguration<I, V, E> conf; /** Partition store for this worker. */ private volatile PartitionStore<I, V, E> partitionStore; - /** Edge store for this worker. */ - private final EdgeStore<I, V, E> edgeStore; /** Message store factory */ private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>> messageStoreFactory; @@ -144,20 +140,13 @@ public class ServerData<I extends WritableComparable, getServiceWorker()); } else { partitionStore = - new SimplePartitionStore<I, V, E>(conf, context); + new SimplePartitionStore<I, V, E>(conf, context, getServiceWorker()); } - EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); - edgeStoreFactory.initialize(service, conf, context); - edgeStore = edgeStoreFactory.newStore(); ownerAggregatorData = new OwnerAggregatorServerData(context); allAggregatorData = new AllAggregatorServerData(context, conf); this.context = context; } - public EdgeStore<I, V, E> getEdgeStore() { - return edgeStore; - } - /** * Return the partition store for this worker. * http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java index ae61de4..d1c33be 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java @@ -78,13 +78,19 @@ public abstract class LongAbstractListMessageStore<M extends Writable, */ private void populateMap() { // TODO - can parallelize? // populate with vertex ids already known - for (int partitionId : service.getPartitionStore().getPartitionIds()) { - Partition<LongWritable, ?, ?> partition = service.getPartitionStore() - .getOrCreatePartition(partitionId); - Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId); - for (Vertex<LongWritable, ?, ?> vertex : partition) { - partitionMap.put(vertex.getId().get(), createList()); + service.getPartitionStore().startIteration(); + while (true) { + Partition partition = service.getPartitionStore().getNextPartition(); + if (partition == null) { + break; + } + Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId()); + for (Object obj : partition) { + Vertex vertex = (Vertex) obj; + LongWritable vertexId = (LongWritable) vertex.getId(); + partitionMap.put(vertexId.get(), createList()); } + service.getPartitionStore().putPartition(partition); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java index 510743f..aeda197 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java @@ -69,8 +69,8 @@ public class SendWorkerEdgesRequest<I extends WritableComparable, iterator = partitionVertexData.getIterator(); while (iterator.hasNext()) { iterator.next(); - serverData.getEdgeStore(). - addPartitionEdges(iterator.getCurrentFirst(), + serverData.getPartitionStore() + .addPartitionEdges(iterator.getCurrentFirst(), iterator.getCurrentSecond()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java index fb93dae..fde6e7f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java @@ -22,13 +22,11 @@ import org.apache.giraph.comm.ServerData; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.PairList; -import org.apache.giraph.utils.VertexIterator; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.giraph.partition.PartitionStore; -import org.apache.giraph.partition.Partition; import org.apache.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -105,15 +103,9 @@ public class SendWorkerVerticesRequest<I extends WritableComparable, iterator = workerPartitions.getIterator(); while (iterator.hasNext()) { iterator.next(); - VertexIterator<I, V, E> vertexIterator = - new VertexIterator<I, V, E>( - iterator.getCurrentSecond(), getConf()); - - Partition<I, V, E> partition; - PartitionStore store = serverData.getPartitionStore(); - partition = store.getOrCreatePartition(iterator.getCurrentFirst()); - partition.addPartitionVertices(vertexIterator); - store.putPartition(partition); + serverData.getPartitionStore() + .addPartitionVertices(iterator.getCurrentFirst(), + iterator.getCurrentSecond()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index a395244..e6931de 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -936,6 +936,10 @@ public class GiraphConfiguration extends Configuration return NUM_COMPUTE_THREADS.get(this); } + public int getNumOocThreads() { + return NUM_OOC_THREADS.get(this); + } + /** * Set the number of input split threads * http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 72d913d..2804192 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -71,6 +71,8 @@ import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; +import org.apache.giraph.ooc.JVMMemoryEstimator; +import org.apache.giraph.ooc.MemoryEstimator; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.HashPartitionerFactory; import org.apache.giraph.partition.Partition; @@ -89,6 +91,8 @@ import org.apache.hadoop.mapreduce.OutputFormat; public interface GiraphConstants { /** 1KB in bytes */ int ONE_KB = 1024; + /** 1MB in bytes */ + int ONE_MB = 1024 * 1024; /** Mapping related information */ ClassConfOption<MappingStore> MAPPING_STORE_CLASS = @@ -967,20 +971,32 @@ public interface GiraphConstants { new BooleanConfOption("giraph.useOutOfCoreGraph", false, "Enable out-of-core graph."); - /** Directory to write YourKit snapshots to */ - String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir"; - /** Default directory to write YourKit snapshots to */ - String YOURKIT_OUTPUT_DIR_DEFAULT = "/tmp/giraph/%JOB_ID%/%TASK_ID%"; + /** + * Memory estimator class used in adaptive out-of-core mechanism for deciding + * when data should go to disk. + */ + ClassConfOption<MemoryEstimator> OUT_OF_CORE_MEM_ESTIMATOR = + ClassConfOption.create("giraph.outOfCoreMemoryEstimator", + JVMMemoryEstimator.class, MemoryEstimator.class, + "Memory estimator class used for out-of-core decisions"); + + /** Number of threads participating in swapping graph/messages to disk. */ + IntConfOption NUM_OOC_THREADS = + new IntConfOption("giraph.numOutOfCoreThreads", 1, + "Number of threads participating in swapping data to disk."); /** Maximum number of partitions to hold in memory for each worker. */ IntConfOption MAX_PARTITIONS_IN_MEMORY = - new IntConfOption("giraph.maxPartitionsInMemory", 10, - "Maximum number of partitions to hold in memory for each worker."); + new IntConfOption("giraph.maxPartitionsInMemory", 0, + "Maximum number of partitions to hold in memory for each worker. By" + + " default it is set to 0 (for adaptive out-of-core mechanism"); - /** Set number of sticky partitions if sticky mode is enabled. */ - IntConfOption MAX_STICKY_PARTITIONS = - new IntConfOption("giraph.stickyPartitions", 0, - "Set number of sticky partitions if sticky mode is enabled."); + + + /** Directory to write YourKit snapshots to */ + String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir"; + /** Default directory to write YourKit snapshots to */ + String YOURKIT_OUTPUT_DIR_DEFAULT = "/tmp/giraph/%JOB_ID%/%TASK_ID%"; /** Keep the zookeeper output for debugging? Default is to remove it. */ BooleanConfOption KEEP_ZOOKEEPER_DATA = http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java index 5d15707..9609047 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java @@ -34,10 +34,11 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; @@ -130,6 +131,23 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, protected abstract OutEdges<I, E> getPartitionEdges(Et entry); /** + * Writes the given key to the output + * + * @param key input key to be written + * @param output output to write the key to + */ + protected abstract void writeVertexKey(K key, DataOutput output) + throws IOException; + + /** + * Reads the given key from the input + * + * @param input input to read the key from + * @return Key read from the input + */ + protected abstract K readVertexKey(DataInput input) throws IOException; + + /** * Get iterator for partition edges * * @param partitionEdges map of out-edges for vertices in a partition @@ -138,6 +156,40 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, protected abstract Iterator<Et> getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges); + @Override + public boolean hasPartitionEdges(int partitionId) { + return transientEdges.containsKey(partitionId); + } + + @Override + public void writePartitionEdgeStore(int partitionId, DataOutput output) + throws IOException { + Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId); + output.writeInt(edges.size()); + for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) { + writeVertexKey(edge.getKey(), output); + edge.getValue().write(output); + } + } + + @Override + public void readPartitionEdgeStore(int partitionId, DataInput input) + throws IOException { + if (transientEdges.containsKey(partitionId)) { + throw new IllegalStateException("readPartitionEdgeStore: reading a " + + "partition that is already there in the partition store " + + "(impossible)"); + } + Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId); + int numEntries = input.readInt(); + for (int i = 0; i < numEntries; ++i) { + K vertexKey = readVertexKey(input); + OutEdges<I, E> edges = configuration.createAndInitializeInputOutEdges(); + edges.readFields(input); + partitionEdges.put(vertexKey, edges); + } + } + /** * Get out-edges for a given vertex * @@ -199,9 +251,7 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, LOG.info("moveEdgesToVertices: Moving incoming edges to vertices."); } - final BlockingQueue<Integer> partitionIdQueue = - new ArrayBlockingQueue<>(transientEdges.size()); - partitionIdQueue.addAll(transientEdges.keySet()); + service.getPartitionStore().startIteration(); int numThreads = configuration.getNumInputSplitsThreads(); CallableFactory<Void> callableFactory = new CallableFactory<Void>() { @@ -212,11 +262,19 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, public Void call() throws Exception { Integer partitionId; I representativeVertexId = configuration.createVertexId(); - while ((partitionId = partitionIdQueue.poll()) != null) { + while (true) { Partition<I, V, E> partition = - service.getPartitionStore().getOrCreatePartition(partitionId); + service.getPartitionStore().getNextPartition(); + if (partition == null) { + break; + } Map<K, OutEdges<I, E>> partitionEdges = - transientEdges.remove(partitionId); + transientEdges.remove(partition.getId()); + if (partitionEdges == null) { + service.getPartitionStore().putPartition(partition); + continue; + } + Iterator<Et> iterator = getPartitionEdgesIterator(partitionEdges); // process all vertices in given partition http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java index 912e25c..1c9d85f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java @@ -22,6 +22,10 @@ import org.apache.giraph.utils.VertexIdEdges; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + /** * Collects incoming edges for vertices owned by this worker. * @@ -45,4 +49,39 @@ public interface EdgeStore<I extends WritableComparable, * Note: this method is not thread-safe. */ void moveEdgesToVertices(); + + /** + * Whether the store contains edges for the given partition. + * Note: This method is thread-safe + * + * @param partitionId Partition id under query + * @return true if the store has any edge for the given partition, false + * otherwise + */ + boolean hasPartitionEdges(int partitionId); + + /** + * Deserialize the edges of a given partition, and removes the associated data + * from the store. + * Note: This method is not thread-safe (i.e. should not be called for the + * same partition at the same time). + * + * @param partitionId Id of partition to deserialize + * @param output Output to write the edge store to + */ + void writePartitionEdgeStore(int partitionId, DataOutput output) + throws IOException; + + /** + * Serialize the edges of a given partition, and adds it to the partition + * store (assumes that the edge store does not have any edge from the + * partition already). + * Note: This method is not thread-safe (i.e. should not be called for the + * same partition at the same time). + * + * @param partitionId Id of partition to serialize + * @param input Input to read the partition from + */ + void readPartitionEdgeStore(int partitionId, DataInput input) + throws IOException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java index 3eb97d6..19ddc07 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java @@ -27,6 +27,9 @@ import org.apache.hadoop.util.Progressable; import com.google.common.collect.MapMaker; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -93,6 +96,18 @@ public class SimpleEdgeStore<I extends WritableComparable, } @Override + protected void writeVertexKey(I key, DataOutput output) throws IOException { + key.write(output); + } + + @Override + protected I readVertexKey(DataInput input) throws IOException { + I id = configuration.createVertexId(); + id.readFields(input); + return id; + } + + @Override protected Iterator<Map.Entry<I, OutEdges<I, E>>> getPartitionEdgesIterator(Map<I, OutEdges<I, E>> partitionEdges) { return partitionEdges.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java index b138f49..253c68c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java @@ -31,6 +31,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Iterator; import java.util.Map; @@ -81,6 +84,18 @@ public class IntEdgeStore<V extends Writable, E extends Writable> } @Override + protected void writeVertexKey(Integer key, DataOutput output) + throws IOException { + output.writeInt(key); + } + + @Override + protected Integer readVertexKey(DataInput input) + throws IOException { + return input.readInt(); + } + + @Override protected Iterator<Int2ObjectMap.Entry<OutEdges<IntWritable, E>>> getPartitionEdgesIterator( Map<Integer, OutEdges<IntWritable, E>> partitionEdges) { http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java index 61f908a..db3ebe5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java @@ -31,6 +31,9 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Iterator; import java.util.Map; @@ -82,6 +85,17 @@ public class LongEdgeStore<V extends Writable, E extends Writable> } @Override + protected void writeVertexKey(Long key, DataOutput output) + throws IOException { + output.writeLong(key); + } + + @Override + protected Long readVertexKey(DataInput input) throws IOException { + return input.readLong(); + } + + @Override protected Iterator<Long2ObjectMap.Entry<OutEdges<LongWritable, E>>> getPartitionEdgesIterator( Map<Long, OutEdges<LongWritable, E>> partitionEdges) { http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index e44a794..923e427 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -20,7 +20,6 @@ package org.apache.giraph.graph; import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import org.apache.giraph.bsp.CentralizedServiceWorker; @@ -34,6 +33,7 @@ import org.apache.giraph.metrics.MetricNames; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStats; +import org.apache.giraph.partition.PartitionStore; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; @@ -61,9 +61,9 @@ import com.yammer.metrics.core.Histogram; * when using the out-of-core graph partition store. We should only load on * demand. * - * @param <I> Vertex index value - * @param <V> Vertex value - * @param <E> Edge value + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value * @param <M1> Incoming message type * @param <M2> Outgoing message type */ @@ -80,8 +80,6 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, private final Mapper<?, ?, ?, ?>.Context context; /** Graph state */ private final GraphState graphState; - /** Thread-safe queue of all partition ids */ - private final BlockingQueue<Integer> partitionIdQueue; /** Message store */ private final MessageStore<I, M1> messageStore; /** Configuration */ @@ -105,23 +103,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, /** * Constructor - * * @param context Context * @param graphState Current graph state (use to create own graph state) * @param messageStore Message store - * @param partitionIdQueue Queue of partition ids (thread-safe) * @param configuration Configuration * @param serviceWorker Service worker */ - public ComputeCallable( - Mapper<?, ?, ?, ?>.Context context, GraphState graphState, - MessageStore<I, M1> messageStore, - BlockingQueue<Integer> partitionIdQueue, + public ComputeCallable(Mapper<?, ?, ?, ?>.Context context, + GraphState graphState, MessageStore<I, M1> messageStore, ImmutableClassesGiraphConfiguration<I, V, E> configuration, CentralizedServiceWorker<I, V, E> serviceWorker) { this.context = context; this.configuration = configuration; - this.partitionIdQueue = partitionIdQueue; this.messageStore = messageStore; this.serviceWorker = serviceWorker; this.graphState = graphState; @@ -155,16 +148,14 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, computation.preSuperstep(); List<PartitionStats> partitionStatsList = Lists.newArrayList(); - while (!partitionIdQueue.isEmpty()) { - Integer partitionId = partitionIdQueue.poll(); - if (partitionId == null) { + PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore(); + while (true) { + long startTime = System.currentTimeMillis(); + Partition<I, V, E> partition = partitionStore.getNextPartition(); + if (partition == null) { break; } - long startTime = System.currentTimeMillis(); - Partition<I, V, E> partition = - serviceWorker.getPartitionStore().getOrCreatePartition(partitionId); - try { serviceWorker.getServerData().resolvePartitionMutation(partition); PartitionStats partitionStats = @@ -179,7 +170,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, messageBytesSentCounter.inc(partitionMsgBytes); timedLogger.info("call: Completed " + partitionStatsList.size() + " partitions, " + - partitionIdQueue.size() + " remaining " + + partitionStore.getNumPartitions() + " remaining " + MemoryUtils.getRuntimeMemoryStats()); } catch (IOException e) { throw new IllegalStateException("call: Caught unexpected IOException," + @@ -188,9 +179,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, throw new IllegalStateException("call: Caught unexpected " + "InterruptedException, failing.", e); } finally { - serviceWorker.getPartitionStore().putPartition(partition); + partitionStore.putPartition(partition); } - histogramComputePerPartition.update( System.currentTimeMillis() - startTime); } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 5c80297..0844858 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -25,8 +25,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Enumeration; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -346,7 +344,7 @@ end[PURE_YARN]*/ // execute the current superstep if (numPartitions > 0) { processGraphPartitions(context, partitionStatsList, graphState, - messageStore, numPartitions, numThreads); + messageStore, numThreads); } finishedSuperstepStats = completeSuperstepAndCollectStats( partitionStatsList, superstepTimerContext); @@ -723,26 +721,22 @@ end[PURE_YARN]*/ * @param partitionStatsList to pick up this superstep's processing stats * @param graphState the BSP graph state * @param messageStore the messages to be processed in this superstep - * @param numPartitions the number of data partitions (vertices) to process * @param numThreads number of concurrent threads to do processing */ private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context, List<PartitionStats> partitionStatsList, final GraphState graphState, final MessageStore<I, Writable> messageStore, - int numPartitions, int numThreads) { - final BlockingQueue<Integer> computePartitionIdQueue = - new ArrayBlockingQueue<Integer>(numPartitions); - long verticesToCompute = 0; PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore(); + long verticesToCompute = 0; for (Integer partitionId : partitionStore.getPartitionIds()) { - computePartitionIdQueue.add(partitionId); verticesToCompute += partitionStore.getPartitionVertexCount(partitionId); } WorkerProgress.get().startSuperstep( serviceWorker.getSuperstep(), verticesToCompute, serviceWorker.getPartitionStore().getNumPartitions()); + partitionStore.startIteration(); GiraphTimerContext computeAllTimerContext = computeAll.time(); timeToFirstMessageTimerContext = timeToFirstMessage.time(); @@ -756,7 +750,6 @@ end[PURE_YARN]*/ context, graphState, messageStore, - computePartitionIdQueue, conf, serviceWorker); } http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java new file mode 100644 index 0000000..8d3cab6 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Adaptive out-of-core mechanism. This mechanism spawns two types of threads: + * 1) check-memory thread, which periodically monitors the amount of available + * memory and decides whether data should go on disk. This threads is + * basically the brain behind the out-of-core mechanism, commands + * "out-of-core processor threads" (type 2 thread below) to move + * appropriate data to disk, + * 2) out-of-core processor threads. This is a team of threads responsible for + * offloading appropriate data to disk. "check-memory thread" decides on + * which data should go to disk, and "out-of-core processor threads" do the + * offloading. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge data + */ +public class AdaptiveOutOfCoreEngine<I extends WritableComparable, + V extends Writable, E extends Writable> implements + OutOfCoreEngine<I, V, E> { + /** Class logger. */ + private static final Logger LOG = + Logger.getLogger(AdaptiveOutOfCoreEngine.class); + + // ---- Synchronization Variables ---- + /** Barrier to coordinate check-memory and OOC-processing threads */ + private final CyclicBarrier gate; + /** + * Signal to determine whether OOC processing threads are done processing OOC + * requests + */ + private final CyclicBarrier doneOocSignal; + /** Signal to determine whether the computation is terminated */ + private final CountDownLatch doneCompute; + /** Finisher signal to OOC processing threads */ + private volatile boolean done; + + // ---- OOC Commands ---- + /** + * List of partitions that are on disk, and their loaded *vertices* during + * INPUT_SUPERSTEP are ready to flush to disk + */ + private final BlockingQueue<Integer> partitionsWithInputVertices; + /** + * List of partitions that are on disk, and their loaded *edges* during + * INPUT_SUPERSTEP are ready to flush to disk + */ + private final BlockingQueue<Integer> partitionsWithInputEdges; + /** Number of partitions to be written to the disk */ + private final AtomicInteger numPartitionsToSpill; + + /** Executor service for check memory thread */ + private ExecutorService checkMemoryExecutor; + /** Executor service for out-of-core processor threads */ + private ExecutorService outOfCoreProcessorExecutor; + + /** Configuration */ + private ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Worker */ + private final CentralizedServiceWorker<I, V, E> serviceWorker; + + /** Cached value for number of out-of-core threads specified by user */ + private int numOocThreads; + + /** Result of check-memory thread (to be checked for graceful termination) */ + private Future<Void> checkMemoryResult; + /** + * Results of out-of-core processor threads (to be checked for graceful + * termination) + */ + private List<Future<Void>> oocProcessorResults; + + /** + * Creates an instance of adaptive mechanism + * @param conf Configuration + * @param serviceWorker Worker service + */ + public AdaptiveOutOfCoreEngine(ImmutableClassesGiraphConfiguration conf, + CentralizedServiceWorker<I, V, E> serviceWorker) { + this.conf = conf; + this.serviceWorker = serviceWorker; + + this.numOocThreads = conf.getNumOocThreads(); + this.gate = new CyclicBarrier(numOocThreads + 1); + this.doneOocSignal = new CyclicBarrier(numOocThreads + 1); + this.doneCompute = new CountDownLatch(1); + this.done = false; + this.partitionsWithInputVertices = new ArrayBlockingQueue<Integer>(100); + this.partitionsWithInputEdges = new ArrayBlockingQueue<Integer>(100); + this.numPartitionsToSpill = new AtomicInteger(0); + } + + @Override + public void initialize() { + if (LOG.isInfoEnabled()) { + LOG.info("initialize: initializing out-of-core engine"); + } + CallableFactory<Void> checkMemoryCallableFactory = + new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new CheckMemoryCallable<I, V, E>( + AdaptiveOutOfCoreEngine.this, conf, serviceWorker); + } + }; + checkMemoryExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("check-memory").build()); + checkMemoryResult = checkMemoryExecutor.submit(new LogStacktraceCallable<>( + checkMemoryCallableFactory.newCallable(0))); + + CallableFactory<Void> outOfCoreProcessorCallableFactory = + new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new OutOfCoreProcessorCallable<I, V, E>( + AdaptiveOutOfCoreEngine.this, serviceWorker); + } + }; + outOfCoreProcessorExecutor = Executors + .newFixedThreadPool(numOocThreads, + new ThreadFactoryBuilder().setNameFormat("ooc-%d").build()); + oocProcessorResults = Lists.newArrayListWithCapacity(numOocThreads); + for (int i = 0; i < numOocThreads; ++i) { + Future<Void> future = outOfCoreProcessorExecutor.submit( + new LogStacktraceCallable<>( + outOfCoreProcessorCallableFactory.newCallable(i))); + oocProcessorResults.add(future); + } + } + + @Override + public void shutdown() { + doneCompute.countDown(); + checkMemoryExecutor.shutdown(); + if (checkMemoryResult.isCancelled()) { + throw new IllegalStateException( + "shutdown: memory check thread did not " + "terminate gracefully!"); + } + outOfCoreProcessorExecutor.shutdown(); + for (int i = 0; i < numOocThreads; ++i) { + if (oocProcessorResults.get(i).isCancelled()) { + throw new IllegalStateException("shutdown: out-of-core processor " + + "thread " + i + " did not terminate gracefully."); + } + } + } + + /** + * @return the latch that signals whether the whole computation is done + */ + public CountDownLatch getDoneCompute() { + return doneCompute; + } + + /** + * @return whether the computation is done + */ + public boolean isDone() { + return done; + } + + /** + * @return list of partitions that have large enough buffers of vertices read + * in INPUT_SUPERSTEP. + */ + public BlockingQueue<Integer> getPartitionsWithInputVertices() { + return partitionsWithInputVertices; + } + + /** + * @return list of partitions that have large enough buffers of edges read + * in INPUT_SUPERSTEP. + */ + public BlockingQueue<Integer> getPartitionsWithInputEdges() { + return partitionsWithInputEdges; + } + + /** + * @return number of partitions to spill to disk + */ + public AtomicInteger getNumPartitionsToSpill() { + return numPartitionsToSpill; + } + + /** + * Wait on gate with which OOC processor threads are notified to execute + * commands provided by brain (memory-check thread). + * + * @throws BrokenBarrierException + * @throws InterruptedException + */ + public void waitOnGate() throws BrokenBarrierException, InterruptedException { + gate.await(); + } + + /** + * Reset the gate for reuse. + */ + public void resetGate() { + gate.reset(); + } + + /** + * Wait on signal from all OOC processor threads that the offloading of data + * is complete. + * + * @throws BrokenBarrierException + * @throws InterruptedException + */ + public void waitOnOocSignal() + throws BrokenBarrierException, InterruptedException { + doneOocSignal.await(); + } + + /** + * Reset the completion signal of OOC processor threads for reuse. + */ + public void resetOocSignal() { + doneOocSignal.reset(); + } + + /** + * Set the computation as done (i.e. setting the state that determines the + * whole computation is done). + */ + public void setDone() { + done = true; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java new file mode 100644 index 0000000..7f52490 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.utils.PairList; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import java.util.Stack; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Adaptive out-of-core mechanism brain. This class provides one thread per + * worker that periodically checks the free memory on the worker and compares it + * with total amount of memory given to that worker to run the job. The period + * at which the thread checks for the memory is specified by the user. Also, + * user can specify the fraction of memory where anytime free memory is less + * than that fraction of total memory, actions would be taken to free up space + * in memory (this fraction is called LOW_FREE_MEMORY_FRACTION). Also, user can + * specify another fraction of available memory where memory pressure is fair + * and some of the data on disk (if there is any) can be brought back to memory + * again (this fraction is called FAIR_FREE_MEMORY_FRACTION). + * + * In the adaptive out-of-core mechanism, if amount of free memory becomes less + * than LOW_FREE_MEMORY_FRACTION, some data are being considered as potentials + * to transfer to disk. These data can be in the following categories: + * 1) Vertex buffers read in INPUT_SUPERSTEP. These are vertex input splits + * read for a partition that is out-of-core and PartitionStore holds these + * vertex buffers in in-memory buffers (and postpone their merge with the + * actual partition until the partition is loaded back in memory). + * 2) Edge buffers read in INPUT_SUPERSTEP. These are similar buffers to + * vertex buffers, but they keep edge data in INPUT_SUPERSTEP. + * 3) Partitions. + * + * This brain prefers the first two categories in INPUT_SUPERSTEP as long as + * size of buffers are large enough that it is worth writing them to disk. In + * case where brain decides on spilling partitions to disk, the brain decides + * only on the "number of partitions" to spill to disk. It is "out-of-core + * processor threads" responsibility to find that many partitions to spill to + * disk. The number of partitions to spill is a fraction of number of partitions + * currently in memory. It is recommended that this fraction be equal to + * subtraction of LOW_FREE_MEMORY_FRACTION from FAIR_FREE_MEMORY_FRACTION. Here + * is an example to clarify on this recommendation. Assume + * LOW_FREE_MEMORY_FRACTION is 5% and FAIR_FREE_MEMORY_FRACTION is 15%. Also + * assume that the partitions are similar in their memory footprint (which is a + * valid assumption for most of the partitioning techniques). If free memory is + * a bit less than 5% of total available memory, if we offload 10% + * (15% - 5% = 10%), then the amount of free memory will increase to a bit less + * than 15% of total available memory. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public class CheckMemoryCallable<I extends WritableComparable, + V extends Writable, E extends Writable> implements Callable<Void> { + /** + * Lowest free memory fraction to start doing necessary actions to go + * out-of-core. + */ + public static final FloatConfOption LOW_FREE_MEMORY_FRACTION = + new FloatConfOption("giraph.lowFreeMemoryFraction", 0.1f, + "If free memory fraction goes below this value, GC is called " + + "manually and necessary actions are taken if we have to go " + + "out-of-core"); + /** + * Expected memory fraction to achieve after detecting that the job is running + * low in memory. Basically, this memory fraction is the target to achieve + * once we decide to offload data on disk. + */ + public static final FloatConfOption MID_FREE_MEMORY_FRACTION = + new FloatConfOption("giraph.midFreeMemoryFraction", 0.15f, + "Once out-of-core mechanism decides to offload data on disk, it " + + "offloads data on disk until free memory fraction reaches this " + + "fraction."); + /** + * Memory fraction at which the job gets the best performance considering the + * choice of GC strategy. It means, if the amount of free memory is more than + * this fraction we will not see severe amount of GC calls. + */ + public static final FloatConfOption FAIR_FREE_MEMORY_FRACTION = + new FloatConfOption("giraph.fairFreeMemoryFraction", 0.3f, + "The fraction of free memory at which the job shows the best GC " + + "performance. This fraction might be dependent on GC strategy " + + "used in running the job, but generally 0.3 is a reasonable " + + "fraction for most strategies."); + /** + * Memory fraction at which the job has enough space so we can back off from + * the last out-of-core decision, i.e. lazily bringing the last bunch of data + * spilled to disk. + */ + public static final FloatConfOption HIGH_FREE_MEMORY_FRACTION = + new FloatConfOption("giraph.highFreeMemoryFraction", 0.4f, + "Once free memory reaches at this fraction, last out-of-core " + + "decision is lazily rolled back, i.e. we back off from " + + "out-of-core."); + /** Time interval at which checking memory is done periodically. */ + public static final IntConfOption CHECK_MEMORY_INTERVAL = + new IntConfOption("giraph.checkMemoryInterval", 5000, + "Time interval (in milliseconds) at which checking memory is done" + + " to decide if there should be any out-of-core action."); + /** Coefficient by which the number of partitions in memory changes. */ + public static final FloatConfOption OOC_GRAPH_MODIFICATION_COEFFICIENT = + new FloatConfOption("giraph.graphPartitionModificationCoefficient", 0.3f, + "If we decide to go out-of-core or back-off from out-of-core, this " + + "is the multiplier by which the number of in-memory partitions" + + "will change."); + + /** Class logger */ + private static final Logger LOG = Logger.getLogger(CheckMemoryCallable.class); + + /** Worker */ + private final CentralizedServiceWorker<I, V, E> serviceWorker; + /** Partition store */ + private final DiskBackedPartitionStore<I, V, E> partitionStore; + + // ---- Cached Config Values ---- + /** Cached value of LOW_FREE_MEMORY_FRACTION */ + private float lowFreeMemoryFraction; + /** Cached value for MID_FREE_MEMORY_FRACTION */ + private float midFreeMemoryFraction; + /** Cached value of FAIR_FREE_MEMORY_FRACTION */ + private float fairFreeMemoryFraction; + /** Cached value for HIGH_FREE_MEMORY_FRACTION */ + private float highFreeMemoryFraction; + /** Cached value of CHECK_MEMORY_INTERVAL */ + private int checkInterval; + /** Cached value for OOC_GRAPH_MODIFICATION_COEFFICIENT */ + private float modificationCoefficient; + + /** List of counts of number of partitions every time we shrink the store */ + private Stack<Integer> oocPartitionCounts; + /** Memory estimator instance */ + private final MemoryEstimator memoryEstimator; + /** Adaptive out-of-core engine */ + private final AdaptiveOutOfCoreEngine<I, V, E> oocEngine; + + /** + * Constructor for check-memory thread. + * + * @param oocEngine out-of-core engine + * @param conf job configuration + * @param serviceWorker worker service + */ + public CheckMemoryCallable(AdaptiveOutOfCoreEngine<I, V, E> oocEngine, + ImmutableClassesGiraphConfiguration<I, V, E> conf, + CentralizedServiceWorker<I, V, E> serviceWorker) { + this.oocEngine = oocEngine; + this.serviceWorker = serviceWorker; + this.partitionStore = + (DiskBackedPartitionStore<I, V, E>) serviceWorker.getPartitionStore(); + + this.oocPartitionCounts = new Stack<>(); + + this.lowFreeMemoryFraction = LOW_FREE_MEMORY_FRACTION.get(conf); + this.midFreeMemoryFraction = MID_FREE_MEMORY_FRACTION.get(conf); + this.fairFreeMemoryFraction = FAIR_FREE_MEMORY_FRACTION.get(conf); + this.highFreeMemoryFraction = HIGH_FREE_MEMORY_FRACTION.get(conf); + this.checkInterval = CHECK_MEMORY_INTERVAL.get(conf); + this.modificationCoefficient = OOC_GRAPH_MODIFICATION_COEFFICIENT.get(conf); + + memoryEstimator = ReflectionUtils + .newInstance(GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR.get(conf)); + } + + /** + * Checks whether the available free memory is enough for an efficient + * execution. If memory is limited, offload partitions to disk. + * Also, if available memory is more than a threshold, loads partitions from + * disk (if there is any) to memory. + */ + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings("DM_GC") + public Void call() { + if (LOG.isInfoEnabled()) { + LOG.info("call: check-memory thread started."); + } + memoryEstimator.initialize(serviceWorker); + CountDownLatch doneCompute = oocEngine.getDoneCompute(); + while (doneCompute.getCount() != 0) { + double maxMemory = memoryEstimator.maxMemoryMB(); + double freeMemory = memoryEstimator.freeMemoryMB(); + boolean gcDone = false; + if (freeMemory < lowFreeMemoryFraction * maxMemory) { + // This is typically a bad scenario where previous GCs were not + // successful to free up enough memory. If we keep staying in this + // situation, usually, either the computation slows down dramatically, + // or the computation throws OOM error. So, we do GC manually, and + // make sure that out-of-core is the solution to get out of this + // situation. + if (LOG.isInfoEnabled()) { + LOG.info("call: Memory is very limited now. Calling GC manually. " + + String.format("freeMemory = %.2fMB", freeMemory)); + } + long gcStartTime = System.currentTimeMillis(); + System.gc(); + gcDone = true; + freeMemory = memoryEstimator.freeMemoryMB(); + if (LOG.isInfoEnabled()) { + LOG.info("call: GC is done. " + String + .format("GC time = %.2f sec, and freeMemory = %.2fMB", + (System.currentTimeMillis() - gcStartTime) / 1000.0, + freeMemory)); + } + } + + // If we have enough memory, we roll back the latest shrink in number of + // partition slots. + // If we do not have enough memory, but we are not in a bad scenario + // either, we gradually increase the number of partition slots in memory. + // If we are low in free memory, we first push unnecessary data to disk + // and then push some partitions to disk if necessary. + int numInMemory = partitionStore.getNumPartitionInMemory(); + int maxInMemory = partitionStore.getNumPartitionSlots(); + int numInTotal = partitionStore.getNumPartitions(); + if (freeMemory > highFreeMemoryFraction * maxMemory) { + if (numInMemory >= maxInMemory && !oocPartitionCounts.isEmpty()) { + partitionStore.increasePartitionSlots(oocPartitionCounts.pop()); + } + } else if (freeMemory > fairFreeMemoryFraction * maxMemory) { + // Only gradually increase the number of partition slots if all slots + // are already used, and we have things out-of-core + if (!oocPartitionCounts.isEmpty() || maxInMemory < numInTotal) { + if (numInMemory >= maxInMemory) { + partitionStore.increasePartitionSlots(1); + if (!oocPartitionCounts.isEmpty()) { + int num = oocPartitionCounts.pop(); + if (num > 1) { + oocPartitionCounts.push(num - 1); + } + } + } + } + } else if (gcDone && freeMemory < midFreeMemoryFraction * maxMemory) { + BlockingQueue<Integer> partitionsWithInputVertices = + oocEngine.getPartitionsWithInputVertices(); + BlockingQueue<Integer> partitionsWithInputEdges = + oocEngine.getPartitionsWithInputEdges(); + AtomicInteger numPartitionsToSpill = + oocEngine.getNumPartitionsToSpill(); + + while (freeMemory < midFreeMemoryFraction * maxMemory) { + // Offload input vertex buffer of OOC partitions if we are in + // INPUT_SUPERSTEP + if (serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) { + // List of pairs (partitionId, approximate memory footprint of + // vertex buffers of that partition). + PairList<Integer, Integer> pairs = + partitionStore.getOocPartitionIdsWithPendingInputVertices(); + freeMemory -= createCommand(pairs, partitionsWithInputVertices); + } + + // Offload edge store of OOC partitions if we are in INPUT_SUPERSTEP + if (freeMemory < midFreeMemoryFraction * maxMemory && + serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) { + PairList<Integer, Integer> pairs = + partitionStore.getOocPartitionIdsWithPendingInputEdges(); + freeMemory -= createCommand(pairs, partitionsWithInputEdges); + } + + // Offload partitions if we are still low in free memory + if (freeMemory < midFreeMemoryFraction * maxMemory) { + numPartitionsToSpill + .set(getNextOocPartitionCount(freeMemory, maxMemory)); + } + + if (!partitionsWithInputVertices.isEmpty() || + !partitionsWithInputEdges.isEmpty() || + numPartitionsToSpill.get() != 0) { + if (LOG.isInfoEnabled()) { + LOG.info("call: signal out-of-core processor threads to start " + + "offloading. These threads will spill vertex buffer of " + + partitionsWithInputVertices.size() + " partitions, edge " + + "buffers of " + partitionsWithInputEdges.size() + + " partitions, and " + numPartitionsToSpill.get() + " whole " + + "partition"); + } + // Opening the gate for OOC processing threads to start spilling + // data on disk + try { + oocEngine.waitOnGate(); + } catch (InterruptedException e) { + throw new IllegalStateException("call: Caught " + + "InterruptedException while opening the gate for OOC " + + "processing threads"); + } catch (BrokenBarrierException e) { + throw new IllegalStateException("call: Caught " + + "BrokenBarrierException while opening the gate for OOC " + + "processing threads"); + } + oocEngine.resetGate(); + + if (LOG.isInfoEnabled()) { + LOG.info("call: waiting on OOC processors to finish offloading " + + "data to disk"); + } + // Wait until all OOC processing threads are done swapping data to + // disk + try { + oocEngine.waitOnOocSignal(); + } catch (InterruptedException e) { + throw new IllegalStateException("call: Caught " + + "InterruptedException. Looks like memory check thread is " + + "interrupted while waiting on OOC processing threads."); + } catch (BrokenBarrierException e) { + throw new IllegalStateException("call: Caught " + + "BrokenBarrierException. Looks like some OOC processing " + + "threads broke while writing data on disk."); + } + oocEngine.resetOocSignal(); + } + + gcDone = false; + long gcStartTime = 0; + if (freeMemory < midFreeMemoryFraction * maxMemory) { + // Calling GC manually to actually free up the memory for data that + // is offloaded to disk + if (LOG.isInfoEnabled()) { + LOG.info("call: calling GC manually to free up space for " + + "recently offloaded data."); + } + gcStartTime = System.currentTimeMillis(); + System.gc(); + gcDone = true; + } + freeMemory = memoryEstimator.freeMemoryMB(); + if (LOG.isInfoEnabled()) { + LOG.info("call: " + + (gcDone ? + ("GC is done. " + String.format("GC time = %.2f sec.", + (System.currentTimeMillis() - gcStartTime) / 1000.0)) : + "") + + "Finished offloading data to disk. " + + String.format("freeMemory = %.2fMB", freeMemory)); + } + } + } + + // Either wait for the computation to be done, or the time interval passes + try { + doneCompute.await(checkInterval, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException("call: Caught InterruptedException " + + "while waiting for computation to be done and/or " + checkInterval + + "milliseconds passes."); + } + } + + // Setting 'done' before the gate here and checking 'done' in OOC processing + // threads after the gate, guarantees that OOC processing threads see the + // new value of done and terminate gracefully. + oocEngine.setDone(); + try { + oocEngine.waitOnGate(); + } catch (InterruptedException e) { + throw new IllegalStateException("call: Caught InterruptedException " + + "while waiting for the last time on gate in the current superstep"); + } catch (BrokenBarrierException e) { + throw new IllegalStateException("call: Caught BrokenBarrierException " + + "while waiting for the last time on gate in the current superstep"); + } + return null; + } + + /** + * Returns the number of partitions that should go out-of-core at this point. + * + * @return number of partitions that should go out-of-core + * @param freeMemory amount of free memory (in MB) + * @param maxMemory amount of max memory (in MB) + */ + private int getNextOocPartitionCount(double freeMemory, double maxMemory) { + int numSlots = partitionStore.getNumPartitionSlots(); + if (numSlots == Integer.MAX_VALUE) { + numSlots = partitionStore.getNumPartitions(); + partitionStore.setNumPartitionSlots(numSlots); + } + + double freeFraction = freeMemory / maxMemory; + double multiplier = Math.min( + // User-specified favorable size to spill to disk + modificationCoefficient, + // Approximate fraction of current data to spill in order to reach the + // fair fraction of free memory + (fairFreeMemoryFraction - freeFraction) / (1 - freeFraction)); + int count = Math.max((int) (numSlots * multiplier), 1); + if (count >= numSlots) { + LOG.warn("getNextOocPartitionCount: Memory capacity is " + + numSlots + " partitions, and OOC mechanism is " + + "trying to put " + count + " partitions to disk. This is not " + + "possible"); + // We should have at least one partition in memory + count = numSlots - 1; + if (count == 0) { + LOG.warn("It seems that size of one partition is too large for the " + + "available memory. Try to run the job with more partitions!"); + } + } + if (count != 0) { + oocPartitionCounts.push(count); + } + return count; + } + + /** + * Generates the command for a particular type of data we want to offload to + * disk. + * + * @param pairs list of pair(partitionId, approximate foot-print that is going + * of be reduced by offloading the particular data of a + * partition) + * @param commands list of partitionIds for which we want to execute the + * command + * @return approximate amount of memory (in MB) that is going to be freed up + * after executing the generated commands + */ + private double createCommand(PairList<Integer, Integer> pairs, + BlockingQueue<Integer> commands) { + double usedMemory = 0; + if (pairs.getSize() != 0) { + PairList<Integer, Integer>.Iterator iterator = pairs.getIterator(); + // Generating commands for out-of-core processor threads to + // offload data as long as command queue has space. + while (iterator.hasNext() && + commands.remainingCapacity() > 0) { + iterator.next(); + commands.add(iterator.getCurrentFirst()); + // Having an approximation on the memory foot-print of data to offload + // helps us to know how much memory is going to become available by + // offloading the data without using internal functions to estimate + // free memory again. + usedMemory += iterator.getCurrentSecond() / 1024.0 / 1024.0; + } + } + return usedMemory; + } +}
