[ 
https://issues.apache.org/jira/browse/BEAM-10409?focusedWorklogId=474951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-474951
 ]

ASF GitHub Bot logged work on BEAM-10409:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Aug/20 18:35
            Start Date: 26/Aug/20 18:35
    Worklog Time Spent: 10m 
      Work Description: yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r477506036



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -818,7 +819,7 @@ def _try_fuse_stages(a, b):
         for output_kv_coder_id in output_kv_coder_ids
     ]
     pack_output_value_coder = beam_runner_api_pb2.Coder(
-        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        spec=beam_runner_api_pb2.FunctionSpec(urn=python_urns.tuple.KV.urn),

Review comment:
       Thanks for catching that. Fixed.

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,200 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent and environment, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    is_packable_combine = False
+
+    if (len(stage.transforms) == 1 and
+        stage.environment is not None and
+        python_urns.PACKED_COMBINE_FN in
+        context.components.environments[stage.environment].capabilities):
+      transform = only_transform(stage.transforms)
+      if (transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and
+          len(transform.inputs) == 1 and
+          len(transform.outputs) == 1):
+        combine_payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.CombinePayload)
+        if combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN:
+          is_packable_combine = True
+
+    if is_packable_combine:
+      input_pcoll_id = only_element(transform.inputs.values())
+      stage_key = (input_pcoll_id, stage.environment)
+      combine_stages_by_input_pcoll_id[stage_key].append(stage)
+    else:
+      yield stage
+
+  for stage_key, packable_stages in combine_stages_by_input_pcoll_id.items():
+    input_pcoll_id, _ = stage_key
+    try:
+      if not len(packable_stages) > 1:
+        raise ValueError('Only one stage in this group: Skipping stage 
packing')
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+    except ValueError:
+      # Skip packing stages in this group.
+      # Yield the stages unmodified, and then continue to the next group.
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in 
packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 474951)
    Time Spent: 4h 50m  (was: 4h 40m)

> Add combiner packing to graph optimizer phases
> ----------------------------------------------
>
>                 Key: BEAM-10409
>                 URL: https://issues.apache.org/jira/browse/BEAM-10409
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Yifan Mai
>            Assignee: Yifan Mai
>            Priority: P2
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Some use cases of Beam (e.g. [TensorFlow 
> Transform|https://github.com/tensorflow/transform]) create thousands of 
> Combine stages with a common parent. The large number of stages can cause 
> performance issues on some runners. To alleviate, a graph optimization phase 
> could be added to the translations module that packs compatible Combine 
> stages into a single stage.
> The graph optimization for CombinePerKey would work as follows: If 
> CombinePerKey stages have a common input, one input each, and one output 
> each, pack the stages into a single stage that runs all CombinePerKeys and 
> outputs resulting tuples to a new PCollection. A subsequent stage unpacks 
> tuples from this PCollection and sends them to the original output 
> PCollections.
> There is an additional issue with supporting this for CombineGlobally: 
> because of the intermediate KeyWithVoid stage between the CombinePerKey 
> stages and the input stage, the CombinePerKey stages do not have a common 
> input stage, and cannot be packed. To support CombineGlobally, a common 
> sibling elimination graph optimization phase can be used to combine the 
> KeyWithVoid stages. After this, the CombinePerKey stages would have a common 
> input and can be packed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to