robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers
over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406466215
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1321,80 +1321,18 @@ def remove_data_plane_ops(stages, pipeline_context):
yield stage
-def inject_timer_pcollections(stages, pipeline_context):
+def setup_timer_mapping(stages, pipeline_context):
# type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
- """Create PCollections for fired timers and to-be-set timers.
-
- At execution time, fired timers and timers-to-set are represented as
- PCollections that are managed by the runner. This phase adds the
- necissary collections, with their read and writes, to any stages using
- timers.
+ """Set up a mapping of {transform_id: [timer_ids]} for each stage.
"""
for stage in stages:
- for transform in list(stage.transforms):
+ for transform in stage.transforms:
if transform.spec.urn in PAR_DO_URNS:
payload = proto_utils.parse_Bytes(
transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
- for tag, spec in payload.timer_family_specs.items():
- if len(transform.inputs) > 1:
- raise NotImplementedError('Timers and side inputs.')
- input_pcoll = pipeline_context.components.pcollections[next(
- iter(transform.inputs.values()))]
- # Create the appropriate coder for the timer PCollection.
- key_coder_id = input_pcoll.coder_id
- if (pipeline_context.components.coders[key_coder_id].spec.urn ==
- common_urns.coders.KV.urn):
- key_coder_id = pipeline_context.components.coders[
- key_coder_id].component_coder_ids[0]
- key_timer_coder_id = pipeline_context.add_or_get_coder_id(
- beam_runner_api_pb2.Coder(
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=common_urns.coders.KV.urn),
- component_coder_ids=[
- key_coder_id, spec.timer_family_coder_id
- ]))
- # Inject the read and write pcollections.
- timer_read_pcoll = unique_name(
- pipeline_context.components.pcollections,
- '%s_timers_to_read_%s' % (transform.unique_name, tag))
- timer_write_pcoll = unique_name(
- pipeline_context.components.pcollections,
- '%s_timers_to_write_%s' % (transform.unique_name, tag))
- pipeline_context.components.pcollections[timer_read_pcoll].CopyFrom(
- beam_runner_api_pb2.PCollection(
- unique_name=timer_read_pcoll,
- coder_id=key_timer_coder_id,
- windowing_strategy_id=input_pcoll.windowing_strategy_id,
- is_bounded=input_pcoll.is_bounded))
- pipeline_context.components.pcollections[timer_write_pcoll].CopyFrom(
- beam_runner_api_pb2.PCollection(
- unique_name=timer_write_pcoll,
- coder_id=key_timer_coder_id,
- windowing_strategy_id=input_pcoll.windowing_strategy_id,
- is_bounded=input_pcoll.is_bounded))
- stage.transforms.append(
- beam_runner_api_pb2.PTransform(
- unique_name=timer_read_pcoll + '/Read',
- outputs={'out': timer_read_pcoll},
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_INPUT_URN,
- payload=create_buffer_id(timer_read_pcoll,
- kind='timers'))))
- stage.transforms.append(
- beam_runner_api_pb2.PTransform(
- unique_name=timer_write_pcoll + '/Write',
- inputs={'in': timer_write_pcoll},
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_OUTPUT_URN,
- payload=create_buffer_id(
- timer_write_pcoll, kind='timers'))))
- assert tag not in transform.inputs
- transform.inputs[tag] = timer_read_pcoll
- assert tag not in transform.outputs
- transform.outputs[tag] = timer_write_pcoll
- stage.timer_pcollections.append(
- (timer_read_pcoll + '/Read', timer_write_pcoll))
+ for timer_family_id in payload.timer_family_specs.keys():
+ stage.timers.add((transform.unique_name, timer_family_id))
Review comment:
Nice simplification here :).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services