yifanmai commented on a change in pull request #12787: URL: https://github.com/apache/beam/pull/12787#discussion_r485943483
########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ########## @@ -704,6 +705,64 @@ def fix_side_input_pcoll_coders(stages, pipeline_context): return stages +def eliminate_common_key_with_none(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterable[Stage] + + """Runs common subexpression elimination for sibling KeyWithNone stages. + + If multiple KeyWithNone stages share a common input, then all but one stages + will be eliminated along with their output PCollections. Transforms that + read input from the output of the eliminated KeyWithNone stages will be + remapped to read input from the output of the remaining KeyWithNone stage. + """ + # Partition stages by whether they are eligible for common KeyWithNone + # elimination, and group eligible KeyWithNone stages by parent and + # environment. + grouped_eligible_stages = collections.defaultdict(list) + ineligible_stages = [] + for stage in stages: + is_eligible = False Review comment: Refactored the code to separate the key function from the grouping function, and used the grouping function in combiner packing as well; PTAL. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org