robertwb commented on a change in pull request #15391:
URL: https://github.com/apache/beam/pull/15391#discussion_r717035959
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -765,6 +767,27 @@ def _group_stages_by_key(stages, get_stage_key):
return (grouped_stages, stages_with_none_key)
+def _group_stages_with_limit(stages, get_limit):
+ # type: (Iterable[Stage], Callable[[[str], int]]) ->
Iterable[Iterable[Stage]]
+ stages_with_limit = [(stage, get_limit(stage.name)) for stage in stages]
+ group = []
+ group_limit = 0
+ for stage, limit in sorted(stages_with_limit, key=operator.itemgetter(1)):
+ if limit < 1:
+ raise Exception(
+ 'expected get_limit to return an integer >= 1, '
+ 'instead got: %d for stage: %s' % (limit, stage))
+ if not group:
+ group_limit = limit
+ assert len(group) < group_limit
+ group.append(stage)
Review comment:
My mistake, I was thinking the iteration was from largest to smallest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]