[ https://issues.apache.org/jira/browse/BEAM-8823?focusedWorklogId=660590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-660590 ]
ASF GitHub Bot logged work on BEAM-8823: ---------------------------------------- Author: ASF GitHub Bot Created on: 05/Oct/21 21:16 Start Date: 05/Oct/21 21:16 Worklog Time Spent: 10m Work Description: pabloem commented on a change in pull request #15441: URL: https://github.com/apache/beam/pull/15441#discussion_r722713049 ########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ########## @@ -602,6 +747,106 @@ def __init__(self, for id in self.pipeline_components.windowing_strategies.keys() } + self._stage_managers: Dict[str, BundleContextManager] = {} + + def bundle_manager_for( + self, + stage: Stage, + num_workers: Optional[int] = None) -> 'BundleContextManager': + if stage.name not in self._stage_managers: + self._stage_managers[stage.name] = BundleContextManager( + self, stage, num_workers or self.num_workers) + return self._stage_managers[stage.name] + + def _compute_pipeline_dictionaries(self) -> None: + for s in self.stages.values(): + for t in s.transforms: + buffer_id = t.spec.payload + if t.spec.urn == bundle_processor.DATA_INPUT_URN: + self.input_transform_to_buffer_id[t.unique_name] = buffer_id + if t.spec.payload != translations.IMPULSE_BUFFER: + _, input_pcoll = split_buffer_id(buffer_id) + # Adding PCollections that may not have a producer. + # This is necessary, for example, for the case where we pass an + # empty list of PCollections into a Flatten transform. + if input_pcoll not in self.pcollection_to_producer_transform: + self.pcollection_to_producer_transform[input_pcoll] = None + if buffer_id not in self.buffer_id_to_consumer_pairs: + self.buffer_id_to_consumer_pairs[buffer_id] = set() + if (s.name, t.unique_name + ) not in self.buffer_id_to_consumer_pairs[buffer_id]: + self.buffer_id_to_consumer_pairs[buffer_id].add( + (s.name, t.unique_name)) + elif t.spec.payload == translations.IMPULSE_BUFFER: + # Impulse data is not produced by any PTransform. + self.pcollection_to_producer_transform[ + translations.IMPULSE_BUFFER] = None + elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN: + _, output_pcoll = split_buffer_id(buffer_id) + self.pcollection_to_producer_transform[output_pcoll] = t.unique_name + elif t.spec.urn in translations.PAR_DO_URNS: + pass + + def setup(self) -> None: + """This sets up the pipeline to begin running. + + 1. This function enqueues all initial pipeline bundles to be executed. + 2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations + to the Data API endpoints that are live. + """ + for stage in self.stages.values(): + self._enqueue_stage_initial_inputs(stage) + + def _enqueue_stage_initial_inputs(self, stage: Stage) -> None: + """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint.""" + data_input = {} # type: MutableMapping[str, PartitionableBuffer] + ready_to_schedule = True + for transform in stage.transforms: + if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN, + bundle_processor.DATA_OUTPUT_URN}): + if transform.spec.urn == bundle_processor.DATA_INPUT_URN: + coder_id = self.data_channel_coders[only_element( + transform.outputs.values())] + coder = self.pipeline_context.coders[self.safe_coders.get( + coder_id, coder_id)] + if transform.spec.payload == translations.IMPULSE_BUFFER: + data_input[transform.unique_name] = ListBuffer(coder.get_impl()) + data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE) + else: + # If this is not an IMPULSE input, then it is not part of the + # initial inputs of a pipeline, and we'll ignore it. + data_input = {} + else: + assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN + coder_id = self.data_channel_coders[only_element( + transform.inputs.values())] + # For every DATA_INPUT or DATA_OUTPUT operation, we need to replace the + # payload with the GRPC configuration for the Data channel. + bundle_manager = self.bundle_manager_for(stage) + data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id) + data_api_service_descriptor = ( + bundle_manager.data_api_service_descriptor()) + if data_api_service_descriptor: + data_spec.api_service_descriptor.url = ( + data_api_service_descriptor.url) + transform.spec.payload = data_spec.SerializeToString() + elif transform.spec.urn in translations.PAR_DO_URNS: + payload = proto_utils.parse_Bytes( + transform.spec.payload, beam_runner_api_pb2.ParDoPayload) + if payload.side_inputs: + # If the stage needs side inputs, then it's not ready to be + # executed. + ready_to_schedule = False + if data_input and ready_to_schedule: + # We push the data inputs, along with the name of the consuming stage. + _LOGGER.debug('Scheduling bundle in stage for execution: %s', stage.name) + self.queues.ready_inputs.enque((stage.name, DataInput(data_input, {}))) + elif data_input and not ready_to_schedule: + _LOGGER.debug( + 'Enqueuing stage pending watermark. Stage name: %s', stage.name) + self.queues.watermark_pending_inputs.enque( + ((stage.name, MAX_TIMESTAMP), DataInput(data_input, {}))) Review comment: that's right - for BATCH, we need the upstream stage to be fully processed before moving forward. As I work on streaming this will change to be attributed to a more appropriate timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 660590) Time Spent: 5h 50m (was: 5h 40m) > Make FnApiRunner work by executing ready elements instead of stages > ------------------------------------------------------------------- > > Key: BEAM-8823 > URL: https://issues.apache.org/jira/browse/BEAM-8823 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Pablo Estrada > Priority: P3 > Time Spent: 5h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)