Decouple out-of-core persistence infrastructure from out-of-core computation
Summary: This diff proposes the following: - The persistence layer is decoupled from out-of-core infrastructure. This way one can simply implement different data accessors for various persistence resources. The persistence layer for reading/writing from/to local file system is implemented in this diff. - Previously, out-of-core data were indexed by string literals. This has changed for more flexibility. Now, data are accessible by a more flexible data indexing mechanism, in which a chain of indices are used to address a particular data. - With different implementations of data accessor, now there may be more emphasis on having more IO threads. It is important that these IO threads are load-balanced. In this diff, the mechanism to assign partitions to IO threads has changed. - All the coolness of Kryo's (de)serialization and RandomAccessFile (in D59277) is included in this diff, all at one place. Test Plan: mvn clean verify out-of-core snapshot test passes Reviewers: dionysis.logothetis, maja.kabiljo, sergey.edunov Differential Revision: https://reviews.facebook.net/D59691 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/3793c9ef Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/3793c9ef Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/3793c9ef Branch: refs/heads/trunk Commit: 3793c9ef69993bf4b180b6f6b15bb2a5edde5530 Parents: 8eb1f76 Author: Hassan Eslami <[email protected]> Authored: Mon Jun 27 14:13:29 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Jun 27 14:13:34 2016 -0700 ---------------------------------------------------------------------- .gitignore | 2 +- .../java/org/apache/giraph/bsp/BspService.java | 2 +- .../java/org/apache/giraph/comm/ServerData.java | 6 +- .../apache/giraph/conf/GiraphConfiguration.java | 9 - .../org/apache/giraph/conf/GiraphConstants.java | 27 +- .../ImmutableClassesGiraphConfiguration.java | 19 + .../apache/giraph/graph/GraphTaskManager.java | 2 +- .../giraph/ooc/FixedPartitionsOracle.java | 139 ------ .../org/apache/giraph/ooc/OutOfCoreEngine.java | 32 +- .../apache/giraph/ooc/OutOfCoreIOCallable.java | 25 +- .../giraph/ooc/OutOfCoreIOCallableFactory.java | 64 +-- .../apache/giraph/ooc/OutOfCoreIOScheduler.java | 31 +- .../giraph/ooc/OutOfCoreIOStatistics.java | 2 +- .../org/apache/giraph/ooc/OutOfCoreOracle.java | 131 ------ .../giraph/ooc/SimpleGCMonitoringOracle.java | 355 --------------- .../apache/giraph/ooc/ThresholdBasedOracle.java | 364 ---------------- .../apache/giraph/ooc/command/IOCommand.java | 104 +++++ .../ooc/command/LoadPartitionIOCommand.java | 102 +++++ .../ooc/command/StoreDataBufferIOCommand.java | 99 +++++ .../command/StoreIncomingMessageIOCommand.java | 69 +++ .../ooc/command/StorePartitionIOCommand.java | 85 ++++ .../giraph/ooc/command/WaitIOCommand.java | 64 +++ .../apache/giraph/ooc/command/package-info.java | 21 + .../giraph/ooc/data/DiskBackedDataStore.java | 432 +++++++++++++++++++ .../giraph/ooc/data/DiskBackedEdgeStore.java | 92 ++-- .../giraph/ooc/data/DiskBackedMessageStore.java | 96 ++--- .../ooc/data/DiskBackedPartitionStore.java | 181 +++----- .../giraph/ooc/data/MetaPartitionManager.java | 59 ++- .../giraph/ooc/data/OutOfCoreDataManager.java | 401 ----------------- .../org/apache/giraph/ooc/io/IOCommand.java | 106 ----- .../giraph/ooc/io/LoadPartitionIOCommand.java | 102 ----- .../giraph/ooc/io/StoreDataBufferIOCommand.java | 99 ----- .../ooc/io/StoreIncomingMessageIOCommand.java | 69 --- .../giraph/ooc/io/StorePartitionIOCommand.java | 85 ---- .../org/apache/giraph/ooc/io/WaitIOCommand.java | 64 --- .../org/apache/giraph/ooc/io/package-info.java | 21 - .../giraph/ooc/persistence/DataIndex.java | 198 +++++++++ .../ooc/persistence/LocalDiskDataAccessor.java | 252 +++++++++++ .../ooc/persistence/OutOfCoreDataAccessor.java | 115 +++++ .../giraph/ooc/persistence/package-info.java | 22 + .../ooc/policy/FixedPartitionsOracle.java | 140 ++++++ .../giraph/ooc/policy/OutOfCoreOracle.java | 135 ++++++ .../ooc/policy/SimpleGCMonitoringOracle.java | 357 +++++++++++++++ .../giraph/ooc/policy/ThresholdBasedOracle.java | 365 ++++++++++++++++ .../apache/giraph/ooc/policy/package-info.java | 21 + .../org/apache/giraph/zk/ZooKeeperManager.java | 2 +- .../giraph/partition/TestPartitionStores.java | 10 + .../rexster/io/RexsterVertexOutputFormat.java | 2 +- 48 files changed, 2871 insertions(+), 2309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 3ae52e2..03acf8e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ # Build files: *.class target -Unknown Job* +UnknownJob* failed-profile.txt # IntelliJ IDEA files: http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index fc0fa95..9545a25 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -241,7 +241,7 @@ public abstract class BspService<I extends WritableComparable, this.context = context; this.graphTaskManager = graphTaskManager; this.conf = graphTaskManager.getConf(); - this.jobId = conf.get("mapred.job.id", "Unknown Job"); + this.jobId = conf.getJobId(); this.taskPartition = conf.getTaskPartition(); this.restartedSuperstep = conf.getLong( GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP); http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index 4156d8c..e926b6c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -150,7 +150,7 @@ public class ServerData<I extends WritableComparable, oocEngine = new OutOfCoreEngine(conf, service); partitionStore = new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore, - conf, context, service, oocEngine); + conf, context, oocEngine); edgeStore = new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine); } else { @@ -268,7 +268,7 @@ public class ServerData<I extends WritableComparable, nextCurrentMessageStore = messageStore; } else { nextCurrentMessageStore = new DiskBackedMessageStore<>( - conf, messageStore, + conf, oocEngine, messageStore, conf.getIncomingMessageClasses().useMessageCombiner(), serviceWorker.getSuperstep()); } @@ -280,7 +280,7 @@ public class ServerData<I extends WritableComparable, nextIncomingMessageStore = messageStore; } else { nextIncomingMessageStore = new DiskBackedMessageStore<>( - conf, messageStore, + conf, oocEngine, messageStore, conf.getOutgoingMessageClasses().useMessageCombiner(), serviceWorker.getSuperstep() + 1); } http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 7f1cb2b..4164c3a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -1146,15 +1146,6 @@ public class GiraphConfiguration extends Configuration } /** - * Whether the application with change or not the graph topology. - * - * @return true if the graph is static, false otherwise. - */ - public boolean isStaticGraph() { - return STATIC_GRAPH.isTrue(this); - } - - /** * Get the output directory to write YourKit snapshots to * * @param context Map context http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index c592a12..ee67bed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -71,8 +71,10 @@ import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; -import org.apache.giraph.ooc.OutOfCoreOracle; -import org.apache.giraph.ooc.ThresholdBasedOracle; +import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; +import org.apache.giraph.ooc.persistence.LocalDiskDataAccessor; +import org.apache.giraph.ooc.policy.OutOfCoreOracle; +import org.apache.giraph.ooc.policy.ThresholdBasedOracle; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.HashPartitionerFactory; import org.apache.giraph.partition.Partition; @@ -949,11 +951,32 @@ public interface GiraphConstants { "Comma-separated list of directories in the local filesystem for " + "out-of-core partitions."); + /** + * Number of IO threads used in out-of-core mechanism. If local disk is used + * for spilling data to and reading data from, this number should be equal to + * the number of available disks on each machine. In such case, one should + * use giraph.partitionsDirectory to specify directories mounted on different + * disks. + */ + IntConfOption NUM_OUT_OF_CORE_THREADS = + new IntConfOption("giraph.numOutOfCoreThreads", 1, "Number of IO " + + "threads used in out-of-core mechanism. If using local disk to " + + "spill data, this should be equal to the number of available " + + "disks. In such case, use giraph.partitionsDirectory to specify " + + "mount points on different disks."); + /** Enable out-of-core graph. */ BooleanConfOption USE_OUT_OF_CORE_GRAPH = new BooleanConfOption("giraph.useOutOfCoreGraph", false, "Enable out-of-core graph."); + /** Data accessor resource/object */ + ClassConfOption<OutOfCoreDataAccessor> OUT_OF_CORE_DATA_ACCESSOR = + ClassConfOption.create("giraph.outOfCoreDataAccessor", + LocalDiskDataAccessor.class, OutOfCoreDataAccessor.class, + "Data accessor used in out-of-core computation (local-disk, " + + "in-memory, HDFS, etc.)"); + /** * Out-of-core oracle that is to be used for adaptive out-of-core engine. If * the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index b9ecf2d..1b79cba 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -128,6 +128,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, * extended data input/output classes for messages */ private final boolean useBigDataIOForMessages; + /** Is the graph static (meaning there is no mutation)? */ + private final boolean isStaticGraph; /** * Constructor. Takes the configuration and then gets the classes out of @@ -144,6 +146,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, GiraphConstants.GRAPH_TYPE_LANGUAGES, conf); valueNeedsWrappers = PerGraphTypeBoolean.readFromConf( GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf); + isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this); valueFactories = new ValueFactories<I, V, E>(this); } @@ -1326,4 +1329,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, return null; } } + + /** + * Whether the application with change or not the graph topology. + * + * @return true if the graph is static, false otherwise. + */ + public boolean isStaticGraph() { + return isStaticGraph; + } + + /** + * @return job id + */ + public String getJobId() { + return get("mapred.job.id", "UnknownJob"); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 725d327..a1d8522 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -1054,7 +1054,7 @@ end[PURE_YARN]*/ * @return Time spent in GC recorder by the GC listener */ public long getSuperstepGCTime() { - return gcTimeMetric.count(); + return (gcTimeMetric == null) ? 0 : gcTimeMetric.count(); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java deleted file mode 100644 index f7badcb..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import com.sun.management.GarbageCollectionNotificationInfo; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.ooc.io.LoadPartitionIOCommand; -import org.apache.giraph.ooc.io.StorePartitionIOCommand; -import org.apache.log4j.Logger; - -import java.util.concurrent.atomic.AtomicInteger; - -/** Oracle for fixed out-of-core mechanism */ -public class FixedPartitionsOracle implements OutOfCoreOracle { - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(FixedPartitionsOracle.class); - /** Maximum number of partitions to be kept in memory */ - private final int maxPartitionsInMemory; - /** - * Number of partitions to be added (loaded) or removed (stored) to/from - * memory. Each outstanding load partition counts +1 and each outstanding - * store partition counts -1 toward this counter. - */ - private final AtomicInteger deltaNumPartitionsInMemory = - new AtomicInteger(0); - /** Out-of-core engine */ - private final OutOfCoreEngine oocEngine; - - /** - * Constructor - * - * @param conf configuration - * @param oocEngine out-of-core engine - */ - public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf, - OutOfCoreEngine oocEngine) { - this.maxPartitionsInMemory = - GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf); - this.oocEngine = oocEngine; - } - - @Override - public IOAction[] getNextIOActions() { - int numPartitionsInMemory = - oocEngine.getMetaPartitionManager().getNumInMemoryPartitions(); - if (LOG.isInfoEnabled()) { - LOG.info("getNextIOActions: calling with " + numPartitionsInMemory + - " partitions in memory, " + deltaNumPartitionsInMemory.get() + - " to be loaded"); - } - int numPartitions = - numPartitionsInMemory + deltaNumPartitionsInMemory.get(); - // Fixed out-of-core policy: - // - if the number of partitions in memory is less than the max number of - // partitions in memory, we should load a partition to memory. This - // basically means we are prefetching partition's data either for the - // current superstep, or for the next superstep. - // - if the number of partitions in memory is equal to the the max number - // of partitions in memory, we do a 'soft store', meaning, we store - // processed partition to disk only if there is an unprocessed partition - // on disk. This basically makes room for unprocessed partitions on disk - // to be prefetched. - // - if the number of partitions in memory is more than the max number of - // partitions in memory, we do a 'hard store', meaning we store a - // partition to disk, regardless of its processing state. - if (numPartitions < maxPartitionsInMemory) { - return new IOAction[]{ - IOAction.LOAD_PARTITION, - IOAction.STORE_MESSAGES_AND_BUFFERS}; - } else if (numPartitions > maxPartitionsInMemory) { - LOG.warn("getNextIOActions: number of partitions in memory passed the " + - "specified threshold!"); - return new IOAction[]{ - IOAction.STORE_PARTITION, - IOAction.STORE_MESSAGES_AND_BUFFERS}; - } else { - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.LOAD_TO_SWAP_PARTITION}; - } - } - - @Override - public boolean approve(IOCommand command) { - int numPartitionsInMemory = oocEngine.getMetaPartitionManager() - .getNumInMemoryPartitions(); - // If loading a partition result in having more partition in memory, the - // command should be denied. Also, if number of partitions in memory is - // already less than the max number of partitions, any command for storing - // a partition should be denied. - if (command instanceof LoadPartitionIOCommand && - numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() > - maxPartitionsInMemory) { - deltaNumPartitionsInMemory.getAndDecrement(); - return false; - - } else if (command instanceof StorePartitionIOCommand && - numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() < - maxPartitionsInMemory) { - deltaNumPartitionsInMemory.getAndIncrement(); - return false; - } - return true; - } - - @Override - public void commandCompleted(IOCommand command) { - if (command instanceof LoadPartitionIOCommand) { - deltaNumPartitionsInMemory.getAndDecrement(); - } else if (command instanceof StorePartitionIOCommand) { - deltaNumPartitionsInMemory.getAndIncrement(); - } - } - - @Override - public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { } - - @Override - public void shutdown() { } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index 2037abe..3187468 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -30,8 +30,11 @@ import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.ooc.data.MetaPartitionManager; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.ooc.io.LoadPartitionIOCommand; +import org.apache.giraph.ooc.command.IOCommand; +import org.apache.giraph.ooc.command.LoadPartitionIOCommand; +import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; +import org.apache.giraph.ooc.policy.FixedPartitionsOracle; +import org.apache.giraph.ooc.policy.OutOfCoreOracle; import org.apache.giraph.utils.AdjustableSemaphore; import org.apache.giraph.worker.BspServiceWorker; import org.apache.log4j.Logger; @@ -87,6 +90,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { * with out-of-core operations (actual IO operations). */ private final ReadWriteLock superstepLock = new ReentrantReadWriteLock(); + /** Data accessor object (DAO) used as persistence layer in out-of-core */ + private final OutOfCoreDataAccessor dataAccessor; /** Callable factory for IO threads */ private final OutOfCoreIOCallableFactory oocIOCallableFactory; /** @@ -149,9 +154,20 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf, CentralizedServiceWorker<?, ?, ?> service) { this.service = service; - this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(conf, this); - /* How many disk (i.e. IO threads) do we have? */ - int numIOThreads = oocIOCallableFactory.getNumDisks(); + Class<? extends OutOfCoreDataAccessor> accessorClass = + GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf); + try { + Constructor<?> constructor = accessorClass.getConstructor( + ImmutableClassesGiraphConfiguration.class); + this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf); + } catch (NoSuchMethodException | InstantiationException | + InvocationTargetException | IllegalAccessException e) { + throw new IllegalStateException("OutOfCoreEngine: caught exception " + + "while creating the data accessor instance!", e); + } + int numIOThreads = dataAccessor.getNumAccessorThreads(); + this.oocIOCallableFactory = + new OutOfCoreIOCallableFactory(this, numIOThreads); this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads); this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this); this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads); @@ -188,6 +204,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { * Initialize/Start the out-of-core engine. */ public void initialize() { + dataAccessor.initialize(); oocIOCallableFactory.createCallable(); } @@ -201,6 +218,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { } ioScheduler.shutdown(); oocIOCallableFactory.shutdown(); + dataAccessor.shutdown(); } /** @@ -500,4 +518,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { public void setFlowControl(FlowControl flowControl) { this.flowControl = flowControl; } + + public OutOfCoreDataAccessor getDataAccessor() { + return dataAccessor; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java index 962bd6a..bea3994 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java @@ -23,9 +23,9 @@ import com.yammer.metrics.core.Histogram; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.ooc.io.LoadPartitionIOCommand; -import org.apache.giraph.ooc.io.WaitIOCommand; +import org.apache.giraph.ooc.command.IOCommand; +import org.apache.giraph.ooc.command.LoadPartitionIOCommand; +import org.apache.giraph.ooc.command.WaitIOCommand; import org.apache.log4j.Logger; import java.util.concurrent.Callable; @@ -47,8 +47,6 @@ public class OutOfCoreIOCallable implements Callable<Void>, private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class); /** Out-of-core engine */ private final OutOfCoreEngine oocEngine; - /** Base path that this thread will write to/read from */ - private final String basePath; /** Thread id/Disk id */ private final int diskId; /** How many bytes of data is read from disk */ @@ -64,13 +62,10 @@ public class OutOfCoreIOCallable implements Callable<Void>, * Constructor * * @param oocEngine out-of-core engine - * @param basePath base path this thread will be using * @param diskId thread id/disk id */ - public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, String basePath, - int diskId) { + public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, int diskId) { this.oocEngine = oocEngine; - this.basePath = basePath; this.diskId = diskId; newSuperstep(GiraphMetrics.get().perSuperstep()); GiraphMetrics.get().addSuperstepResetObserver(this); @@ -98,15 +93,23 @@ public class OutOfCoreIOCallable implements Callable<Void>, long bytes; // CHECKSTYLE: stop IllegalCatch try { + long timeInGC = oocEngine.getServiceWorker().getGraphTaskManager() + .getSuperstepGCTime(); long startTime = System.currentTimeMillis(); - commandExecuted = command.execute(basePath); + commandExecuted = command.execute(); duration = System.currentTimeMillis() - startTime; + timeInGC = oocEngine.getServiceWorker().getGraphTaskManager() + .getSuperstepGCTime() - timeInGC; bytes = command.bytesTransferred(); if (LOG.isInfoEnabled()) { LOG.info("call: thread " + diskId + "'s command " + command + " completed: bytes= " + bytes + ", duration=" + duration + ", " + "bandwidth=" + String.format("%.2f", (double) bytes / duration * - 1000 / 1024 / 1024)); + 1000 / 1024 / 1024) + + ((command instanceof WaitIOCommand) ? "" : + (", bandwidth (excluding GC time)=" + String.format("%.2f", + (double) bytes / (duration - timeInGC) * + 1000 / 1024 / 1024)))); } } catch (Exception e) { oocEngine.failTheJob(); http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java index d4fea22..6aeb196 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java @@ -18,13 +18,11 @@ package org.apache.giraph.ooc; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.LogStacktraceCallable; import org.apache.giraph.utils.ThreadUtils; import org.apache.log4j.Logger; -import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -35,9 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static com.google.common.base.Preconditions.checkState; -import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY; - /** * Factory class to create IO threads for out-of-core engine. */ @@ -49,37 +44,22 @@ public class OutOfCoreIOCallableFactory { private final OutOfCoreEngine oocEngine; /** Result of IO threads at the end of the computation */ private final List<Future> results; - /** How many disks (i.e. IO threads) do we have? */ - private int numDisks; - /** Path prefix for different disks */ - private final String[] basePaths; + /** Number of threads used for IO operations */ + private final int numIOThreads; /** Executor service for IO threads */ private ExecutorService outOfCoreIOExecutor; + /** * Constructor * - * @param conf Configuration * @param oocEngine Out-of-core engine + * @param numIOThreads Number of IO threads used */ - public OutOfCoreIOCallableFactory( - ImmutableClassesGiraphConfiguration<?, ?, ?> conf, - OutOfCoreEngine oocEngine) { + public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine, + int numIOThreads) { this.oocEngine = oocEngine; - this.results = new ArrayList<>(); - // Take advantage of multiple disks - String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf); - this.numDisks = userPaths.length; - this.basePaths = new String[numDisks]; - int ptr = 0; - for (String path : userPaths) { - File file = new File(path); - if (!file.exists()) { - checkState(file.mkdirs(), "OutOfCoreIOCallableFactory: cannot create " + - "directory " + file.getAbsolutePath()); - } - basePaths[ptr] = path + "/" + conf.get("mapred.job.id", "Unknown Job"); - ptr++; - } + this.numIOThreads = numIOThreads; + this.results = new ArrayList<>(numIOThreads); } /** @@ -90,11 +70,10 @@ public class OutOfCoreIOCallableFactory { new CallableFactory<Void>() { @Override public Callable<Void> newCallable(int callableId) { - return new OutOfCoreIOCallable(oocEngine, basePaths[callableId], - callableId); + return new OutOfCoreIOCallable(oocEngine, callableId); } }; - outOfCoreIOExecutor = new ThreadPoolExecutor(numDisks, numDisks, 0L, + outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), ThreadUtils.createThreadFactory("ooc-io-%d")) { @Override @@ -120,7 +99,7 @@ public class OutOfCoreIOCallableFactory { } }; - for (int i = 0; i < numDisks; ++i) { + for (int i = 0; i < numIOThreads; ++i) { Future<Void> future = outOfCoreIOExecutor.submit( new LogStacktraceCallable<>( outOfCoreIOCallableFactory.newCallable(i))); @@ -131,15 +110,6 @@ public class OutOfCoreIOCallableFactory { } /** - * How many disks do we have? - * - * @return number of disks (IO threads) - */ - public int getNumDisks() { - return numDisks; - } - - /** * Check whether all IO threads terminated gracefully. */ public void shutdown() { @@ -156,7 +126,7 @@ public class OutOfCoreIOCallableFactory { "InterruptedException while waiting for IO threads to finish"); } } - for (int i = 0; i < numDisks; ++i) { + for (int i = 0; i < numIOThreads; ++i) { try { // Check whether the tread terminated gracefully results.get(i).get(); @@ -170,15 +140,5 @@ public class OutOfCoreIOCallableFactory { throw new IllegalStateException(e); } } - for (String path : basePaths) { - File file = new File(path).getParentFile(); - for (String subFileName : file.list()) { - File subFile = new File(file.getPath(), subFileName); - checkState(subFile.delete(), "shutdown: cannot delete file %s", - subFile.getAbsoluteFile()); - } - checkState(file.delete(), "shutdown: cannot delete directory %s", - file.getAbsoluteFile()); - } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java index 6428c30..906607d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java @@ -18,15 +18,15 @@ package org.apache.giraph.ooc; -import com.google.common.hash.Hashing; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.IntConfOption; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.ooc.io.LoadPartitionIOCommand; -import org.apache.giraph.ooc.io.StoreDataBufferIOCommand; -import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand; -import org.apache.giraph.ooc.io.StorePartitionIOCommand; -import org.apache.giraph.ooc.io.WaitIOCommand; +import org.apache.giraph.ooc.command.IOCommand; +import org.apache.giraph.ooc.command.LoadPartitionIOCommand; +import org.apache.giraph.ooc.command.StoreDataBufferIOCommand; +import org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand; +import org.apache.giraph.ooc.command.StorePartitionIOCommand; +import org.apache.giraph.ooc.command.WaitIOCommand; +import org.apache.giraph.ooc.policy.OutOfCoreOracle; import org.apache.log4j.Logger; import java.util.ArrayList; @@ -57,8 +57,6 @@ public class OutOfCoreIOScheduler { private final OutOfCoreEngine oocEngine; /** How much an IO thread should wait if there is no IO command */ private final int waitInterval; - /** How many disks (i.e. IO threads) do we have? */ - private final int numDisks; /** * Queue of IO commands for loading partitions to memory. Load commands are * urgent and should be done once loading data is a viable IO command. @@ -77,7 +75,6 @@ public class OutOfCoreIOScheduler { OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf, OutOfCoreEngine oocEngine, int numDisks) { this.oocEngine = oocEngine; - this.numDisks = numDisks; this.waitInterval = OOC_WAIT_INTERVAL.get(conf); threadLoadCommandQueue = new ArrayList<>(numDisks); for (int i = 0; i < numDisks; ++i) { @@ -88,17 +85,6 @@ public class OutOfCoreIOScheduler { } /** - * Get the thread id that is responsible for a particular partition - * - * @param partitionId id of the given partition - * @return id of the thread responsible for the given partition - */ - public int getOwnerThreadId(int partitionId) { - int result = Hashing.murmur3_32().hashInt(partitionId).asInt() % numDisks; - return (result >= 0) ? result : (result + numDisks); - } - - /** * Generate and return the next appropriate IO command for a given thread * * @param threadId id of the thread ready to execute the next IO command @@ -254,8 +240,9 @@ public class OutOfCoreIOScheduler { * @param ioCommand IO command to add to the scheduler */ public void addIOCommand(IOCommand ioCommand) { - int ownerThread = getOwnerThreadId(ioCommand.getPartitionId()); if (ioCommand instanceof LoadPartitionIOCommand) { + int ownerThread = oocEngine.getMetaPartitionManager() + .getOwnerThreadId(ioCommand.getPartitionId()); threadLoadCommandQueue.get(ownerThread).offer(ioCommand); } else { throw new IllegalStateException("addIOCommand: IO command type is not " + http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java index a225a4c..44a0d2f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java @@ -22,7 +22,7 @@ import com.google.common.collect.Maps; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.IntConfOption; -import org.apache.giraph.ooc.io.IOCommand.IOCommandType; +import org.apache.giraph.ooc.command.IOCommand.IOCommandType; import org.apache.log4j.Logger; import java.util.Map; http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java deleted file mode 100644 index fa8e6bd..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import com.sun.management.GarbageCollectionNotificationInfo; -import org.apache.giraph.ooc.io.IOCommand; - -/** - * Interface for any out-of-core oracle. An out-of-core oracle is the brain of - * the out-of-core mechanism, determining/deciding on out-of-core actions (load - * or store) that should happen. - */ -public interface OutOfCoreOracle { - /** - * Different types of IO actions that can potentially lead to a more desired - * state of computation for out-of-core mechanism. These actions are issued - * based on the status of the memory (memory pressure, rate of data transfer - * to memory, etc.) - */ - enum IOAction { - /** - * Either of: - * - storing incoming messages of any partition currently on disk, or - * - storing incoming messages' raw data buffer of any partition - * currently on disk, or - * - storing partitions' raw data buffer for those partitions that are - * currently on disk. - */ - STORE_MESSAGES_AND_BUFFERS, - /** - * Storing a partition that is *processed* in the current iteration cycle. - * This action is also known as "soft store" - */ - STORE_PROCESSED_PARTITION, - /** - * Storing a partition from memory on disk, prioritizing to *processed* - * partitions on memory. However, if there is no *processed* partition, - * store should happen at any cost, even if an *unprocessed* partition has - * to be stored. This action is also know as "hard store". - */ - STORE_PARTITION, - /** - * Loading an *unprocessed* partition from disk to memory, only if there are - * *processed* partitions in memory. This action basically initiates a swap - * operation. - */ - LOAD_TO_SWAP_PARTITION, - /** - * Loading an *unprocessed* partition from disk to memory. This action is - * also known as "soft load". - */ - LOAD_UNPROCESSED_PARTITION, - /** - * Loading a partition (prioritizing *unprocessed* over *processed*) from - * disk to memory. Loading a *processed* partition to memory is a prefetch - * of that partition to be processed in the next superstep. This action is - * also known as "hard load". - */ - LOAD_PARTITION, - /** - * Loading a partition regardless of the memory situation. An out-of-core - * mechanism may use this action to signal IO threads that it is allowed to - * load a partition that is specifically requested. - */ - URGENT_LOAD_PARTITION - } - - /** - * Get the next set of viable IO actions to help bring memory to a more - * desired state. - * - * @return an array of viable IO actions, sorted from highest priority to - * lowest priority - */ - IOAction[] getNextIOActions(); - - /** - * Whether a command is appropriate to bring the memory to a more desired - * state. A command is not executed unless it is approved by the oracle. This - * method is specially important where there are multiple IO threads - * performing IO operations for the out-of-core mechanism. The approval - * becomes significantly important to prevent all IO threads from performing - * identical command type, if that is a necessity. For instance, execution of - * a particular command type by only one thread may bring the memory to a - * desired state, and the rest of IO threads may perform other types of - * commands. - * - * @param command the IO command that is about to execute - * @return 'true' if the command is approved for execution. 'false' if the - * command should not be executed - */ - boolean approve(IOCommand command); - - /** - * Notification of command completion. Oracle may update its status and commit - * the changes a command may cause. - * - * @param command the IO command that is completed - */ - void commandCompleted(IOCommand command); - - /** - * Notification of GC completion. Oracle may take certain decisions based on - * GC information (such as amount of time it took, memory it reclaimed, etc.) - * - * @param gcInfo GC information - */ - void gcCompleted(GarbageCollectionNotificationInfo gcInfo); - - /** - * Shut down the out-of-core oracle. Necessary specifically for cases where - * out-of-core oracle is using additional monitoring threads. - */ - void shutdown(); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java deleted file mode 100644 index 0dfc9de..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import com.google.common.collect.Maps; -import com.sun.management.GarbageCollectionNotificationInfo; -import org.apache.giraph.conf.FloatConfOption; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.ooc.io.LoadPartitionIOCommand; -import org.apache.giraph.ooc.io.WaitIOCommand; -import org.apache.log4j.Logger; - -import java.lang.management.MemoryUsage; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Out-of-core oracle to adaptively control data kept in memory, with the goal - * of keeping the memory state constantly at a desired state. This oracle - * monitors GC behavior to keep track of memory pressure. - * - * After each GC is done, this oracle retrieve statistics about the memory - * pressure (memory used, max memory, and how far away memory is compared to a - * max optimal pressure). Based on the the past 2 recent memory statistics, - * the oracle predicts the status of the memory, and sets the rate of load/store - * of data from/to disk. If the rate of loading data from disk is 'l', and the - * rate of storing data to disk is 's', the rate of data injection to memory - * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should - * be based on the prediction of memory status. - * - * Assume that based on the previous GC call the memory usage at time t_0 is - * m_0, and based on the most recent GC call the memory usage at time t_1 is - * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0). - * Assume that the ideal memory pressure happens when the memory usage is - * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means - * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date - * injection rate to memory so far was i, the new injection rate should be: - * i_new = i - (alpha - beta) - */ -public class SimpleGCMonitoringOracle implements OutOfCoreOracle { - /** - * The optimal memory pressure at which GC behavior is close to ideal. This - * fraction may be dependant on the GC strategy used for running a job, but - * generally should not be dependent on the graph processing application. - */ - public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE = - new FloatConfOption("giraph.optimalMemoryPressure", 0.8f, - "The memory pressure (fraction of used memory) at which the job " + - "shows the optimal GC behavior. This fraction may be dependent " + - "on the GC strategy used in running the job."); - - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(SimpleGCMonitoringOracle.class); - /** Cached value for OPTIMAL_MEMORY_PRESSURE */ - private final float optimalMemoryPressure; - /** Out-of-core engine */ - private final OutOfCoreEngine oocEngine; - /** Status of memory from the last GC call */ - private GCObservation lastGCObservation; - /** Desired rate of data injection to memory */ - private final AtomicLong desiredDiskToMemoryDataRate = - new AtomicLong(0); - /** Number of on the fly (outstanding) IO commands for each command type */ - private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences = - Maps.newConcurrentMap(); - - /** - * Constructor - * - * @param conf configuration - * @param oocEngine out-of-core engine - */ - public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf, - OutOfCoreEngine oocEngine) { - this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf); - this.oocEngine = oocEngine; - this.lastGCObservation = new GCObservation(-1, 0, 0); - for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) { - commandOccurrences.put(type, new AtomicInteger(0)); - } - } - - @Override - public synchronized void gcCompleted(GarbageCollectionNotificationInfo - gcInfo) { - long time = System.currentTimeMillis(); - Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo() - .getMemoryUsageAfterGc(); - long usedMemory = 0; - long maxMemory = 0; - for (MemoryUsage memDetail : memAfter.values()) { - usedMemory += memDetail.getUsed(); - maxMemory += memDetail.getMax(); - } - GCObservation observation = new GCObservation(time, usedMemory, maxMemory); - if (LOG.isInfoEnabled()) { - LOG.info("gcCompleted: GC completed with: " + observation); - } - // Whether this is not the first GC call in the application - if (lastGCObservation.isValid()) { - long deltaDataRate = - lastGCObservation.getDesiredDeltaDataRate(observation); - long diskBandwidthEstimate = - oocEngine.getIOStatistics().getDiskBandwidth(); - // Update the desired data injection rate to memory. The data injection - // rate cannot be less than -disk_bandwidth (the extreme case happens if - // we only do 'store'), and cannot be more than disk_bandwidth (the - // extreme case happens if we only do 'load'). - long dataInjectionRate = desiredDiskToMemoryDataRate.get(); - desiredDiskToMemoryDataRate.set(Math.max( - Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate, - diskBandwidthEstimate), -diskBandwidthEstimate)); - if (LOG.isInfoEnabled()) { - LOG.info("gcCompleted: changing data injection rate from " + - String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) + - " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() / - 1024.0 / 1024.0)); - } - } - lastGCObservation = observation; - } - - /** - * Get the current data injection rate to memory based on the commands ran - * in the history (retrieved from statistics collector), and outstanding - * commands issued by the IO scheduler. - * - * @return the current data injection rate to memory - */ - private long getCurrentDataInjectionRate() { - long effectiveBytesTransferred = 0; - long effectiveDuration = 0; - for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) { - OutOfCoreIOStatistics.BytesDuration stats = - oocEngine.getIOStatistics().getCommandTypeStats(type); - int occurrence = commandOccurrences.get(type).get(); - long typeBytesTransferred = stats.getBytes(); - long typeDuration = stats.getDuration(); - // If there is an outstanding command, we still do not know how many bytes - // it will transfer, and how long it will take. So, we guesstimate these - // numbers based on other similar commands happened in the history. We - // simply take the average number of bytes transferred for the particular - // command, and we take average duration for the particular command. We - // should multiply these numbers by the number of outstanding commands of - // this particular command type. - if (stats.getOccurrence() != 0) { - typeBytesTransferred += stats.getBytes() / stats.getOccurrence() * - occurrence; - typeDuration += stats.getDuration() / stats.getOccurrence() * - occurrence; - } - if (type == IOCommand.IOCommandType.LOAD_PARTITION) { - effectiveBytesTransferred += typeBytesTransferred; - } else { - // Store (data going out of memory), or wait (no data transferred) - effectiveBytesTransferred -= typeBytesTransferred; - } - effectiveDuration += typeDuration; - } - if (effectiveDuration == 0) { - return 0; - } else { - return effectiveBytesTransferred / effectiveDuration; - } - } - - @Override - public IOAction[] getNextIOActions() { - long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05); - long desiredRate = desiredDiskToMemoryDataRate.get(); - long currentRate = getCurrentDataInjectionRate(); - if (desiredRate > error) { - // 'l-s' is positive, we should do more load than store. - if (currentRate > desiredRate + error) { - // We should decrease 'l-s'. This can be done either by increasing 's' - // or issuing wait command. We prioritize wait over hard store. - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PROCESSED_PARTITION}; - } else if (currentRate < desiredRate - error) { - // We should increase 'l-s'. We can simply load partitions/data. - return new IOAction[]{IOAction.LOAD_PARTITION}; - } else { - // We are in a proper state and we should keep up with the rate. We can - // either soft store data or load data (hard load, since we desired rate - // is positive). - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PROCESSED_PARTITION, - IOAction.LOAD_PARTITION}; - } - } else if (desiredRate < -error) { - // 'l-s' is negative, we should do more store than load. - if (currentRate < desiredRate - error) { - // We should increase 'l-s', but we should be cautious. We only do soft - // load, or wait. - return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION}; - } else if (currentRate > desiredRate + error) { - // We should reduce 'l-s', we do hard store. - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PARTITION}; - } else { - // We should keep up with the rate. We can either soft store data, or - // soft load data. - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PROCESSED_PARTITION, - IOAction.LOAD_UNPROCESSED_PARTITION}; - } - } else { - // 'l-s' is almost zero. If current rate is over the desired rate, we do - // soft store. If the current rate is below the desired rate, we do soft - // load. - if (currentRate > desiredRate + error) { - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PROCESSED_PARTITION}; - } else if (currentRate < desiredRate - error) { - return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION}; - } else { - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PROCESSED_PARTITION, - IOAction.LOAD_UNPROCESSED_PARTITION}; - } - } - } - - @Override - public synchronized boolean approve(IOCommand command) { - long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05); - long desiredRate = desiredDiskToMemoryDataRate.get(); - long currentRate = getCurrentDataInjectionRate(); - // The command is denied iff the current rate is above the desired rate and - // we are doing load (instead of store), or the current rate is below the - // desired rate and we are doing store (instead of loading). - if (currentRate > desiredRate + error && - command instanceof LoadPartitionIOCommand) { - return false; - } - if (currentRate < desiredRate - error && - !(command instanceof LoadPartitionIOCommand) && - !(command instanceof WaitIOCommand)) { - return false; - } - commandOccurrences.get(command.getType()).getAndIncrement(); - return true; - } - - @Override - public void commandCompleted(IOCommand command) { - commandOccurrences.get(command.getType()).getAndDecrement(); - } - - @Override - public void shutdown() { } - - /** Helper class to record memory status after GC calls */ - private class GCObservation { - /** The time at which the GC happened (in milliseconds) */ - private long time; - /** Amount of memory used after the GC call */ - private long usedMemory; - /** Maximum amounts of memory reported by GC listener */ - private long maxMemory; - - /** - * Constructor - * - * @param time time of GC - * @param usedMemory amount of used memory after GC - * @param maxMemory amount of all available memory based on GC observation - */ - public GCObservation(long time, long usedMemory, long maxMemory) { - this.time = time; - this.usedMemory = usedMemory; - this.maxMemory = maxMemory; - } - - /** - * Is this a valid observation? - * - * @return true iff it is a valid observation - */ - public boolean isValid() { - return time > 0; - } - - /** - * Considering a new observation of memory status after the most recent GC, - * what is the desired rate for data injection to memory. - * - * @param newObservation the most recent GC observation - * @return desired rate of data injection to memory - */ - public long getDesiredDeltaDataRate(GCObservation newObservation) { - long newUsedMemory = newObservation.usedMemory; - long newMaxMemory = newObservation.maxMemory; - long lastUsedMemory = usedMemory; - long lastMaxMemory = maxMemory; - // Scale the memory status of two GC observation to be the same - long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory); - newUsedMemory = - (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory); - lastUsedMemory = - (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory); - long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory); - if (LOG.isInfoEnabled()) { - LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " + - "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format( - "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) + - String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 / - 1024.0)); - } - long interval = newObservation.time - time; - if (interval == 0) { - interval = 1; - LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " + - "time!"); - } - long currentDataRate = (long) ((double) (newUsedMemory - - lastUsedMemory) / interval * 1000); - long desiredDataRate = (long) ((double) (desiredUsedMemory - - newUsedMemory) / interval * 1000); - return currentDataRate - desiredDataRate; - } - - @Override - public String toString() { - return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " + - "time: %d ms)", usedMemory / 1024.0 / 1024.0, - maxMemory / 1024.0 / 1024.0, time); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java deleted file mode 100644 index 3e05dce..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import com.sun.management.GarbageCollectionNotificationInfo; -import org.apache.giraph.comm.flow_control.CreditBasedFlowControl; -import org.apache.giraph.comm.flow_control.FlowControl; -import org.apache.giraph.comm.netty.NettyClient; -import org.apache.giraph.conf.FloatConfOption; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.LongConfOption; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.utils.LogStacktraceCallable; -import org.apache.giraph.utils.MemoryUtils; -import org.apache.giraph.utils.ThreadUtils; -import org.apache.log4j.Logger; - -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkState; - -/** - * Out-of-core oracle to adaptively control data kept in memory, with the goal - * of keeping the memory usage at a desired state. Out-of-core policy in this - * oracle is based on several user-defined thresholds. Also, this oracle spawns - * a thread to periodically check the memory usage. This thread would issue - * manual GC calls if JVM fails to call major/full GC for a while and the amount - * of used memory is about to cause high-memory pressure. This oracle, also, - * monitors GC activities. The monitoring mechanism looks for major/full GC - * calls, and updates out-of-core decisions based on the amount of available - * memory after such GCs. There are three out-of-core decisions: - * - Which IO operations should be done (load/offload of partitions and - * messages) - * - What the incoming messages rate should be (updating credits announced by - * this worker in credit-based flow-control mechanism) - * - How many processing threads should remain active (tethering rate of - * data generation) - * - * The following table shows the relationship of these decisions and - * used-defined thresholds. - * -------------------------------------------------------------- - * Memory Pressure | Manual | IO | Credit | Active | - * (memory usage) | GC? | Action | | Threads | - * -------------------------------------------------------------- - * | Yes | hard | 0 | 0 | - * | | store | | | - * failPressure ------------------------------------------------- - * | Yes | hard | 0 | fraction | - * | | store | | | - * emergencyPressure -------------------------------------------- - * | Yes | hard | fraction | max | - * | | store | | | - * highPressure ------------------------------------------------- - * | No | soft | fraction | max | - * | | store | | | - * optimalPressure ---------------------------------------------- - * | No | soft | max | max | - * | | load | | | - * lowPressure -------------------------------------------------- - * | No | hard | max | max | - * | | load | | | - * -------------------------------------------------------------- - * - */ -public class ThresholdBasedOracle implements OutOfCoreOracle { - /** The memory pressure at/above which the job would fail */ - public static final FloatConfOption FAIL_MEMORY_PRESSURE = - new FloatConfOption("giraph.memory.failPressure", 0.975f, - "The memory pressure (fraction of used memory) at/above which the " + - "job would fail."); - /** - * The memory pressure at which the job is cloe to fail, even though we were - * using maximal disk bandwidth and minimal network rate. We should reduce - * job processing rate. - */ - public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE = - new FloatConfOption("giraph.memory.emergencyPressure", 0.925f, - "The memory pressure (fraction of used memory) at which the job " + - "is close to fail, hence we should reduce its processing rate " + - "as much as possible."); - /** The memory pressure at which the job is suffering from GC overhead. */ - public static final FloatConfOption HIGH_MEMORY_PRESSURE = - new FloatConfOption("giraph.memory.highPressure", 0.875f, - "The memory pressure (fraction of used memory) at which the job " + - "is suffering from GC overhead."); - /** - * The memory pressure at which we expect GC to perform optimally for a - * memory intensive job. - */ - public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE = - new FloatConfOption("giraph.memory.optimalPressure", 0.8f, - "The memory pressure (fraction of used memory) at which a " + - "memory-intensive job shows the optimal GC behavior."); - /** - * The memory pressure at/below which the job can use more memory without - * suffering from GC overhead. - */ - public static final FloatConfOption LOW_MEMORY_PRESSURE = - new FloatConfOption("giraph.memory.lowPressure", 0.7f, - "The memory pressure (fraction of used memory) at/below which the " + - "job can use more memory without suffering the performance."); - /** The interval at which memory observer thread wakes up. */ - public static final LongConfOption CHECK_MEMORY_INTERVAL = - new LongConfOption("giraph.checkMemoryInterval", 2500, - "The interval/period where memory observer thread wakes up and " + - "monitors memory footprint (in milliseconds)"); - /** - * Memory observer thread would manually call GC if major/full GC has not - * been called for a while. The period where we expect GC to be happened in - * past is specified in this parameter - */ - public static final LongConfOption LAST_GC_CALL_INTERVAL = - new LongConfOption("giraph.lastGcCallInterval", 10 * 1000, - "How long after last major/full GC should we call manual GC?"); - - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(ThresholdBasedOracle.class); - /** Cached value for FAIL_MEMORY_PRESSURE */ - private final float failMemoryPressure; - /** Cached value for EMERGENCY_MEMORY_PRESSURE */ - private final float emergencyMemoryPressure; - /** Cached value for HIGH_MEMORY_PRESSURE */ - private final float highMemoryPressure; - /** Cached value for OPTIMAL_MEMORY_PRESSURE */ - private final float optimalMemoryPressure; - /** Cached value for LOW_MEMORY_PRESSURE */ - private final float lowMemoryPressure; - /** Cached value for CHECK_MEMORY_INTERVAL */ - private final long checkMemoryInterval; - /** Cached value for LAST_GC_CALL_INTERVAL */ - private final long lastGCCallInterval; - /** - * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max - * credit used for credit-based flow-control mechanism) - */ - private final short maxRequestsCredit; - /** - * Whether the job is shutting down. Used for terminating the memory - * observer thread. - */ - private final CountDownLatch shouldTerminate; - /** Result of memory observer thread */ - private final Future<Void> checkMemoryThreadResult; - /** Out-of-core engine */ - private final OutOfCoreEngine oocEngine; - /** Last time a major/full GC has been called (in milliseconds) */ - private volatile long lastMajorGCTime; - /** Last time a non major/full GC has been called (in milliseconds) */ - private volatile long lastMinorGCTime; - - /** - * Constructor - * - * @param conf configuration - * @param oocEngine out-of-core engine - */ - public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf, - OutOfCoreEngine oocEngine) { - this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf); - this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf); - this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf); - this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf); - this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf); - this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf); - this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf); - this.maxRequestsCredit = (short) - CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); - NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true); - boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf); - checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " + - "must be enabled. Use giraph.waitForPerWorkerRequests=true"); - this.shouldTerminate = new CountDownLatch(1); - this.oocEngine = oocEngine; - this.lastMajorGCTime = 0; - - CallableFactory<Void> callableFactory = new CallableFactory<Void>() { - @Override - public Callable<Void> newCallable(int callableId) { - return new Callable<Void>() { - @Override - public Void call() throws Exception { - while (true) { - boolean done = shouldTerminate.await(checkMemoryInterval, - TimeUnit.MILLISECONDS); - if (done) { - break; - } - double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); - long time = System.currentTimeMillis(); - if ((usedMemoryFraction > highMemoryPressure && - time - lastMajorGCTime >= lastGCCallInterval) || - (usedMemoryFraction > optimalMemoryPressure && - time - lastMajorGCTime >= lastGCCallInterval && - time - lastMinorGCTime >= lastGCCallInterval)) { - if (LOG.isInfoEnabled()) { - LOG.info("call: last GC happened a while ago and the " + - "amount of used memory is high (used memory " + - "fraction is " + - String.format("%.2f", usedMemoryFraction) + "). " + - "Calling GC manually"); - } - System.gc(); - time = System.currentTimeMillis() - time; - usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); - if (LOG.isInfoEnabled()) { - LOG.info("call: manual GC is done. It took " + - String.format("%.2f", (double) time / 1000) + - " seconds. Used memory fraction is " + - String.format("%.2f", usedMemoryFraction)); - } - } - updateRates(usedMemoryFraction); - } - return null; - } - }; - } - }; - ExecutorService executor = Executors.newSingleThreadExecutor( - ThreadUtils.createThreadFactory("check-memory")); - this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>( - callableFactory.newCallable(0))); - executor.shutdown(); - } - - /** - * upon major/full GC calls. - */ - /** - * Update statistics and rate regarding communication credits and number of - * active threads. - * - * @param usedMemoryFraction the fraction of used memory over max memory - */ - public void updateRates(double usedMemoryFraction) { - // Update the fraction of processing threads that should remain active - if (usedMemoryFraction >= failMemoryPressure) { - oocEngine.updateActiveThreadsFraction(0); - } else if (usedMemoryFraction < emergencyMemoryPressure) { - oocEngine.updateActiveThreadsFraction(1); - } else { - oocEngine.updateActiveThreadsFraction(1 - - (usedMemoryFraction - emergencyMemoryPressure) / - (failMemoryPressure - emergencyMemoryPressure)); - } - - // Update the fraction of credit that should be used in credit-based flow- - // control - if (usedMemoryFraction >= emergencyMemoryPressure) { - updateRequestsCredit((short) 0); - } else if (usedMemoryFraction < optimalMemoryPressure) { - updateRequestsCredit(maxRequestsCredit); - } else { - updateRequestsCredit((short) (maxRequestsCredit * - (1 - (usedMemoryFraction - optimalMemoryPressure) / - (emergencyMemoryPressure - optimalMemoryPressure)))); - } - } - - @Override - public IOAction[] getNextIOActions() { - double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); - if (LOG.isInfoEnabled()) { - LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f", - usedMemoryFraction)); - } - if (usedMemoryFraction > highMemoryPressure) { - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PARTITION}; - } else if (usedMemoryFraction > optimalMemoryPressure) { - return new IOAction[]{ - IOAction.LOAD_UNPROCESSED_PARTITION, - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PROCESSED_PARTITION}; - } else if (usedMemoryFraction > lowMemoryPressure) { - return new IOAction[]{ - IOAction.LOAD_UNPROCESSED_PARTITION, - IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.LOAD_PARTITION}; - } else { - return new IOAction[]{IOAction.LOAD_PARTITION}; - } - } - - @Override - public boolean approve(IOCommand command) { - return true; - } - - @Override - public void commandCompleted(IOCommand command) { - // Do nothing - } - - @Override - public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { - String gcAction = gcInfo.getGcAction().toLowerCase(); - if (gcAction.contains("full") || gcAction.contains("major")) { - if (!gcInfo.getGcCause().contains("No GC")) { - lastMajorGCTime = System.currentTimeMillis(); - } - } else { - lastMinorGCTime = System.currentTimeMillis(); - } - } - - @Override - public void shutdown() { - shouldTerminate.countDown(); - try { - checkMemoryThreadResult.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("shutdown: caught exception while waiting on check-memory " + - "thread to terminate!"); - throw new IllegalStateException(e); - } - if (LOG.isInfoEnabled()) { - LOG.info("shutdown: ThresholdBasedOracle shutdown complete!"); - } - } - - /** - * Update the credit announced for this worker in Netty. The lower the credit - * is, the lower rate incoming messages arrive at this worker. Thus, credit - * is an indirect way of controlling amount of memory incoming messages would - * take. - * - * @param newCredit the new credit to announce to other workers - */ - private void updateRequestsCredit(short newCredit) { - if (LOG.isInfoEnabled()) { - LOG.info("updateRequestsCredit: updating the credit to " + newCredit); - } - FlowControl flowControl = oocEngine.getFlowControl(); - if (flowControl != null) { - ((CreditBasedFlowControl) flowControl).updateCredit(newCredit); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java new file mode 100644 index 0000000..b6c986d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.command; + +import org.apache.giraph.ooc.OutOfCoreEngine; + +import java.io.IOException; + +/** + * Representation of an IO command (moving data to disk/memory) used in + * out-of-core mechanism. + */ +public abstract class IOCommand { + /** Type of IO command */ + public enum IOCommandType { + /** Loading a partition */ + LOAD_PARTITION, + /** Storing a partition */ + STORE_PARTITION, + /** Storing incoming messages of a partition */ + STORE_MESSAGE, + /** + * Storing message/buffer raw data buffer of a currently out-of-core + * partition + */ + STORE_BUFFER, + /** Doing nothing regarding IO */ + WAIT + } + + /** Id of the partition involved for the IO */ + protected final int partitionId; + /** Out-of-core engine */ + protected final OutOfCoreEngine oocEngine; + /** + * Number of bytes transferred to/from memory (loaded/stored) during the + * execution of the command + */ + protected long numBytesTransferred; + + /** + * Constructor + * + * @param oocEngine Out-of-core engine + * @param partitionId Id of the partition involved in the IO + */ + public IOCommand(OutOfCoreEngine oocEngine, int partitionId) { + this.oocEngine = oocEngine; + this.partitionId = partitionId; + this.numBytesTransferred = 0; + } + + /** + * Get the id of the partition involved in the IO + * + * @return id of the partition + */ + public int getPartitionId() { + return partitionId; + } + + /** + * Execute (load/store of data) the IO command, and change the data stores + * appropriately based on the data loaded/stored. Return true iff the command + * is actually executed (resulted in loading or storing data). + * + * @return whether the command is actually executed + * @throws IOException + */ + public abstract boolean execute() throws IOException; + + /** + * Get the type of the command. + * + * @return type of the command + */ + public abstract IOCommandType getType(); + + /** + * Get the number of bytes transferred (loaded/stored from/to disk). + * + * @return number of bytes transferred during the execution of the command + */ + public long bytesTransferred() { + return numBytesTransferred; + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java new file mode 100644 index 0000000..ee12159 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.command; + +import com.google.common.base.Preconditions; +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.data.DiskBackedEdgeStore; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; + +import java.io.IOException; + +/** + * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and + * message data (if in compute supersteps). Also, this command can be used to + * prefetch a partition to be processed in the next superstep. + */ +public class LoadPartitionIOCommand extends IOCommand { + /** + * Which superstep this partition should be loaded for? (can be current + * superstep or next superstep -- in case of prefetching). + */ + private final long superstep; + + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to be loaded + * @param superstep superstep to load the partition for + */ + public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId, + long superstep) { + super(oocEngine, partitionId); + this.superstep = superstep; + } + + @Override + public boolean execute() throws IOException { + boolean executed = false; + if (oocEngine.getMetaPartitionManager() + .startLoadingPartition(partitionId, superstep)) { + long currentSuperstep = oocEngine.getSuperstep(); + DiskBackedPartitionStore partitionStore = + (DiskBackedPartitionStore) + oocEngine.getServerData().getPartitionStore(); + numBytesTransferred += + partitionStore.loadPartitionData(partitionId); + if (currentSuperstep == BspService.INPUT_SUPERSTEP && + superstep == currentSuperstep) { + DiskBackedEdgeStore edgeStore = + (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); + numBytesTransferred += + edgeStore.loadPartitionData(partitionId); + } + MessageStore messageStore; + if (currentSuperstep == superstep) { + messageStore = oocEngine.getServerData().getCurrentMessageStore(); + } else { + Preconditions.checkState(superstep == currentSuperstep + 1); + messageStore = oocEngine.getServerData().getIncomingMessageStore(); + } + if (messageStore != null) { + numBytesTransferred += ((DiskBackedMessageStore) messageStore) + .loadPartitionData(partitionId); + } + oocEngine.getMetaPartitionManager() + .doneLoadingPartition(partitionId, superstep); + executed = true; + } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.LOAD_PARTITION; + } + + @Override + public String toString() { + return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " + + "superstep = " + superstep + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java new file mode 100644 index 0000000..beda796 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.command; + +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.data.DiskBackedEdgeStore; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; + +import java.io.IOException; + +/** + * IOCommand to store raw data buffers on disk. + */ +public class StoreDataBufferIOCommand extends IOCommand { + /** + * Types of raw data buffer to offload to disk (either vertices/edges buffer + * in INPUT_SUPERSTEP or incoming message buffer). + */ + public enum DataBufferType { PARTITION, MESSAGE }; + /** + * Type of the buffer to store on disk. + */ + private final DataBufferType type; + + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to offload its buffers + * @param type type of the buffer to store on disk + */ + public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine, + int partitionId, + DataBufferType type) { + super(oocEngine, partitionId); + this.type = type; + } + + @Override + public boolean execute() throws IOException { + boolean executed = false; + if (oocEngine.getMetaPartitionManager() + .startOffloadingBuffer(partitionId)) { + switch (type) { + case PARTITION: + DiskBackedPartitionStore partitionStore = + (DiskBackedPartitionStore) + oocEngine.getServerData().getPartitionStore(); + numBytesTransferred += + partitionStore.offloadBuffers(partitionId); + DiskBackedEdgeStore edgeStore = + (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); + numBytesTransferred += edgeStore.offloadBuffers(partitionId); + break; + case MESSAGE: + DiskBackedMessageStore messageStore = + (DiskBackedMessageStore) + oocEngine.getServerData().getIncomingMessageStore(); + numBytesTransferred += + messageStore.offloadBuffers(partitionId); + break; + default: + throw new IllegalStateException("execute: requested data buffer type " + + "does not exist!"); + } + oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId); + executed = true; + } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.STORE_BUFFER; + } + + @Override + public String toString() { + return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " + + "type = " + type.name() + ")"; + } +}
