Repository: giraph Updated Branches: refs/heads/trunk 97e26e65f -> 24bed1a9b
GIRAPH-1046: Add a way to synchronize full GC calls across workers Summary: In applications which use memory more heavily, we can see full GC pauses happening on different workers at different times, and each of these is causing some delay because other workers are often waiting on something from the worker in GC (closing open requests, finishing superstep, etc). Having a way to coordinate when full GCs are called could help them have less effect on job performance. Test Plan: Ran some memory heavy jobs where I observed overall better performance from using this feature. Differential Revision: https://reviews.facebook.net/D55347 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/24bed1a9 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/24bed1a9 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/24bed1a9 Branch: refs/heads/trunk Commit: 24bed1a9bdbe3a1edd46bc3b2af83ee385e041a9 Parents: 97e26e6 Author: Maja Kabiljo <[email protected]> Authored: Thu Mar 10 14:30:11 2016 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Mon Apr 4 16:42:26 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/giraph/bsp/BspService.java | 5 + .../apache/giraph/worker/BspServiceWorker.java | 13 +- .../apache/giraph/worker/MemoryObserver.java | 157 +++ src/site/xdoc/options.xml | 1071 ++++++++++++++++++ 4 files changed, 1242 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/24bed1a9/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 15e4dbe..c73f441 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -111,6 +111,8 @@ public abstract class BspService<I extends WritableComparable, public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; /** Denotes that computation should be halted */ public static final String HALT_COMPUTATION_NODE = "/_haltComputation"; + /** Memory observer dir */ + public static final String MEMORY_OBSERVER_DIR = "/_memoryObserver"; /** User sets this flag to checkpoint and stop the job */ public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop"; /** Denotes which workers have been cleaned up */ @@ -160,6 +162,8 @@ public abstract class BspService<I extends WritableComparable, protected final String masterElectionPath; /** If this path exists computation will be halted */ protected final String haltComputationPath; + /** Path where memory observer stores data */ + protected final String memoryObserverPath; /** Private ZooKeeper instance that implements the service */ private final ZooKeeperExt zk; /** Has the Connection occurred? */ @@ -273,6 +277,7 @@ public abstract class BspService<I extends WritableComparable, masterElectionPath = basePath + MASTER_ELECTION_DIR; String serverPortList = conf.getZookeeperList(); haltComputationPath = basePath + HALT_COMPUTATION_NODE; + memoryObserverPath = basePath + MEMORY_OBSERVER_DIR; getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP, haltComputationPath); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/24bed1a9/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 37876d4..678d99e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -178,6 +178,9 @@ public class BspServiceWorker<I extends WritableComparable, /** InputSplit handlers used in INPUT_SUPERSTEP */ private WorkerInputSplitsHandler inputSplitsHandler; + /** Memory observer */ + private final MemoryObserver memoryObserver; + /** * Constructor for setting up the worker. * @@ -237,6 +240,8 @@ public class BspServiceWorker<I extends WritableComparable, inputSplitsHandler = new WorkerInputSplitsHandler( workerInfo, masterInfo.getTaskId(), workerClient); + + memoryObserver = new MemoryObserver(getZkExt(), memoryObserverPath, conf); } @Override @@ -1010,16 +1015,13 @@ else[HADOOP_NON_SECURE]*/ long nextPrintMsecs = System.currentTimeMillis() + 15000; int partitionIndex = 0; int numPartitions = getPartitionStore().getNumPartitions(); - LOG.info("Write thread started!"); while (true) { Partition<I, V, E> partition = getPartitionStore().getNextPartition(); - LOG.info("partition is : " + partition); if (partition == null) { break; } - LOG.info("start to write a partition"); long verticesWritten = 0; for (Vertex<I, V, E> vertex : partition) { vertexWriter.writeVertex(vertex); @@ -1036,7 +1038,6 @@ else[HADOOP_NON_SECURE]*/ nextPrintMsecs = System.currentTimeMillis() + 15000; nextPrintVertices = verticesWritten + 250000; } - LOG.info("done writing vertices"); if (verticesWritten >= nextUpdateProgressVertices) { WorkerProgress.get().addVerticesStored( @@ -1761,6 +1762,10 @@ else[HADOOP_NON_SECURE]*/ } partitionExchangeChildrenChanged.signal(); foundEvent = true; + } else if (event.getPath().contains(MEMORY_OBSERVER_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + memoryObserver.callGc(); + foundEvent = true; } return foundEvent; http://git-wip-us.apache.org/repos/asf/giraph/blob/24bed1a9/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java new file mode 100644 index 0000000..50fae81 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/MemoryObserver.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + +import org.apache.giraph.conf.BooleanConfOption; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.utils.MemoryUtils; +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Memory observer to help synchronize when full gcs are happening across all + * the workers + */ +public class MemoryObserver { + /** Whether or not to use memory observer */ + public static final BooleanConfOption USE_MEMORY_OBSERVER = + new BooleanConfOption("giraph.memoryObserver.enabled", false, + "Whether or not to use memory observer"); + /** For which fraction of free memory will we issue manual gc calls */ + public static final FloatConfOption FREE_MEMORY_FRACTION_FOR_GC = + new FloatConfOption("giraph.memoryObserver.freeMemoryFractionForGc", 0.1f, + "For which fraction of free memory will we issue manual gc calls"); + /** Minimum milliseconds between two manual gc calls */ + public static final IntConfOption MIN_MS_BETWEEN_FULL_GCS = + new IntConfOption("giraph.memoryObserver.minMsBetweenFullGcs", 60 * 1000, + "Minimum milliseconds between two manual gc calls"); + + /** Logger */ + private static final Logger LOG = Logger.getLogger(MemoryObserver.class); + /** How long does memory observer thread sleep for */ + private static final int MEMORY_OBSERVER_SLEEP_MS = 1000; + + /** When was the last manual gc call */ + private final AtomicLong lastManualGc = new AtomicLong(); + /** Zookeeper */ + private final ZooKeeperExt zk; + /** Path on zookeeper for memory observer files */ + private final String zkPath; + /** Value of conf setting MIN_MS_BETWEEN_FULL_GCS */ + private final int minMsBetweenFullGcs; + + /** + * Constructor + * + * @param zk Zookeeper + * @param zkPath Path on zookeeper for memory observer files + * @param conf Configration + */ + public MemoryObserver(final ZooKeeperExt zk, + final String zkPath, GiraphConfiguration conf) { + this.zk = zk; + this.zkPath = zkPath; + minMsBetweenFullGcs = MIN_MS_BETWEEN_FULL_GCS.get(conf); + + if (!USE_MEMORY_OBSERVER.get(conf)) { + return; + } + + try { + // Create base path for memory observer nodes + zk.createOnceExt(zkPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, true); + } catch (KeeperException | InterruptedException e) { + LOG.info("Exception occurred", e); + } + setWatcher(); + + final float freeMemoryFractionForGc = + FREE_MEMORY_FRACTION_FOR_GC.get(conf); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + + while (true) { + double freeMemoryFraction = MemoryUtils.freeMemoryFraction(); + long msFromLastGc = System.currentTimeMillis() - lastManualGc.get(); + if (msFromLastGc > minMsBetweenFullGcs && + freeMemoryFraction < freeMemoryFractionForGc) { + try { + if (LOG.isInfoEnabled()) { + LOG.info("Notifying others about low memory (" + + freeMemoryFraction + "% free)"); + } + zk.createExt( + zkPath + "/" + System.currentTimeMillis(), + new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL, + false); + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception occurred", e); + } + } + try { + Thread.sleep(MEMORY_OBSERVER_SLEEP_MS); + } catch (InterruptedException e) { + LOG.warn("Exception occurred", e); + return; + } + } + } + }); + thread.setName("memory-observer"); + thread.setDaemon(true); + thread.start(); + } + + /** Set watcher on memory observer folder */ + private void setWatcher() { + try { + // Set a watcher on this path + zk.getChildrenExt(zkPath, true, false, false); + } catch (KeeperException | InterruptedException e) { + LOG.warn("Exception occurred", e); + } + } + + /** Manually call gc, if enough time from last call has passed */ + public void callGc() { + long last = lastManualGc.get(); + if (System.currentTimeMillis() - last > minMsBetweenFullGcs && + lastManualGc.compareAndSet(last, System.currentTimeMillis())) { + if (LOG.isInfoEnabled()) { + LOG.info("Calling gc manually"); + } + System.gc(); + if (LOG.isInfoEnabled()) { + LOG.info("Manual gc call done"); + } + } + setWatcher(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/24bed1a9/src/site/xdoc/options.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/options.xml b/src/site/xdoc/options.xml new file mode 100644 index 0000000..687d30f --- /dev/null +++ b/src/site/xdoc/options.xml @@ -0,0 +1,1071 @@ +<?xml version='1.0' encoding='UTF-8'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +'License'); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<document xmlns='http://maven.apache.org/XDOC/2.0' + xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' + xsi:schemaLocation='http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd'> + <properties> + <title>Giraph Options</title> + </properties> + <body> + <section name='Giraph Options'> + <table border='0' style='width:110%; max-width:110%'> + <tr> + <th>label</th> + <th>type</th> + <th>default value</th> + <th>description</th> + </tr> + <tr> + <td>giraph.SplitMasterWorker</td> + <td>boolean</td> + <td>true</td> + <td>Separate the workers and the master tasks. This is required to support dynamic recovery. (boolean)</td> + </tr> + <tr> + <td>giraph.ZkSkipAcl</td> + <td>boolean</td> + <td>true</td> + <td>ZooKeeper skip ACLs</td> + </tr> + <tr> + <td>giraph.authenticate</td> + <td>boolean</td> + <td>false</td> + <td>Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate and authorize Netty BSP Clients to Servers.</td> + </tr> + <tr> + <td>giraph.cleanupCheckpointsAfterSuccess</td> + <td>boolean</td> + <td>true</td> + <td>Delete checkpoints after a successful job run?</td> + </tr> + <tr> + <td>giraph.createEdgeSourceVertices</td> + <td>boolean</td> + <td>true</td> + <td>Create a source vertex if present in edge input but not necessarily in vertex input</td> + </tr> + <tr> + <td>giraph.doOutputDuringComputation</td> + <td>boolean</td> + <td>false</td> + <td>If you use this option, instead of having saving vertices in the end of application, saveVertex will be called right after each vertex.compute() is called.NOTE: This feature doesn't work well with checkpointing - if you restart from a checkpoint you won't have any ouptut from previous supresteps.</td> + </tr> + <tr> + <td>giraph.failFirstIpcPortBindAttempt</td> + <td>boolean</td> + <td>false</td> + <td>Fail first IPC port binding attempt, simulate binding failure on real grid testing</td> + </tr> + <tr> + <td>giraph.heap.enableReactiveJmapDumping</td> + <td>boolean</td> + <td>false</td> + <td>Option to enable dumping jmap histogram reactively based on free memory on heap</td> + </tr> + <tr> + <td>giraph.isStaticGraph</td> + <td>boolean</td> + <td>false</td> + <td>The application will not mutate the graph topology (the edges). It is used to optimise out-of-core graph, by not writing back edges every time.</td> + </tr> + <tr> + <td>giraph.jmap.histo.enable</td> + <td>boolean</td> + <td>false</td> + <td>Configuration key to enable jmap printing</td> + </tr> + <tr> + <td>giraph.jmap.histo.live</td> + <td>boolean</td> + <td>false</td> + <td>Only print live objects in jmap?</td> + </tr> + <tr> + <td>giraph.jython.type.wrappers.edge.value</td> + <td>boolean</td> + <td>false</td> + <td>Whether user graph types (IVEMM) need Jython wrappers</td> + </tr> + <tr> + <td>giraph.jython.type.wrappers.outgoing.message</td> + <td>boolean</td> + <td>false</td> + <td>Whether user graph types (IVEMM) need Jython wrappers</td> + </tr> + <tr> + <td>giraph.jython.type.wrappers.vertex.id</td> + <td>boolean</td> + <td>false</td> + <td>Whether user graph types (IVEMM) need Jython wrappers</td> + </tr> + <tr> + <td>giraph.jython.type.wrappers.vertex.value</td> + <td>boolean</td> + <td>false</td> + <td>Whether user graph types (IVEMM) need Jython wrappers</td> + </tr> + <tr> + <td>giraph.keepZooKeeperData</td> + <td>boolean</td> + <td>false</td> + <td>Keep the zookeeper output for debugging? Default is to remove it.</td> + </tr> + <tr> + <td>giraph.localTestMode</td> + <td>boolean</td> + <td>false</td> + <td>Indicates whether this job is run in an internal unit test</td> + </tr> + <tr> + <td>giraph.logThreadLayout</td> + <td>boolean</td> + <td>false</td> + <td>Use thread level debugging?</td> + </tr> + <tr> + <td>giraph.metrics.enable</td> + <td>boolean</td> + <td>false</td> + <td>Enable the Metrics system</td> + </tr> + <tr> + <td>giraph.nettyClientUseExecutionHandler</td> + <td>boolean</td> + <td>true</td> + <td>Use the execution handler in netty on the client?</td> + </tr> + <tr> + <td>giraph.nettyServerUseExecutionHandler</td> + <td>boolean</td> + <td>true</td> + <td>Use the execution handler in netty on the server?</td> + </tr> + <tr> + <td>giraph.nettySimulateFirstRequestClosed</td> + <td>boolean</td> + <td>false</td> + <td>Netty simulate a first request closed</td> + </tr> + <tr> + <td>giraph.nettySimulateFirstResponseFailed</td> + <td>boolean</td> + <td>false</td> + <td>Netty simulate a first response failed</td> + </tr> + <tr> + <td>giraph.preferIP</td> + <td>boolean</td> + <td>false</td> + <td>Prefer IP addresses instead of host names</td> + </tr> + <tr> + <td>giraph.pure.yarn.job</td> + <td>boolean</td> + <td>false</td> + <td>Is this a pure YARN job (i.e. no MapReduce layer managing Giraph tasks)</td> + </tr> + <tr> + <td>giraph.textoutputformat.reverse</td> + <td>boolean</td> + <td>false</td> + <td>Reverse values in the output</td> + </tr> + <tr> + <td>giraph.trackJobProgressOnClient</td> + <td>boolean</td> + <td>false</td> + <td>Whether to track job progress on client or not</td> + </tr> + <tr> + <td>giraph.useBigDataIOForMessages</td> + <td>boolean</td> + <td>false</td> + <td>Use BigDataIO for messages?</td> + </tr> + <tr> + <td>giraph.useInputSplitLocality</td> + <td>boolean</td> + <td>true</td> + <td>To minimize network usage when reading input splits, each worker can prioritize splits that reside on its host. This, however, comes at the cost of increased load on ZooKeeper. Hence, users with a lot of splits and input threads (or with configurations that can't exploit locality) may want to disable it.</td> + </tr> + <tr> + <td>giraph.useMessageSizeEncoding</td> + <td>boolean</td> + <td>false</td> + <td>Use message size encoding (typically better for complex objects, not meant for primitive wrapped messages)</td> + </tr> + <tr> + <td>giraph.useNettyDirectMemory</td> + <td>boolean</td> + <td>false</td> + <td>Should netty use direct memory buffers</td> + </tr> + <tr> + <td>giraph.useNettyPooledAllocator</td> + <td>boolean</td> + <td>false</td> + <td>Should netty use pooled memory allocator?</td> + </tr> + <tr> + <td>giraph.useOutOfCoreGraph</td> + <td>boolean</td> + <td>false</td> + <td>Enable out-of-core graph.</td> + </tr> + <tr> + <td>giraph.useSuperstepCounters</td> + <td>boolean</td> + <td>true</td> + <td>Use superstep counters? (boolean)</td> + </tr> + <tr> + <td>giraph.useUnsafeSerialization</td> + <td>boolean</td> + <td>true</td> + <td>Use unsafe serialization?</td> + </tr> + <tr> + <td>giraph.vertex.resolver.create.on.msgs</td> + <td>boolean</td> + <td>true</td> + <td>Option of whether to create vertexes that were not existent before but received messages</td> + </tr> + <tr> + <td>giraph.vertexOutputFormatThreadSafe</td> + <td>boolean</td> + <td>false</td> + <td>Vertex output format thread-safe - if your VertexOutputFormat allows several vertexWriters to be created and written to in parallel, you should set this to true.</td> + </tr> + <tr> + <td>giraph.zKForceSync</td> + <td>boolean</td> + <td>false</td> + <td>ZooKeeper force sync</td> + </tr> + <tr> + <td>giraph.zkIsExternal</td> + <td>boolean</td> + <td>true</td> + <td>Zookeeper List will always hold a value during the computation while this option provides information regarding whether the zookeeper was internally started or externally provided.</td> + </tr> + <tr> + <td>giraph.zkRunsInProcess</td> + <td>boolean</td> + <td>true</td> + <td>If true run zookeeper in master process, if false starts separate process for zookeeper</td> + </tr> + <tr> + <td>giraph.aggregatorWriterClass</td> + <td>class</td> + <td>TextAggregatorWriter</td> + <td>AggregatorWriter class - optional</td> + </tr> + <tr> + <td>giraph.checkpoint.supported.checker</td> + <td>class</td> + <td>DefaultCheckpointSupportedChecker</td> + <td>This is the way to specify if checkpointing is supported by the job</td> + </tr> + <tr> + <td>giraph.computation.factory.class</td> + <td>class</td> + <td>DefaultComputationFactory</td> + <td>Computation factory class - optional</td> + </tr> + <tr> + <td>giraph.computationClass</td> + <td>class</td> + <td>null</td> + <td>Computation class - required</td> + </tr> + <tr> + <td>giraph.edgeInputFilterClass</td> + <td>class</td> + <td>DefaultEdgeInputFilter</td> + <td>EdgeInputFilter class</td> + </tr> + <tr> + <td>giraph.edgeInputFormatClass</td> + <td>class</td> + <td>null</td> + <td>EdgeInputFormat class</td> + </tr> + <tr> + <td>giraph.edgeOutputFormatClass</td> + <td>class</td> + <td>null</td> + <td>EdgeOutputFormat class</td> + </tr> + <tr> + <td>giraph.edgeStoreFactoryClass</td> + <td>class</td> + <td>InMemoryEdgeStoreFactory</td> + <td>Edge Store Factory class to use for creating edgeStore</td> + </tr> + <tr> + <td>giraph.edgeTranslationClass</td> + <td>class</td> + <td>null</td> + <td>Class used to conduct expensive edge translation during vertex input phase</td> + </tr> + <tr> + <td>giraph.edgeValueClass</td> + <td>class</td> + <td>null</td> + <td>Edge value class</td> + </tr> + <tr> + <td>giraph.edgeValueFactoryClass</td> + <td>class</td> + <td>DefaultEdgeValueFactory</td> + <td>Edge value factory class - optional</td> + </tr> + <tr> + <td>giraph.graphPartitionerFactoryClass</td> + <td>class</td> + <td>HashPartitionerFactory</td> + <td>Graph partitioner factory class - optional</td> + </tr> + <tr> + <td>giraph.hadoopOutputFormatClass</td> + <td>class</td> + <td>BspOutputFormat</td> + <td>Output format class for hadoop to use (for committing)</td> + </tr> + <tr> + <td>giraph.haltInstructionsWriter</td> + <td>class</td> + <td>DefaultHaltInstructionsWriter</td> + <td>Class used to write instructions on how to halt the application</td> + </tr> + <tr> + <td>giraph.inputOutEdgesClass</td> + <td>class</td> + <td>ByteArrayEdges</td> + <td>Vertex edges class to be used during edge input only - optional</td> + </tr> + <tr> + <td>giraph.jobObserverClass</td> + <td>class</td> + <td>DefaultJobObserver</td> + <td>Observer class to watch over job status - optional</td> + </tr> + <tr> + <td>giraph.jobRetryCheckerClass</td> + <td>class</td> + <td>DefaultGiraphJobRetryChecker</td> + <td>Class which decides whether a failed job should be retried - optional</td> + </tr> + <tr> + <td>giraph.mappingInputFormatClass</td> + <td>class</td> + <td>null</td> + <td>MappingInputFormat class</td> + </tr> + <tr> + <td>giraph.mappingStoreClass</td> + <td>class</td> + <td>null</td> + <td>MappingStore Class</td> + </tr> + <tr> + <td>giraph.mappingStoreOpsClass</td> + <td>class</td> + <td>null</td> + <td>MappingStoreOps class</td> + </tr> + <tr> + <td>giraph.master.observers</td> + <td>class</td> + <td>null</td> + <td>Classes for Master Observer - optional</td> + </tr> + <tr> + <td>giraph.masterComputeClass</td> + <td>class</td> + <td>DefaultMasterCompute</td> + <td>Class for Master - optional</td> + </tr> + <tr> + <td>giraph.messageCombinerClass</td> + <td>class</td> + <td>null</td> + <td>Message combiner class - optional</td> + </tr> + <tr> + <td>giraph.messageStoreFactoryClass</td> + <td>class</td> + <td>InMemoryMessageStoreFactory</td> + <td>Message Store Factory Class that is to be used</td> + </tr> + <tr> + <td>giraph.outEdgesClass</td> + <td>class</td> + <td>ByteArrayEdges</td> + <td>Vertex edges class - optional</td> + </tr> + <tr> + <td>giraph.outgoingMessageValueClass</td> + <td>class</td> + <td>null</td> + <td>Outgoing message value class</td> + </tr> + <tr> + <td>giraph.outgoingMessageValueFactoryClass</td> + <td>class</td> + <td>DefaultMessageValueFactory</td> + <td>Outgoing message value factory class - optional</td> + </tr> + <tr> + <td>giraph.partitionClass</td> + <td>class</td> + <td>SimplePartition</td> + <td>Partition class - optional</td> + </tr> + <tr> + <td>giraph.typesHolder</td> + <td>class</td> + <td>null</td> + <td>TypesHolder, used if Computation not set - optional</td> + </tr> + <tr> + <td>giraph.vertexClass</td> + <td>class</td> + <td>DefaultVertex</td> + <td>Vertex class</td> + </tr> + <tr> + <td>giraph.vertexIdClass</td> + <td>class</td> + <td>null</td> + <td>Vertex index class</td> + </tr> + <tr> + <td>giraph.vertexIdFactoryClass</td> + <td>class</td> + <td>DefaultVertexIdFactory</td> + <td>Vertex ID factory class - optional</td> + </tr> + <tr> + <td>giraph.vertexInputFilterClass</td> + <td>class</td> + <td>DefaultVertexInputFilter</td> + <td>VertexInputFilter class</td> + </tr> + <tr> + <td>giraph.vertexInputFormatClass</td> + <td>class</td> + <td>null</td> + <td>VertexInputFormat class (at least one of the input format classes is required)</td> + </tr> + <tr> + <td>giraph.vertexOutputFormatClass</td> + <td>class</td> + <td>null</td> + <td>VertexOutputFormat class</td> + </tr> + <tr> + <td>giraph.vertexResolverClass</td> + <td>class</td> + <td>DefaultVertexResolver</td> + <td>Vertex resolver class - optional</td> + </tr> + <tr> + <td>giraph.vertexValueClass</td> + <td>class</td> + <td>null</td> + <td>Vertex value class</td> + </tr> + <tr> + <td>giraph.vertexValueCombinerClass</td> + <td>class</td> + <td>DefaultVertexValueCombiner</td> + <td>Vertex value combiner class - optional</td> + </tr> + <tr> + <td>giraph.vertexValueFactoryClass</td> + <td>class</td> + <td>DefaultVertexValueFactory</td> + <td>Vertex value factory class - optional</td> + </tr> + <tr> + <td>giraph.worker.observers</td> + <td>class</td> + <td>null</td> + <td>Classes for Worker Observer - optional</td> + </tr> + <tr> + <td>giraph.workerContextClass</td> + <td>class</td> + <td>DefaultWorkerContext</td> + <td>Worker contextclass</td> + </tr> + <tr> + <td>giraph.computation.language</td> + <td>enum</td> + <td>JAVA</td> + <td>Which language computation is implemented in</td> + </tr> + <tr> + <td>giraph.messageEncodeAndStoreType</td> + <td>enum</td> + <td>BYTEARRAY_PER_PARTITION</td> + <td>Select the message_encode_and_store_type to use</td> + </tr> + <tr> + <td>giraph.types.language.edge.value</td> + <td>enum</td> + <td>JAVA</td> + <td>Language user graph types (IVEMM) are implemented in</td> + </tr> + <tr> + <td>giraph.types.language.outgoing.message</td> + <td>enum</td> + <td>JAVA</td> + <td>Language user graph types (IVEMM) are implemented in</td> + </tr> + <tr> + <td>giraph.types.language.vertex.id</td> + <td>enum</td> + <td>JAVA</td> + <td>Language user graph types (IVEMM) are implemented in</td> + </tr> + <tr> + <td>giraph.types.language.vertex.value</td> + <td>enum</td> + <td>JAVA</td> + <td>Language user graph types (IVEMM) are implemented in</td> + </tr> + <tr> + <td>giraph.async.message.store.threads</td> + <td>integer</td> + <td>0</td> + <td>Number of threads to be used in async message store.</td> + </tr> + <tr> + <td>giraph.channelsPerServer</td> + <td>integer</td> + <td>1</td> + <td>Number of channels used per server</td> + </tr> + <tr> + <td>giraph.checkpoint.io.threads</td> + <td>integer</td> + <td>8</td> + <td>Number of threads for writing and reading checkpoints</td> + </tr> + <tr> + <td>giraph.checkpointFrequency</td> + <td>integer</td> + <td>0</td> + <td>How often to checkpoint (i.e. 0, means no checkpoint, 1 means every superstep, 2 is every two supersteps, etc.).</td> + </tr> + <tr> + <td>giraph.clientReceiveBufferSize</td> + <td>integer</td> + <td>32768</td> + <td>Client receive buffer size</td> + </tr> + <tr> + <td>giraph.clientSendBufferSize</td> + <td>integer</td> + <td>524288</td> + <td>Client send buffer size</td> + </tr> + <tr> + <td>giraph.edgeRequestSize</td> + <td>integer</td> + <td>524288</td> + <td>Maximum size of edges (in bytes) per peer before flush</td> + </tr> + <tr> + <td>giraph.eventWaitMsecs</td> + <td>integer</td> + <td>30000</td> + <td>Millseconds to wait for an event before continuing</td> + </tr> + <tr> + <td>giraph.hdfs.file.creation.retries</td> + <td>integer</td> + <td>10</td> + <td>Retries to create an HDFS file before failing</td> + </tr> + <tr> + <td>giraph.hdfs.file.creation.retry.wait.ms</td> + <td>integer</td> + <td>30000</td> + <td>Milliseconds to wait prior to retrying creation of an HDFS file</td> + </tr> + <tr> + <td>giraph.heap.minFreeMb</td> + <td>integer</td> + <td>128</td> + <td>Option used by worker and master observers to check for imminent OOM exception</td> + </tr> + <tr> + <td>giraph.ipcInitialPort</td> + <td>integer</td> + <td>30000</td> + <td>Initial port to start using for the IPC communication</td> + </tr> + <tr> + <td>giraph.jmap.histo.msec</td> + <td>integer</td> + <td>30000</td> + <td>Configuration key for msec to sleep between calls</td> + </tr> + <tr> + <td>giraph.jmap.histo.print_lines</td> + <td>integer</td> + <td>30</td> + <td>Configuration key for how many lines to print</td> + </tr> + <tr> + <td>giraph.lbMappingStoreLower</td> + <td>integer</td> + <td>-1</td> + <td>'lower' value used by lbMappingstore</td> + </tr> + <tr> + <td>giraph.lbMappingStoreUpper</td> + <td>integer</td> + <td>-1</td> + <td>'upper' value used by lbmappingstore</td> + </tr> + <tr> + <td>giraph.maxIpcPortBindAttempts</td> + <td>integer</td> + <td>20</td> + <td>Maximum bind attempts for different IPC ports</td> + </tr> + <tr> + <td>giraph.maxMasterSuperstepWaitMsecs</td> + <td>integer</td> + <td>600000</td> + <td>Maximum milliseconds to wait before giving up trying to get the minimum number of workers before a superstep (int).</td> + </tr> + <tr> + <td>giraph.maxMessagesInMemory</td> + <td>integer</td> + <td>1000000</td> + <td>If using out-of-core messaging, it tells how much messages do we keep in memory.</td> + </tr> + <tr> + <td>giraph.maxMutationsPerRequest</td> + <td>integer</td> + <td>100</td> + <td>Maximum number of mutations per partition before flush</td> + </tr> + <tr> + <td>giraph.maxNumberOfSupersteps</td> + <td>integer</td> + <td>1</td> + <td>The application will halt after this many supersteps is completed. For instance, if it is set to 3, the application will run at most 0, 1, and 2 supersteps and then go into the shutdown superstep.</td> + </tr> + <tr> + <td>giraph.maxPartitionsInMemory</td> + <td>integer</td> + <td>0</td> + <td>Maximum number of partitions to hold in memory for each worker. By default it is set to 0 (for adaptive out-of-core mechanism</td> + </tr> + <tr> + <td>giraph.maxRequestMilliseconds</td> + <td>integer</td> + <td>600000</td> + <td>Milliseconds for a request to complete (or else resend)</td> + </tr> + <tr> + <td>giraph.maxResolveAddressAttempts</td> + <td>integer</td> + <td>5</td> + <td>Max resolve address attempts</td> + </tr> + <tr> + <td>giraph.messagesBufferSize</td> + <td>integer</td> + <td>8192</td> + <td>Size of buffer when reading and writing messages out-of-core.</td> + </tr> + <tr> + <td>giraph.minPartitionsPerComputeThread</td> + <td>integer</td> + <td>1</td> + <td>Minimum number of partitions to have per compute thread</td> + </tr> + <tr> + <td>giraph.msgRequestSize</td> + <td>integer</td> + <td>524288</td> + <td>Maximum size of messages (in bytes) per peer before flush</td> + </tr> + <tr> + <td>giraph.nettyClientExecutionThreads</td> + <td>integer</td> + <td>8</td> + <td>Netty client execution threads (execution handler)</td> + </tr> + <tr> + <td>giraph.nettyClientThreads</td> + <td>integer</td> + <td>4</td> + <td>Netty client threads</td> + </tr> + <tr> + <td>giraph.nettyMaxConnectionFailures</td> + <td>integer</td> + <td>1000</td> + <td>Netty max connection failures</td> + </tr> + <tr> + <td>giraph.nettyRequestEncoderBufferSize</td> + <td>integer</td> + <td>32768</td> + <td>How big to make the encoder buffer?</td> + </tr> + <tr> + <td>giraph.nettyServerExecutionThreads</td> + <td>integer</td> + <td>8</td> + <td>Netty server execution threads (execution handler)</td> + </tr> + <tr> + <td>giraph.nettyServerThreads</td> + <td>integer</td> + <td>16</td> + <td>Netty server threads</td> + </tr> + <tr> + <td>giraph.numComputeThreads</td> + <td>integer</td> + <td>1</td> + <td>Number of threads for vertex computation</td> + </tr> + <tr> + <td>giraph.numInputThreads</td> + <td>integer</td> + <td>1</td> + <td>Number of threads for input split loading</td> + </tr> + <tr> + <td>giraph.numOutOfCoreThreads</td> + <td>integer</td> + <td>1</td> + <td>Number of threads participating in swapping data to disk.</td> + </tr> + <tr> + <td>giraph.numOutputThreads</td> + <td>integer</td> + <td>1</td> + <td>Number of threads for writing output in the end of the application</td> + </tr> + <tr> + <td>giraph.partitionLongTailMinPrint</td> + <td>integer</td> + <td>1</td> + <td>Minimum stragglers of the superstep before printing them out</td> + </tr> + <tr> + <td>giraph.serverReceiveBufferSize</td> + <td>integer</td> + <td>524288</td> + <td>Server receive buffer size</td> + </tr> + <tr> + <td>giraph.serverSendBufferSize</td> + <td>integer</td> + <td>32768</td> + <td>Server send buffer size</td> + </tr> + <tr> + <td>giraph.tcpBacklog</td> + <td>integer</td> + <td>1</td> + <td>TCP backlog (defaults to number of workers)</td> + </tr> + <tr> + <td>giraph.userPartitionCount</td> + <td>integer</td> + <td>-1</td> + <td>Overrides default partition count calculation if not -1</td> + </tr> + <tr> + <td>giraph.vertexRequestSize</td> + <td>integer</td> + <td>524288</td> + <td>Maximum size of vertices (in bytes) per peer before flush</td> + </tr> + <tr> + <td>giraph.waitTaskDoneTimeoutMs</td> + <td>integer</td> + <td>900000</td> + <td>Maximum timeout (in ms) for waiting for all all tasks to complete</td> + </tr> + <tr> + <td>giraph.waitingRequestMsecs</td> + <td>integer</td> + <td>15000</td> + <td>Msecs to wait between waiting for all requests to finish</td> + </tr> + <tr> + <td>giraph.yarn.task.heap.mb</td> + <td>integer</td> + <td>1024</td> + <td>Name of Giraph property for user-configurable heap memory per worker</td> + </tr> + <tr> + <td>giraph.zKMinSessionTimeout</td> + <td>integer</td> + <td>600000</td> + <td>ZooKeeper minimum session timeout</td> + </tr> + <tr> + <td>giraph.zkConnectionAttempts</td> + <td>integer</td> + <td>10</td> + <td>Number of ZooKeeper client connection attempts before giving up.</td> + </tr> + <tr> + <td>giraph.zkMaxSessionTimeout</td> + <td>integer</td> + <td>900000</td> + <td>ZooKeeper maximum session timeout</td> + </tr> + <tr> + <td>giraph.zkOpsMaxAttempts</td> + <td>integer</td> + <td>3</td> + <td>Max attempts for handling ZooKeeper connection loss</td> + </tr> + <tr> + <td>giraph.zkOpsRetryWaitMsecs</td> + <td>integer</td> + <td>5000</td> + <td>Msecs to wait before retrying a failed ZooKeeper op due to connection loss.</td> + </tr> + <tr> + <td>giraph.zkServerCount</td> + <td>integer</td> + <td>1</td> + <td>Number of nodes (not tasks) to run Zookeeper on</td> + </tr> + <tr> + <td>giraph.zkServerPort</td> + <td>integer</td> + <td>22181</td> + <td>ZooKeeper port to use</td> + </tr> + <tr> + <td>giraph.zkServerlistPollMsecs</td> + <td>integer</td> + <td>3000</td> + <td>Polling interval to check for the ZooKeeper server data</td> + </tr> + <tr> + <td>giraph.zkSessionMsecTimeout</td> + <td>integer</td> + <td>60000</td> + <td>ZooKeeper session millisecond timeout</td> + </tr> + <tr> + <td>mapred.map.max.attempts</td> + <td>integer</td> + <td>-1</td> + <td>Maximum number of attempts a master/worker will retry before killing the job. This directly maps to the number of map task attempts in Hadoop.</td> + </tr> + <tr> + <td>giraph.additionalEdgeRequestSize</td> + <td>float</td> + <td>0.2</td> + <td>Additional size (expressed as a ratio) of each per-partition buffer on top of the average size.</td> + </tr> + <tr> + <td>giraph.additionalMsgRequestSize</td> + <td>float</td> + <td>0.2</td> + <td>How much bigger than the average per partition size to make initial per partition buffers. If this value is A, message request size is M, and a worker has P partitions, than its initial partition buffer size will be (M / P) * (1 + A).</td> + </tr> + <tr> + <td>giraph.additionalVertexRequestSize</td> + <td>float</td> + <td>0.2</td> + <td>Additional size (expressed as a ratio) of each per-partition buffer on top of the average size.</td> + </tr> + <tr> + <td>giraph.inputSplitSamplePercent</td> + <td>float</td> + <td>100.0</td> + <td>Input split sample percent - Used only for sampling and testing, rather than an actual job. The idea is that to test, you might only want a fraction of the actual input splits from your VertexInputFormat to load (values should be [0, 100]).</td> + </tr> + <tr> + <td>giraph.masterPartitionCountMultiplier</td> + <td>float</td> + <td>1.0</td> + <td>Multiplier for the current workers squared</td> + </tr> + <tr> + <td>giraph.minPercentResponded</td> + <td>float</td> + <td>100.0</td> + <td>Minimum percent of the maximum number of workers that have responded in order to continue progressing. (float)</td> + </tr> + <tr> + <td>giraph.msgRequestWarningThreshold</td> + <td>float</td> + <td>2.0</td> + <td>If request sizes are bigger than the buffer size by this factor warnings are printed to the log and to the command line</td> + </tr> + <tr> + <td>giraph.InputSplitMaxEdges</td> + <td>long</td> + <td>-1</td> + <td>To limit outlier vertex input splits from producing too many vertices or to help with testing, the number of edges loaded from an input split can be limited. By default, everything is loaded.</td> + </tr> + <tr> + <td>giraph.InputSplitMaxVertices</td> + <td>long</td> + <td>-1</td> + <td>To limit outlier vertex input splits from producing too many vertices or to help with testing, the number of vertices loaded from an input split can be limited. By default, everything is loaded.</td> + </tr> + <tr> + <td>giraph.maxAllowedJobTimeMilliseconds</td> + <td>long</td> + <td>-1</td> + <td>Maximum allowed time for job to run after getting all resources before it will be killed, in milliseconds (-1 if it has no limit)</td> + </tr> + <tr> + <td>giraph.checkpoint.compression.codec</td> + <td>string</td> + <td>.deflate</td> + <td>Defines compression algorithm we will be using for storing checkpoint. Available options include but not restricted to: .deflate, .gz, .bz2, .lzo</td> + </tr> + <tr> + <td>giraph.checkpointDirectory</td> + <td>string</td> + <td>_bsp/_checkpoints/</td> + <td>This directory has/stores the available checkpoint files in HDFS.</td> + </tr> + <tr> + <td>giraph.dns.interface</td> + <td>string</td> + <td>default</td> + <td>Interface to use for hostname resolution</td> + </tr> + <tr> + <td>giraph.dns.nameserver</td> + <td>string</td> + <td>default</td> + <td>Server for hostname resolution</td> + </tr> + <tr> + <td>giraph.edge.output.subdir</td> + <td>string</td> + <td></td> + <td>EdgeOutputFormat sub-directory</td> + </tr> + <tr> + <td>giraph.logLevel</td> + <td>string</td> + <td>info</td> + <td>Override the Hadoop log level and set the desired log level.</td> + </tr> + <tr> + <td>giraph.messagesDirectory</td> + <td>string</td> + <td>_bsp/_messages/</td> + <td>Comma-separated list of directories in the local file system for out-of-core messages.</td> + </tr> + <tr> + <td>giraph.metrics.directory</td> + <td>string</td> + <td></td> + <td>Directory in HDFS to write master metrics to, instead of stderr</td> + </tr> + <tr> + <td>giraph.nettyClientExecutionAfterHandler</td> + <td>string</td> + <td>request-encoder</td> + <td>Where to place the netty client execution handle?</td> + </tr> + <tr> + <td>giraph.nettyCompressionAlgorithm</td> + <td>string</td> + <td></td> + <td>Which compression algorithm to use in netty</td> + </tr> + <tr> + <td>giraph.nettyServerExecutionAfterHandler</td> + <td>string</td> + <td>requestFrameDecoder</td> + <td>Where to place the netty server execution handle?</td> + </tr> + <tr> + <td>giraph.partitionsDirectory</td> + <td>string</td> + <td>_bsp/_partitions</td> + <td>Comma-separated list of directories in the local filesystem for out-of-core partitions.</td> + </tr> + <tr> + <td>giraph.restart.jobId</td> + <td>string</td> + <td>null</td> + <td>Which job ID should I try to restart?</td> + </tr> + <tr> + <td>giraph.textoutputformat.separator</td> + <td>string</td> + <td> </td> + <td>GiraphTextOuputFormat Separator</td> + </tr> + <tr> + <td>giraph.vertex.output.subdir</td> + <td>string</td> + <td></td> + <td>VertexOutputFormat sub-directory</td> + </tr> + <tr> + <td>giraph.yarn.libjars</td> + <td>string</td> + <td></td> + <td>conf key for comma-separated list of jars to export to YARN workers</td> + </tr> + <tr> + <td>giraph.zkJavaOpts</td> + <td>string</td> + <td>-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100</td> + <td>Java opts passed to ZooKeeper startup</td> + </tr> + <tr> + <td>giraph.zkList</td> + <td>string</td> + <td></td> + <td>ZooKeeper comma-separated list (if not set, will start up ZooKeeper locally). Consider that after locally-starting zookeeper, this parameter will updated the configuration with the corrent configuration value.</td> + </tr> + <tr> + <td>giraph.zkManagerDirectory</td> + <td>string</td> + <td>_bsp/_defaultZkManagerDir</td> + <td>If ZOOKEEPER_LIST is not set, then use this directory to manage ZooKeeper</td> + </tr> + </table> + </section> + </body> +</document>
