[ 
https://issues.apache.org/jira/browse/BEAM-2732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331275#comment-16331275
 ] 

ASF GitHub Bot commented on BEAM-2732:
--------------------------------------

robertwb closed pull request #4375: [BEAM-2732] Starting refactor of state 
tracking in Python
URL: https://github.com/apache/beam/pull/4375
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 6304f71df5e..83bb83a83ff 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,13 +23,14 @@
 from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.portability import maptask_executor_runner_test
 from apache_beam.runners.worker import sdk_worker
+from apache_beam.runners.worker import statesampler
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms import window
 
-try:
-  from apache_beam.runners.worker.statesampler import 
DEFAULT_SAMPLING_PERIOD_MS
-except ImportError:
+if statesampler.FAST_SAMPLER:
+  DEFAULT_SAMPLING_PERIOD_MS = statesampler.DEFAULT_SAMPLING_PERIOD_MS
+else:
   DEFAULT_SAMPLING_PERIOD_MS = 0
 
 
diff --git 
a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py 
b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 913ccd59585..74c6b03f5a6 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -36,15 +36,11 @@
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
+from apache_beam.runners.worker import statesampler
 from apache_beam.typehints import typehints
 from apache_beam.utils import profiler
 from apache_beam.utils.counters import CounterFactory
 
-try:
-  from apache_beam.runners.worker import statesampler
-except ImportError:
-  from apache_beam.runners.worker import statesampler_fake as statesampler
-
 # This module is experimental. No backwards-compatibility guarantees.
 
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 1b270b90372..fa303e88891 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -38,6 +38,7 @@
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
+from apache_beam.runners.worker import statesampler
 from apache_beam.transforms import sideinputs
 from apache_beam.utils import counters
 from apache_beam.utils import proto_utils
@@ -46,12 +47,6 @@
 # This module is experimental. No backwards-compatibility guarantees.
 
 
