[ 
https://issues.apache.org/jira/browse/BEAM-2732?focusedWorklogId=100048&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100048
 ]

ASF GitHub Bot logged work on BEAM-2732:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/May/18 15:39
            Start Date: 09/May/18 15:39
    Worklog Time Spent: 10m 
      Work Description: pabloem closed pull request #5299: [BEAM-2732] 
StateSampler knows the execution thread it tracks.
URL: https://github.com/apache/beam/pull/5299
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
index d3980928ac7..8a00079c984 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -47,7 +47,10 @@ def get_current_tracker():
 
 StateSamplerInfo = namedtuple(
     'StateSamplerInfo',
-    ['state_name', 'transition_count', 'time_since_transition'])
+    ['state_name',
+     'transition_count',
+     'time_since_transition',
+     'tracked_thread'])
 
 
 # Default period for sampling current state of pipeline execution.
@@ -63,6 +66,7 @@ def __init__(self, prefix, counter_factory,
     self._counter_factory = counter_factory
     self._states_by_name = {}
     self.sampling_period_ms = sampling_period_ms
+    self.tracked_thread = None
     super(StateSampler, self).__init__(sampling_period_ms)
 
   def stop_if_still_running(self):
@@ -70,6 +74,7 @@ def stop_if_still_running(self):
       self.stop()
 
   def start(self):
+    self.tracked_thread = threading.current_thread()
     set_current_tracker(self)
     execution.metrics_startup()
     super(StateSampler, self).start()
@@ -80,7 +85,8 @@ def get_info(self):
     return StateSamplerInfo(
         self.current_state().name,
         self.state_transition_count,
-        self.time_since_transition)
+        self.time_since_transition,
+        self.tracked_thread)
 
   def scoped_state(self,
                    step_name,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 100048)
    Time Spent: 14h 10m  (was: 14h)

> State tracking in Python is inefficient and has duplicated code
> ---------------------------------------------------------------
>
>                 Key: BEAM-2732
>                 URL: https://issues.apache.org/jira/browse/BEAM-2732
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 14h 10m
>  Remaining Estimate: 0h
>
> e.g logging and metrics keep state separately. State tracking should be 
> unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to