[
https://issues.apache.org/jira/browse/BEAM-2732?focusedWorklogId=91441&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91441
]
ASF GitHub Bot logged work on BEAM-2732:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Apr/18 19:30
Start Date: 16/Apr/18 19:30
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #4387:
[BEAM-2732] Metrics rely on statesampler state
URL: https://github.com/apache/beam/pull/4387#discussion_r181856996
##########
File path: sdks/python/apache_beam/runners/direct/executor.py
##########
@@ -290,70 +293,87 @@ def __init__(self, transform_evaluator_registry,
evaluation_context,
self._retry_count = 0
self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE
- def call(self):
+ def call(self, state_sampler):
self._call_count += 1
assert self._call_count <= (1 + len(self._applied_ptransform.side_inputs))
metrics_container = MetricsContainer(self._applied_ptransform.full_label)
- scoped_metrics_container = ScopedMetricsContainer(metrics_container)
-
- for side_input in self._applied_ptransform.side_inputs:
- # Find the projection of main's window onto the side input's window.
- window_mapping_fn = side_input._view_options().get(
- 'window_mapping_fn', sideinputs._global_window_mapping_fn)
- main_onto_side_window = window_mapping_fn(self._latest_main_input_window)
- block_until = main_onto_side_window.end
-
- if side_input not in self._side_input_values:
- value = self._evaluation_context.get_value_or_block_until_ready(
- side_input, self, block_until)
- if not value:
- # Monitor task will reschedule this executor once the side input is
- # available.
- return
- self._side_input_values[side_input] = value
- side_input_values = [self._side_input_values[side_input]
- for side_input in
self._applied_ptransform.side_inputs]
-
- while self._retry_count < self._max_retries_per_bundle:
- try:
- self.attempt_call(metrics_container,
- scoped_metrics_container,
- side_input_values)
- break
- except Exception as e:
- self._retry_count += 1
- logging.error(
- 'Exception at bundle %r, due to an exception.\n %s',
- self._input_bundle, traceback.format_exc())
- if self._retry_count == self._max_retries_per_bundle:
- logging.error('Giving up after %s attempts.',
- self._max_retries_per_bundle)
- self._completion_callback.handle_exception(self, e)
+ start_state = state_sampler.scoped_state(
+ self._applied_ptransform.full_label,
+ 'start',
+ metrics_container=metrics_container)
+ process_state = state_sampler.scoped_state(
+ self._applied_ptransform.full_label,
+ 'process',
+ metrics_container=metrics_container)
+ finish_state = state_sampler.scoped_state(
+ self._applied_ptransform.full_label,
+ 'finish',
+ metrics_container=metrics_container)
+
+ with start_state:
+ for side_input in self._applied_ptransform.side_inputs:
+ # Find the projection of main's window onto the side input's window.
+ window_mapping_fn = side_input._view_options().get(
+ 'window_mapping_fn', sideinputs._global_window_mapping_fn)
+ main_onto_side_window = window_mapping_fn(
+ self._latest_main_input_window)
+ block_until = main_onto_side_window.end
+
+ if side_input not in self._side_input_values:
+ value = self._evaluation_context.get_value_or_block_until_ready(
+ side_input, self, block_until)
+ if not value:
+ # Monitor task will reschedule this executor once the side input is
+ # available.
+ return
+ self._side_input_values[side_input] = value
+ side_input_values = [
+ self._side_input_values[side_input]
+ for side_input in self._applied_ptransform.side_inputs]
+
+ while self._retry_count < self._max_retries_per_bundle:
+ try:
+ self.attempt_call(metrics_container,
+ side_input_values,
+ process_state,
+ finish_state)
+ break
+ except Exception as e:
+ self._retry_count += 1
+ logging.error(
+ 'Exception at bundle %r, due to an exception.\n %s',
+ self._input_bundle, traceback.format_exc())
+ if self._retry_count == self._max_retries_per_bundle:
+ logging.error('Giving up after %s attempts.',
+ self._max_retries_per_bundle)
+ self._completion_callback.handle_exception(self, e)
self._evaluation_context.metrics().commit_physical(
self._input_bundle,
metrics_container.get_cumulative())
self._transform_evaluation_state.complete(self)
def attempt_call(self, metrics_container,
- scoped_metrics_container,
- side_input_values):
+ side_input_values,
+ process_state,
+ finish_state):
+ """Attempts to run a bundle. Called within the 'start' ExecutionState."""
evaluator = self._transform_evaluator_registry.get_evaluator(
self._applied_ptransform, self._input_bundle,
- side_input_values, scoped_metrics_container)
+ side_input_values)
- with scoped_metrics_container:
- evaluator.start_bundle()
+ evaluator.start_bundle()
Review comment:
Why is this not called from within start_state?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 91441)
Time Spent: 8h 20m (was: 8h 10m)
> State tracking in Python is inefficient and has duplicated code
> ---------------------------------------------------------------
>
> Key: BEAM-2732
> URL: https://issues.apache.org/jira/browse/BEAM-2732
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
> Priority: Major
> Time Spent: 8h 20m
> Remaining Estimate: 0h
>
> e.g logging and metrics keep state separately. State tracking should be
> unified.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)