[ 
https://issues.apache.org/jira/browse/BEAM-8823?focusedWorklogId=660590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-660590
 ]

ASF GitHub Bot logged work on BEAM-8823:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Oct/21 21:16
            Start Date: 05/Oct/21 21:16
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on a change in pull request #15441:
URL: https://github.com/apache/beam/pull/15441#discussion_r722713049



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,106 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload != translations.IMPULSE_BUFFER:
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+          elif t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}
+        else:
+          assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN
+          coder_id = self.data_channel_coders[only_element(
+              transform.inputs.values())]
+        # For every DATA_INPUT or DATA_OUTPUT operation, we need to replace the
+        # payload with the GRPC configuration for the Data channel.
+        bundle_manager = self.bundle_manager_for(stage)
+        data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
+        data_api_service_descriptor = (
+            bundle_manager.data_api_service_descriptor())
+        if data_api_service_descriptor:
+          data_spec.api_service_descriptor.url = (
+              data_api_service_descriptor.url)
+        transform.spec.payload = data_spec.SerializeToString()
+      elif transform.spec.urn in translations.PAR_DO_URNS:
+        payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+        if payload.side_inputs:
+          # If the stage needs side inputs, then it's not ready to be
+          # executed.
+          ready_to_schedule = False
+    if data_input and ready_to_schedule:
+      # We push the data inputs, along with the name of the consuming stage.
+      _LOGGER.debug('Scheduling bundle in stage for execution: %s', stage.name)
+      self.queues.ready_inputs.enque((stage.name, DataInput(data_input, {})))
+    elif data_input and not ready_to_schedule:
+      _LOGGER.debug(
+          'Enqueuing stage pending watermark. Stage name: %s', stage.name)
+      self.queues.watermark_pending_inputs.enque(
+          ((stage.name, MAX_TIMESTAMP), DataInput(data_input, {})))

Review comment:
       that's right - for BATCH, we need the upstream stage to be fully 
processed before moving forward. As I work on streaming this will change to be 
attributed to a more appropriate timestamp.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 660590)
    Time Spent: 5h 50m  (was: 5h 40m)

> Make FnApiRunner work by executing ready elements instead of stages
> -------------------------------------------------------------------
>
>                 Key: BEAM-8823
>                 URL: https://issues.apache.org/jira/browse/BEAM-8823
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Priority: P3
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to