robertwb commented on code in PR #26423:
URL: https://github.com/apache/beam/pull/26423#discussion_r1179607394
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -250,29 +260,35 @@ def expand_transform(spec, scope):
def expand_leaf_transform(spec, scope):
+ _LOGGER.info("Expanding %s ", identify_object(spec))
spec = normalize_inputs_outputs(spec)
- inputs_dict = {
- key: scope.get_pcollection(value)
- for (key, value) in spec['input'].items()
- }
- input_type = spec.get('input_type', 'default')
- if input_type == 'list':
- inputs = tuple(inputs_dict.values())
- elif input_type == 'map':
- inputs = inputs_dict
+ ptransform = scope.create_ptransform(spec)
+ transform_label = scope.unique_name(spec, ptransform)
+
+ if spec['type'] == 'Flatten':
+ # Avoid flattening before the flatten, just to make a nicer graph.
+ inputs = tuple(
+ scope.get_pcollection(input) for key,
+ value in spec['input'].items()
+ for input in ([value] if isinstance(value, str) else value))
Review Comment:
Ah, yeah. Yapf has trouble with the commas in list comprehensions. Fixed by
adding parentheses.
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -123,6 +123,16 @@ def compute_all(self):
for transform_id in self._transforms_by_uuid.keys():
self.compute_outputs(transform_id)
+ def get_input(self, inputs, transform_label, key):
+ if isinstance(inputs, str):
+ return self.get_pcollection(inputs)
+ else:
+ return tuple(
+ self.get_pcollection(x) for x in
+ inputs) | f'{transform_label}-FlattenInputs[{key}]' >> beam.Flatten(
+ pipeline=self.root if isinstance(self.root, beam.Pipeline
+ ) else self.root.pipeline)
Review Comment:
That pipeline statement is certainly unwieldy. Refactored a bit, hopefully
it's clearer now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]