Repository: beam
Updated Branches:
  refs/heads/master baa6ebc0c -> f9bc76364


Change the state sampler to use structured names

The new io_target argument allows states to track time spent in IO such
as side inputs, shuffle and state. Tests have passed , and the latest
commit only updates documentation.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/beacf9ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/beacf9ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/beacf9ff

Branch: refs/heads/master
Commit: beacf9ff38ae65dc5dc9fbf33563a5cb5b33a439
Parents: baa6ebc
Author: Pablo <pabl...@google.com>
Authored: Tue Aug 29 15:23:20 2017 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Tue Oct 3 09:48:26 2017 -0700

----------------------------------------------------------------------
 .../portability/maptask_executor_runner.py      |  2 +-
 .../runners/worker/bundle_processor.py          |  2 +-
 .../apache_beam/runners/worker/operations.py    |  6 +--
 .../apache_beam/runners/worker/statesampler.pyx | 46 ++++++++++++++++----
 .../runners/worker/statesampler_fake.py         |  2 +-
 .../runners/worker/statesampler_test.py         |  2 +-
 6 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/beacf9ff/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py 
b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index a20ceef..afb96fa 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -130,7 +130,7 @@ class MapTaskExecutorRunner(PipelineRunner):
       # Create the CounterFactory and StateSampler for this MapTask.
       # TODO(robertwb): Output counters produced here are currently ignored.
       counter_factory = CounterFactory()
-      state_sampler = statesampler.StateSampler('%s-' % ix, counter_factory)
+      state_sampler = statesampler.StateSampler('%s' % ix, counter_factory)
       map_executor = operations.SimpleMapTaskExecutor(
           operation_specs.MapTask(
               all_operations, 'S%02d' % ix,

http://git-wip-us.apache.org/repos/asf/beam/blob/beacf9ff/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 068aa0a..b69d002 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -193,7 +193,7 @@ class BundleProcessor(object):
     # from StateSampler.
     counter_factory = counters.CounterFactory()
     state_sampler = statesampler.StateSampler(
-        'fnapi-step%s-' % descriptor.id, counter_factory)
+        'fnapi-step%s' % descriptor.id, counter_factory)
 
     transform_factory = BeamTransformFactory(
         descriptor, self.data_channel_factory, counter_factory, state_sampler,

http://git-wip-us.apache.org/repos/asf/beam/blob/beacf9ff/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 1b61f8e..1136d99 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -122,11 +122,11 @@ class Operation(object):
 
     self.state_sampler = state_sampler
     self.scoped_start_state = self.state_sampler.scoped_state(
-        self.operation_name + '-start')
+        self.operation_name, 'start')
     self.scoped_process_state = self.state_sampler.scoped_state(
-        self.operation_name + '-process')
+        self.operation_name, 'process')
     self.scoped_finish_state = self.state_sampler.scoped_state(
-        self.operation_name + '-finish')
+        self.operation_name, 'finish')
     # TODO(ccy): the '-abort' state can be added when the abort is supported in
     # Operations.
     self.scoped_metrics_container = None

http://git-wip-us.apache.org/repos/asf/beam/blob/beacf9ff/sdks/python/apache_beam/runners/worker/statesampler.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx 
b/sdks/python/apache_beam/runners/worker/statesampler.pyx
index 3ff6c20..c562763 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx
@@ -40,12 +40,13 @@ import time
 
 
 from apache_beam.utils.counters import Counter
-
+from apache_beam.utils.counters import CounterName
 
 cimport cython
 from cpython cimport pythread
 from libc.stdint cimport int32_t, int64_t
 
+
 cdef extern from "Python.h":
   # This typically requires the GIL, but we synchronize the list modifications
   # we use this on via our own lock.
@@ -110,7 +111,10 @@ cdef class StateSampler(object):
   def __init__(self, prefix, counter_factory,
       sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS):
 
-    self.prefix = prefix
+    # TODO(pabloem): Remove this once all dashed prefixes are removed from
+    # the worker.
+    # We stop using prefixes with included dash.
+    self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix
     self.counter_factory = counter_factory
     self.sampling_period_ms = sampling_period_ms
 
@@ -180,21 +184,45 @@ cdef class StateSampler(object):
         self.scoped_states_by_index[self.current_state_index].name,
         self.state_transition_count)
 
-  def scoped_state(self, name):
-    """Returns a context manager managing transitions for a given state."""
-    cdef ScopedState scoped_state = self.scoped_states_by_name.get(name, None)
+  # TODO(pabloem): Make state_name required once all callers migrate,
+  #   and the legacy path is removed.
+  def scoped_state(self, step_name, state_name=None, io_target=None):
+    """Returns a context manager managing transitions for a given state.
+    Args:
+      step_name: A string with the name of the running step.
+      state_name: A string with the name of the state (e.g. 'process', 'start')
+      io_target: An IOTargetName object describing the io_target (e.g. writing
+        or reading to side inputs, shuffle or state). Will often be None.
+
+    Returns:
+      A ScopedState for the set of step-state-io_target.
+    """
+    cdef ScopedState scoped_state
+    if state_name is None:
+      # If state_name is None, the worker is still using old style
+      # msec counters.
+      counter_name = '%s-%s-msecs' % (self.prefix, step_name)
+      scoped_state = self.scoped_states_by_name.get(counter_name, None)
+    else:
+      counter_name = CounterName(state_name + '-msecs',
+                                 stage_name=self.prefix,
+                                 step_name=step_name,
+                                 io_target=io_target)
+      scoped_state = self.scoped_states_by_name.get(counter_name, None)
+
     if scoped_state is None:
-      output_counter = self.counter_factory.get_counter(
-          '%s%s-msecs' % (self.prefix,  name), Counter.SUM)
+      output_counter = self.counter_factory.get_counter(counter_name,
+                                                        Counter.SUM)
       new_state_index = len(self.scoped_states_by_index)
-      scoped_state = ScopedState(self, name, new_state_index, output_counter)
+      scoped_state = ScopedState(self, counter_name,
+                                 new_state_index, output_counter)
       # Both scoped_states_by_index and scoped_state.nsecs are accessed
       # by the sampling thread; initialize them under the lock.
       pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
       self.scoped_states_by_index.append(scoped_state)
       scoped_state.nsecs = 0
       pythread.PyThread_release_lock(self.lock)
-      self.scoped_states_by_name[name] = scoped_state
+      self.scoped_states_by_name[counter_name] = scoped_state
     return scoped_state
 
   def commit_counters(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/beacf9ff/sdks/python/apache_beam/runners/worker/statesampler_fake.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py 
b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
index 88ace8c..5cd0fd2 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
@@ -23,7 +23,7 @@ class StateSampler(object):
   def __init__(self, *args, **kwargs):
     pass
 
-  def scoped_state(self, name):
+  def scoped_state(self, step_name, state_name=None, io_target=None):
     return _FakeScopedState()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/beacf9ff/sdks/python/apache_beam/runners/worker/statesampler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py 
b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index 663cdec..2a85610 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -40,7 +40,7 @@ class StateSamplerTest(unittest.TestCase):
   def test_basic_sampler(self):
     # Set up state sampler.
     counter_factory = CounterFactory()
-    sampler = statesampler.StateSampler('basic-', counter_factory,
+    sampler = statesampler.StateSampler('basic', counter_factory,
                                         sampling_period_ms=1)
 
     # Run basic workload transitioning between 3 states.

Reply via email to