y1chi commented on a change in pull request #16841:
URL: https://github.com/apache/beam/pull/16841#discussion_r839061295
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
for id in self.pipeline_components.windowing_strategies.keys()
}
+ self._stage_managers: Dict[str, BundleContextManager] = {}
+
+ def bundle_manager_for(
+ self,
+ stage: Stage,
+ num_workers: Optional[int] = None) -> 'BundleContextManager':
+ if stage.name not in self._stage_managers:
+ self._stage_managers[stage.name] = BundleContextManager(
+ self, stage, num_workers or self.num_workers)
+ return self._stage_managers[stage.name]
+
+ def _compute_pipeline_dictionaries(self) -> None:
+ for s in self.stages.values():
+ for t in s.transforms:
+ buffer_id = t.spec.payload
+ if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+ self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+ if t.spec.payload == translations.IMPULSE_BUFFER:
+ # Impulse data is not produced by any PTransform.
+ self.pcollection_to_producer_transform[
+ translations.IMPULSE_BUFFER] = None
+ else:
+ assert t.spec.payload != translations.IMPULSE_BUFFER
+ _, input_pcoll = split_buffer_id(buffer_id)
+ # Adding PCollections that may not have a producer.
+ # This is necessary, for example, for the case where we pass an
+ # empty list of PCollections into a Flatten transform.
+ if input_pcoll not in self.pcollection_to_producer_transform:
+ self.pcollection_to_producer_transform[input_pcoll] = None
+ if buffer_id not in self.buffer_id_to_consumer_pairs:
+ self.buffer_id_to_consumer_pairs[buffer_id] = set()
+ if (s.name, t.unique_name
+ ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+ self.buffer_id_to_consumer_pairs[buffer_id].add(
+ (s.name, t.unique_name))
+ elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+ _, output_pcoll = split_buffer_id(buffer_id)
+ self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+ elif t.spec.urn in translations.PAR_DO_URNS:
+ pass
+
+ def setup(self) -> None:
+ """This sets up the pipeline to begin running.
+
+ 1. This function enqueues all initial pipeline bundles to be executed.
+ 2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+ to the Data API endpoints that are live.
+ """
+ for stage in self.stages.values():
+ self._enqueue_stage_initial_inputs(stage)
+
+ def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+ """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+ data_input = {} # type: MutableMapping[str, PartitionableBuffer]
+ ready_to_schedule = True
+ for transform in stage.transforms:
+ if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+ bundle_processor.DATA_OUTPUT_URN}):
+ if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+ coder_id = self.data_channel_coders[only_element(
+ transform.outputs.values())]
+ coder = self.pipeline_context.coders[self.safe_coders.get(
+ coder_id, coder_id)]
+ if transform.spec.payload == translations.IMPULSE_BUFFER:
+ data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+ data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+ else:
+ # If this is not an IMPULSE input, then it is not part of the
+ # initial inputs of a pipeline, and we'll ignore it.
+ data_input = {}
+ else:
+ assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN
+ coder_id = self.data_channel_coders[only_element(
+ transform.inputs.values())]
+ # For every DATA_INPUT or DATA_OUTPUT operation, we need to replace the
+ # payload with the GRPC configuration for the Data channel.
+ bundle_manager = self.bundle_manager_for(stage)
+ data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
+ data_api_service_descriptor = (
+ bundle_manager.data_api_service_descriptor())
+ if data_api_service_descriptor:
+ data_spec.api_service_descriptor.url = (
+ data_api_service_descriptor.url)
+ transform.spec.payload = data_spec.SerializeToString()
+ elif transform.spec.urn in translations.PAR_DO_URNS:
+ payload = proto_utils.parse_Bytes(
+ transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+ if payload.side_inputs:
Review comment:
got it.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
for id in self.pipeline_components.windowing_strategies.keys()
}
+ self._stage_managers: Dict[str, BundleContextManager] = {}
+
+ def bundle_manager_for(
+ self,
+ stage: Stage,
+ num_workers: Optional[int] = None) -> 'BundleContextManager':
+ if stage.name not in self._stage_managers:
+ self._stage_managers[stage.name] = BundleContextManager(
+ self, stage, num_workers or self.num_workers)
+ return self._stage_managers[stage.name]
+
+ def _compute_pipeline_dictionaries(self) -> None:
+ for s in self.stages.values():
+ for t in s.transforms:
+ buffer_id = t.spec.payload
+ if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+ self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+ if t.spec.payload == translations.IMPULSE_BUFFER:
+ # Impulse data is not produced by any PTransform.
+ self.pcollection_to_producer_transform[
+ translations.IMPULSE_BUFFER] = None
+ else:
+ assert t.spec.payload != translations.IMPULSE_BUFFER
Review comment:
it just seems to have no effect since the 'else:' is already doing the
same logic check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]