Updated Branches: refs/heads/trunk 9325fa986 -> 4d520645f
GIRAPH-838: setup time & total time counter also include time spent waiting for machines (pavanka via majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4d520645 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4d520645 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4d520645 Branch: refs/heads/trunk Commit: 4d520645ff53b9a354fef57074c7661f3bc46e39 Parents: 9325fa9 Author: Maja Kabiljo <[email protected]> Authored: Mon Feb 3 09:51:28 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Mon Feb 3 09:51:28 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 3 +++ .../giraph/bsp/CentralizedServiceMaster.java | 12 +++++++++++ .../apache/giraph/counters/GiraphTimers.java | 22 +++++++++++++++++--- .../apache/giraph/master/BspServiceMaster.java | 10 ++------- .../org/apache/giraph/master/MasterThread.java | 14 ++++++++++--- 5 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index cef754e..b707cc2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-838: setup time & total time counter also include time spent waiting for machines + (pavanka via majakabiljo) + GIRAPH-839: NettyWorkerAggregatorRequestProcessor tries to reuse request objects (pavanka via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java index 999888d..bda967d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -21,11 +21,13 @@ package org.apache.giraph.bsp; import org.apache.giraph.master.MasterAggregatorHandler; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterInfo; +import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.zookeeper.KeeperException; import java.io.IOException; +import java.util.List; /** * At most, there will be one active master at a time, but many threads can @@ -58,6 +60,16 @@ public interface CentralizedServiceMaster<I extends WritableComparable, MasterInfo getMasterInfo(); /** + * Check all the {@link org.apache.giraph.worker.WorkerInfo} objects to ensure + * that a minimum number of good workers exists out of the total that have + * reported. + * + * @return List of of healthy workers such that the minimum has been + * met, otherwise null + */ + List<WorkerInfo> checkWorkers(); + + /** * Create the {@link BspInputSplit} objects from the index range based on the * user-defined VertexInputFormat. The {@link BspInputSplit} objects will * processed by the workers later on during the INPUT_SUPERSTEP. http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java index cbf2470..56915b6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java +++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java @@ -39,18 +39,22 @@ public class GiraphTimers extends HadoopCountersBase { public static final String TOTAL_MS_NAME = "Total (ms)"; /** Counter name for shutdown msec */ public static final String SHUTDOWN_MS_NAME = "Shutdown (ms)"; + /** Counter name for initialize msec */ + public static final String INITIALIZE_MS_NAME = "Initialize (ms)"; /** Singleton instance for everyone to use */ private static GiraphTimers INSTANCE; /** Setup time in msec */ private static final int SETUP_MS = 0; - /** Total time in msec */ + /** Total time in msec (doesn't include initialize time) */ private static final int TOTAL_MS = 1; /** Shutdown time in msec */ private static final int SHUTDOWN_MS = 2; + /** Total time it takes to get minimum machines */ + private static final int INITIALIZE_MS = 3; /** How many whole job counters we have */ - private static final int NUM_COUNTERS = 3; + private static final int NUM_COUNTERS = 4; /** superstep time in msec */ private final Map<Long, GiraphHadoopCounter> superstepMsec; @@ -69,6 +73,7 @@ public class GiraphTimers extends HadoopCountersBase { jobCounters[SETUP_MS] = getCounter(SETUP_MS_NAME); jobCounters[TOTAL_MS] = getCounter(TOTAL_MS_NAME); jobCounters[SHUTDOWN_MS] = getCounter(SHUTDOWN_MS_NAME); + jobCounters[INITIALIZE_MS] = getCounter(INITIALIZE_MS_NAME); superstepMsec = Maps.newHashMap(); } @@ -124,7 +129,8 @@ public class GiraphTimers extends HadoopCountersBase { } /** - * Get counter for total time in milliseconds. + * Get counter for total time in milliseconds (doesn't include initialize + * time). * * @return Counter for total time in milliseconds. */ @@ -142,6 +148,16 @@ public class GiraphTimers extends HadoopCountersBase { } /** + * Get counter for initializing the process, + * having to wait for a minimum number of processes to be available + * before setup step + * @return Counter for initializing in milliseconds + */ + public GiraphHadoopCounter getInitializeMs() { + return jobCounters[INITIALIZE_MS]; + } + + /** * Get map of superstep to msec counter. * * @return mapping of superstep to msec counter. http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index cfee4c5..90dc9f3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -459,14 +459,8 @@ public class BspServiceMaster<I extends WritableComparable, } } - /** - * Check all the {@link WorkerInfo} objects to ensure that a minimum - * number of good workers exists out of the total that have reported. - * - * @return List of of healthy workers such that the minimum has been - * met, otherwise null - */ - private List<WorkerInfo> checkWorkers() { + @Override + public List<WorkerInfo> checkWorkers() { boolean failJob = true; long failWorkerCheckMsecs = SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs; http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java index ec1733c..15dbe07 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java @@ -93,15 +93,23 @@ public class MasterThread<I extends WritableComparable, V extends Writable, // 3. Run all supersteps until complete try { long startMillis = System.currentTimeMillis(); + long initializeMillis = 0; long endMillis = 0; bspServiceMaster.setup(); if (bspServiceMaster.becomeMaster()) { + // First call to checkWorkers waits for all pending resources. + // If these resources are still available at subsequent calls it just + // reads zookeeper for the list of healthy workers. + bspServiceMaster.checkWorkers(); + initializeMillis = System.currentTimeMillis(); + GiraphTimers.getInstance().getInitializeMs().increment( + initializeMillis - startMillis); // Attempt to create InputSplits if necessary. Bail out if that fails. if (bspServiceMaster.getRestartedSuperstep() != BspService.UNSET_SUPERSTEP || (bspServiceMaster.createVertexInputSplits() != -1 && bspServiceMaster.createEdgeInputSplits() != -1)) { - long setupMillis = System.currentTimeMillis() - startMillis; + long setupMillis = System.currentTimeMillis() - initializeMillis; GiraphTimers.getInstance().getSetupMs().increment(setupMillis); setupSecs = setupMillis / 1000.0d; SuperstepState superstepState = SuperstepState.INITIAL; @@ -169,11 +177,11 @@ public class MasterThread<I extends WritableComparable, V extends Writable, (System.currentTimeMillis() - endMillis) / 1000.0d + " seconds."); LOG.info("total: Took " + - ((System.currentTimeMillis() - startMillis) / + ((System.currentTimeMillis() - initializeMillis) / 1000.0d) + " seconds."); } GiraphTimers.getInstance().getTotalMs(). - increment(System.currentTimeMillis() - startMillis); + increment(System.currentTimeMillis() - initializeMillis); } bspServiceMaster.postApplication(); // CHECKSTYLE: stop IllegalCatchCheck
