Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
Thu Feb 16 22:12:31 2012
@@ -33,570 +33,571 @@ import java.io.IOException;
  * for our needs.  For instance, our job should not have any reduce tasks.
  */
 public class GiraphJob extends Job {
-    static {
-        Configuration.addDefaultResource("giraph-site.xml");
-    }
-
-    /** Vertex class - required */
-    public static final String VERTEX_CLASS = "giraph.vertexClass";
-    /** VertexInputFormat class - required */
-    public static final String VERTEX_INPUT_FORMAT_CLASS =
-        "giraph.vertexInputFormatClass";
-
-    /** VertexOutputFormat class - optional */
-    public static final String VERTEX_OUTPUT_FORMAT_CLASS =
-        "giraph.vertexOutputFormatClass";
-    /** Vertex combiner class - optional */
-    public static final String VERTEX_COMBINER_CLASS =
-        "giraph.combinerClass";
-    /** Vertex resolver class - optional */
-    public static final String VERTEX_RESOLVER_CLASS =
-        "giraph.vertexResolverClass";
-    /** Graph partitioner factory class - optional */
-    public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
-        "giraph.graphPartitionerFactoryClass";
-
-    /** Vertex index class */
-    public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
-    /** Vertex value class */
-    public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
-    /** Edge value class */
-    public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
-    /** Message value class */
-    public static final String MESSAGE_VALUE_CLASS = 
"giraph.messageValueClass";
-    /** Worker context class */
-    public static final String WORKER_CONTEXT_CLASS =
-       "giraph.workerContextClass";
-    /** AggregatorWriter class - optional */
-    public static final String AGGREGATOR_WRITER_CLASS =
-       "giraph.aggregatorWriterClass";
-
-    /**
-     * Minimum number of simultaneous workers before this job can run (int)
-     */
-    public static final String MIN_WORKERS = "giraph.minWorkers";
-    /**
-     * Maximum number of simultaneous worker tasks started by this job (int).
-     */
-    public static final String MAX_WORKERS = "giraph.maxWorkers";
-
-    /**
-     * Separate the workers and the master tasks.  This is required
-     * to support dynamic recovery. (boolean)
-     */
-    public static final String SPLIT_MASTER_WORKER =
-        "giraph.SplitMasterWorker";
-    /**
-     * Default on whether to separate the workers and the master tasks.
-     * Needs to be "true" to support dynamic recovery.
-     */
-    public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true;
-
-    /** Indicates whether this job is run in an internal unit test */
-    public static final String LOCAL_TEST_MODE =
-        "giraph.localTestMode";
-
-    /** not in local test mode per default */
-    public static final boolean LOCAL_TEST_MODE_DEFAULT = false;
-
-    /**
-     * Minimum percent of the maximum number of workers that have responded
-     * in order to continue progressing. (float)
-     */
-    public static final String MIN_PERCENT_RESPONDED =
-        "giraph.minPercentResponded";
-    /** Default 100% response rate for workers */
-    public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
-
-    /** Polling timeout to check on the number of responded tasks (int) */
-    public static final String POLL_MSECS = "giraph.pollMsecs";
-    /** Default poll msecs (30 seconds) */
-    public static final int POLL_MSECS_DEFAULT = 30*1000;
-
-    /**
-     *  ZooKeeper comma-separated list (if not set,
-     *  will start up ZooKeeper locally)
-     */
-    public static final String ZOOKEEPER_LIST = "giraph.zkList";
-
-    /** ZooKeeper session millisecond timeout */
-    public static final String ZOOKEEPER_SESSION_TIMEOUT =
-        "giraph.zkSessionMsecTimeout";
-    /** Default Zookeeper session millisecond timeout */
-    public static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60*1000;
-
-    /** Polling interval to check for the final ZooKeeper server data */
-    public static final String ZOOKEEPER_SERVERLIST_POLL_MSECS =
-        "giraph.zkServerlistPollMsecs";
-    /** Default polling interval to check for the final ZooKeeper server data 
*/
-    public static final int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT =
-        3*1000;
-
-    /** Number of nodes (not tasks) to run Zookeeper on */
-    public static final String ZOOKEEPER_SERVER_COUNT =
-        "giraph.zkServerCount";
-    /** Default number of nodes to run Zookeeper on */
-    public static final int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
-
-    /** ZooKeeper port to use */
-    public static final String ZOOKEEPER_SERVER_PORT =
-        "giraph.zkServerPort";
-    /** Default ZooKeeper port to use */
-    public static final int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
-
-    /** Location of the ZooKeeper jar - Used internally, not meant for users */
-    public static final String ZOOKEEPER_JAR = "giraph.zkJar";
-
-    /** Local ZooKeeper directory to use */
-    public static final String ZOOKEEPER_DIR = "giraph.zkDir";
-
-    /** Initial port to start using for the RPC communication */
-    public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
-    /** Default port to start using for the RPC communication */
-    public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
-
-    /** Maximum bind attempts for different RPC ports */
-    public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
-        "giraph.maxRpcPortBindAttempts";
-    /** Default maximum bind attempts for different RPC ports */
-    public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
-
-    /** Maximum number of RPC handlers */
-    public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
-    /** Default maximum number of RPC handlers */
-    public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
-
-    /**
-     *  Maximum number of vertices per partition before sending.
-     *  (input superstep only).
-     */
-    public static final String MAX_VERTICES_PER_PARTITION =
-        "giraph.maxVerticesPerPartition";
-    /** Default maximum number of vertices per partition before sending. */
-    public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000;
-
-    /** Maximum number of messages per peer before flush */
-    public static final String MSG_SIZE = "giraph.msgSize";
-    /** Default maximum number of messages per peer before flush */
-    public static final int MSG_SIZE_DEFAULT = 1000;
-
-    /** Maximum number of messages that can be bulk sent during a flush */
-    public static final String MAX_MESSAGES_PER_FLUSH_PUT =
-        "giraph.maxMessagesPerFlushPut";
-    /** Default number of messages that can be bulk sent during a flush */
-    public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000;
-
-    /** Number of flush threads per peer */
-    public static final String MSG_NUM_FLUSH_THREADS =
-        "giraph.msgNumFlushThreads";
-
-    /** Number of poll attempts prior to failing the job (int) */
-    public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
-    /** Default poll attempts */
-    public static final int POLL_ATTEMPTS_DEFAULT = 10;
-
-    /** Number of minimum vertices in each vertex range */
-    public static final String MIN_VERTICES_PER_RANGE =
-        "giraph.minVerticesPerRange";
-    /** Default number of minimum vertices in each vertex range */
-    public static final long MIN_VERTICES_PER_RANGE_DEFAULT = 3;
-
-    /** Minimum stragglers of the superstep before printing them out */
-    public static final String PARTITION_LONG_TAIL_MIN_PRINT =
-        "giraph.partitionLongTailMinPrint";
-    /** Only print stragglers with one as a default */
-    public static final int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
-
-    /** Use superstep counters? (boolean) */
-    public static final String USE_SUPERSTEP_COUNTERS =
-        "giraph.useSuperstepCounters";
-    /** Default is to use the superstep counters */
-    public static final boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
-
-    /**
-     * Set the multiplicative factor of how many partitions to create from
-     * a single InputSplit based on the number of total InputSplits.  For
-     * example, if there are 10 total InputSplits and this is set to 0.5, then
-     * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that 
the
-     * minimum size is met).
-     */
-    public static final String TOTAL_INPUT_SPLIT_MULTIPLIER =
-        "giraph.totalInputSplitMultiplier";
-    /** Default total input split multiplier */
-    public static final float TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT = 0.5f;
-
-    /**
-     * Input split sample percent - Used only for sampling and testing, rather
-     * than an actual job.  The idea is that to test, you might only want a
-     * fraction of the actual input splits from your VertexInputFormat to
-     * load (values should be [0, 100]).
-     */
-    public static final String INPUT_SPLIT_SAMPLE_PERCENT =
-        "giraph.inputSplitSamplePercent";
-    /** Default is to use all the input splits */
-    public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
-
-    /**
-     * To limit outlier input splits from producing too many vertices or to
-     * help with testing, the number of vertices loaded from an input split can
-     * be limited.  By default, everything is loaded.
-     */
-    public static final String INPUT_SPLIT_MAX_VERTICES =
-        "giraph.InputSplitMaxVertices";
-    /**
-     * Default is that all the vertices are to be loaded from the input
-     * split
-     */
-    public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
-
-    /** Java opts passed to ZooKeeper startup */
-    public static final String ZOOKEEPER_JAVA_OPTS =
-        "giraph.zkJavaOpts";
-    /** Default java opts passed to ZooKeeper startup */
-    public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT =
-        "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
-        "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
-
-    /**
-     *  How often to checkpoint (i.e. 0, means no checkpoint,
-     *  1 means every superstep, 2 is every two supersteps, etc.).
-     */
-    public static final String CHECKPOINT_FREQUENCY =
-        "giraph.checkpointFrequency";
-
-    /** Default checkpointing frequency of every 2 supersteps. */
-    public static final int CHECKPOINT_FREQUENCY_DEFAULT = 2;
-
-    /**
-     * Delete checkpoints after a successful job run?
-     */
-    public static final String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
-        "giraph.cleanupCheckpointsAfterSuccess";
-    /** Default is to clean up the checkponts after a successful job */
-    public static final boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT =
-        true;
-
-    /**
-     * An application can be restarted manually by selecting a superstep.  The
-     * corresponding checkpoint must exist for this to work.  The user should
-     * set a long value.  Default is start from scratch.
-     */
-    public static final String RESTART_SUPERSTEP = "giraph.restartSuperstep";
-
-    /**
-     * Base ZNode for Giraph's state in the ZooKeeper cluster.  Must be a root
-     * znode on the cluster beginning with "/"
-     */
-    public static final String BASE_ZNODE_KEY = "giraph.zkBaseZNode";
-
-    /**
-     * If ZOOKEEPER_LIST is not set, then use this directory to manage
-     * ZooKeeper
-     */
-    public static final String ZOOKEEPER_MANAGER_DIRECTORY =
-        "giraph.zkManagerDirectory";
-    /**
-     * Default ZooKeeper manager directory (where determining the servers in
-     * HDFS files will go).  Final directory path will also have job number
-     * for uniqueness.
-     */
-    public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT =
-        "_bsp/_defaultZkManagerDir";
-
-    /** This directory has/stores the available checkpoint files in HDFS. */
-    public static final String CHECKPOINT_DIRECTORY =
-        "giraph.checkpointDirectory";
-    /**
-     * Default checkpoint directory (where checkpoing files go in HDFS).  Final
-     * directory path will also have the job number for uniqueness
-     */
-    public static final String CHECKPOINT_DIRECTORY_DEFAULT =
-        "_bsp/_checkpoints/";
-
-    /** Keep the zookeeper output for debugging? Default is to remove it. */
-    public static final String KEEP_ZOOKEEPER_DATA =
-        "giraph.keepZooKeeperData";
-    /** Default is to remove ZooKeeper data. */
-    public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
-
-    /** Default ZooKeeper tick time. */
-    public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
-    /** Default ZooKeeper init limit (in ticks). */
-    public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
-    /** Default ZooKeeper sync limit (in ticks). */
-    public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
-    /** Default ZooKeeper snap count. */
-    public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
-    /** Default ZooKeeper maximum client connections. */
-    public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
-    /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
-    public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300*1000;
-    /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
-    public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600*1000;
-
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(GiraphJob.class);
-
-    /**
-     * Constructor that will instantiate the configuration
-     *
-     * @param jobName User-defined job name
-     * @throws IOException
-     */
-    public GiraphJob(String jobName) throws IOException {
-        super(new Configuration(), jobName);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param conf User-defined configuration
-     * @param jobName User-defined job name
-     * @throws IOException
-     */
-    public GiraphJob(Configuration conf, String jobName) throws IOException {
-        super(conf, jobName);
-    }
-
-    /**
-     * Make sure the configuration is set properly by the user prior to
-     * submitting the job.
-     */
-    private void checkConfiguration() {
-        if (conf.getInt(MAX_WORKERS, -1) < 0) {
-            throw new RuntimeException("No valid " + MAX_WORKERS);
-        }
-        if (conf.getFloat(MIN_PERCENT_RESPONDED,
-                          MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f ||
-                conf.getFloat(MIN_PERCENT_RESPONDED,
-                              MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) {
-            throw new IllegalArgumentException(
-                "Invalid " +
-                conf.getFloat(MIN_PERCENT_RESPONDED,
-                              MIN_PERCENT_RESPONDED_DEFAULT) + " for " +
-                MIN_PERCENT_RESPONDED);
-        }
-        if (conf.getInt(MIN_WORKERS, -1) < 0) {
-            throw new IllegalArgumentException("No valid " + MIN_WORKERS);
-        }
-        if (BspUtils.getVertexClass(getConfiguration()) == null) {
-            throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS");
-        }
-        if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) {
-            throw new IllegalArgumentException(
-                "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS");
-        }
-        if (BspUtils.getVertexResolverClass(getConfiguration()) == null) {
-            setVertexResolverClass(VertexResolver.class);
-            if (LOG.isInfoEnabled()) {
-                LOG.info("GiraphJob: No class found for " +
-                         VERTEX_RESOLVER_CLASS + ", defaulting to " +
-                         VertexResolver.class.getCanonicalName());
-            }
-        }
-    }
-
-    /**
-     * Set the vertex class (required)
-     *
-     * @param vertexClass Runs vertex computation
-     */
-    final public void setVertexClass(Class<?> vertexClass) {
-        getConfiguration().setClass(VERTEX_CLASS, vertexClass, 
BasicVertex.class);
-    }
-
-    /**
-     * Set the vertex input format class (required)
-     *
-     * @param vertexInputFormatClass Determines how graph is input
-     */
-    final public void setVertexInputFormatClass(
-            Class<?> vertexInputFormatClass) {
-        getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS,
-                                    vertexInputFormatClass,
-                                    VertexInputFormat.class);
-    }
-
-    /**
-     * Set the vertex output format class (optional)
-     *
-     * @param vertexOutputFormatClass Determines how graph is output
-     */
-    final public void setVertexOutputFormatClass(
-            Class<?> vertexOutputFormatClass) {
-        getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS,
-                                    vertexOutputFormatClass,
-                                    VertexOutputFormat.class);
-    }
-
-    /**
-     * Set the vertex combiner class (optional)
-     *
-     * @param vertexCombinerClass Determines how vertex messages are combined
-     */
-    final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
-        getConfiguration().setClass(VERTEX_COMBINER_CLASS,
-                                    vertexCombinerClass,
-                                    VertexCombiner.class);
-    }
-
-    /**
-     * Set the graph partitioner class (optional)
-     *
-     * @param graphPartitionerFactoryClass Determines how the graph is 
partitioned
-     */
-    final public void setGraphPartitionerFactoryClass(
-            Class<?> graphPartitionerFactoryClass) {
-        getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
-                                    graphPartitionerFactoryClass,
-                                    GraphPartitionerFactory.class);
-    }
-
-    /**
-     * Set the vertex resolver class (optional)
-     *
-     * @param vertexResolverClass Determines how vertex mutations are resolved
-     */
-    final public void setVertexResolverClass(Class<?> vertexResolverClass) {
-        getConfiguration().setClass(VERTEX_RESOLVER_CLASS,
-                                    vertexResolverClass,
-                                    VertexResolver.class);
-    }
-
-   /**
-    * Set the worker context class (optional)
-    *
-    * @param workerContextClass Determines what code is executed on a each
-    *        worker before and after each superstep and computation
-    */
-    final public void setWorkerContextClass(Class<?> workerContextClass) {
-        getConfiguration().setClass(WORKER_CONTEXT_CLASS,
-                                    workerContextClass,
-                                    WorkerContext.class);
-    }
-
-    /**
-     * Set the aggregator writer class (optional)
-     *
-     * @param aggregatorWriterClass Determines how the aggregators are
-     *               written to file at the end of the job
-     */
-     final public void setAggregatorWriterClass(
-                Class<?> aggregatorWriterClass) {
-         getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
-                                     aggregatorWriterClass,
-                                     AggregatorWriter.class);
-     }
-
-    /**
-     * Set worker configuration for determining what is required for
-     * a superstep.
-     *
-     * @param minWorkers Minimum workers to do a superstep
-     * @param maxWorkers Maximum workers to do a superstep
-     *        (max map tasks in job)
-     * @param minPercentResponded 0 - 100 % of the workers required to
-     *        have responded before continuing the superstep
-     */
-    final public void setWorkerConfiguration(int minWorkers,
-                                             int maxWorkers,
-                                             float minPercentResponded) {
-        conf.setInt(MIN_WORKERS, minWorkers);
-        conf.setInt(MAX_WORKERS, maxWorkers);
-        conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
-    }
-
-    /**
-     * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
-     * will be dynamically started by Giraph for this job.
-     *
-     * @param serverList Comma separated list of servers and ports
-     *        (i.e. zk1:2221,zk2:2221)
-     */
-    final public void setZooKeeperConfiguration(String serverList) {
-        conf.set(ZOOKEEPER_LIST, serverList);
-    }
-
-    /**
-     * Check if the configuration is local.  If it is local, do additional
-     * checks due to the restrictions of LocalJobRunner.
-     *
-     * @param conf Configuration
-     */
-    private static void checkLocalJobRunnerConfiguration(
-            Configuration conf) {
-        String jobTracker = conf.get("mapred.job.tracker", null);
-        if (!jobTracker.equals("local")) {
-            // Nothing to check
-            return;
-        }
-
-        int maxWorkers = conf.getInt(MAX_WORKERS, -1);
-        if (maxWorkers != 1) {
-            throw new IllegalArgumentException(
-                "checkLocalJobRunnerConfiguration: When using " +
-                "LocalJobRunner, must have only one worker since " +
-                "only 1 task at a time!");
-        }
-        if (conf.getBoolean(SPLIT_MASTER_WORKER,
-                            SPLIT_MASTER_WORKER_DEFAULT)) {
-            throw new IllegalArgumentException(
-                "checkLocalJobRunnerConfiguration: When using " +
-                "LocalJobRunner, you cannot run in split master / worker " +
-                "mode since there is only 1 task at a time!");
-        }
-    }
-
-    /**
-     * Check whether a specified int conf value is set and if not, set it.
-     *
-     * @param param Conf value to check
-     * @param defaultValue Assign to value if not set
-     */
-    private void setIntConfIfDefault(String param, int defaultValue) {
-        if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) {
-            conf.setInt(param, defaultValue);
-        }
-    }
-
-    /**
-     * Runs the actual graph application through Hadoop Map-Reduce.
-     *
-     * @param verbose If true, provide verbose output, false otherwise
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     * @throws IOException
-     */
-    final public boolean run(boolean verbose)
-            throws IOException, InterruptedException, ClassNotFoundException {
-        checkConfiguration();
-        checkLocalJobRunnerConfiguration(conf);
-        setNumReduceTasks(0);
-        // Most users won't hit this hopefully and can set it higher if desired
-        setIntConfIfDefault("mapreduce.job.counters.limit", 512);
-
-        // Capacity scheduler-specific settings.  These should be enough for
-        // a reasonable Giraph job
-        setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
-        setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
-
-        // Speculative execution doesn't make sense for Giraph
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-
-        // Set the ping interval to 5 minutes instead of one minute
-        // (DEFAULT_PING_INTERVAL)
-        Client.setPingInterval(conf, 60000*5);
-
-        if (getJar() == null) {
-            setJarByClass(GiraphJob.class);
-        }
-        // Should work in MAPREDUCE-1938 to let the user jars/classes
-        // get loaded first
-        conf.setBoolean("mapreduce.user.classpath.first", true);
-
-        setMapperClass(GraphMapper.class);
-        setInputFormatClass(BspInputFormat.class);
-        setOutputFormatClass(BspOutputFormat.class);
-        return waitForCompletion(verbose);
-    }
+  static {
+    Configuration.addDefaultResource("giraph-site.xml");
+  }
+
+  /** Vertex class - required */
+  public static final String VERTEX_CLASS = "giraph.vertexClass";
+  /** VertexInputFormat class - required */
+  public static final String VERTEX_INPUT_FORMAT_CLASS =
+      "giraph.vertexInputFormatClass";
+
+  /** VertexOutputFormat class - optional */
+  public static final String VERTEX_OUTPUT_FORMAT_CLASS =
+      "giraph.vertexOutputFormatClass";
+  /** Vertex combiner class - optional */
+  public static final String VERTEX_COMBINER_CLASS =
+      "giraph.combinerClass";
+  /** Vertex resolver class - optional */
+  public static final String VERTEX_RESOLVER_CLASS =
+      "giraph.vertexResolverClass";
+  /** Graph partitioner factory class - optional */
+  public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
+      "giraph.graphPartitionerFactoryClass";
+
+  /** Vertex index class */
+  public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
+  /** Vertex value class */
+  public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+  /** Edge value class */
+  public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+  /** Message value class */
+  public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+  /** Worker context class */
+  public static final String WORKER_CONTEXT_CLASS =
+      "giraph.workerContextClass";
+  /** AggregatorWriter class - optional */
+  public static final String AGGREGATOR_WRITER_CLASS =
+      "giraph.aggregatorWriterClass";
+
+  /**
+   * Minimum number of simultaneous workers before this job can run (int)
+   */
+  public static final String MIN_WORKERS = "giraph.minWorkers";
+  /**
+   * Maximum number of simultaneous worker tasks started by this job (int).
+   */
+  public static final String MAX_WORKERS = "giraph.maxWorkers";
+
+  /**
+   * Separate the workers and the master tasks.  This is required
+   * to support dynamic recovery. (boolean)
+   */
+  public static final String SPLIT_MASTER_WORKER =
+      "giraph.SplitMasterWorker";
+  /**
+   * Default on whether to separate the workers and the master tasks.
+   * Needs to be "true" to support dynamic recovery.
+   */
+  public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true;
+
+  /** Indicates whether this job is run in an internal unit test */
+  public static final String LOCAL_TEST_MODE =
+      "giraph.localTestMode";
+
+  /** not in local test mode per default */
+  public static final boolean LOCAL_TEST_MODE_DEFAULT = false;
+
+  /**
+   * Minimum percent of the maximum number of workers that have responded
+   * in order to continue progressing. (float)
+   */
+  public static final String MIN_PERCENT_RESPONDED =
+      "giraph.minPercentResponded";
+  /** Default 100% response rate for workers */
+  public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
+
+  /** Polling timeout to check on the number of responded tasks (int) */
+  public static final String POLL_MSECS = "giraph.pollMsecs";
+  /** Default poll msecs (30 seconds) */
+  public static final int POLL_MSECS_DEFAULT = 30 * 1000;
+
+  /**
+   *  ZooKeeper comma-separated list (if not set,
+   *  will start up ZooKeeper locally)
+   */
+  public static final String ZOOKEEPER_LIST = "giraph.zkList";
+
+  /** ZooKeeper session millisecond timeout */
+  public static final String ZOOKEEPER_SESSION_TIMEOUT =
+      "giraph.zkSessionMsecTimeout";
+  /** Default Zookeeper session millisecond timeout */
+  public static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000;
+
+  /** Polling interval to check for the final ZooKeeper server data */
+  public static final String ZOOKEEPER_SERVERLIST_POLL_MSECS =
+      "giraph.zkServerlistPollMsecs";
+  /** Default polling interval to check for the final ZooKeeper server data */
+  public static final int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT =
+      3 * 1000;
+
+  /** Number of nodes (not tasks) to run Zookeeper on */
+  public static final String ZOOKEEPER_SERVER_COUNT =
+      "giraph.zkServerCount";
+  /** Default number of nodes to run Zookeeper on */
+  public static final int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
+
+  /** ZooKeeper port to use */
+  public static final String ZOOKEEPER_SERVER_PORT =
+      "giraph.zkServerPort";
+  /** Default ZooKeeper port to use */
+  public static final int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
+
+  /** Location of the ZooKeeper jar - Used internally, not meant for users */
+  public static final String ZOOKEEPER_JAR = "giraph.zkJar";
+
+  /** Local ZooKeeper directory to use */
+  public static final String ZOOKEEPER_DIR = "giraph.zkDir";
+
+  /** Initial port to start using for the RPC communication */
+  public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
+  /** Default port to start using for the RPC communication */
+  public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
+
+  /** Maximum bind attempts for different RPC ports */
+  public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
+      "giraph.maxRpcPortBindAttempts";
+  /** Default maximum bind attempts for different RPC ports */
+  public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+
+  /** Maximum number of RPC handlers */
+  public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
+  /** Default maximum number of RPC handlers */
+  public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
+
+  /**
+   *  Maximum number of vertices per partition before sending.
+   *  (input superstep only).
+   */
+  public static final String MAX_VERTICES_PER_PARTITION =
+      "giraph.maxVerticesPerPartition";
+  /** Default maximum number of vertices per partition before sending. */
+  public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000;
+
+  /** Maximum number of messages per peer before flush */
+  public static final String MSG_SIZE = "giraph.msgSize";
+  /** Default maximum number of messages per peer before flush */
+  public static final int MSG_SIZE_DEFAULT = 1000;
+
+  /** Maximum number of messages that can be bulk sent during a flush */
+  public static final String MAX_MESSAGES_PER_FLUSH_PUT =
+      "giraph.maxMessagesPerFlushPut";
+  /** Default number of messages that can be bulk sent during a flush */
+  public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000;
+
+  /** Number of flush threads per peer */
+  public static final String MSG_NUM_FLUSH_THREADS =
+      "giraph.msgNumFlushThreads";
+
+  /** Number of poll attempts prior to failing the job (int) */
+  public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
+  /** Default poll attempts */
+  public static final int POLL_ATTEMPTS_DEFAULT = 10;
+
+  /** Number of minimum vertices in each vertex range */
+  public static final String MIN_VERTICES_PER_RANGE =
+      "giraph.minVerticesPerRange";
+  /** Default number of minimum vertices in each vertex range */
+  public static final long MIN_VERTICES_PER_RANGE_DEFAULT = 3;
+
+  /** Minimum stragglers of the superstep before printing them out */
+  public static final String PARTITION_LONG_TAIL_MIN_PRINT =
+      "giraph.partitionLongTailMinPrint";
+  /** Only print stragglers with one as a default */
+  public static final int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
+
+  /** Use superstep counters? (boolean) */
+  public static final String USE_SUPERSTEP_COUNTERS =
+      "giraph.useSuperstepCounters";
+  /** Default is to use the superstep counters */
+  public static final boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
+
+  /**
+   * Set the multiplicative factor of how many partitions to create from
+   * a single InputSplit based on the number of total InputSplits.  For
+   * example, if there are 10 total InputSplits and this is set to 0.5, then
+   * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the
+   * minimum size is met).
+   */
+  public static final String TOTAL_INPUT_SPLIT_MULTIPLIER =
+      "giraph.totalInputSplitMultiplier";
+  /** Default total input split multiplier */
+  public static final float TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT = 0.5f;
+
+  /**
+   * Input split sample percent - Used only for sampling and testing, rather
+   * than an actual job.  The idea is that to test, you might only want a
+   * fraction of the actual input splits from your VertexInputFormat to
+   * load (values should be [0, 100]).
+   */
+  public static final String INPUT_SPLIT_SAMPLE_PERCENT =
+      "giraph.inputSplitSamplePercent";
+  /** Default is to use all the input splits */
+  public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
+
+  /**
+   * To limit outlier input splits from producing too many vertices or to
+   * help with testing, the number of vertices loaded from an input split can
+   * be limited.  By default, everything is loaded.
+   */
+  public static final String INPUT_SPLIT_MAX_VERTICES =
+      "giraph.InputSplitMaxVertices";
+  /**
+   * Default is that all the vertices are to be loaded from the input
+   * split
+   */
+  public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
+
+  /** Java opts passed to ZooKeeper startup */
+  public static final String ZOOKEEPER_JAVA_OPTS =
+      "giraph.zkJavaOpts";
+  /** Default java opts passed to ZooKeeper startup */
+  public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT =
+      "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
+          "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
+
+  /**
+   *  How often to checkpoint (i.e. 0, means no checkpoint,
+   *  1 means every superstep, 2 is every two supersteps, etc.).
+   */
+  public static final String CHECKPOINT_FREQUENCY =
+      "giraph.checkpointFrequency";
+
+  /** Default checkpointing frequency of every 2 supersteps. */
+  public static final int CHECKPOINT_FREQUENCY_DEFAULT = 2;
+
+  /**
+   * Delete checkpoints after a successful job run?
+   */
+  public static final String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
+      "giraph.cleanupCheckpointsAfterSuccess";
+  /** Default is to clean up the checkponts after a successful job */
+  public static final boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT =
+      true;
+
+  /**
+   * An application can be restarted manually by selecting a superstep.  The
+   * corresponding checkpoint must exist for this to work.  The user should
+   * set a long value.  Default is start from scratch.
+   */
+  public static final String RESTART_SUPERSTEP = "giraph.restartSuperstep";
+
+  /**
+   * Base ZNode for Giraph's state in the ZooKeeper cluster.  Must be a root
+   * znode on the cluster beginning with "/"
+   */
+  public static final String BASE_ZNODE_KEY = "giraph.zkBaseZNode";
+
+  /**
+   * If ZOOKEEPER_LIST is not set, then use this directory to manage
+   * ZooKeeper
+   */
+  public static final String ZOOKEEPER_MANAGER_DIRECTORY =
+      "giraph.zkManagerDirectory";
+  /**
+   * Default ZooKeeper manager directory (where determining the servers in
+   * HDFS files will go).  Final directory path will also have job number
+   * for uniqueness.
+   */
+  public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT =
+      "_bsp/_defaultZkManagerDir";
+
+  /** This directory has/stores the available checkpoint files in HDFS. */
+  public static final String CHECKPOINT_DIRECTORY =
+      "giraph.checkpointDirectory";
+  /**
+   * Default checkpoint directory (where checkpoing files go in HDFS).  Final
+   * directory path will also have the job number for uniqueness
+   */
+  public static final String CHECKPOINT_DIRECTORY_DEFAULT =
+      "_bsp/_checkpoints/";
+
+  /** Keep the zookeeper output for debugging? Default is to remove it. */
+  public static final String KEEP_ZOOKEEPER_DATA =
+      "giraph.keepZooKeeperData";
+  /** Default is to remove ZooKeeper data. */
+  public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
+
+  /** Default ZooKeeper tick time. */
+  public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
+  /** Default ZooKeeper init limit (in ticks). */
+  public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
+  /** Default ZooKeeper sync limit (in ticks). */
+  public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
+  /** Default ZooKeeper snap count. */
+  public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
+  /** Default ZooKeeper maximum client connections. */
+  public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
+  /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
+  public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300 * 1000;
+  /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
+  public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000;
+
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GiraphJob.class);
+
+  /**
+   * Constructor that will instantiate the configuration
+   *
+   * @param jobName User-defined job name
+   * @throws IOException
+   */
+  public GiraphJob(String jobName) throws IOException {
+    super(new Configuration(), jobName);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param conf User-defined configuration
+   * @param jobName User-defined job name
+   * @throws IOException
+   */
+  public GiraphJob(Configuration conf, String jobName) throws IOException {
+    super(conf, jobName);
+  }
+
+  /**
+   * Make sure the configuration is set properly by the user prior to
+   * submitting the job.
+   */
+  private void checkConfiguration() {
+    if (conf.getInt(MAX_WORKERS, -1) < 0) {
+      throw new RuntimeException("No valid " + MAX_WORKERS);
+    }
+    if (conf.getFloat(MIN_PERCENT_RESPONDED,
+        MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f ||
+        conf.getFloat(MIN_PERCENT_RESPONDED,
+            MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) {
+      throw new IllegalArgumentException(
+          "Invalid " +
+              conf.getFloat(MIN_PERCENT_RESPONDED,
+                  MIN_PERCENT_RESPONDED_DEFAULT) + " for " +
+                  MIN_PERCENT_RESPONDED);
+    }
+    if (conf.getInt(MIN_WORKERS, -1) < 0) {
+      throw new IllegalArgumentException("No valid " + MIN_WORKERS);
+    }
+    if (BspUtils.getVertexClass(getConfiguration()) == null) {
+      throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS");
+    }
+    if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) {
+      throw new IllegalArgumentException(
+          "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS");
+    }
+    if (BspUtils.getVertexResolverClass(getConfiguration()) == null) {
+      setVertexResolverClass(VertexResolver.class);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("GiraphJob: No class found for " +
+            VERTEX_RESOLVER_CLASS + ", defaulting to " +
+            VertexResolver.class.getCanonicalName());
+      }
+    }
+  }
+
+  /**
+   * Set the vertex class (required)
+   *
+   * @param vertexClass Runs vertex computation
+   */
+  public final void setVertexClass(Class<?> vertexClass) {
+    getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class);
+  }
+
+  /**
+   * Set the vertex input format class (required)
+   *
+   * @param vertexInputFormatClass Determines how graph is input
+   */
+  public final void setVertexInputFormatClass(
+      Class<?> vertexInputFormatClass) {
+    getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS,
+        vertexInputFormatClass,
+        VertexInputFormat.class);
+  }
+
+  /**
+   * Set the vertex output format class (optional)
+   *
+   * @param vertexOutputFormatClass Determines how graph is output
+   */
+  public final void setVertexOutputFormatClass(
+      Class<?> vertexOutputFormatClass) {
+    getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS,
+        vertexOutputFormatClass,
+        VertexOutputFormat.class);
+  }
+
+  /**
+   * Set the vertex combiner class (optional)
+   *
+   * @param vertexCombinerClass Determines how vertex messages are combined
+   */
+  public final void setVertexCombinerClass(Class<?> vertexCombinerClass) {
+    getConfiguration().setClass(VERTEX_COMBINER_CLASS,
+        vertexCombinerClass,
+        VertexCombiner.class);
+  }
+
+  /**
+   * Set the graph partitioner class (optional)
+   *
+   * @param graphPartitionerFactoryClass Determines how the graph is 
partitioned
+   */
+  public final void setGraphPartitionerFactoryClass(
+      Class<?> graphPartitionerFactoryClass) {
+    getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+        graphPartitionerFactoryClass,
+        GraphPartitionerFactory.class);
+  }
+
+  /**
+   * Set the vertex resolver class (optional)
+   *
+   * @param vertexResolverClass Determines how vertex mutations are resolved
+   */
+  public final void setVertexResolverClass(Class<?> vertexResolverClass) {
+    getConfiguration().setClass(VERTEX_RESOLVER_CLASS,
+        vertexResolverClass,
+        VertexResolver.class);
+  }
+
+  /**
+   * Set the worker context class (optional)
+   *
+   * @param workerContextClass Determines what code is executed on a each
+   *        worker before and after each superstep and computation
+   */
+  public final void setWorkerContextClass(Class<?> workerContextClass) {
+    getConfiguration().setClass(WORKER_CONTEXT_CLASS,
+        workerContextClass,
+        WorkerContext.class);
+  }
+
+  /**
+   * Set the aggregator writer class (optional)
+   *
+   * @param aggregatorWriterClass Determines how the aggregators are
+   *        written to file at the end of the job
+   */
+  public final void setAggregatorWriterClass(
+      Class<?> aggregatorWriterClass) {
+    getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
+        aggregatorWriterClass,
+        AggregatorWriter.class);
+  }
+
+  /**
+   * Set worker configuration for determining what is required for
+   * a superstep.
+   *
+   * @param minWorkers Minimum workers to do a superstep
+   * @param maxWorkers Maximum workers to do a superstep
+   *        (max map tasks in job)
+   * @param minPercentResponded 0 - 100 % of the workers required to
+   *        have responded before continuing the superstep
+   */
+  public final void setWorkerConfiguration(int minWorkers,
+      int maxWorkers,
+      float minPercentResponded) {
+    conf.setInt(MIN_WORKERS, minWorkers);
+    conf.setInt(MAX_WORKERS, maxWorkers);
+    conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+  }
+
+  /**
+   * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
+   * will be dynamically started by Giraph for this job.
+   *
+   * @param serverList Comma separated list of servers and ports
+   *        (i.e. zk1:2221,zk2:2221)
+   */
+  public final void setZooKeeperConfiguration(String serverList) {
+    conf.set(ZOOKEEPER_LIST, serverList);
+  }
+
+  /**
+   * Check if the configuration is local.  If it is local, do additional
+   * checks due to the restrictions of LocalJobRunner.
+   *
+   * @param conf Configuration
+   */
+  private static void checkLocalJobRunnerConfiguration(
+      Configuration conf) {
+    String jobTracker = conf.get("mapred.job.tracker", null);
+    if (!jobTracker.equals("local")) {
+      // Nothing to check
+      return;
+    }
+
+    int maxWorkers = conf.getInt(MAX_WORKERS, -1);
+    if (maxWorkers != 1) {
+      throw new IllegalArgumentException(
+          "checkLocalJobRunnerConfiguration: When using " +
+              "LocalJobRunner, must have only one worker since " +
+          "only 1 task at a time!");
+    }
+    if (conf.getBoolean(SPLIT_MASTER_WORKER,
+        SPLIT_MASTER_WORKER_DEFAULT)) {
+      throw new IllegalArgumentException(
+          "checkLocalJobRunnerConfiguration: When using " +
+              "LocalJobRunner, you cannot run in split master / worker " +
+          "mode since there is only 1 task at a time!");
+    }
+  }
+
+  /**
+   * Check whether a specified int conf value is set and if not, set it.
+   *
+   * @param param Conf value to check
+   * @param defaultValue Assign to value if not set
+   */
+  private void setIntConfIfDefault(String param, int defaultValue) {
+    if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) {
+      conf.setInt(param, defaultValue);
+    }
+  }
+
+  /**
+   * Runs the actual graph application through Hadoop Map-Reduce.
+   *
+   * @param verbose If true, provide verbose output, false otherwise
+   * @return True if success, false otherwise
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public final boolean run(boolean verbose)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    checkConfiguration();
+    checkLocalJobRunnerConfiguration(conf);
+    setNumReduceTasks(0);
+    // Most users won't hit this hopefully and can set it higher if desired
+    setIntConfIfDefault("mapreduce.job.counters.limit", 512);
+
+    // Capacity scheduler-specific settings.  These should be enough for
+    // a reasonable Giraph job
+    setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
+    setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
+
+    // Speculative execution doesn't make sense for Giraph
+    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+
+    // Set the ping interval to 5 minutes instead of one minute
+    // (DEFAULT_PING_INTERVAL)
+    Client.setPingInterval(conf, 60000 * 5);
+
+    if (getJar() == null) {
+      setJarByClass(GiraphJob.class);
+    }
+    // Should work in MAPREDUCE-1938 to let the user jars/classes
+    // get loaded first
+    conf.setBoolean("mapreduce.user.classpath.first", true);
+
+    setMapperClass(GraphMapper.class);
+    setInputFormatClass(BspInputFormat.class);
+    setOutputFormatClass(BspOutputFormat.class);
+    return waitForCompletion(verbose);
+  }
 }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java 
Thu Feb 16 22:12:31 2012
@@ -29,57 +29,71 @@ import org.apache.hadoop.io.Writable;
  * Aggregated stats by the master.
  */
 public class GlobalStats implements Writable {
-    private long vertexCount = 0;
-    private long finishedVertexCount = 0;
-    private long edgeCount = 0;
-    private long messageCount = 0;
-
-    public void addPartitionStats(PartitionStats partitionStats) {
-        this.vertexCount += partitionStats.getVertexCount();
-        this.finishedVertexCount += partitionStats.getFinishedVertexCount();
-        this.edgeCount += partitionStats.getEdgeCount();
-    }
-
-    public long getVertexCount() {
-        return vertexCount;
-    }
-
-    public long getFinishedVertexCount() {
-        return finishedVertexCount;
-    }
-
-    public long getEdgeCount() {
-        return edgeCount;
-    }
-
-    public long getMessageCount() {
-        return messageCount;
-    }
-
-    public void addMessageCount(long messageCount) {
-        this.messageCount += messageCount;
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        vertexCount = input.readLong();
-        finishedVertexCount = input.readLong();
-        edgeCount = input.readLong();
-        messageCount = input.readLong();
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        output.writeLong(vertexCount);
-        output.writeLong(finishedVertexCount);
-        output.writeLong(edgeCount);
-        output.writeLong(messageCount);
-    }
-
-    @Override
-    public String toString() {
-        return "(vtx=" + vertexCount + ",finVtx=" +
-               finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
-               messageCount + ")";
-    }
+  /** All vertices in the application */
+  private long vertexCount = 0;
+  /** All finished vertices in the last superstep */
+  private long finishedVertexCount = 0;
+  /** All edges in the last superstep */
+  private long edgeCount = 0;
+  /** All messages sent in the last superstep */
+  private long messageCount = 0;
+
+  /**
+   * Add the stats of a partition to the global stats.
+   *
+   * @param partitionStats Partition stats to be added.
+   */
+  public void addPartitionStats(PartitionStats partitionStats) {
+    this.vertexCount += partitionStats.getVertexCount();
+    this.finishedVertexCount += partitionStats.getFinishedVertexCount();
+    this.edgeCount += partitionStats.getEdgeCount();
+  }
+
+  public long getVertexCount() {
+    return vertexCount;
+  }
+
+  public long getFinishedVertexCount() {
+    return finishedVertexCount;
+  }
+
+  public long getEdgeCount() {
+    return edgeCount;
+  }
+
+  public long getMessageCount() {
+    return messageCount;
+  }
+
+  /**
+   * Add messages to the global stats.
+   *
+   * @param messageCount Number of messages to be added.
+   */
+  public void addMessageCount(long messageCount) {
+    this.messageCount += messageCount;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    vertexCount = input.readLong();
+    finishedVertexCount = input.readLong();
+    edgeCount = input.readLong();
+    messageCount = input.readLong();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeLong(vertexCount);
+    output.writeLong(finishedVertexCount);
+    output.writeLong(edgeCount);
+    output.writeLong(messageCount);
+  }
+
+  @Override
+  public String toString() {
+    return "(vtx=" + vertexCount + ",finVtx=" +
+        finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
+        messageCount + ")";
+  }
 }


Reply via email to