robertwb commented on a change in pull request #13877:
URL: https://github.com/apache/beam/pull/13877#discussion_r572469235
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
*[
core.CombineFn.from_runner_api(combine_payload.combine_fn,
context) # type: ignore[arg-type]
for combine_payload in combine_payloads
- ]).to_runner_api(context) # type: ignore[arg-type]
+ ]) # type: ignore[arg-type]
pack_transform = beam_runner_api_pb2.PTransform(
- unique_name=pack_transform_name + '/Pack',
+ unique_name=pack_transform_name + '/CombinePerKey',
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.composites.COMBINE_PER_KEY.urn,
payload=beam_runner_api_pb2.CombinePayload(
- combine_fn=pack_combine_fn,
+ combine_fn=pack_combine_fn.to_runner_api(context),
accumulator_coder_id=tuple_accumulator_coder_id).
SerializeToString()),
inputs={'in': input_pcoll_id},
# 'None' single output key follows convention for CombinePerKey.
outputs={'None': pack_pcoll_id},
environment_id=fused_stage.environment)
pack_stage = Stage(
- pack_stage_name + '/Pack', [pack_transform],
+ pack_stage_name + '/CombinePerKey', [pack_transform],
downstream_side_inputs=fused_stage.downstream_side_inputs,
must_follow=fused_stage.must_follow,
parent=fused_stage.parent,
environment=fused_stage.environment)
+
+ # Traverse the subtransform structure.
+ original_group_by_key_transforms = []
+ original_combine_grouped_values_transforms = []
+ original_combine_values_par_do_transforms = []
+ for transform in transforms:
+ # CombinePerKey may contain GroupByKey and Combine subtransforms.
+ if transform.subtransforms:
+ assert len(transform.subtransforms) == 2
+
+ group_by_key_transform = context.components.transforms[
+ transform.subtransforms[0]]
Review comment:
I don't think there's any requirement that they come in a particular
order.
What we can assume, or use as a precondition to grouping in
_group_stages_by_key, is that there will be two subtransforms of the expected
kind.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
*[
core.CombineFn.from_runner_api(combine_payload.combine_fn,
context) # type: ignore[arg-type]
Review comment:
There is a check for combine_payload.combine_fn.urn ==
python_urns.PICKLED_COMBINE_FN above.
It is unfortunate, however, to have this python-specific stuff embedded into
the SDK. Ideally the notion of a packed CombineFn could be in the model and one
could create such from a set of CombineFns.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
*[
core.CombineFn.from_runner_api(combine_payload.combine_fn,
context) # type: ignore[arg-type]
for combine_payload in combine_payloads
- ]).to_runner_api(context) # type: ignore[arg-type]
+ ]) # type: ignore[arg-type]
pack_transform = beam_runner_api_pb2.PTransform(
- unique_name=pack_transform_name + '/Pack',
+ unique_name=pack_transform_name + '/CombinePerKey',
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.composites.COMBINE_PER_KEY.urn,
payload=beam_runner_api_pb2.CombinePayload(
- combine_fn=pack_combine_fn,
+ combine_fn=pack_combine_fn.to_runner_api(context),
accumulator_coder_id=tuple_accumulator_coder_id).
SerializeToString()),
inputs={'in': input_pcoll_id},
# 'None' single output key follows convention for CombinePerKey.
outputs={'None': pack_pcoll_id},
environment_id=fused_stage.environment)
pack_stage = Stage(
- pack_stage_name + '/Pack', [pack_transform],
+ pack_stage_name + '/CombinePerKey', [pack_transform],
downstream_side_inputs=fused_stage.downstream_side_inputs,
must_follow=fused_stage.must_follow,
parent=fused_stage.parent,
environment=fused_stage.environment)
+
+ # Traverse the subtransform structure.
+ original_group_by_key_transforms = []
+ original_combine_grouped_values_transforms = []
+ original_combine_values_par_do_transforms = []
+ for transform in transforms:
Review comment:
We would still have to insert the multiplexing DoFn as a follow-on.
I do agree this function is getting a bit unwieldy though.
Could we perhaps factor this out into a function
`create_combine_per_key_transform(combine_fn, input_pcoll, output_pcoll,
template)` that also creates the internal structure?
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2171,6 +2171,7 @@ def __init__(
runtime_type_check, # type: bool
):
super(CombineValuesDoFn, self).__init__()
+ self.input_pcoll_type = input_pcoll_type
Review comment:
Whence this change?
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
*[
core.CombineFn.from_runner_api(combine_payload.combine_fn,
context) # type: ignore[arg-type]
for combine_payload in combine_payloads
- ]).to_runner_api(context) # type: ignore[arg-type]
+ ]) # type: ignore[arg-type]
pack_transform = beam_runner_api_pb2.PTransform(
- unique_name=pack_transform_name + '/Pack',
+ unique_name=pack_transform_name + '/CombinePerKey',
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.composites.COMBINE_PER_KEY.urn,
payload=beam_runner_api_pb2.CombinePayload(
- combine_fn=pack_combine_fn,
+ combine_fn=pack_combine_fn.to_runner_api(context),
accumulator_coder_id=tuple_accumulator_coder_id).
SerializeToString()),
inputs={'in': input_pcoll_id},
# 'None' single output key follows convention for CombinePerKey.
outputs={'None': pack_pcoll_id},
environment_id=fused_stage.environment)
pack_stage = Stage(
- pack_stage_name + '/Pack', [pack_transform],
+ pack_stage_name + '/CombinePerKey', [pack_transform],
downstream_side_inputs=fused_stage.downstream_side_inputs,
must_follow=fused_stage.must_follow,
parent=fused_stage.parent,
environment=fused_stage.environment)
+
+ # Traverse the subtransform structure.
+ original_group_by_key_transforms = []
+ original_combine_grouped_values_transforms = []
+ original_combine_values_par_do_transforms = []
+ for transform in transforms:
+ # CombinePerKey may contain GroupByKey and Combine subtransforms.
+ if transform.subtransforms:
+ assert len(transform.subtransforms) == 2
+
+ group_by_key_transform = context.components.transforms[
+ transform.subtransforms[0]]
+ assert group_by_key_transform.spec.urn ==
common_urns.primitives.GROUP_BY_KEY.urn
+ original_group_by_key_transforms.append(group_by_key_transform)
+
+ combine_grouped_values_transform = context.components.transforms[
+ transform.subtransforms[1]]
+ assert combine_grouped_values_transform.spec.urn ==
common_urns.combine_components.COMBINE_GROUPED_VALUES.urn
+ original_combine_grouped_values_transforms.append(
+ combine_grouped_values_transform)
+
+ # Combine may contain a ParDo subtransform.
+ if combine_grouped_values_transform.subtransforms:
+ assert len(combine_grouped_values_transform.subtransforms) == 1
+
+ combine_values_par_do_transform = context.components.transforms[
+ combine_grouped_values_transform.subtransforms[0]]
+ assert combine_values_par_do_transform.spec.urn ==
common_urns.primitives.PAR_DO.urn
+ original_combine_values_par_do_transforms.append(
+ combine_values_par_do_transform)
+
+ # Pack the subtransforms if and only if the original transform had
subtransforms.
+ if original_group_by_key_transforms or
original_combine_grouped_values_transforms:
+ assert original_group_by_key_transforms and
original_combine_grouped_values_transforms
+ # For each subtransform, reuse the an arbitrary original subtransform as
a template,
+ # and then rewrite it with the correct input, output and payload.
+ # Also reuse an arbitrary GroupByKey output PCollection.
+
+ grouped_pcoll_id = next(
+ iter(original_group_by_key_transforms[0].outputs.values()))
+
+ packed_group_by_key_transform_name = (
+ pack_transform_name + '/CombinePerKey/GroupByKey')
+ packed_group_by_key_transform_key = unique_name(
+ context.components.transforms, packed_group_by_key_transform_name)
+
context.components.transforms[packed_group_by_key_transform_key].CopyFrom(
+ original_group_by_key_transforms[0])
+ context.components.transforms[
+ packed_group_by_key_transform_key].unique_name =
packed_group_by_key_transform_name
+ context.components.transforms[
+ packed_group_by_key_transform_key].outputs.clear()
+ context.components.transforms[packed_group_by_key_transform_key].outputs[
+ 'None'] = grouped_pcoll_id
+
+ packed_combine_grouped_values_transform_name = (
+ pack_transform_name + '/CombinePerKey/Combine')
+ packed_combine_grouped_values_transform_key = unique_name(
+ context.components.transforms,
+ packed_combine_grouped_values_transform_name)
+ context.components.transforms[
+ packed_combine_grouped_values_transform_key].CopyFrom(
+ original_combine_grouped_values_transforms[0])
+ context.components.transforms[
+ packed_combine_grouped_values_transform_key].unique_name =
packed_group_by_key_transform_name
+ context.components.transforms[
+ packed_combine_grouped_values_transform_key].inputs.clear()
+ context.components.transforms[
+ packed_combine_grouped_values_transform_key].inputs[
+ '0'] = grouped_pcoll_id
+ context.components.transforms[
+ packed_combine_grouped_values_transform_key].outputs.clear()
+ context.components.transforms[
+ packed_combine_grouped_values_transform_key].outputs[
+ '0'] = pack_pcoll_id
+ context.components.transforms[
+ packed_combine_grouped_values_transform_key].spec.payload =
beam_runner_api_pb2.CombinePayload(
+ combine_fn=pack_combine_fn.to_runner_api(context),
+ accumulator_coder_id=tuple_accumulator_coder_id
+ ).SerializeToString()
+
+ if original_combine_values_par_do_transforms:
Review comment:
Why this if statement?
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
##########
@@ -67,6 +67,7 @@ def expand(self, pcoll):
environment = environments.DockerEnvironment.from_options(
pipeline_options.PortableOptions(sdk_location='container'))
pipeline_proto = pipeline.to_runner_api(default_environment=environment)
+ # logging.error('[debug:yifanmai] unoptimized pipeline %s' %
pipeline_proto)
Review comment:
Remove in final PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]