[
https://issues.apache.org/jira/browse/BEAM-7516?focusedWorklogId=376027&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376027
]
ASF GitHub Bot logged work on BEAM-7516:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Jan/20 01:28
Start Date: 23/Jan/20 01:28
Worklog Time Spent: 10m
Work Description: robertwb commented on pull request #10291:
[BEAM-7516][BEAM-8823] FnApiRunner works with work queues, and a primitive
watermark manager
URL: https://github.com/apache/beam/pull/10291#discussion_r369886202
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
##########
@@ -554,62 +527,277 @@ def run_stages(self,
"""
worker_handler_manager = WorkerHandlerManager(
stage_context.components.environments, self._provision_info)
- metrics_by_stage = {}
+ input_queue_manager = _ProcessingQueueManager()
+ pcoll_buffers = collections.defaultdict(_ListBuffer) # type:
DefaultDict[bytes, _ListBuffer]
monitoring_infos_by_stage = {}
+ metrics_by_stage = {}
+ execution_context = PipelineExecutionContext(stage_context.components,
+ stages)
+ self._enqueue_all_initial_inputs(stages,
+ input_queue_manager,
+ execution_context)
try:
with self.maybe_profile():
- pcoll_buffers = collections.defaultdict(_ListBuffer) # type:
DefaultDict[bytes, _ListBuffer]
- for stage in stages:
- stage_results = self._run_stage(
+ while len(input_queue_manager.ready_inputs) > 0:
+ stage_and_ready_elements = input_queue_manager.ready_inputs.deque()
+ consuming_stage_name = stage_and_ready_elements[0]
+ next_ready_elements = stage_and_ready_elements[1]
+
+ stage = execution_context.stages_per_name[consuming_stage_name].stage
+ data_input, data_output = execution_context.endpoints_per_stage[
+ stage.name]
+
+ for transform_name, _ in data_input.items():
+ if transform_name in next_ready_elements:
+ continue
Review comment:
Nit, I'd do
```
if transform_name not in next_ready_elements:
next_ready_elements[transform_name] = _ListBuffer()
```
Also add a comment as to why this is needed. (The input channel blocks on
all possible inputs, i.e. we need to at least send a done signal for all of
them. It might actually make more sense to push this logic down into where the
inputs are actually fed in than here which is a bit out of place.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 376027)
Time Spent: 5h 50m (was: 5h 40m)
> Add a watermark manager for the fn_api_runner
> ---------------------------------------------
>
> Key: BEAM-7516
> URL: https://issues.apache.org/jira/browse/BEAM-7516
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
> Priority: Major
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> To track watermarks for each stage
--
This message was sent by Atlassian Jira
(v8.3.4#803005)