This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new abce1ad34eb Merge pull request #29093 [YAML] Don't require redundant 
input for YamlTransform.
abce1ad34eb is described below

commit abce1ad34ebd34c58c34b505a5df89a54ec0dcb4
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Oct 20 16:26:49 2023 -0700

    Merge pull request #29093 [YAML] Don't require redundant input for 
YamlTransform.
---
 sdks/python/apache_beam/yaml/yaml_io_test.py             |  6 ------
 sdks/python/apache_beam/yaml/yaml_mapping_test.py        |  4 ----
 sdks/python/apache_beam/yaml/yaml_transform.py           | 12 +++++++++++-
 sdks/python/apache_beam/yaml/yaml_transform_test.py      | 12 +++++++++++-
 sdks/python/apache_beam/yaml/yaml_transform_unit_test.py |  2 --
 sdks/python/apache_beam/yaml/yaml_udf_test.py            |  8 --------
 6 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py 
b/sdks/python/apache_beam/yaml/yaml_io_test.py
index 7071860a7bf..a0bd65c78ca 100644
--- a/sdks/python/apache_beam/yaml/yaml_io_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_io_test.py
@@ -319,7 +319,6 @@ class YamlPubSubTest(unittest.TestCase):
             | YamlTransform(
                 '''
             type: WriteToPubSub
-            input: input
             config:
               topic: my_topic
               format: raw
@@ -341,7 +340,6 @@ class YamlPubSubTest(unittest.TestCase):
             ]) | YamlTransform(
                 '''
             type: WriteToPubSub
-            input: input
             config:
               topic: my_topic
               format: raw
@@ -364,7 +362,6 @@ class YamlPubSubTest(unittest.TestCase):
             ]) | YamlTransform(
                 '''
             type: WriteToPubSub
-            input: input
             config:
               topic: my_topic
               format: raw
@@ -384,7 +381,6 @@ class YamlPubSubTest(unittest.TestCase):
             | YamlTransform(
                 '''
             type: WriteToPubSub
-            input: input
             config:
               topic: my_topic
               format: raw
@@ -408,7 +404,6 @@ class YamlPubSubTest(unittest.TestCase):
             | YamlTransform(
                 '''
             type: WriteToPubSub
-            input: input
             config:
               topic: my_topic
               format: avro
@@ -434,7 +429,6 @@ class YamlPubSubTest(unittest.TestCase):
             ]) | YamlTransform(
                 '''
             type: WriteToPubSub
-            input: input
             config:
               topic: my_topic
               format: json
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py 
b/sdks/python/apache_beam/yaml/yaml_mapping_test.py
index 55032aeae52..a09214c26b6 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py
@@ -40,7 +40,6 @@ class YamlMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
           type: MapToFields
-          input: input
           config:
               language: python
               fields:
@@ -62,7 +61,6 @@ class YamlMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
           type: MapToFields
-          input: input
           config:
               fields: {}
               append: true
@@ -83,7 +81,6 @@ class YamlMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
           type: Filter
-          input: input
           config:
               language: python
               keep: "rank > 0"
@@ -106,7 +103,6 @@ class YamlMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
           type: chain
-          input: input
           transforms:
             - type: MapToFields
               config:
diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py 
b/sdks/python/apache_beam/yaml/yaml_transform.py
index fa30c183080..7ab8da33f1a 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -522,7 +522,7 @@ def chain_as_composite(spec):
     raise TypeError(
         f"Chain at {identify_object(spec)} missing transforms property.")
   has_explicit_outputs = 'output' in spec
-  composite_spec = normalize_inputs_outputs(spec)
+  composite_spec = normalize_inputs_outputs(tag_explicit_inputs(spec))
   new_transforms = []
   for ix, transform in enumerate(composite_spec['transforms']):
     if any(io in transform for io in ('input', 'output')):
@@ -539,6 +539,8 @@ def chain_as_composite(spec):
         pass
       elif is_explicitly_empty(composite_spec['input']):
         transform['input'] = composite_spec['input']
+      elif is_empty(composite_spec['input']):
+        del composite_spec['input']
       else:
         transform['input'] = {
             key: key
@@ -931,6 +933,7 @@ class YamlTransform(beam.PTransform):
     self._providers = yaml_provider.merge_providers(
         providers, yaml_provider.standard_providers())
     self._spec = preprocess(spec, known_transforms=self._providers.keys())
+    self._was_chain = spec['type'] == 'chain'
 
   def expand(self, pcolls):
     if isinstance(pcolls, beam.pvalue.PBegin):
@@ -939,8 +942,15 @@ class YamlTransform(beam.PTransform):
     elif isinstance(pcolls, beam.PCollection):
       root = pcolls.pipeline
       pcolls = {'input': pcolls}
+      if not self._spec['input']:
+        self._spec['input'] = {'input': 'input'}
+        if self._was_chain and self._spec['transforms']:
+          # This should have been copied as part of the composite-to-chain.
+          self._spec['transforms'][0]['input'] = self._spec['input']
     else:
       root = next(iter(pcolls.values())).pipeline
+      if not self._spec['input']:
+        self._spec['input'] = {name: name for name in pcolls.keys()}
     result = expand_transform(
         self._spec,
         Scope(
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index 63f2e0e7fac..05bbf419647 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -43,6 +43,16 @@ class CreateTimestamped(beam.PTransform):
         | beam.Map(lambda x: beam.transforms.window.TimestampedValue(x, x)))
 
 
+class CreateInts(beam.PTransform):
+  _yaml_requires_inputs = False
+
+  def __init__(self, elements):
+    self._elements = elements
+
+  def expand(self, p):
+    return p | beam.Create(self._elements)
+
+
 class SumGlobally(beam.PTransform):
   def expand(self, pcoll):
     return pcoll | beam.CombineGlobally(sum).without_defaults()
@@ -65,7 +75,7 @@ class SizeLimiter(beam.PTransform):
 
 
 TEST_PROVIDERS = {
-    'CreateInts': lambda elements: beam.Create(elements),
+    'CreateInts': CreateInts,
     'CreateTimestamped': CreateTimestamped,
     'SumGlobally': SumGlobally,
     'SizeLimiter': SizeLimiter,
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
index 5d5e5850fd7..d1886ba4dcf 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
@@ -244,12 +244,10 @@ class MainTest(unittest.TestCase):
     expected = f'''
       type: composite
       name: Chain
-      input: {{}}
       transforms:
       - type: Create
         config:
           elements: [0,1,2]
-        input: {{}}
       - type: PyMap
         config:
           fn: 'lambda x: x*x'
diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py 
b/sdks/python/apache_beam/yaml/yaml_udf_test.py
index 5e9faa08253..42bdf6e0bd5 100644
--- a/sdks/python/apache_beam/yaml/yaml_udf_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py
@@ -55,7 +55,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
       type: MapToFields
-      input: input
       config:
         language: javascript
         fields:
@@ -79,7 +78,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
       type: MapToFields
-      input: input
       config:
         language: python
         fields:
@@ -103,7 +101,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
       type: Filter
-      input: input
       config:
         language: javascript
         keep:
@@ -123,7 +120,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
       type: Filter
-      input: input
       config:
         language: python
         keep:
@@ -143,7 +139,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
       type: Filter
-      input: input
       config:
         language: javascript
         keep:
@@ -162,7 +157,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           '''
       type: Filter
-      input: input
       config:
         language: python
         keep:
@@ -194,7 +188,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           f'''
         type: Filter
-        input: input
         config:
           language: javascript
           keep:
@@ -226,7 +219,6 @@ class YamlUDFMappingTest(unittest.TestCase):
       result = elements | YamlTransform(
           f'''
         type: Filter
-        input: input
         config:
           language: python
           keep:

Reply via email to