Repository: beam Updated Branches: refs/heads/master 41f16123b -> 49c392790
Adding lull tracking for python sampler Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21cdc85c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21cdc85c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21cdc85c Branch: refs/heads/master Commit: 21cdc85cfa8a06208a7f0a6736cc7d5886d4c8de Parents: 41f1612 Author: Pablo <[email protected]> Authored: Thu Oct 19 12:50:46 2017 -0700 Committer: [email protected] <[email protected]> Committed: Thu Oct 19 16:02:03 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/worker/statesampler.pyx | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/21cdc85c/sdks/python/apache_beam/runners/worker/statesampler.pyx ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx index f0527c6..1e37196 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx @@ -74,12 +74,16 @@ cdef inline int64_t get_nsec_time() nogil: class StateSamplerInfo(object): """Info for current state and transition statistics of StateSampler.""" - def __init__(self, state_name, transition_count): + def __init__(self, state_name, transition_count, time_since_transition): self.state_name = state_name self.transition_count = transition_count + self.time_since_transition = time_since_transition def __repr__(self): - return '<StateSamplerInfo %s %d>' % (self.state_name, self.transition_count) + return ('<StateSamplerInfo state: %s time: %dns transitions: %d>' + % (self.state_name, + self.time_since_transition, + self.transition_count)) # Default period for sampling current state of pipeline execution. @@ -105,6 +109,7 @@ cdef class StateSampler(object): cdef pythread.PyThread_type_lock lock cdef public int64_t state_transition_count + cdef int64_t time_since_transition cdef int32_t current_state_index @@ -122,6 +127,8 @@ cdef class StateSampler(object): self.scoped_states_by_name = {} self.current_state_index = 0 + self.time_since_transition = 0 + self.state_transition_count = 0 unknown_state = ScopedState(self, 'unknown', self.current_state_index) pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) self.scoped_states_by_index = [unknown_state] @@ -142,6 +149,7 @@ cdef class StateSampler(object): def run(self): cdef int64_t last_nsecs = get_nsec_time() cdef int64_t elapsed_nsecs + cdef int64_t latest_transition_count = self.state_transition_count with nogil: while True: usleep(self.sampling_period_ms * 1000) @@ -155,6 +163,10 @@ cdef class StateSampler(object): nsecs_ptr = &(<ScopedState>PyList_GET_ITEM( self.scoped_states_by_index, self.current_state_index)).nsecs nsecs_ptr[0] += elapsed_nsecs + if latest_transition_count != self.state_transition_count: + self.time_since_transition = 0 + latest_transition_count = self.state_transition_count + self.time_since_transition += elapsed_nsecs last_nsecs += elapsed_nsecs finally: pythread.PyThread_release_lock(self.lock) @@ -182,7 +194,8 @@ cdef class StateSampler(object): """Returns StateSamplerInfo with transition statistics.""" return StateSamplerInfo( self.scoped_states_by_index[self.current_state_index].name, - self.state_transition_count) + self.state_transition_count, + self.time_since_transition) # TODO(pabloem): Make state_name required once all callers migrate, # and the legacy path is removed.
