[ 
https://issues.apache.org/jira/browse/BEAM-7516?focusedWorklogId=376031&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376031
 ]

ASF GitHub Bot logged work on BEAM-7516:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jan/20 01:28
            Start Date: 23/Jan/20 01:28
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on pull request #10291: 
[BEAM-7516][BEAM-8823] FnApiRunner works with work queues, and a primitive 
watermark manager
URL: https://github.com/apache/beam/pull/10291#discussion_r369890841
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##########
 @@ -554,62 +527,277 @@ def run_stages(self,
     """
     worker_handler_manager = WorkerHandlerManager(
         stage_context.components.environments, self._provision_info)
-    metrics_by_stage = {}
+    input_queue_manager = _ProcessingQueueManager()
+    pcoll_buffers = collections.defaultdict(_ListBuffer)  # type: 
DefaultDict[bytes, _ListBuffer]
     monitoring_infos_by_stage = {}
+    metrics_by_stage = {}
 
+    execution_context = PipelineExecutionContext(stage_context.components,
+                                                 stages)
+    self._enqueue_all_initial_inputs(stages,
+                                     input_queue_manager,
+                                     execution_context)
     try:
       with self.maybe_profile():
-        pcoll_buffers = collections.defaultdict(_ListBuffer)  # type: 
DefaultDict[bytes, _ListBuffer]
-        for stage in stages:
-          stage_results = self._run_stage(
+        while len(input_queue_manager.ready_inputs) > 0:
+          stage_and_ready_elements = input_queue_manager.ready_inputs.deque()
+          consuming_stage_name = stage_and_ready_elements[0]
+          next_ready_elements = stage_and_ready_elements[1]
+
+          stage = execution_context.stages_per_name[consuming_stage_name].stage
+          data_input, data_output = execution_context.endpoints_per_stage[
+              stage.name]
+
+          for transform_name, _ in data_input.items():
+            if transform_name in next_ready_elements:
+              continue
+            # Empty-filling the data input.
+            next_ready_elements[transform_name] = _ListBuffer()
+          endpoints_for_execution = (next_ready_elements, data_output)
+
+          logging.info('Executing bundle for stage %s', stage.name)
+          logging.debug('Endpoints for execution: %s', endpoints_for_execution)
+          stage_results, output_bundles, deferred_inputs = 
self._execute_bundle(
+              execution_context,
               worker_handler_manager.get_worker_handlers,
               stage_context.components,
               stage,
               pcoll_buffers,
-              stage_context.safe_coders)
-          metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
-          monitoring_infos_by_stage[stage.name] = (
-              stage_results.process_bundle.monitoring_infos)
+              stage_context.safe_coders,
+              endpoints_for_execution)
+          self._merge_metrics_from_stage_execution(monitoring_infos_by_stage,
+                                                   metrics_by_stage,
+                                                   stage_results, stage.name)
+
+          output_bundles_with_timestamps = self._update_watermarks(
+              execution_context, stage, data_input, output_bundles,
+              deferred_inputs)
+
+          self._process_output_bundles(deferred_inputs, stage,
+                                       execution_context,
+                                       output_bundles_with_timestamps,
+                                       input_queue_manager)
+          self._schedule_newly_ready_bundles(execution_context,
+                                             input_queue_manager)
+
     finally:
       worker_handler_manager.close_all()
+
+    if len(input_queue_manager.watermark_pending_inputs) > 0:
+      raise RuntimeError('There are bundles pending processing that can not '
+                         'be scheduled because the watermark is stuck. '
+                         'This represents an error in Apache Beam.'
+                         '\n\t%s', 
input_queue_manager.watermark_pending_inputs)
     return RunnerResult(
         runner.PipelineState.DONE, monitoring_infos_by_stage, metrics_by_stage)
 
-  def _store_side_inputs_in_state(self,
-                                  worker_handler,  # type: WorkerHandler
-                                  context,  # type: 
pipeline_context.PipelineContext
-                                  pipeline_components,  # type: 
beam_runner_api_pb2.Components
-                                  data_side_input,  # type: DataSideInput
-                                  pcoll_buffers,  # type: Mapping[bytes, 
_ListBuffer]
-                                  safe_coders):
-    for (transform_id, tag), (buffer_id, si) in data_side_input.items():
-      _, pcoll_id = split_buffer_id(buffer_id)
-      value_coder = context.coders[safe_coders[
-          pipeline_components.pcollections[pcoll_id].coder_id]]
-      elements_by_window = _WindowGroupingBuffer(si, value_coder)
-      for element_data in pcoll_buffers[buffer_id]:
-        elements_by_window.append(element_data)
-
-      if si.urn == common_urns.side_inputs.ITERABLE.urn:
-        for _, window, elements_data in elements_by_window.encoded_items():
-          state_key = beam_fn_api_pb2.StateKey(
-              iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
-                  transform_id=transform_id,
-                  side_input_id=tag,
-                  window=window))
-          worker_handler.state.append_raw(state_key, elements_data)
-      elif si.urn == common_urns.side_inputs.MULTIMAP.urn:
-        for key, window, elements_data in elements_by_window.encoded_items():
-          state_key = beam_fn_api_pb2.StateKey(
-              multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput(
-                  transform_id=transform_id,
-                  side_input_id=tag,
-                  window=window,
-                  key=key))
-          worker_handler.state.append_raw(state_key, elements_data)
+  @staticmethod
+  def _merge_metrics_from_stage_execution(
+      monitoring_infos_by_stage,  # type: Dict[str, List[Any]]
+      metrics_by_stage,  # type: Dict[str, List[beam_fn_api_pb2.Metrics]]
+      stage_results,  # type: beam_fn_api_pb2.InstructionResponse
+      stage_name  # type: str
+  ):
+    # type: (...) -> None
+    stage_minfos = stage_results.process_bundle.monitoring_infos
+    if stage_name in monitoring_infos_by_stage:
+      stage_minfos = monitoring_infos.consolidate(
+          itertools.chain(stage_minfos, monitoring_infos_by_stage[stage_name]))
+
+    monitoring_infos_by_stage[stage_name] = stage_minfos
+
+    # TODO(pabloem): Note that these are not merged.
+    metrics_by_stage[stage_name] = stage_results.process_bundle.metrics
+
+  @staticmethod
+  def _schedule_newly_ready_bundles(execution_context, input_queue_manager):
+    # type: (PipelineExecutionContext, _ProcessingQueueManager) -> None
+    """Inspect watermark pending bundles, and schedule ready ones.
+
+    This function should be called after the watermarks have been recalculated.
+    """
+    requed_inputs = []
+    while len(input_queue_manager.watermark_pending_inputs) > 0:
 
 Review comment:
   Would this better be expressed as a for loop over all pending inputs? 
(Alternatively, would it make sense for the data structure to be 
`Dict[StageName, Heap[Tuple[Watermark, Inputs]]]` and one could then pop all 
(ordered) ready inputs on each watermark advance? Inspecting every pending 
queue at every stage execution seems like it'd be quadratic time. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 376031)
    Time Spent: 6h 10m  (was: 6h)

> Add a watermark manager for the fn_api_runner
> ---------------------------------------------
>
>                 Key: BEAM-7516
>                 URL: https://issues.apache.org/jira/browse/BEAM-7516
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> To track watermarks for each stage



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to