This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 24e6bf8 [BEAM-2732] Starting refactor of state tracking in Python
(#4375)
24e6bf8 is described below
commit 24e6bf81790b97024fe1871575186d9db325bf1d
Author: Pablo <[email protected]>
AuthorDate: Thu Jan 18 13:39:48 2018 -0800
[BEAM-2732] Starting refactor of state tracking in Python (#4375)
Also giving the Python-only state sampler full functionality.
---
.../runners/portability/fn_api_runner_test.py | 7 +-
.../runners/portability/maptask_executor_runner.py | 6 +-
.../apache_beam/runners/worker/bundle_processor.py | 7 +-
.../apache_beam/runners/worker/statesampler.py | 81 ++++++++++++
.../runners/worker/statesampler_fake.py | 51 --------
.../{statesampler.pyx => statesampler_fast.pyx} | 136 ++++++---------------
.../runners/worker/statesampler_slow.py | 76 ++++++++++++
.../runners/worker/statesampler_test.py | 48 +++++---
sdks/python/apache_beam/utils/counters.py | 3 -
9 files changed, 225 insertions(+), 190 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 6304f71..83bb83a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,13 +23,14 @@ import apache_beam as beam
from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.portability import maptask_executor_runner_test
from apache_beam.runners.worker import sdk_worker
+from apache_beam.runners.worker import statesampler
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import window
-try:
- from apache_beam.runners.worker.statesampler import
DEFAULT_SAMPLING_PERIOD_MS
-except ImportError:
+if statesampler.FAST_SAMPLER:
+ DEFAULT_SAMPLING_PERIOD_MS = statesampler.DEFAULT_SAMPLING_PERIOD_MS
+else:
DEFAULT_SAMPLING_PERIOD_MS = 0
diff --git
a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 913ccd5..74c6b03 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -36,15 +36,11 @@ from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import operations
+from apache_beam.runners.worker import statesampler
from apache_beam.typehints import typehints
from apache_beam.utils import profiler
from apache_beam.utils.counters import CounterFactory
-try:
- from apache_beam.runners.worker import statesampler
-except ImportError:
- from apache_beam.runners.worker import statesampler_fake as statesampler
-
# This module is experimental. No backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 97c318b..9bc9056 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -38,6 +38,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners import pipeline_context
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import operations
+from apache_beam.runners.worker import statesampler
from apache_beam.transforms import sideinputs
from apache_beam.utils import counters
from apache_beam.utils import proto_utils
@@ -46,12 +47,6 @@ from apache_beam.utils import urns
# This module is experimental. No backwards-compatibility guarantees.
-try:
- from apache_beam.runners.worker import statesampler
-except ImportError:
- from apache_beam.runners.worker import statesampler_fake as statesampler
-
-
DATA_INPUT_URN = 'urn:org.apache.beam:source:runner:0.1'
DATA_OUTPUT_URN = 'urn:org.apache.beam:sink:runner:0.1'
IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1'
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py
b/sdks/python/apache_beam/runners/worker/statesampler.py
new file mode 100644
index 0000000..03af644
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This module is experimental. No backwards-compatibility guarantees.
+from collections import namedtuple
+
+from apache_beam.utils.counters import Counter
+from apache_beam.utils.counters import CounterName
+
+try:
+ from apache_beam.runners.worker import statesampler_fast as statesampler_impl
+ FAST_SAMPLER = True
+except ImportError:
+ from apache_beam.runners.worker import statesampler_slow as statesampler_impl
+ FAST_SAMPLER = False
+
+
+StateSamplerInfo = namedtuple(
+ 'StateSamplerInfo',
+ ['state_name', 'transition_count', 'time_since_transition'])
+
+
+# Default period for sampling current state of pipeline execution.
+DEFAULT_SAMPLING_PERIOD_MS = 200
+
+
+class StateSampler(statesampler_impl.StateSampler):
+
+ def __init__(self, prefix, counter_factory,
+ sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS):
+ self.states_by_name = {}
+ self._prefix = prefix
+ self._counter_factory = counter_factory
+ self._states_by_name = {}
+ self.sampling_period_ms = sampling_period_ms
+ super(StateSampler, self).__init__(sampling_period_ms)
+
+ def stop_if_still_running(self):
+ if self.started and not self.finished:
+ self.stop()
+
+ def get_info(self):
+ """Returns StateSamplerInfo with transition statistics."""
+ return StateSamplerInfo(
+ self.current_state().name,
+ self.state_transition_count,
+ self.time_since_transition)
+
+ def scoped_state(self, step_name, state_name, io_target=None):
+ counter_name = CounterName(state_name + '-msecs',
+ stage_name=self._prefix,
+ step_name=step_name,
+ io_target=io_target)
+ if counter_name in self._states_by_name:
+ return self._states_by_name[counter_name]
+ else:
+ output_counter = self._counter_factory.get_counter(counter_name,
+ Counter.SUM)
+ self._states_by_name[counter_name] = super(
+ StateSampler, self)._scoped_state(counter_name, output_counter)
+ return self._states_by_name[counter_name]
+
+ def commit_counters(self):
+ """Updates output counters with latest state statistics."""
+ for state in self._states_by_name.values():
+ state_msecs = int(1e-6 * state.nsecs)
+ state.counter.update(state_msecs - state.counter.value())
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
deleted file mode 100644
index bc56021..0000000
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This module is experimental. No backwards-compatibility guarantees.
-
-
-class StateSampler(object):
-
- def __init__(self, *args, **kwargs):
- pass
-
- def scoped_state(self, step_name, state_name=None, io_target=None):
- return _FakeScopedState()
-
- def start(self):
- pass
-
- def stop(self):
- pass
-
- def stop_if_still_running(self):
- self.stop()
-
- def commit_counters(self):
- pass
-
-
-class _FakeScopedState(object):
-
- def __enter__(self):
- pass
-
- def __exit__(self, *unused_args):
- pass
-
- def sampled_seconds(self):
- return 0
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
similarity index 64%
rename from sdks/python/apache_beam/runners/worker/statesampler.pyx
rename to sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
index 1e37196..d0b1878 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
@@ -34,13 +34,7 @@ thread queries the current state, the time spent since the
previous sample is
attributed to that state and accumulated. Over time, this allows a granular
runtime profile to be produced.
"""
-
import threading
-import time
-
-
-from apache_beam.utils.counters import Counter
-from apache_beam.utils.counters import CounterName
cimport cython
from cpython cimport pythread
@@ -71,37 +65,14 @@ cdef inline int64_t get_nsec_time() nogil:
current_time.tv_nsec)
-class StateSamplerInfo(object):
- """Info for current state and transition statistics of StateSampler."""
-
- def __init__(self, state_name, transition_count, time_since_transition):
- self.state_name = state_name
- self.transition_count = transition_count
- self.time_since_transition = time_since_transition
-
- def __repr__(self):
- return ('<StateSamplerInfo state: %s time: %dns transitions: %d>'
- % (self.state_name,
- self.time_since_transition,
- self.transition_count))
-
-
-# Default period for sampling current state of pipeline execution.
-DEFAULT_SAMPLING_PERIOD_MS = 200
-
-
cdef class StateSampler(object):
"""Tracks time spent in states during pipeline execution."""
+ cdef int _sampling_period_ms
- cdef object prefix
- cdef object counter_factory
- cdef int sampling_period_ms
-
- cdef dict scoped_states_by_name
cdef list scoped_states_by_index
- cdef bint started
- cdef bint finished
+ cdef public bint started
+ cdef public bint finished
cdef object sampling_thread
# This lock guards members that are shared between threads, specificaly
@@ -109,22 +80,16 @@ cdef class StateSampler(object):
cdef pythread.PyThread_type_lock lock
cdef public int64_t state_transition_count
- cdef int64_t time_since_transition
+ cdef public int64_t time_since_transition
cdef int32_t current_state_index
- def __init__(self, prefix, counter_factory,
- sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS):
-
- # TODO(pabloem): Remove this once all dashed prefixes are removed from
- # the worker.
- # We stop using prefixes with included dash.
- self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix
- self.counter_factory = counter_factory
- self.sampling_period_ms = sampling_period_ms
+ def __init__(self, sampling_period_ms, *args):
+ self._sampling_period_ms = sampling_period_ms
+ self.started = False
+ self.finished = False
self.lock = pythread.PyThread_allocate_lock()
- self.scoped_states_by_name = {}
self.current_state_index = 0
self.time_since_transition = 0
@@ -132,7 +97,6 @@ cdef class StateSampler(object):
unknown_state = ScopedState(self, 'unknown', self.current_state_index)
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
self.scoped_states_by_index = [unknown_state]
- self.finished = False
pythread.PyThread_release_lock(self.lock)
# Assert that the compiler correctly aligned the current_state field. This
@@ -152,7 +116,7 @@ cdef class StateSampler(object):
cdef int64_t latest_transition_count = self.state_transition_count
with nogil:
while True:
- usleep(self.sampling_period_ms * 1000)
+ usleep(self._sampling_period_ms * 1000)
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
try:
if self.finished:
@@ -161,7 +125,7 @@ cdef class StateSampler(object):
# Take an address as we can't create a reference to the scope
# without the GIL.
nsecs_ptr = &(<ScopedState>PyList_GET_ITEM(
- self.scoped_states_by_index, self.current_state_index)).nsecs
+ self.scoped_states_by_index, self.current_state_index))._nsecs
nsecs_ptr[0] += elapsed_nsecs
if latest_transition_count != self.state_transition_count:
self.time_since_transition = 0
@@ -186,64 +150,28 @@ cdef class StateSampler(object):
# pythread doesn't support conditions.
self.sampling_thread.join()
- def stop_if_still_running(self):
- if self.started and not self.finished:
- self.stop()
-
- def get_info(self):
- """Returns StateSamplerInfo with transition statistics."""
- return StateSamplerInfo(
- self.scoped_states_by_index[self.current_state_index].name,
- self.state_transition_count,
- self.time_since_transition)
+ def current_state(self):
+ return self.scoped_states_by_index[self.current_state_index]
- # TODO(pabloem): Make state_name required once all callers migrate,
- # and the legacy path is removed.
- def scoped_state(self, step_name, state_name=None, io_target=None):
+ cpdef _scoped_state(self, counter_name, output_counter):
"""Returns a context manager managing transitions for a given state.
Args:
- step_name: A string with the name of the running step.
- state_name: A string with the name of the state (e.g. 'process', 'start')
- io_target: An IOTargetName object describing the io_target (e.g. writing
- or reading to side inputs, shuffle or state). Will often be None.
+ TODO(pabloem)
Returns:
A ScopedState for the set of step-state-io_target.
"""
- cdef ScopedState scoped_state
- if state_name is None:
- # If state_name is None, the worker is still using old style
- # msec counters.
- counter_name = '%s-%s-msecs' % (self.prefix, step_name)
- scoped_state = self.scoped_states_by_name.get(counter_name, None)
- else:
- counter_name = CounterName(state_name + '-msecs',
- stage_name=self.prefix,
- step_name=step_name,
- io_target=io_target)
- scoped_state = self.scoped_states_by_name.get(counter_name, None)
-
- if scoped_state is None:
- output_counter = self.counter_factory.get_counter(counter_name,
- Counter.SUM)
- new_state_index = len(self.scoped_states_by_index)
- scoped_state = ScopedState(self, counter_name,
- new_state_index, output_counter)
- # Both scoped_states_by_index and scoped_state.nsecs are accessed
- # by the sampling thread; initialize them under the lock.
- pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
- self.scoped_states_by_index.append(scoped_state)
- scoped_state.nsecs = 0
- pythread.PyThread_release_lock(self.lock)
- self.scoped_states_by_name[counter_name] = scoped_state
+ new_state_index = len(self.scoped_states_by_index)
+ scoped_state = ScopedState(self, counter_name,
+ new_state_index, output_counter)
+ # Both scoped_states_by_index and scoped_state.nsecs are accessed
+ # by the sampling thread; initialize them under the lock.
+ pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
+ self.scoped_states_by_index.append(scoped_state)
+ scoped_state._nsecs = 0
+ pythread.PyThread_release_lock(self.lock)
return scoped_state
- def commit_counters(self):
- """Updates output counters with latest state statistics."""
- for state in self.scoped_states_by_name.values():
- state_msecs = int(1e-6 * state.nsecs)
- state.counter.update(state_msecs - state.counter.value())
-
cdef class ScopedState(object):
"""Context manager class managing transitions for a given sampler state."""
@@ -252,7 +180,7 @@ cdef class ScopedState(object):
cdef readonly int32_t state_index
cdef readonly object counter
cdef readonly object name
- cdef readonly int64_t nsecs
+ cdef readonly int64_t _nsecs
cdef int32_t old_state_index
def __init__(self, sampler, name, state_index, counter=None):
@@ -261,6 +189,16 @@ cdef class ScopedState(object):
self.state_index = state_index
self.counter = counter
+ @property
+ def nsecs(self):
+ return self._nsecs
+
+ def sampled_seconds(self):
+ return 1e-9 * self.nsecs
+
+ def __repr__(self):
+ return "ScopedState[%s, %s]" % (self.name, self.nsecs)
+
cpdef __enter__(self):
self.old_state_index = self.sampler.current_state_index
pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK)
@@ -273,9 +211,3 @@ cdef class ScopedState(object):
self.sampler.current_state_index = self.old_state_index
pythread.PyThread_release_lock(self.sampler.lock)
self.sampler.state_transition_count += 1
-
- def __repr__(self):
- return "ScopedState[%s, %s, %s]" % (self.name, self.state_index,
self.nsecs)
-
- def sampled_seconds(self):
- return 1e-9 * self.nsecs
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
new file mode 100644
index 0000000..dafe3b4
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This module is experimental. No backwards-compatibility guarantees.
+
+
+class StateSampler(object):
+
+ def __init__(self, sampling_period_ms):
+ self._state_stack = [ScopedState(None, self, None)]
+ self.state_transition_count = 0
+ self.time_since_transition = 0
+ self.started = False
+ self.finished = False
+
+ def current_state(self):
+ """Returns the current execution state."""
+ return self._state_stack[-1]
+
+ def _scoped_state(self, counter_name, output_counter):
+ return ScopedState(self, counter_name, output_counter)
+
+ def _enter_state(self, state):
+ self.state_transition_count += 1
+ self._state_stack.append(state)
+
+ def _exit_state(self):
+ self.state_transition_count += 1
+ self._state_stack.pop()
+
+ def start(self):
+ # Sampling not yet supported. Only state tracking at the moment.
+ self.started = True
+
+ def stop(self):
+ self.finished = True
+
+ def get_info(self):
+ """Returns StateSamplerInfo with transition statistics."""
+ return StateSamplerInfo(
+ self.current_state().name, self.transition_count, 0)
+
+
+class ScopedState(object):
+
+ def __init__(self, sampler, name, counter=None):
+ self.state_sampler = sampler
+ self.name = name
+ self.counter = counter
+ self.nsecs = 0
+
+ def sampled_seconds(self):
+ return 1e-9 * self.nsecs
+
+ def __repr__(self):
+ return "ScopedState[%s, %s]" % (self.name, self.nsecs)
+
+ def __enter__(self):
+ self.state_sampler._enter_state(self)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.state_sampler._exit_state()
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py
b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index 2f2c8be..63dc6f8 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -22,22 +22,13 @@ import logging
import time
import unittest
-from nose.plugins.skip import SkipTest
-
+from apache_beam.runners.worker import statesampler
from apache_beam.utils.counters import CounterFactory
+from apache_beam.utils.counters import CounterName
class StateSamplerTest(unittest.TestCase):
- def setUp(self):
- try:
- # pylint: disable=global-variable-not-assigned
- global statesampler
- from apache_beam.runners.worker import statesampler
- except ImportError:
- raise SkipTest('State sampler not compiled.')
- super(StateSamplerTest, self).setUp()
-
def test_basic_sampler(self):
# Set up state sampler.
counter_factory = CounterFactory()
@@ -46,21 +37,38 @@ class StateSamplerTest(unittest.TestCase):
# Run basic workload transitioning between 3 states.
sampler.start()
- with sampler.scoped_state('statea'):
+ with sampler.scoped_state('step1', 'statea'):
time.sleep(0.1)
- with sampler.scoped_state('stateb'):
+ self.assertEqual(
+ sampler.current_state().name,
+ CounterName(
+ 'statea-msecs', step_name='step1', stage_name='basic'))
+ with sampler.scoped_state('step1', 'stateb'):
time.sleep(0.2 / 2)
- with sampler.scoped_state('statec'):
+ self.assertEqual(
+ sampler.current_state().name,
+ CounterName(
+ 'stateb-msecs', step_name='step1', stage_name='basic'))
+ with sampler.scoped_state('step1', 'statec'):
time.sleep(0.3)
+ self.assertEqual(
+ sampler.current_state().name,
+ CounterName(
+ 'statec-msecs', step_name='step1', stage_name='basic'))
time.sleep(0.2 / 2)
+
sampler.stop()
sampler.commit_counters()
+ if not statesampler.FAST_SAMPLER:
+ # The slow sampler does not implement sampling, so we won't test it.
+ return
+
# Test that sampled state timings are close to their expected values.
expected_counter_values = {
- 'basic-statea-msecs': 100,
- 'basic-stateb-msecs': 200,
- 'basic-statec-msecs': 300,
+ CounterName('statea-msecs', step_name='step1', stage_name='basic'):
100,
+ CounterName('stateb-msecs', step_name='step1', stage_name='basic'):
200,
+ CounterName('statec-msecs', step_name='step1', stage_name='basic'):
300,
}
for counter in counter_factory.get_counters():
self.assertIn(counter.name, expected_counter_values)
@@ -76,9 +84,9 @@ class StateSamplerTest(unittest.TestCase):
sampling_period_ms=10)
# Run basic workload transitioning between 3 states.
- state_a = sampler.scoped_state('statea')
- state_b = sampler.scoped_state('stateb')
- state_c = sampler.scoped_state('statec')
+ state_a = sampler.scoped_state('step1', 'statea')
+ state_b = sampler.scoped_state('step1', 'stateb')
+ state_c = sampler.scoped_state('step1', 'statec')
start_time = time.time()
sampler.start()
for _ in range(100000):
diff --git a/sdks/python/apache_beam/utils/counters.py
b/sdks/python/apache_beam/utils/counters.py
index e2e0a1a..95b2117 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -66,9 +66,6 @@ class CounterName(_CounterName):
system_name, namespace,
origin, output_index, io_target)
- def __str__(self):
- return '%s' % self._str_internal()
-
def __repr__(self):
return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self)))
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].