yifanmai commented on a change in pull request #15391:
URL: https://github.com/apache/beam/pull/15391#discussion_r711196560



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -919,10 +945,17 @@ def _try_fuse_stages(a, b):
     else:
       raise ValueError
 
+  def _get_limit(stage_name):
+    result = can_pack(stage_name)
+    if result is True:
+      return _DEFAULT_PACK_COMBINERS_LIMIT
+    else:
+      return int(result)
+
   # Partition stages by whether they are eligible for CombinePerKey packing
   # and group eligible CombinePerKey stages by parent and environment.
   def get_stage_key(stage):
-    if (len(stage.transforms) == 1 and can_pack(stage.name) and
+    if (len(stage.transforms) == 1 and int(can_pack(stage.name)) > 0 and

Review comment:
       Fixed.

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -818,7 +841,7 @@ def _eliminate_common_key_with_none(stages, context, 
can_pack=lambda s: True):
   # elimination, and group eligible KeyWithNone stages by parent and
   # environment.
   def get_stage_key(stage):
-    if len(stage.transforms) == 1 and can_pack(stage.name):
+    if len(stage.transforms) == 1 and int(can_pack(stage.name)) > 0:

Review comment:
       Fixed.

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -765,6 +767,27 @@ def _group_stages_by_key(stages, get_stage_key):
   return (grouped_stages, stages_with_none_key)
 
 
+def _group_stages_with_limit(stages, get_limit):
+  # type: (Iterable[Stage], Callable[[[str], int]]) -> 
Iterable[Iterable[Stage]]
+  stages_with_limit = [(stage, get_limit(stage.name)) for stage in stages]
+  group = []
+  group_limit = 0
+  for stage, limit in sorted(stages_with_limit, key=operator.itemgetter(1)):
+    if limit < 1:
+      raise Exception(
+          'expected get_limit to return an integer >= 1, '
+          'instead got: %d for stage: %s' % (limit, stage))
+    if not group:
+      group_limit = limit
+    assert len(group) < group_limit
+    group.append(stage)

Review comment:
       I ran this manually and the output was `[['s1', 's2'], ['big']]`. I 
added a similar test case for testing a mix of limits.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to