[
https://issues.apache.org/jira/browse/BEAM-12798?focusedWorklogId=656643&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-656643
]
ASF GitHub Bot logged work on BEAM-12798:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Sep/21 20:06
Start Date: 28/Sep/21 20:06
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #15391:
URL: https://github.com/apache/beam/pull/15391#discussion_r717035959
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -765,6 +767,27 @@ def _group_stages_by_key(stages, get_stage_key):
return (grouped_stages, stages_with_none_key)
+def _group_stages_with_limit(stages, get_limit):
+ # type: (Iterable[Stage], Callable[[[str], int]]) ->
Iterable[Iterable[Stage]]
+ stages_with_limit = [(stage, get_limit(stage.name)) for stage in stages]
+ group = []
+ group_limit = 0
+ for stage, limit in sorted(stages_with_limit, key=operator.itemgetter(1)):
+ if limit < 1:
+ raise Exception(
+ 'expected get_limit to return an integer >= 1, '
+ 'instead got: %d for stage: %s' % (limit, stage))
+ if not group:
+ group_limit = limit
+ assert len(group) < group_limit
+ group.append(stage)
Review comment:
My mistake, I was thinking the iteration was from largest to smallest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 656643)
Time Spent: 3h (was: 2h 50m)
> Configurable combiner packing limit
> -----------------------------------
>
> Key: BEAM-12798
> URL: https://issues.apache.org/jira/browse/BEAM-12798
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Yifan Mai
> Priority: P2
> Time Spent: 3h
> Remaining Estimate: 0h
>
> Currently translations.pack_combiners has no upper limit, which means it can
> potentially pack thousands of combiners together, which may cause OOM issues
> if the accumulators are large. We should set a default upper limit, and also
> make the upper limit configurable per-combiner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)