bzablocki commented on code in PR #26423:
URL: https://github.com/apache/beam/pull/26423#discussion_r1179485349


##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -123,6 +123,16 @@ def compute_all(self):
     for transform_id in self._transforms_by_uuid.keys():
       self.compute_outputs(transform_id)
 
+  def get_input(self, inputs, transform_label, key):
+    if isinstance(inputs, str):
+      return self.get_pcollection(inputs)
+    else:
+      return tuple(
+          self.get_pcollection(x) for x in
+          inputs) | f'{transform_label}-FlattenInputs[{key}]' >> beam.Flatten(
+              pipeline=self.root if isinstance(self.root, beam.Pipeline
+                                               ) else self.root.pipeline)

Review Comment:
   It took me some time to understand what is happening here, but that is 
probably because I'm not that familiar with the Python's SDK, it will probably 
come with time ;) To get a better understanding on this piece of code, I 
rewrote it to this form: 
   ```suggestion
         transform_name = f'{transform_label}-FlattenInputs[{key}]'
         root = self.root if isinstance(self.root,
                                        beam.Pipeline) else self.root.pipeline
         inputs = (
             tuple(self.get_pcollection(x) for x in inputs)
             | transform_name >> beam.Flatten(pipeline=root))
         return inputs
   ```
   It is definitely longer and more verbose, so I leave the choice to you. 



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -250,29 +260,35 @@ def expand_transform(spec, scope):
 
 
 def expand_leaf_transform(spec, scope):
+  _LOGGER.info("Expanding %s ", identify_object(spec))
   spec = normalize_inputs_outputs(spec)
-  inputs_dict = {
-      key: scope.get_pcollection(value)
-      for (key, value) in spec['input'].items()
-  }
-  input_type = spec.get('input_type', 'default')
-  if input_type == 'list':
-    inputs = tuple(inputs_dict.values())
-  elif input_type == 'map':
-    inputs = inputs_dict
+  ptransform = scope.create_ptransform(spec)
+  transform_label = scope.unique_name(spec, ptransform)
+
+  if spec['type'] == 'Flatten':
+    # Avoid flattening before the flatten, just to make a nicer graph.
+    inputs = tuple(
+        scope.get_pcollection(input) for key,
+        value in spec['input'].items()
+        for input in ([value] if isinstance(value, str) else value))

Review Comment:
   Can we improve formatting for readability?
   ```suggestion
       inputs = tuple(
           scope.get_pcollection(input) 
           for key, value in spec['input'].items()
           for input in ([value] if isinstance(value, str) else value))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to