[
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419756&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419756
]
ASF GitHub Bot logged work on BEAM-9562:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m
Work Description: robertwb commented on pull request #11314: [BEAM-9562]
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406466215
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1321,80 +1321,18 @@ def remove_data_plane_ops(stages, pipeline_context):
yield stage
-def inject_timer_pcollections(stages, pipeline_context):
+def setup_timer_mapping(stages, pipeline_context):
# type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
- """Create PCollections for fired timers and to-be-set timers.
-
- At execution time, fired timers and timers-to-set are represented as
- PCollections that are managed by the runner. This phase adds the
- necissary collections, with their read and writes, to any stages using
- timers.
+ """Set up a mapping of {transform_id: [timer_ids]} for each stage.
"""
for stage in stages:
- for transform in list(stage.transforms):
+ for transform in stage.transforms:
if transform.spec.urn in PAR_DO_URNS:
payload = proto_utils.parse_Bytes(
transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
- for tag, spec in payload.timer_family_specs.items():
- if len(transform.inputs) > 1:
- raise NotImplementedError('Timers and side inputs.')
- input_pcoll = pipeline_context.components.pcollections[next(
- iter(transform.inputs.values()))]
- # Create the appropriate coder for the timer PCollection.
- key_coder_id = input_pcoll.coder_id
- if (pipeline_context.components.coders[key_coder_id].spec.urn ==
- common_urns.coders.KV.urn):
- key_coder_id = pipeline_context.components.coders[
- key_coder_id].component_coder_ids[0]
- key_timer_coder_id = pipeline_context.add_or_get_coder_id(
- beam_runner_api_pb2.Coder(
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=common_urns.coders.KV.urn),
- component_coder_ids=[
- key_coder_id, spec.timer_family_coder_id
- ]))
- # Inject the read and write pcollections.
- timer_read_pcoll = unique_name(
- pipeline_context.components.pcollections,
- '%s_timers_to_read_%s' % (transform.unique_name, tag))
- timer_write_pcoll = unique_name(
- pipeline_context.components.pcollections,
- '%s_timers_to_write_%s' % (transform.unique_name, tag))
- pipeline_context.components.pcollections[timer_read_pcoll].CopyFrom(
- beam_runner_api_pb2.PCollection(
- unique_name=timer_read_pcoll,
- coder_id=key_timer_coder_id,
- windowing_strategy_id=input_pcoll.windowing_strategy_id,
- is_bounded=input_pcoll.is_bounded))
- pipeline_context.components.pcollections[timer_write_pcoll].CopyFrom(
- beam_runner_api_pb2.PCollection(
- unique_name=timer_write_pcoll,
- coder_id=key_timer_coder_id,
- windowing_strategy_id=input_pcoll.windowing_strategy_id,
- is_bounded=input_pcoll.is_bounded))
- stage.transforms.append(
- beam_runner_api_pb2.PTransform(
- unique_name=timer_read_pcoll + '/Read',
- outputs={'out': timer_read_pcoll},
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_INPUT_URN,
- payload=create_buffer_id(timer_read_pcoll,
- kind='timers'))))
- stage.transforms.append(
- beam_runner_api_pb2.PTransform(
- unique_name=timer_write_pcoll + '/Write',
- inputs={'in': timer_write_pcoll},
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_OUTPUT_URN,
- payload=create_buffer_id(
- timer_write_pcoll, kind='timers'))))
- assert tag not in transform.inputs
- transform.inputs[tag] = timer_read_pcoll
- assert tag not in transform.outputs
- transform.outputs[tag] = timer_write_pcoll
- stage.timer_pcollections.append(
- (timer_read_pcoll + '/Read', timer_write_pcoll))
+ for timer_family_id in payload.timer_family_specs.keys():
+ stage.timers.add((transform.unique_name, timer_family_id))
Review comment:
Nice simplification here :).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 419756)
Time Spent: 19h 10m (was: 19h)
> Remove timer from PCollection and treat timers as Elements
> -----------------------------------------------------------
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-harness, sdk-py-harness
> Reporter: Boyuan Zhang
> Assignee: Boyuan Zhang
> Priority: Major
> Fix For: 2.21.0
>
> Time Spent: 19h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)