-try:
-  from apache_beam.runners.worker import statesampler
-except ImportError:
-  from apache_beam.runners.worker import statesampler_fake as statesampler
-
-
 DATA_INPUT_URN = 'urn:org.apache.beam:source:runner:0.1'
 DATA_OUTPUT_URN = 'urn:org.apache.beam:sink:runner:0.1'
 IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1'
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
new file mode 100644
index 00000000000..03af644846d
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This module is experimental. No backwards-compatibility guarantees.
+from collections import namedtuple
+
+from apache_beam.utils.counters import Counter
+from apache_beam.utils.counters import CounterName
+
+try:
+  from apache_beam.runners.worker import statesampler_fast as statesampler_impl
+  FAST_SAMPLER = True
+except ImportError:
+  from apache_beam.runners.worker import statesampler_slow as statesampler_impl
+  FAST_SAMPLER = False
+
+
+StateSamplerInfo = namedtuple(
+    'StateSamplerInfo',
+    ['state_name', 'transition_count', 'time_since_transition'])
+
+
+# Default period for sampling current state of pipeline execution.
+DEFAULT_SAMPLING_PERIOD_MS = 200
+
+
+class StateSampler(statesampler_impl.StateSampler):
+
+  def __init__(self, prefix, counter_factory,
+               sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS):
+    self.states_by_name = {}
+    self._prefix = prefix
+    self._counter_factory = counter_factory
+    self._states_by_name = {}
+    self.sampling_period_ms = sampling_period_ms
+    super(StateSampler, self).__init__(sampling_period_ms)
+
+  def stop_if_still_running(self):
+    if self.started and not self.finished:
+      self.stop()
+
+  def get_info(self):
+    """Returns StateSamplerInfo with transition statistics."""
+    return StateSamplerInfo(
+        self.current_state().name,
+        self.state_transition_count,
+        self.time_since_transition)
+
+  def scoped_state(self, step_name, state_name, io_target=None):
+    counter_name = CounterName(state_name + '-msecs',
+                               stage_name=self._prefix,
+                               step_name=step_name,
+                               io_target=io_target)
+    if counter_name in self._states_by_name:
+      return self._states_by_name[counter_name]
+    else:
+      output_counter = self._counter_factory.get_counter(counter_name,
+                                                         Counter.SUM)
+      self._states_by_name[counter_name] = super(
+          StateSampler, self)._scoped_state(counter_name, output_counter)
+      return self._states_by_name[counter_name]
+
+  def commit_counters(self):
+    """Updates output counters with latest state statistics."""
+    for state in self._states_by_name.values():
+      state_msecs = int(1e-6 * state.nsecs)
+      state.counter.update(state_msecs - state.counter.value())
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py 
b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
deleted file mode 100644
index bc56021520a..00000000000
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This module is experimental. No backwards-compatibility guarantees.
-
-
-class StateSampler(object):
-
-  def __init__(self, *args, **kwargs):
-    pass
-
-  def scoped_state(self, step_name, state_name=None, io_target=None):
-    return _FakeScopedState()
-
-  def start(self):
-    pass
-
-  def stop(self):
-    pass
-
-  def stop_if_still_running(self):
-    self.stop()
-
-  def commit_counters(self):
-    pass
-
-
-class _FakeScopedState(object):
-
-  def __enter__(self):
-    pass
-
-  def __exit__(self, *unused_args):
-    pass
-
-  def sampled_seconds(self):
-    return 0
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx 
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
similarity index 64%
rename from sdks/python/apache_beam/runners/worker/statesampler.pyx
rename to sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
index 1e371968a4f..d0b187818c1 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
@@ -34,13 +34,7 @@ thread queries the current state, the time spent since the 
previous sample is
 attributed to that state and accumulated.  Over time, this allows a granular
 runtime profile to be produced.
 """
-
 import threading
-import time
-
-
-from apache_beam.utils.counters import Counter
-from apache_beam.utils.counters import CounterName
 
 cimport cython
 from cpython cimport pythread
@@ -71,37 +65,14 @@ cdef inline int64_t get_nsec_time() nogil:
       current_time.tv_nsec)
 
 
-class StateSamplerInfo(object):
-  """Info for current state and transition statistics of StateSampler."""
-
-  def __init__(self, state_name, transition_count, time_since_transition):
-    self.state_name = state_name
-    self.transition_count = transition_count
-    self.time_since_transition = time_since_transition
-
-  def __repr__(self):
-    return ('<StateSamplerInfo state: %s time: %dns transitions: %d>'
-            % (self.state_name,
-               self.time_since_transition,
-               self.transition_count))
-
-
-# Default period for sampling current state of pipeline execution.
-DEFAULT_SAMPLING_PERIOD_MS = 200
-
-
 cdef class StateSampler(object):
   """Tracks time spent in states during pipeline execution."""
+  cdef int _sampling_period_ms
 
-  cdef object prefix
-  cdef object counter_factory
-  cdef int sampling_period_ms
-
-  cdef dict scoped_states_by_name
   cdef list scoped_states_by_index
 
-  cdef bint started
-  cdef bint finished
+  cdef public bint started
+  cdef public bint finished
   cdef object sampling_thread
 
   # This lock guards members that are shared between threads, specificaly
@@ -109,22 +80,16 @@ cdef class StateSampler(object):
   cdef pythread.PyThread_type_lock lock
 
   cdef public int64_t state_transition_count
-  cdef int64_t time_since_transition
+  cdef public int64_t time_since_transition
 
   cdef int32_t current_state_index
 
-  def __init__(self, prefix, counter_factory,
-      sampling_period_ms=DEFAULT_SAMPLING_PERIOD_MS):
-
-    # TODO(pabloem): Remove this once all dashed prefixes are removed from
-    # the worker.
-    # We stop using prefixes with included dash.
-    self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix
-    self.counter_factory = counter_factory
-    self.sampling_period_ms = sampling_period_ms
+  def __init__(self, sampling_period_ms, *args):
+    self._sampling_period_ms = sampling_period_ms
+    self.started = False
+    self.finished = False
 
     self.lock = pythread.PyThread_allocate_lock()
-    self.scoped_states_by_name = {}
 
     self.current_state_index = 0
     self.time_since_transition = 0
@@ -132,7 +97,6 @@ cdef class StateSampler(object):
     unknown_state = ScopedState(self, 'unknown', self.current_state_index)
     pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
     self.scoped_states_by_index = [unknown_state]
-    self.finished = False
     pythread.PyThread_release_lock(self.lock)
 
     # Assert that the compiler correctly aligned the current_state field.  This
@@ -152,7 +116,7 @@ cdef class StateSampler(object):
     cdef int64_t latest_transition_count = self.state_transition_count
     with nogil:
       while True:
-        usleep(self.sampling_period_ms * 1000)
+        usleep(self._sampling_period_ms * 1000)
         pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
         try:
           if self.finished:
@@ -161,7 +125,7 @@ cdef class StateSampler(object):
           # Take an address as we can't create a reference to the scope
           # without the GIL.
           nsecs_ptr = &(<ScopedState>PyList_GET_ITEM(
-              self.scoped_states_by_index, self.current_state_index)).nsecs
+              self.scoped_states_by_index, self.current_state_index))._nsecs
           nsecs_ptr[0] += elapsed_nsecs
           if latest_transition_count != self.state_transition_count:
             self.time_since_transition = 0
@@ -186,64 +150,28 @@ cdef class StateSampler(object):
     # pythread doesn't support conditions.
     self.sampling_thread.join()
 
-  def stop_if_still_running(self):
-    if self.started and not self.finished:
-      self.stop()
-
-  def get_info(self):
-    """Returns StateSamplerInfo with transition statistics."""
-    return StateSamplerInfo(
-        self.scoped_states_by_index[self.current_state_index].name,
-        self.state_transition_count,
-        self.time_since_transition)
+  def current_state(self):
+    return self.scoped_states_by_index[self.current_state_index]
 
-  # TODO(pabloem): Make state_name required once all callers migrate,
-  #   and the legacy path is removed.
-  def scoped_state(self, step_name, state_name=None, io_target=None):
+  cpdef _scoped_state(self, counter_name, output_counter):
     """Returns a context manager managing transitions for a given state.
     Args:
