[
https://issues.apache.org/jira/browse/BEAM-12798?focusedWorklogId=648953&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-648953
]
ASF GitHub Bot logged work on BEAM-12798:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Sep/21 00:29
Start Date: 10/Sep/21 00:29
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #15391:
URL: https://github.com/apache/beam/pull/15391#discussion_r705809380
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -818,7 +841,7 @@ def _eliminate_common_key_with_none(stages, context,
can_pack=lambda s: True):
# elimination, and group eligible KeyWithNone stages by parent and
# environment.
def get_stage_key(stage):
- if len(stage.transforms) == 1 and can_pack(stage.name):
+ if len(stage.transforms) == 1 and int(can_pack(stage.name)) > 0:
Review comment:
`can_pack(stage.name)` is equivalent (assuming no negative values) and
more clear.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -765,6 +767,27 @@ def _group_stages_by_key(stages, get_stage_key):
return (grouped_stages, stages_with_none_key)
+def _group_stages_with_limit(stages, get_limit):
+ # type: (Iterable[Stage], Callable[[[str], int]]) ->
Iterable[Iterable[Stage]]
+ stages_with_limit = [(stage, get_limit(stage.name)) for stage in stages]
+ group = []
+ group_limit = 0
+ for stage, limit in sorted(stages_with_limit, key=operator.itemgetter(1)):
+ if limit < 1:
+ raise Exception(
+ 'expected get_limit to return an integer >= 1, '
+ 'instead got: %d for stage: %s' % (limit, stage))
+ if not group:
+ group_limit = limit
+ assert len(group) < group_limit
+ group.append(stage)
Review comment:
This will put stages into groups that are too big, e.g. if we have
`[(big, 10), (s1, 2), (s2, 2)]`
they will all end up in the same group. Instead, it seems you should append
if len(group) < limit, else make a new group.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -919,10 +945,17 @@ def _try_fuse_stages(a, b):
else:
raise ValueError
+ def _get_limit(stage_name):
+ result = can_pack(stage_name)
+ if result is True:
+ return _DEFAULT_PACK_COMBINERS_LIMIT
+ else:
+ return int(result)
+
# Partition stages by whether they are eligible for CombinePerKey packing
# and group eligible CombinePerKey stages by parent and environment.
def get_stage_key(stage):
- if (len(stage.transforms) == 1 and can_pack(stage.name) and
+ if (len(stage.transforms) == 1 and int(can_pack(stage.name)) > 0 and
Review comment:
same
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 648953)
Time Spent: 1h 10m (was: 1h)
> Configurable combiner packing limit
> -----------------------------------
>
> Key: BEAM-12798
> URL: https://issues.apache.org/jira/browse/BEAM-12798
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Yifan Mai
> Priority: P2
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Currently translations.pack_combiners has no upper limit, which means it can
> potentially pack thousands of combiners together, which may cause OOM issues
> if the accumulators are large. We should set a default upper limit, and also
> make the upper limit configurable per-combiner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)