robertwb commented on a change in pull request #13848:
URL: https://github.com/apache/beam/pull/13848#discussion_r567136406
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -598,9 +606,33 @@ def copy_subtransforms(transform):
del new_proto.root_transform_ids[:]
new_proto.root_transform_ids.extend(roots)
+ known_primitives = set.union(set(BEAM_PRIMITIVES), known_runner_urns)
+ for transform in pipeline_proto.components.transforms.values():
+ if transform.spec.urn not in known_primitives and not is_composite(
+ transform):
+ known_primitives.add(transform.spec.urn)
+ if not partial:
+ known_primitives.add('beam:runner:executable_stage:v1')
+ validate_pipeline(new_proto, known_primitives)
+
return new_proto
+def is_composite(transform):
+ return bool(
+ transform.subtransforms or not set.difference(
+ set(transform.outputs.values()), transform.inputs.values()))
Review comment:
Consider a "switch" transform parameterized by a boolean that takes two
PCollections and returns exactly one of them. Or, consider PTransform that
sometimes returns its input unchanged (possibly as a degenerate case, such
things exist in TFX).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]