[
https://issues.apache.org/jira/browse/BEAM-4678?focusedWorklogId=173596&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173596
]
ASF GitHub Bot logged work on BEAM-4678:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Dec/18 14:09
Start Date: 10/Dec/18 14:09
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #7228:
[BEAM-4678] Support combiner lifting in portable Flink runner.
URL: https://github.com/apache/beam/pull/7228#discussion_r240222359
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
##########
@@ -142,6 +142,135 @@ def leaf_transform_stages(
yield stage
+def lift_combiners(stages, context):
+ """Expands CombinePerKey into pre- and post-grouping stages.
+
+ ... -> CombinePerKey -> ...
+
+ becomes
+
+ ... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ...
+ """
+ for stage in stages:
+ assert len(stage.transforms) == 1
+ transform = stage.transforms[0]
+ if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn:
+ combine_payload = proto_utils.parse_Bytes(
+ transform.spec.payload, beam_runner_api_pb2.CombinePayload)
+
+ input_pcoll = context.components.pcollections[only_element(
+ list(transform.inputs.values()))]
+ output_pcoll = context.components.pcollections[only_element(
+ list(transform.outputs.values()))]
+
+ element_coder_id = input_pcoll.coder_id
+ element_coder = context.components.coders[element_coder_id]
+ key_coder_id, _ = element_coder.component_coder_ids
+ accumulator_coder_id = combine_payload.accumulator_coder_id
+
+ key_accumulator_coder = beam_runner_api_pb2.Coder(
+ spec=beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=common_urns.coders.KV.urn)),
+ component_coder_ids=[key_coder_id, accumulator_coder_id])
+ key_accumulator_coder_id = context.add_or_get_coder_id(
+ key_accumulator_coder)
+
+ accumulator_iter_coder = beam_runner_api_pb2.Coder(
+ spec=beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=common_urns.coders.ITERABLE.urn)),
+ component_coder_ids=[accumulator_coder_id])
+ accumulator_iter_coder_id = context.add_or_get_coder_id(
+ accumulator_iter_coder)
+
+ key_accumulator_iter_coder = beam_runner_api_pb2.Coder(
+ spec=beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=common_urns.coders.KV.urn)),
+ component_coder_ids=[key_coder_id, accumulator_iter_coder_id])
+ key_accumulator_iter_coder_id = context.add_or_get_coder_id(
+ key_accumulator_iter_coder)
+
+ precombined_pcoll_id = unique_name(
+ context.components.pcollections, 'pcollection')
+ context.components.pcollections[precombined_pcoll_id].CopyFrom(
+ beam_runner_api_pb2.PCollection(
+ unique_name=transform.unique_name + '/Precombine.out',
+ coder_id=key_accumulator_coder_id,
+ windowing_strategy_id=input_pcoll.windowing_strategy_id,
+ is_bounded=input_pcoll.is_bounded))
+
+ grouped_pcoll_id = unique_name(
+ context.components.pcollections, 'pcollection')
+ context.components.pcollections[grouped_pcoll_id].CopyFrom(
+ beam_runner_api_pb2.PCollection(
+ unique_name=transform.unique_name + '/Group.out',
+ coder_id=key_accumulator_iter_coder_id,
+ windowing_strategy_id=output_pcoll.windowing_strategy_id,
+ is_bounded=output_pcoll.is_bounded))
+
+ merged_pcoll_id = unique_name(
+ context.components.pcollections, 'pcollection')
+ context.components.pcollections[merged_pcoll_id].CopyFrom(
+ beam_runner_api_pb2.PCollection(
+ unique_name=transform.unique_name + '/Merge.out',
+ coder_id=key_accumulator_coder_id,
+ windowing_strategy_id=output_pcoll.windowing_strategy_id,
+ is_bounded=output_pcoll.is_bounded))
+
+ def make_stage(base_stage, transform):
+ return Stage(
+ transform.unique_name,
+ [transform],
+ downstream_side_inputs=base_stage.downstream_side_inputs,
+ must_follow=base_stage.must_follow)
+
+ yield make_stage(
+ stage,
+ beam_runner_api_pb2.PTransform(
+ unique_name=transform.unique_name + '/Precombine',
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=common_urns.combine_components.COMBINE_PGBKCV.urn,
Review comment:
Yeah, the duplication of combiner URNs was quite a surprise to me too (and
took a while to debug--I kept checking that we're pulling the Java and Python
constants from the (freshly generated) proto files and couldn't figure out why
the Java registrations weren't getting picked up by the Python-generated
transforms until I actually printed the values out--so much for once source of
truth :-)...) I filed BEAM-6199 to resolve this; the comment at
https://github.com/apache/beam/pull/5128/commits/030749ffa4ee00db0a6fe39cc308b330df844147
seems to indicate there are still references lying around elsewhere.
Also, PGBKCV = PartialGroupByKeyCombiningValues.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 173596)
Time Spent: 50m (was: 40m)
> Support portable combiner lifting in Java Flink Runner
> ------------------------------------------------------
>
> Key: BEAM-4678
> URL: https://issues.apache.org/jira/browse/BEAM-4678
> Project: Beam
> Issue Type: Sub-task
> Components: runner-flink
> Reporter: Daniel Oliveira
> Assignee: Daniel Oliveira
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Adjust Flink Runner to support portable combiner lifting as described in the
> following doc:
> https://s.apache.org/beam-runner-api-combine-model
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)