[ 
https://issues.apache.org/jira/browse/BEAM-11715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17275442#comment-17275442
 ] 

Yifan Mai commented on BEAM-11715:
----------------------------------

Work items:

# Add a ValidatesRunner that triggers translations.pack_combiners for runners 
that have it enabled. This will be done in 
https://github.com/apache/beam/pull/13851
# Ensure that this does not break DataflowRunner (with --use_runner_v2) in 
2.28. This should already be fixed by https://github.com/apache/beam/pull/13829 
which uses a PTransformOverride hack to re-expand the CombinePerKey into 
GroupByKey and CombineValues. We can check that the fix works using the 
ValidatesRunner in the previous step.
# Disable translations.pack_combiners for FlinkRunner and SparkRunner (i.e. 
runners in which CombinePerKey is not a primitive). This should already be 
fixed by https://github.com/apache/beam/pull/13816
# In the long term, rewrite tranlations.pack_combiners so that the packed 
combine is expanded to GroupByKey and CombineValues.

There are two ways we could do the long term fix:
# Re-write the CombinePerKey back into GroupByKey and CombineValues in the 
proto if known_runner_urns does not contain CombinePerKey's URN. This is 
straightforward, but violates DRY because this is a copy of the CombinePerKey's 
expand method.
# Instead of packing at the CombinePerKey level, pack at the CombineValues 
level. This means we need to eliminate common GroupByKey stages, similarly to 
how translations.eliminate_common_key_with_void works.

[~robertwb] would you have an opinion about this?

> Combiner packing creates an incorrect proto
> -------------------------------------------
>
>                 Key: BEAM-11715
>                 URL: https://issues.apache.org/jira/browse/BEAM-11715
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Yifan Mai
>            Priority: P0
>             Fix For: 2.28.0
>
>
> Seems like optimization creates a CombinePerKey transform that does not have 
> any sub-transforms. We should fix this by preserving the old structure for 
> CombinePerKey (GBK+CombineValues composite).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to