[ 
https://issues.apache.org/jira/browse/BEAM-2732?focusedWorklogId=91517&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91517
 ]

ASF GitHub Bot logged work on BEAM-2732:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Apr/18 21:59
            Start Date: 16/Apr/18 21:59
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #4387: 
[BEAM-2732] Metrics rely on statesampler state
URL: https://github.com/apache/beam/pull/4387#discussion_r181898590
 
 

 ##########
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##########
 @@ -290,70 +293,87 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
     self._retry_count = 0
     self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE
 
-  def call(self):
+  def call(self, state_sampler):
     self._call_count += 1
     assert self._call_count <= (1 + len(self._applied_ptransform.side_inputs))
     metrics_container = MetricsContainer(self._applied_ptransform.full_label)
-    scoped_metrics_container = ScopedMetricsContainer(metrics_container)
-
-    for side_input in self._applied_ptransform.side_inputs:
-      # Find the projection of main's window onto the side input's window.
-      window_mapping_fn = side_input._view_options().get(
-          'window_mapping_fn', sideinputs._global_window_mapping_fn)
-      main_onto_side_window = window_mapping_fn(self._latest_main_input_window)
-      block_until = main_onto_side_window.end
-
-      if side_input not in self._side_input_values:
-        value = self._evaluation_context.get_value_or_block_until_ready(
-            side_input, self, block_until)
-        if not value:
-          # Monitor task will reschedule this executor once the side input is
-          # available.
-          return
-        self._side_input_values[side_input] = value
-    side_input_values = [self._side_input_values[side_input]
-                         for side_input in 
self._applied_ptransform.side_inputs]
-
-    while self._retry_count < self._max_retries_per_bundle:
-      try:
-        self.attempt_call(metrics_container,
-                          scoped_metrics_container,
-                          side_input_values)
-        break
-      except Exception as e:
-        self._retry_count += 1
-        logging.error(
-            'Exception at bundle %r, due to an exception.\n %s',
-            self._input_bundle, traceback.format_exc())
-        if self._retry_count == self._max_retries_per_bundle:
-          logging.error('Giving up after %s attempts.',
-                        self._max_retries_per_bundle)
-          self._completion_callback.handle_exception(self, e)
+    start_state = state_sampler.scoped_state(
+        self._applied_ptransform.full_label,
+        'start',
+        metrics_container=metrics_container)
+    process_state = state_sampler.scoped_state(
+        self._applied_ptransform.full_label,
+        'process',
+        metrics_container=metrics_container)
+    finish_state = state_sampler.scoped_state(
+        self._applied_ptransform.full_label,
+        'finish',
+        metrics_container=metrics_container)
+
+    with start_state:
+      for side_input in self._applied_ptransform.side_inputs:
+        # Find the projection of main's window onto the side input's window.
+        window_mapping_fn = side_input._view_options().get(
+            'window_mapping_fn', sideinputs._global_window_mapping_fn)
+        main_onto_side_window = window_mapping_fn(
+            self._latest_main_input_window)
+        block_until = main_onto_side_window.end
+
+        if side_input not in self._side_input_values:
+          value = self._evaluation_context.get_value_or_block_until_ready(
+              side_input, self, block_until)
+          if not value:
+            # Monitor task will reschedule this executor once the side input is
+            # available.
+            return
+          self._side_input_values[side_input] = value
+      side_input_values = [
+          self._side_input_values[side_input]
+          for side_input in self._applied_ptransform.side_inputs]
+
+      while self._retry_count < self._max_retries_per_bundle:
+        try:
+          self.attempt_call(metrics_container,
+                            side_input_values,
+                            process_state,
+                            finish_state)
+          break
+        except Exception as e:
+          self._retry_count += 1
+          logging.error(
+              'Exception at bundle %r, due to an exception.\n %s',
+              self._input_bundle, traceback.format_exc())
+          if self._retry_count == self._max_retries_per_bundle:
+            logging.error('Giving up after %s attempts.',
+                          self._max_retries_per_bundle)
+            self._completion_callback.handle_exception(self, e)
 
     self._evaluation_context.metrics().commit_physical(
         self._input_bundle,
         metrics_container.get_cumulative())
     self._transform_evaluation_state.complete(self)
 
   def attempt_call(self, metrics_container,
-                   scoped_metrics_container,
-                   side_input_values):
+                   side_input_values,
+                   process_state,
+                   finish_state):
+    """Attempts to run a bundle. Called within the 'start' ExecutionState."""
     evaluator = self._transform_evaluator_registry.get_evaluator(
         self._applied_ptransform, self._input_bundle,
-        side_input_values, scoped_metrics_container)
+        side_input_values)
 
-    with scoped_metrics_container:
-      evaluator.start_bundle()
+    evaluator.start_bundle()
 
 Review comment:
   Yes, I think it'd be cleaner to have all states explicitly entered within 
attempt_call than make this an implicit assumption about the caller. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 91517)
    Time Spent: 10h 40m  (was: 10.5h)

> State tracking in Python is inefficient and has duplicated code
> ---------------------------------------------------------------
>
>                 Key: BEAM-2732
>                 URL: https://issues.apache.org/jira/browse/BEAM-2732
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> e.g logging and metrics keep state separately. State tracking should be 
> unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to