-      step_name: A string with the name of the running step.
-      state_name: A string with the name of the state (e.g. 'process', 'start')
-      io_target: An IOTargetName object describing the io_target (e.g. writing
-        or reading to side inputs, shuffle or state). Will often be None.
+     TODO(pabloem)
 
     Returns:
       A ScopedState for the set of step-state-io_target.
     """
-    cdef ScopedState scoped_state
-    if state_name is None:
-      # If state_name is None, the worker is still using old style
-      # msec counters.
-      counter_name = '%s-%s-msecs' % (self.prefix, step_name)
-      scoped_state = self.scoped_states_by_name.get(counter_name, None)
-    else:
-      counter_name = CounterName(state_name + '-msecs',
-                                 stage_name=self.prefix,
-                                 step_name=step_name,
-                                 io_target=io_target)
-      scoped_state = self.scoped_states_by_name.get(counter_name, None)
-
-    if scoped_state is None:
-      output_counter = self.counter_factory.get_counter(counter_name,
-                                                        Counter.SUM)
-      new_state_index = len(self.scoped_states_by_index)
-      scoped_state = ScopedState(self, counter_name,
-                                 new_state_index, output_counter)
-      # Both scoped_states_by_index and scoped_state.nsecs are accessed
-      # by the sampling thread; initialize them under the lock.
-      pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
-      self.scoped_states_by_index.append(scoped_state)
-      scoped_state.nsecs = 0
-      pythread.PyThread_release_lock(self.lock)
-      self.scoped_states_by_name[counter_name] = scoped_state
+    new_state_index = len(self.scoped_states_by_index)
+    scoped_state = ScopedState(self, counter_name,
+                               new_state_index, output_counter)
+    # Both scoped_states_by_index and scoped_state.nsecs are accessed
+    # by the sampling thread; initialize them under the lock.
+    pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
+    self.scoped_states_by_index.append(scoped_state)
+    scoped_state._nsecs = 0
+    pythread.PyThread_release_lock(self.lock)
     return scoped_state
 
-  def commit_counters(self):
-    """Updates output counters with latest state statistics."""
-    for state in self.scoped_states_by_name.values():
-      state_msecs = int(1e-6 * state.nsecs)
-      state.counter.update(state_msecs - state.counter.value())
-
 
 cdef class ScopedState(object):
   """Context manager class managing transitions for a given sampler state."""
@@ -252,7 +180,7 @@ cdef class ScopedState(object):
   cdef readonly int32_t state_index
   cdef readonly object counter
   cdef readonly object name
-  cdef readonly int64_t nsecs
+  cdef readonly int64_t _nsecs
   cdef int32_t old_state_index
 
   def __init__(self, sampler, name, state_index, counter=None):
@@ -261,6 +189,16 @@ cdef class ScopedState(object):
     self.state_index = state_index
     self.counter = counter
 
+  @property
+  def nsecs(self):
+    return self._nsecs
+
+  def sampled_seconds(self):
+    return 1e-9 * self.nsecs
+
+  def __repr__(self):
+    return "ScopedState[%s, %s]" % (self.name, self.nsecs)
+
   cpdef __enter__(self):
     self.old_state_index = self.sampler.current_state_index
     pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK)
@@ -273,9 +211,3 @@ cdef class ScopedState(object):
     self.sampler.current_state_index = self.old_state_index
     pythread.PyThread_release_lock(self.sampler.lock)
     self.sampler.state_transition_count += 1
-
-  def __repr__(self):
-    return "ScopedState[%s, %s, %s]" % (self.name, self.state_index, 
self.nsecs)
-
-  def sampled_seconds(self):
-    return 1e-9 * self.nsecs
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py 
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
new file mode 100644
index 00000000000..dafe3b46887
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This module is experimental. No backwards-compatibility guarantees.
+
+
+class StateSampler(object):
+
+  def __init__(self, sampling_period_ms):
+    self._state_stack = [ScopedState(None, self, None)]
+    self.state_transition_count = 0
+    self.time_since_transition = 0
+    self.started = False
+    self.finished = False
+
+  def current_state(self):
+    """Returns the current execution state."""
+    return self._state_stack[-1]
+
+  def _scoped_state(self, counter_name, output_counter):
+    return ScopedState(self, counter_name, output_counter)
+
+  def _enter_state(self, state):
+    self.state_transition_count += 1
+    self._state_stack.append(state)
+
+  def _exit_state(self):
+    self.state_transition_count += 1
+    self._state_stack.pop()
+
+  def start(self):
+    # Sampling not yet supported. Only state tracking at the moment.
+    self.started = True
+
+  def stop(self):
+    self.finished = True
+
+  def get_info(self):
+    """Returns StateSamplerInfo with transition statistics."""
+    return StateSamplerInfo(
+        self.current_state().name, self.transition_count, 0)
+
+
+class ScopedState(object):
+
+  def __init__(self, sampler, name, counter=None):
+    self.state_sampler = sampler
+    self.name = name
+    self.counter = counter
+    self.nsecs = 0
+
+  def sampled_seconds(self):
+    return 1e-9 * self.nsecs
+
+  def __repr__(self):
+    return "ScopedState[%s, %s]" % (self.name, self.nsecs)
+
+  def __enter__(self):
+    self.state_sampler._enter_state(self)
+
+  def __exit__(self, exc_type, exc_value, traceback):
+    self.state_sampler._exit_state()
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py 
b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index 2f2c8bea4f7..63dc6f899bf 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -22,22 +22,13 @@
 import time
 import unittest
 
-from nose.plugins.skip import SkipTest
-
+from apache_beam.runners.worker import statesampler
 from apache_beam.utils.counters import CounterFactory
+from apache_beam.utils.counters import CounterName
 
 
 class StateSamplerTest(unittest.TestCase):
 
-  def setUp(self):
-    try:
-      # pylint: disable=global-variable-not-assigned
-      global statesampler
-      from apache_beam.runners.worker import statesampler
-    except ImportError:
-      raise SkipTest('State sampler not compiled.')
-    super(StateSamplerTest, self).setUp()
-
   def test_basic_sampler(self):
     # Set up state sampler.
     counter_factory = CounterFactory()
@@ -46,21 +37,38 @@ def test_basic_sampler(self):
 
     # Run basic workload transitioning between 3 states.
     sampler.start()
-    with sampler.scoped_state('statea'):
+    with sampler.scoped_state('step1', 'statea'):
       time.sleep(0.1)
-      with sampler.scoped_state('stateb'):
+      self.assertEqual(
+          sampler.current_state().name,
+          CounterName(
+              'statea-msecs', step_name='step1', stage_name='basic'))
+      with sampler.scoped_state('step1', 'stateb'):
         time.sleep(0.2 / 2)
-        with sampler.scoped_state('statec'):
+        self.assertEqual(
+            sampler.current_state().name,
+            CounterName(
+                'stateb-msecs', step_name='step1', stage_name='basic'))
+        with sampler.scoped_state('step1', 'statec'):
           time.sleep(0.3)
+          self.assertEqual(
+              sampler.current_state().name,
+              CounterName(
+                  'statec-msecs', step_name='step1', stage_name='basic'))
         time.sleep(0.2 / 2)
+
     sampler.stop()
     sampler.commit_counters()
 
+    if not statesampler.FAST_SAMPLER:
+      # The slow sampler does not implement sampling, so we won't test it.
+      return
+
     # Test that sampled state timings are close to their expected values.
     expected_counter_values = {
-        'basic-statea-msecs': 100,
-        'basic-stateb-msecs': 200,
-        'basic-statec-msecs': 300,
+        CounterName('statea-msecs', step_name='step1', stage_name='basic'): 
100,
+        CounterName('stateb-msecs', step_name='step1', stage_name='basic'): 
200,
+        CounterName('statec-msecs', step_name='step1', stage_name='basic'): 
300,
     }
     for counter in counter_factory.get_counters():
       self.assertIn(counter.name, expected_counter_values)
@@ -76,9 +84,9 @@ def test_sampler_transition_overhead(self):
                                         sampling_period_ms=10)
 
     # Run basic workload transitioning between 3 states.
-    state_a = sampler.scoped_state('statea')
-    state_b = sampler.scoped_state('stateb')
-    state_c = sampler.scoped_state('statec')
+    state_a = sampler.scoped_state('step1', 'statea')
+    state_b = sampler.scoped_state('step1', 'stateb')
+    state_c = sampler.scoped_state('step1', 'statec')
     start_time = time.time()
     sampler.start()
     for _ in range(100000):
diff --git a/sdks/python/apache_beam/utils/counters.py 
b/sdks/python/apache_beam/utils/counters.py
index e2e0a1a730b..95b2117cf38 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -66,9 +66,6 @@ def __new__(cls, name, stage_name=None, step_name=None,
                                            system_name, namespace,
                                            origin, output_index, io_target)
 
-  def __str__(self):
-    return '%s' % self._str_internal()
-
   def __repr__(self):
     return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self)))
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> State tracking in Python is inefficient and has duplicated code
> ---------------------------------------------------------------
>
>                 Key: BEAM-2732
>                 URL: https://issues.apache.org/jira/browse/BEAM-2732
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>
> e.g logging and metrics keep state separately. State tracking should be 
> unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to