Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Feb 16 22:12:31 2012 @@ -33,570 +33,571 @@ import java.io.IOException; * for our needs. For instance, our job should not have any reduce tasks. */ public class GiraphJob extends Job { - static { - Configuration.addDefaultResource("giraph-site.xml"); - } - - /** Vertex class - required */ - public static final String VERTEX_CLASS = "giraph.vertexClass"; - /** VertexInputFormat class - required */ - public static final String VERTEX_INPUT_FORMAT_CLASS = - "giraph.vertexInputFormatClass"; - - /** VertexOutputFormat class - optional */ - public static final String VERTEX_OUTPUT_FORMAT_CLASS = - "giraph.vertexOutputFormatClass"; - /** Vertex combiner class - optional */ - public static final String VERTEX_COMBINER_CLASS = - "giraph.combinerClass"; - /** Vertex resolver class - optional */ - public static final String VERTEX_RESOLVER_CLASS = - "giraph.vertexResolverClass"; - /** Graph partitioner factory class - optional */ - public static final String GRAPH_PARTITIONER_FACTORY_CLASS = - "giraph.graphPartitionerFactoryClass"; - - /** Vertex index class */ - public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass"; - /** Vertex value class */ - public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass"; - /** Edge value class */ - public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass"; - /** Message value class */ - public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass"; - /** Worker context class */ - public static final String WORKER_CONTEXT_CLASS = - "giraph.workerContextClass"; - /** AggregatorWriter class - optional */ - public static final String AGGREGATOR_WRITER_CLASS = - "giraph.aggregatorWriterClass"; - - /** - * Minimum number of simultaneous workers before this job can run (int) - */ - public static final String MIN_WORKERS = "giraph.minWorkers"; - /** - * Maximum number of simultaneous worker tasks started by this job (int). - */ - public static final String MAX_WORKERS = "giraph.maxWorkers"; - - /** - * Separate the workers and the master tasks. This is required - * to support dynamic recovery. (boolean) - */ - public static final String SPLIT_MASTER_WORKER = - "giraph.SplitMasterWorker"; - /** - * Default on whether to separate the workers and the master tasks. - * Needs to be "true" to support dynamic recovery. - */ - public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true; - - /** Indicates whether this job is run in an internal unit test */ - public static final String LOCAL_TEST_MODE = - "giraph.localTestMode"; - - /** not in local test mode per default */ - public static final boolean LOCAL_TEST_MODE_DEFAULT = false; - - /** - * Minimum percent of the maximum number of workers that have responded - * in order to continue progressing. (float) - */ - public static final String MIN_PERCENT_RESPONDED = - "giraph.minPercentResponded"; - /** Default 100% response rate for workers */ - public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f; - - /** Polling timeout to check on the number of responded tasks (int) */ - public static final String POLL_MSECS = "giraph.pollMsecs"; - /** Default poll msecs (30 seconds) */ - public static final int POLL_MSECS_DEFAULT = 30*1000; - - /** - * ZooKeeper comma-separated list (if not set, - * will start up ZooKeeper locally) - */ - public static final String ZOOKEEPER_LIST = "giraph.zkList"; - - /** ZooKeeper session millisecond timeout */ - public static final String ZOOKEEPER_SESSION_TIMEOUT = - "giraph.zkSessionMsecTimeout"; - /** Default Zookeeper session millisecond timeout */ - public static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60*1000; - - /** Polling interval to check for the final ZooKeeper server data */ - public static final String ZOOKEEPER_SERVERLIST_POLL_MSECS = - "giraph.zkServerlistPollMsecs"; - /** Default polling interval to check for the final ZooKeeper server data */ - public static final int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = - 3*1000; - - /** Number of nodes (not tasks) to run Zookeeper on */ - public static final String ZOOKEEPER_SERVER_COUNT = - "giraph.zkServerCount"; - /** Default number of nodes to run Zookeeper on */ - public static final int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1; - - /** ZooKeeper port to use */ - public static final String ZOOKEEPER_SERVER_PORT = - "giraph.zkServerPort"; - /** Default ZooKeeper port to use */ - public static final int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181; - - /** Location of the ZooKeeper jar - Used internally, not meant for users */ - public static final String ZOOKEEPER_JAR = "giraph.zkJar"; - - /** Local ZooKeeper directory to use */ - public static final String ZOOKEEPER_DIR = "giraph.zkDir"; - - /** Initial port to start using for the RPC communication */ - public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort"; - /** Default port to start using for the RPC communication */ - public static final int RPC_INITIAL_PORT_DEFAULT = 30000; - - /** Maximum bind attempts for different RPC ports */ - public static final String MAX_RPC_PORT_BIND_ATTEMPTS = - "giraph.maxRpcPortBindAttempts"; - /** Default maximum bind attempts for different RPC ports */ - public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20; - - /** Maximum number of RPC handlers */ - public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers"; - /** Default maximum number of RPC handlers */ - public static final int RPC_NUM_HANDLERS_DEFAULT = 100; - - /** - * Maximum number of vertices per partition before sending. - * (input superstep only). - */ - public static final String MAX_VERTICES_PER_PARTITION = - "giraph.maxVerticesPerPartition"; - /** Default maximum number of vertices per partition before sending. */ - public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000; - - /** Maximum number of messages per peer before flush */ - public static final String MSG_SIZE = "giraph.msgSize"; - /** Default maximum number of messages per peer before flush */ - public static final int MSG_SIZE_DEFAULT = 1000; - - /** Maximum number of messages that can be bulk sent during a flush */ - public static final String MAX_MESSAGES_PER_FLUSH_PUT = - "giraph.maxMessagesPerFlushPut"; - /** Default number of messages that can be bulk sent during a flush */ - public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000; - - /** Number of flush threads per peer */ - public static final String MSG_NUM_FLUSH_THREADS = - "giraph.msgNumFlushThreads"; - - /** Number of poll attempts prior to failing the job (int) */ - public static final String POLL_ATTEMPTS = "giraph.pollAttempts"; - /** Default poll attempts */ - public static final int POLL_ATTEMPTS_DEFAULT = 10; - - /** Number of minimum vertices in each vertex range */ - public static final String MIN_VERTICES_PER_RANGE = - "giraph.minVerticesPerRange"; - /** Default number of minimum vertices in each vertex range */ - public static final long MIN_VERTICES_PER_RANGE_DEFAULT = 3; - - /** Minimum stragglers of the superstep before printing them out */ - public static final String PARTITION_LONG_TAIL_MIN_PRINT = - "giraph.partitionLongTailMinPrint"; - /** Only print stragglers with one as a default */ - public static final int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1; - - /** Use superstep counters? (boolean) */ - public static final String USE_SUPERSTEP_COUNTERS = - "giraph.useSuperstepCounters"; - /** Default is to use the superstep counters */ - public static final boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true; - - /** - * Set the multiplicative factor of how many partitions to create from - * a single InputSplit based on the number of total InputSplits. For - * example, if there are 10 total InputSplits and this is set to 0.5, then - * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the - * minimum size is met). - */ - public static final String TOTAL_INPUT_SPLIT_MULTIPLIER = - "giraph.totalInputSplitMultiplier"; - /** Default total input split multiplier */ - public static final float TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT = 0.5f; - - /** - * Input split sample percent - Used only for sampling and testing, rather - * than an actual job. The idea is that to test, you might only want a - * fraction of the actual input splits from your VertexInputFormat to - * load (values should be [0, 100]). - */ - public static final String INPUT_SPLIT_SAMPLE_PERCENT = - "giraph.inputSplitSamplePercent"; - /** Default is to use all the input splits */ - public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f; - - /** - * To limit outlier input splits from producing too many vertices or to - * help with testing, the number of vertices loaded from an input split can - * be limited. By default, everything is loaded. - */ - public static final String INPUT_SPLIT_MAX_VERTICES = - "giraph.InputSplitMaxVertices"; - /** - * Default is that all the vertices are to be loaded from the input - * split - */ - public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1; - - /** Java opts passed to ZooKeeper startup */ - public static final String ZOOKEEPER_JAVA_OPTS = - "giraph.zkJavaOpts"; - /** Default java opts passed to ZooKeeper startup */ - public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT = - "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " + - "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100"; - - /** - * How often to checkpoint (i.e. 0, means no checkpoint, - * 1 means every superstep, 2 is every two supersteps, etc.). - */ - public static final String CHECKPOINT_FREQUENCY = - "giraph.checkpointFrequency"; - - /** Default checkpointing frequency of every 2 supersteps. */ - public static final int CHECKPOINT_FREQUENCY_DEFAULT = 2; - - /** - * Delete checkpoints after a successful job run? - */ - public static final String CLEANUP_CHECKPOINTS_AFTER_SUCCESS = - "giraph.cleanupCheckpointsAfterSuccess"; - /** Default is to clean up the checkponts after a successful job */ - public static final boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = - true; - - /** - * An application can be restarted manually by selecting a superstep. The - * corresponding checkpoint must exist for this to work. The user should - * set a long value. Default is start from scratch. - */ - public static final String RESTART_SUPERSTEP = "giraph.restartSuperstep"; - - /** - * Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root - * znode on the cluster beginning with "/" - */ - public static final String BASE_ZNODE_KEY = "giraph.zkBaseZNode"; - - /** - * If ZOOKEEPER_LIST is not set, then use this directory to manage - * ZooKeeper - */ - public static final String ZOOKEEPER_MANAGER_DIRECTORY = - "giraph.zkManagerDirectory"; - /** - * Default ZooKeeper manager directory (where determining the servers in - * HDFS files will go). Final directory path will also have job number - * for uniqueness. - */ - public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT = - "_bsp/_defaultZkManagerDir"; - - /** This directory has/stores the available checkpoint files in HDFS. */ - public static final String CHECKPOINT_DIRECTORY = - "giraph.checkpointDirectory"; - /** - * Default checkpoint directory (where checkpoing files go in HDFS). Final - * directory path will also have the job number for uniqueness - */ - public static final String CHECKPOINT_DIRECTORY_DEFAULT = - "_bsp/_checkpoints/"; - - /** Keep the zookeeper output for debugging? Default is to remove it. */ - public static final String KEEP_ZOOKEEPER_DATA = - "giraph.keepZooKeeperData"; - /** Default is to remove ZooKeeper data. */ - public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false; - - /** Default ZooKeeper tick time. */ - public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000; - /** Default ZooKeeper init limit (in ticks). */ - public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10; - /** Default ZooKeeper sync limit (in ticks). */ - public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5; - /** Default ZooKeeper snap count. */ - public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000; - /** Default ZooKeeper maximum client connections. */ - public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000; - /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */ - public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300*1000; - /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */ - public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600*1000; - - /** Class logger */ - private static final Logger LOG = Logger.getLogger(GiraphJob.class); - - /** - * Constructor that will instantiate the configuration - * - * @param jobName User-defined job name - * @throws IOException - */ - public GiraphJob(String jobName) throws IOException { - super(new Configuration(), jobName); - } - - /** - * Constructor. - * - * @param conf User-defined configuration - * @param jobName User-defined job name - * @throws IOException - */ - public GiraphJob(Configuration conf, String jobName) throws IOException { - super(conf, jobName); - } - - /** - * Make sure the configuration is set properly by the user prior to - * submitting the job. - */ - private void checkConfiguration() { - if (conf.getInt(MAX_WORKERS, -1) < 0) { - throw new RuntimeException("No valid " + MAX_WORKERS); - } - if (conf.getFloat(MIN_PERCENT_RESPONDED, - MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f || - conf.getFloat(MIN_PERCENT_RESPONDED, - MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) { - throw new IllegalArgumentException( - "Invalid " + - conf.getFloat(MIN_PERCENT_RESPONDED, - MIN_PERCENT_RESPONDED_DEFAULT) + " for " + - MIN_PERCENT_RESPONDED); - } - if (conf.getInt(MIN_WORKERS, -1) < 0) { - throw new IllegalArgumentException("No valid " + MIN_WORKERS); - } - if (BspUtils.getVertexClass(getConfiguration()) == null) { - throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS"); - } - if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) { - throw new IllegalArgumentException( - "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS"); - } - if (BspUtils.getVertexResolverClass(getConfiguration()) == null) { - setVertexResolverClass(VertexResolver.class); - if (LOG.isInfoEnabled()) { - LOG.info("GiraphJob: No class found for " + - VERTEX_RESOLVER_CLASS + ", defaulting to " + - VertexResolver.class.getCanonicalName()); - } - } - } - - /** - * Set the vertex class (required) - * - * @param vertexClass Runs vertex computation - */ - final public void setVertexClass(Class<?> vertexClass) { - getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class); - } - - /** - * Set the vertex input format class (required) - * - * @param vertexInputFormatClass Determines how graph is input - */ - final public void setVertexInputFormatClass( - Class<?> vertexInputFormatClass) { - getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, - vertexInputFormatClass, - VertexInputFormat.class); - } - - /** - * Set the vertex output format class (optional) - * - * @param vertexOutputFormatClass Determines how graph is output - */ - final public void setVertexOutputFormatClass( - Class<?> vertexOutputFormatClass) { - getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, - vertexOutputFormatClass, - VertexOutputFormat.class); - } - - /** - * Set the vertex combiner class (optional) - * - * @param vertexCombinerClass Determines how vertex messages are combined - */ - final public void setVertexCombinerClass(Class<?> vertexCombinerClass) { - getConfiguration().setClass(VERTEX_COMBINER_CLASS, - vertexCombinerClass, - VertexCombiner.class); - } - - /** - * Set the graph partitioner class (optional) - * - * @param graphPartitionerFactoryClass Determines how the graph is partitioned - */ - final public void setGraphPartitionerFactoryClass( - Class<?> graphPartitionerFactoryClass) { - getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS, - graphPartitionerFactoryClass, - GraphPartitionerFactory.class); - } - - /** - * Set the vertex resolver class (optional) - * - * @param vertexResolverClass Determines how vertex mutations are resolved - */ - final public void setVertexResolverClass(Class<?> vertexResolverClass) { - getConfiguration().setClass(VERTEX_RESOLVER_CLASS, - vertexResolverClass, - VertexResolver.class); - } - - /** - * Set the worker context class (optional) - * - * @param workerContextClass Determines what code is executed on a each - * worker before and after each superstep and computation - */ - final public void setWorkerContextClass(Class<?> workerContextClass) { - getConfiguration().setClass(WORKER_CONTEXT_CLASS, - workerContextClass, - WorkerContext.class); - } - - /** - * Set the aggregator writer class (optional) - * - * @param aggregatorWriterClass Determines how the aggregators are - * written to file at the end of the job - */ - final public void setAggregatorWriterClass( - Class<?> aggregatorWriterClass) { - getConfiguration().setClass(AGGREGATOR_WRITER_CLASS, - aggregatorWriterClass, - AggregatorWriter.class); - } - - /** - * Set worker configuration for determining what is required for - * a superstep. - * - * @param minWorkers Minimum workers to do a superstep - * @param maxWorkers Maximum workers to do a superstep - * (max map tasks in job) - * @param minPercentResponded 0 - 100 % of the workers required to - * have responded before continuing the superstep - */ - final public void setWorkerConfiguration(int minWorkers, - int maxWorkers, - float minPercentResponded) { - conf.setInt(MIN_WORKERS, minWorkers); - conf.setInt(MAX_WORKERS, maxWorkers); - conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded); - } - - /** - * Utilize an existing ZooKeeper service. If this is not set, ZooKeeper - * will be dynamically started by Giraph for this job. - * - * @param serverList Comma separated list of servers and ports - * (i.e. zk1:2221,zk2:2221) - */ - final public void setZooKeeperConfiguration(String serverList) { - conf.set(ZOOKEEPER_LIST, serverList); - } - - /** - * Check if the configuration is local. If it is local, do additional - * checks due to the restrictions of LocalJobRunner. - * - * @param conf Configuration - */ - private static void checkLocalJobRunnerConfiguration( - Configuration conf) { - String jobTracker = conf.get("mapred.job.tracker", null); - if (!jobTracker.equals("local")) { - // Nothing to check - return; - } - - int maxWorkers = conf.getInt(MAX_WORKERS, -1); - if (maxWorkers != 1) { - throw new IllegalArgumentException( - "checkLocalJobRunnerConfiguration: When using " + - "LocalJobRunner, must have only one worker since " + - "only 1 task at a time!"); - } - if (conf.getBoolean(SPLIT_MASTER_WORKER, - SPLIT_MASTER_WORKER_DEFAULT)) { - throw new IllegalArgumentException( - "checkLocalJobRunnerConfiguration: When using " + - "LocalJobRunner, you cannot run in split master / worker " + - "mode since there is only 1 task at a time!"); - } - } - - /** - * Check whether a specified int conf value is set and if not, set it. - * - * @param param Conf value to check - * @param defaultValue Assign to value if not set - */ - private void setIntConfIfDefault(String param, int defaultValue) { - if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) { - conf.setInt(param, defaultValue); - } - } - - /** - * Runs the actual graph application through Hadoop Map-Reduce. - * - * @param verbose If true, provide verbose output, false otherwise - * @throws ClassNotFoundException - * @throws InterruptedException - * @throws IOException - */ - final public boolean run(boolean verbose) - throws IOException, InterruptedException, ClassNotFoundException { - checkConfiguration(); - checkLocalJobRunnerConfiguration(conf); - setNumReduceTasks(0); - // Most users won't hit this hopefully and can set it higher if desired - setIntConfIfDefault("mapreduce.job.counters.limit", 512); - - // Capacity scheduler-specific settings. These should be enough for - // a reasonable Giraph job - setIntConfIfDefault("mapred.job.map.memory.mb", 1024); - setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024); - - // Speculative execution doesn't make sense for Giraph - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - - // Set the ping interval to 5 minutes instead of one minute - // (DEFAULT_PING_INTERVAL) - Client.setPingInterval(conf, 60000*5); - - if (getJar() == null) { - setJarByClass(GiraphJob.class); - } - // Should work in MAPREDUCE-1938 to let the user jars/classes - // get loaded first - conf.setBoolean("mapreduce.user.classpath.first", true); - - setMapperClass(GraphMapper.class); - setInputFormatClass(BspInputFormat.class); - setOutputFormatClass(BspOutputFormat.class); - return waitForCompletion(verbose); - } + static { + Configuration.addDefaultResource("giraph-site.xml"); + } + + /** Vertex class - required */ + public static final String VERTEX_CLASS = "giraph.vertexClass"; + /** VertexInputFormat class - required */ + public static final String VERTEX_INPUT_FORMAT_CLASS = + "giraph.vertexInputFormatClass"; + + /** VertexOutputFormat class - optional */ + public static final String VERTEX_OUTPUT_FORMAT_CLASS = + "giraph.vertexOutputFormatClass"; + /** Vertex combiner class - optional */ + public static final String VERTEX_COMBINER_CLASS = + "giraph.combinerClass"; + /** Vertex resolver class - optional */ + public static final String VERTEX_RESOLVER_CLASS = + "giraph.vertexResolverClass"; + /** Graph partitioner factory class - optional */ + public static final String GRAPH_PARTITIONER_FACTORY_CLASS = + "giraph.graphPartitionerFactoryClass"; + + /** Vertex index class */ + public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass"; + /** Vertex value class */ + public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass"; + /** Edge value class */ + public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass"; + /** Message value class */ + public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass"; + /** Worker context class */ + public static final String WORKER_CONTEXT_CLASS = + "giraph.workerContextClass"; + /** AggregatorWriter class - optional */ + public static final String AGGREGATOR_WRITER_CLASS = + "giraph.aggregatorWriterClass"; + + /** + * Minimum number of simultaneous workers before this job can run (int) + */ + public static final String MIN_WORKERS = "giraph.minWorkers"; + /** + * Maximum number of simultaneous worker tasks started by this job (int). + */ + public static final String MAX_WORKERS = "giraph.maxWorkers"; + + /** + * Separate the workers and the master tasks. This is required + * to support dynamic recovery. (boolean) + */ + public static final String SPLIT_MASTER_WORKER = + "giraph.SplitMasterWorker"; + /** + * Default on whether to separate the workers and the master tasks. + * Needs to be "true" to support dynamic recovery. + */ + public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true; + + /** Indicates whether this job is run in an internal unit test */ + public static final String LOCAL_TEST_MODE = + "giraph.localTestMode"; + + /** not in local test mode per default */ + public static final boolean LOCAL_TEST_MODE_DEFAULT = false; + + /** + * Minimum percent of the maximum number of workers that have responded + * in order to continue progressing. (float) + */ + public static final String MIN_PERCENT_RESPONDED = + "giraph.minPercentResponded"; + /** Default 100% response rate for workers */ + public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f; + + /** Polling timeout to check on the number of responded tasks (int) */ + public static final String POLL_MSECS = "giraph.pollMsecs"; + /** Default poll msecs (30 seconds) */ + public static final int POLL_MSECS_DEFAULT = 30 * 1000; + + /** + * ZooKeeper comma-separated list (if not set, + * will start up ZooKeeper locally) + */ + public static final String ZOOKEEPER_LIST = "giraph.zkList"; + + /** ZooKeeper session millisecond timeout */ + public static final String ZOOKEEPER_SESSION_TIMEOUT = + "giraph.zkSessionMsecTimeout"; + /** Default Zookeeper session millisecond timeout */ + public static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000; + + /** Polling interval to check for the final ZooKeeper server data */ + public static final String ZOOKEEPER_SERVERLIST_POLL_MSECS = + "giraph.zkServerlistPollMsecs"; + /** Default polling interval to check for the final ZooKeeper server data */ + public static final int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = + 3 * 1000; + + /** Number of nodes (not tasks) to run Zookeeper on */ + public static final String ZOOKEEPER_SERVER_COUNT = + "giraph.zkServerCount"; + /** Default number of nodes to run Zookeeper on */ + public static final int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1; + + /** ZooKeeper port to use */ + public static final String ZOOKEEPER_SERVER_PORT = + "giraph.zkServerPort"; + /** Default ZooKeeper port to use */ + public static final int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181; + + /** Location of the ZooKeeper jar - Used internally, not meant for users */ + public static final String ZOOKEEPER_JAR = "giraph.zkJar"; + + /** Local ZooKeeper directory to use */ + public static final String ZOOKEEPER_DIR = "giraph.zkDir"; + + /** Initial port to start using for the RPC communication */ + public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort"; + /** Default port to start using for the RPC communication */ + public static final int RPC_INITIAL_PORT_DEFAULT = 30000; + + /** Maximum bind attempts for different RPC ports */ + public static final String MAX_RPC_PORT_BIND_ATTEMPTS = + "giraph.maxRpcPortBindAttempts"; + /** Default maximum bind attempts for different RPC ports */ + public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20; + + /** Maximum number of RPC handlers */ + public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers"; + /** Default maximum number of RPC handlers */ + public static final int RPC_NUM_HANDLERS_DEFAULT = 100; + + /** + * Maximum number of vertices per partition before sending. + * (input superstep only). + */ + public static final String MAX_VERTICES_PER_PARTITION = + "giraph.maxVerticesPerPartition"; + /** Default maximum number of vertices per partition before sending. */ + public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000; + + /** Maximum number of messages per peer before flush */ + public static final String MSG_SIZE = "giraph.msgSize"; + /** Default maximum number of messages per peer before flush */ + public static final int MSG_SIZE_DEFAULT = 1000; + + /** Maximum number of messages that can be bulk sent during a flush */ + public static final String MAX_MESSAGES_PER_FLUSH_PUT = + "giraph.maxMessagesPerFlushPut"; + /** Default number of messages that can be bulk sent during a flush */ + public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000; + + /** Number of flush threads per peer */ + public static final String MSG_NUM_FLUSH_THREADS = + "giraph.msgNumFlushThreads"; + + /** Number of poll attempts prior to failing the job (int) */ + public static final String POLL_ATTEMPTS = "giraph.pollAttempts"; + /** Default poll attempts */ + public static final int POLL_ATTEMPTS_DEFAULT = 10; + + /** Number of minimum vertices in each vertex range */ + public static final String MIN_VERTICES_PER_RANGE = + "giraph.minVerticesPerRange"; + /** Default number of minimum vertices in each vertex range */ + public static final long MIN_VERTICES_PER_RANGE_DEFAULT = 3; + + /** Minimum stragglers of the superstep before printing them out */ + public static final String PARTITION_LONG_TAIL_MIN_PRINT = + "giraph.partitionLongTailMinPrint"; + /** Only print stragglers with one as a default */ + public static final int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1; + + /** Use superstep counters? (boolean) */ + public static final String USE_SUPERSTEP_COUNTERS = + "giraph.useSuperstepCounters"; + /** Default is to use the superstep counters */ + public static final boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true; + + /** + * Set the multiplicative factor of how many partitions to create from + * a single InputSplit based on the number of total InputSplits. For + * example, if there are 10 total InputSplits and this is set to 0.5, then + * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the + * minimum size is met). + */ + public static final String TOTAL_INPUT_SPLIT_MULTIPLIER = + "giraph.totalInputSplitMultiplier"; + /** Default total input split multiplier */ + public static final float TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT = 0.5f; + + /** + * Input split sample percent - Used only for sampling and testing, rather + * than an actual job. The idea is that to test, you might only want a + * fraction of the actual input splits from your VertexInputFormat to + * load (values should be [0, 100]). + */ + public static final String INPUT_SPLIT_SAMPLE_PERCENT = + "giraph.inputSplitSamplePercent"; + /** Default is to use all the input splits */ + public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f; + + /** + * To limit outlier input splits from producing too many vertices or to + * help with testing, the number of vertices loaded from an input split can + * be limited. By default, everything is loaded. + */ + public static final String INPUT_SPLIT_MAX_VERTICES = + "giraph.InputSplitMaxVertices"; + /** + * Default is that all the vertices are to be loaded from the input + * split + */ + public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1; + + /** Java opts passed to ZooKeeper startup */ + public static final String ZOOKEEPER_JAVA_OPTS = + "giraph.zkJavaOpts"; + /** Default java opts passed to ZooKeeper startup */ + public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT = + "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " + + "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100"; + + /** + * How often to checkpoint (i.e. 0, means no checkpoint, + * 1 means every superstep, 2 is every two supersteps, etc.). + */ + public static final String CHECKPOINT_FREQUENCY = + "giraph.checkpointFrequency"; + + /** Default checkpointing frequency of every 2 supersteps. */ + public static final int CHECKPOINT_FREQUENCY_DEFAULT = 2; + + /** + * Delete checkpoints after a successful job run? + */ + public static final String CLEANUP_CHECKPOINTS_AFTER_SUCCESS = + "giraph.cleanupCheckpointsAfterSuccess"; + /** Default is to clean up the checkponts after a successful job */ + public static final boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = + true; + + /** + * An application can be restarted manually by selecting a superstep. The + * corresponding checkpoint must exist for this to work. The user should + * set a long value. Default is start from scratch. + */ + public static final String RESTART_SUPERSTEP = "giraph.restartSuperstep"; + + /** + * Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root + * znode on the cluster beginning with "/" + */ + public static final String BASE_ZNODE_KEY = "giraph.zkBaseZNode"; + + /** + * If ZOOKEEPER_LIST is not set, then use this directory to manage + * ZooKeeper + */ + public static final String ZOOKEEPER_MANAGER_DIRECTORY = + "giraph.zkManagerDirectory"; + /** + * Default ZooKeeper manager directory (where determining the servers in + * HDFS files will go). Final directory path will also have job number + * for uniqueness. + */ + public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT = + "_bsp/_defaultZkManagerDir"; + + /** This directory has/stores the available checkpoint files in HDFS. */ + public static final String CHECKPOINT_DIRECTORY = + "giraph.checkpointDirectory"; + /** + * Default checkpoint directory (where checkpoing files go in HDFS). Final + * directory path will also have the job number for uniqueness + */ + public static final String CHECKPOINT_DIRECTORY_DEFAULT = + "_bsp/_checkpoints/"; + + /** Keep the zookeeper output for debugging? Default is to remove it. */ + public static final String KEEP_ZOOKEEPER_DATA = + "giraph.keepZooKeeperData"; + /** Default is to remove ZooKeeper data. */ + public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false; + + /** Default ZooKeeper tick time. */ + public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000; + /** Default ZooKeeper init limit (in ticks). */ + public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10; + /** Default ZooKeeper sync limit (in ticks). */ + public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5; + /** Default ZooKeeper snap count. */ + public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000; + /** Default ZooKeeper maximum client connections. */ + public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000; + /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */ + public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300 * 1000; + /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */ + public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000; + + /** Class logger */ + private static final Logger LOG = Logger.getLogger(GiraphJob.class); + + /** + * Constructor that will instantiate the configuration + * + * @param jobName User-defined job name + * @throws IOException + */ + public GiraphJob(String jobName) throws IOException { + super(new Configuration(), jobName); + } + + /** + * Constructor. + * + * @param conf User-defined configuration + * @param jobName User-defined job name + * @throws IOException + */ + public GiraphJob(Configuration conf, String jobName) throws IOException { + super(conf, jobName); + } + + /** + * Make sure the configuration is set properly by the user prior to + * submitting the job. + */ + private void checkConfiguration() { + if (conf.getInt(MAX_WORKERS, -1) < 0) { + throw new RuntimeException("No valid " + MAX_WORKERS); + } + if (conf.getFloat(MIN_PERCENT_RESPONDED, + MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f || + conf.getFloat(MIN_PERCENT_RESPONDED, + MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) { + throw new IllegalArgumentException( + "Invalid " + + conf.getFloat(MIN_PERCENT_RESPONDED, + MIN_PERCENT_RESPONDED_DEFAULT) + " for " + + MIN_PERCENT_RESPONDED); + } + if (conf.getInt(MIN_WORKERS, -1) < 0) { + throw new IllegalArgumentException("No valid " + MIN_WORKERS); + } + if (BspUtils.getVertexClass(getConfiguration()) == null) { + throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS"); + } + if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) { + throw new IllegalArgumentException( + "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS"); + } + if (BspUtils.getVertexResolverClass(getConfiguration()) == null) { + setVertexResolverClass(VertexResolver.class); + if (LOG.isInfoEnabled()) { + LOG.info("GiraphJob: No class found for " + + VERTEX_RESOLVER_CLASS + ", defaulting to " + + VertexResolver.class.getCanonicalName()); + } + } + } + + /** + * Set the vertex class (required) + * + * @param vertexClass Runs vertex computation + */ + public final void setVertexClass(Class<?> vertexClass) { + getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class); + } + + /** + * Set the vertex input format class (required) + * + * @param vertexInputFormatClass Determines how graph is input + */ + public final void setVertexInputFormatClass( + Class<?> vertexInputFormatClass) { + getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, + vertexInputFormatClass, + VertexInputFormat.class); + } + + /** + * Set the vertex output format class (optional) + * + * @param vertexOutputFormatClass Determines how graph is output + */ + public final void setVertexOutputFormatClass( + Class<?> vertexOutputFormatClass) { + getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, + vertexOutputFormatClass, + VertexOutputFormat.class); + } + + /** + * Set the vertex combiner class (optional) + * + * @param vertexCombinerClass Determines how vertex messages are combined + */ + public final void setVertexCombinerClass(Class<?> vertexCombinerClass) { + getConfiguration().setClass(VERTEX_COMBINER_CLASS, + vertexCombinerClass, + VertexCombiner.class); + } + + /** + * Set the graph partitioner class (optional) + * + * @param graphPartitionerFactoryClass Determines how the graph is partitioned + */ + public final void setGraphPartitionerFactoryClass( + Class<?> graphPartitionerFactoryClass) { + getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS, + graphPartitionerFactoryClass, + GraphPartitionerFactory.class); + } + + /** + * Set the vertex resolver class (optional) + * + * @param vertexResolverClass Determines how vertex mutations are resolved + */ + public final void setVertexResolverClass(Class<?> vertexResolverClass) { + getConfiguration().setClass(VERTEX_RESOLVER_CLASS, + vertexResolverClass, + VertexResolver.class); + } + + /** + * Set the worker context class (optional) + * + * @param workerContextClass Determines what code is executed on a each + * worker before and after each superstep and computation + */ + public final void setWorkerContextClass(Class<?> workerContextClass) { + getConfiguration().setClass(WORKER_CONTEXT_CLASS, + workerContextClass, + WorkerContext.class); + } + + /** + * Set the aggregator writer class (optional) + * + * @param aggregatorWriterClass Determines how the aggregators are + * written to file at the end of the job + */ + public final void setAggregatorWriterClass( + Class<?> aggregatorWriterClass) { + getConfiguration().setClass(AGGREGATOR_WRITER_CLASS, + aggregatorWriterClass, + AggregatorWriter.class); + } + + /** + * Set worker configuration for determining what is required for + * a superstep. + * + * @param minWorkers Minimum workers to do a superstep + * @param maxWorkers Maximum workers to do a superstep + * (max map tasks in job) + * @param minPercentResponded 0 - 100 % of the workers required to + * have responded before continuing the superstep + */ + public final void setWorkerConfiguration(int minWorkers, + int maxWorkers, + float minPercentResponded) { + conf.setInt(MIN_WORKERS, minWorkers); + conf.setInt(MAX_WORKERS, maxWorkers); + conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded); + } + + /** + * Utilize an existing ZooKeeper service. If this is not set, ZooKeeper + * will be dynamically started by Giraph for this job. + * + * @param serverList Comma separated list of servers and ports + * (i.e. zk1:2221,zk2:2221) + */ + public final void setZooKeeperConfiguration(String serverList) { + conf.set(ZOOKEEPER_LIST, serverList); + } + + /** + * Check if the configuration is local. If it is local, do additional + * checks due to the restrictions of LocalJobRunner. + * + * @param conf Configuration + */ + private static void checkLocalJobRunnerConfiguration( + Configuration conf) { + String jobTracker = conf.get("mapred.job.tracker", null); + if (!jobTracker.equals("local")) { + // Nothing to check + return; + } + + int maxWorkers = conf.getInt(MAX_WORKERS, -1); + if (maxWorkers != 1) { + throw new IllegalArgumentException( + "checkLocalJobRunnerConfiguration: When using " + + "LocalJobRunner, must have only one worker since " + + "only 1 task at a time!"); + } + if (conf.getBoolean(SPLIT_MASTER_WORKER, + SPLIT_MASTER_WORKER_DEFAULT)) { + throw new IllegalArgumentException( + "checkLocalJobRunnerConfiguration: When using " + + "LocalJobRunner, you cannot run in split master / worker " + + "mode since there is only 1 task at a time!"); + } + } + + /** + * Check whether a specified int conf value is set and if not, set it. + * + * @param param Conf value to check + * @param defaultValue Assign to value if not set + */ + private void setIntConfIfDefault(String param, int defaultValue) { + if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) { + conf.setInt(param, defaultValue); + } + } + + /** + * Runs the actual graph application through Hadoop Map-Reduce. + * + * @param verbose If true, provide verbose output, false otherwise + * @return True if success, false otherwise + * @throws ClassNotFoundException + * @throws InterruptedException + * @throws IOException + */ + public final boolean run(boolean verbose) + throws IOException, InterruptedException, ClassNotFoundException { + checkConfiguration(); + checkLocalJobRunnerConfiguration(conf); + setNumReduceTasks(0); + // Most users won't hit this hopefully and can set it higher if desired + setIntConfIfDefault("mapreduce.job.counters.limit", 512); + + // Capacity scheduler-specific settings. These should be enough for + // a reasonable Giraph job + setIntConfIfDefault("mapred.job.map.memory.mb", 1024); + setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024); + + // Speculative execution doesn't make sense for Giraph + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + + // Set the ping interval to 5 minutes instead of one minute + // (DEFAULT_PING_INTERVAL) + Client.setPingInterval(conf, 60000 * 5); + + if (getJar() == null) { + setJarByClass(GiraphJob.class); + } + // Should work in MAPREDUCE-1938 to let the user jars/classes + // get loaded first + conf.setBoolean("mapreduce.user.classpath.first", true); + + setMapperClass(GraphMapper.class); + setInputFormatClass(BspInputFormat.class); + setOutputFormatClass(BspOutputFormat.class); + return waitForCompletion(verbose); + } }
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java Thu Feb 16 22:12:31 2012 @@ -29,57 +29,71 @@ import org.apache.hadoop.io.Writable; * Aggregated stats by the master. */ public class GlobalStats implements Writable { - private long vertexCount = 0; - private long finishedVertexCount = 0; - private long edgeCount = 0; - private long messageCount = 0; - - public void addPartitionStats(PartitionStats partitionStats) { - this.vertexCount += partitionStats.getVertexCount(); - this.finishedVertexCount += partitionStats.getFinishedVertexCount(); - this.edgeCount += partitionStats.getEdgeCount(); - } - - public long getVertexCount() { - return vertexCount; - } - - public long getFinishedVertexCount() { - return finishedVertexCount; - } - - public long getEdgeCount() { - return edgeCount; - } - - public long getMessageCount() { - return messageCount; - } - - public void addMessageCount(long messageCount) { - this.messageCount += messageCount; - } - - @Override - public void readFields(DataInput input) throws IOException { - vertexCount = input.readLong(); - finishedVertexCount = input.readLong(); - edgeCount = input.readLong(); - messageCount = input.readLong(); - } - - @Override - public void write(DataOutput output) throws IOException { - output.writeLong(vertexCount); - output.writeLong(finishedVertexCount); - output.writeLong(edgeCount); - output.writeLong(messageCount); - } - - @Override - public String toString() { - return "(vtx=" + vertexCount + ",finVtx=" + - finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" + - messageCount + ")"; - } + /** All vertices in the application */ + private long vertexCount = 0; + /** All finished vertices in the last superstep */ + private long finishedVertexCount = 0; + /** All edges in the last superstep */ + private long edgeCount = 0; + /** All messages sent in the last superstep */ + private long messageCount = 0; + + /** + * Add the stats of a partition to the global stats. + * + * @param partitionStats Partition stats to be added. + */ + public void addPartitionStats(PartitionStats partitionStats) { + this.vertexCount += partitionStats.getVertexCount(); + this.finishedVertexCount += partitionStats.getFinishedVertexCount(); + this.edgeCount += partitionStats.getEdgeCount(); + } + + public long getVertexCount() { + return vertexCount; + } + + public long getFinishedVertexCount() { + return finishedVertexCount; + } + + public long getEdgeCount() { + return edgeCount; + } + + public long getMessageCount() { + return messageCount; + } + + /** + * Add messages to the global stats. + * + * @param messageCount Number of messages to be added. + */ + public void addMessageCount(long messageCount) { + this.messageCount += messageCount; + } + + @Override + public void readFields(DataInput input) throws IOException { + vertexCount = input.readLong(); + finishedVertexCount = input.readLong(); + edgeCount = input.readLong(); + messageCount = input.readLong(); + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeLong(vertexCount); + output.writeLong(finishedVertexCount); + output.writeLong(edgeCount); + output.writeLong(messageCount); + } + + @Override + public String toString() { + return "(vtx=" + vertexCount + ",finVtx=" + + finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" + + messageCount + ")"; + } }
