Switch to the start state when lazily initializing Previously, we would attribute time spent running the startBundle of a DoFn as time spent in -process state.
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115604508 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31116460 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31116460 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31116460 Branch: refs/heads/master Commit: 31116460fb4f6c47d48e01c507389f4eb8f8b3cd Parents: fba9147 Author: bchambers <[email protected]> Authored: Thu Feb 25 14:26:03 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:28 2016 -0800 ---------------------------------------------------------------------- .../sdk/util/common/worker/StateSampler.java | 47 +++++++++++++------- 1 file changed, 32 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31116460/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java index df916a0..00d3b3b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java @@ -67,7 +67,7 @@ public class StateSampler implements AutoCloseable { private volatile int currentState; /** Special value of {@code currentState} that means we do not sample. */ - private static final int DO_NOT_SAMPLE = -1; + public static final int DO_NOT_SAMPLE = -1; /** * A counter that increments with each state transition. May be used @@ -113,6 +113,30 @@ public class StateSampler implements AutoCloseable { this.prefix = prefix; this.counterSetMutator = counterSetMutator; currentState = DO_NOT_SAMPLE; + scheduleSampling(samplingPeriodMs); + } + + /** + * Constructs a new {@link StateSampler} that can be used to obtain + * an approximate breakdown of the time spent by an execution + * context in various states, as a fraction of the total time. + * + * @param prefix the prefix of the counter names for the states + * @param counterSetMutator the {@link CounterSet.AddCounterMutator} + * used to create a counter for each distinct state + */ + public StateSampler(String prefix, + CounterSet.AddCounterMutator counterSetMutator) { + this(prefix, counterSetMutator, DEFAULT_SAMPLING_PERIOD_MS); + } + + /** + * Called by the constructor to schedule sampling at the given period. + * + * <p>Should not be overridden by sub-classes unless they want to change + * or disable the automatic sampling of state. + */ + protected void scheduleSampling(final long samplingPeriodMs) { // Here "stratified sampling" is used, which makes sure that there's 1 uniformly chosen sampled // point in every bucket of samplingPeriodMs, to prevent pathological behavior in case some // states happen to occur at a similar period. @@ -148,20 +172,6 @@ public class StateSampler implements AutoCloseable { TimeUnit.MILLISECONDS); } - /** - * Constructs a new {@link StateSampler} that can be used to obtain - * an approximate breakdown of the time spent by an execution - * context in various states, as a fraction of the total time. - * - * @param prefix the prefix of the counter names for the states - * @param counterSetMutator the {@link CounterSet.AddCounterMutator} - * used to create a counter for each distinct state - */ - public StateSampler(String prefix, - CounterSet.AddCounterMutator counterSetMutator) { - this(prefix, counterSetMutator, DEFAULT_SAMPLING_PERIOD_MS); - } - public synchronized void run() { long startTimestampNs = System.nanoTime(); int state = currentState; @@ -255,6 +265,13 @@ public class StateSampler implements AutoCloseable { } /** + * Returns the current state of this state sampler. + */ + public int getCurrentState() { + return currentState; + } + + /** * Sets the current thread state. * * @param state the new state to transition to
