This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new abce1ad34eb Merge pull request #29093 [YAML] Don't require redundant
input for YamlTransform.
abce1ad34eb is described below
commit abce1ad34ebd34c58c34b505a5df89a54ec0dcb4
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Oct 20 16:26:49 2023 -0700
Merge pull request #29093 [YAML] Don't require redundant input for
YamlTransform.
---
sdks/python/apache_beam/yaml/yaml_io_test.py | 6 ------
sdks/python/apache_beam/yaml/yaml_mapping_test.py | 4 ----
sdks/python/apache_beam/yaml/yaml_transform.py | 12 +++++++++++-
sdks/python/apache_beam/yaml/yaml_transform_test.py | 12 +++++++++++-
sdks/python/apache_beam/yaml/yaml_transform_unit_test.py | 2 --
sdks/python/apache_beam/yaml/yaml_udf_test.py | 8 --------
6 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py
b/sdks/python/apache_beam/yaml/yaml_io_test.py
index 7071860a7bf..a0bd65c78ca 100644
--- a/sdks/python/apache_beam/yaml/yaml_io_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_io_test.py
@@ -319,7 +319,6 @@ class YamlPubSubTest(unittest.TestCase):
| YamlTransform(
'''
type: WriteToPubSub
- input: input
config:
topic: my_topic
format: raw
@@ -341,7 +340,6 @@ class YamlPubSubTest(unittest.TestCase):
]) | YamlTransform(
'''
type: WriteToPubSub
- input: input
config:
topic: my_topic
format: raw
@@ -364,7 +362,6 @@ class YamlPubSubTest(unittest.TestCase):
]) | YamlTransform(
'''
type: WriteToPubSub
- input: input
config:
topic: my_topic
format: raw
@@ -384,7 +381,6 @@ class YamlPubSubTest(unittest.TestCase):
| YamlTransform(
'''
type: WriteToPubSub
- input: input
config:
topic: my_topic
format: raw
@@ -408,7 +404,6 @@ class YamlPubSubTest(unittest.TestCase):
| YamlTransform(
'''
type: WriteToPubSub
- input: input
config:
topic: my_topic
format: avro
@@ -434,7 +429,6 @@ class YamlPubSubTest(unittest.TestCase):
]) | YamlTransform(
'''
type: WriteToPubSub
- input: input
config:
topic: my_topic
format: json
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py
b/sdks/python/apache_beam/yaml/yaml_mapping_test.py
index 55032aeae52..a09214c26b6 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py
@@ -40,7 +40,6 @@ class YamlMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: MapToFields
- input: input
config:
language: python
fields:
@@ -62,7 +61,6 @@ class YamlMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: MapToFields
- input: input
config:
fields: {}
append: true
@@ -83,7 +81,6 @@ class YamlMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: Filter
- input: input
config:
language: python
keep: "rank > 0"
@@ -106,7 +103,6 @@ class YamlMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: chain
- input: input
transforms:
- type: MapToFields
config:
diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py
b/sdks/python/apache_beam/yaml/yaml_transform.py
index fa30c183080..7ab8da33f1a 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -522,7 +522,7 @@ def chain_as_composite(spec):
raise TypeError(
f"Chain at {identify_object(spec)} missing transforms property.")
has_explicit_outputs = 'output' in spec
- composite_spec = normalize_inputs_outputs(spec)
+ composite_spec = normalize_inputs_outputs(tag_explicit_inputs(spec))
new_transforms = []
for ix, transform in enumerate(composite_spec['transforms']):
if any(io in transform for io in ('input', 'output')):
@@ -539,6 +539,8 @@ def chain_as_composite(spec):
pass
elif is_explicitly_empty(composite_spec['input']):
transform['input'] = composite_spec['input']
+ elif is_empty(composite_spec['input']):
+ del composite_spec['input']
else:
transform['input'] = {
key: key
@@ -931,6 +933,7 @@ class YamlTransform(beam.PTransform):
self._providers = yaml_provider.merge_providers(
providers, yaml_provider.standard_providers())
self._spec = preprocess(spec, known_transforms=self._providers.keys())
+ self._was_chain = spec['type'] == 'chain'
def expand(self, pcolls):
if isinstance(pcolls, beam.pvalue.PBegin):
@@ -939,8 +942,15 @@ class YamlTransform(beam.PTransform):
elif isinstance(pcolls, beam.PCollection):
root = pcolls.pipeline
pcolls = {'input': pcolls}
+ if not self._spec['input']:
+ self._spec['input'] = {'input': 'input'}
+ if self._was_chain and self._spec['transforms']:
+ # This should have been copied as part of the composite-to-chain.
+ self._spec['transforms'][0]['input'] = self._spec['input']
else:
root = next(iter(pcolls.values())).pipeline
+ if not self._spec['input']:
+ self._spec['input'] = {name: name for name in pcolls.keys()}
result = expand_transform(
self._spec,
Scope(
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index 63f2e0e7fac..05bbf419647 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -43,6 +43,16 @@ class CreateTimestamped(beam.PTransform):
| beam.Map(lambda x: beam.transforms.window.TimestampedValue(x, x)))
+class CreateInts(beam.PTransform):
+ _yaml_requires_inputs = False
+
+ def __init__(self, elements):
+ self._elements = elements
+
+ def expand(self, p):
+ return p | beam.Create(self._elements)
+
+
class SumGlobally(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.CombineGlobally(sum).without_defaults()
@@ -65,7 +75,7 @@ class SizeLimiter(beam.PTransform):
TEST_PROVIDERS = {
- 'CreateInts': lambda elements: beam.Create(elements),
+ 'CreateInts': CreateInts,
'CreateTimestamped': CreateTimestamped,
'SumGlobally': SumGlobally,
'SizeLimiter': SizeLimiter,
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
index 5d5e5850fd7..d1886ba4dcf 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
@@ -244,12 +244,10 @@ class MainTest(unittest.TestCase):
expected = f'''
type: composite
name: Chain
- input: {{}}
transforms:
- type: Create
config:
elements: [0,1,2]
- input: {{}}
- type: PyMap
config:
fn: 'lambda x: x*x'
diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py
b/sdks/python/apache_beam/yaml/yaml_udf_test.py
index 5e9faa08253..42bdf6e0bd5 100644
--- a/sdks/python/apache_beam/yaml/yaml_udf_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py
@@ -55,7 +55,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: MapToFields
- input: input
config:
language: javascript
fields:
@@ -79,7 +78,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: MapToFields
- input: input
config:
language: python
fields:
@@ -103,7 +101,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: Filter
- input: input
config:
language: javascript
keep:
@@ -123,7 +120,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: Filter
- input: input
config:
language: python
keep:
@@ -143,7 +139,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: Filter
- input: input
config:
language: javascript
keep:
@@ -162,7 +157,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
'''
type: Filter
- input: input
config:
language: python
keep:
@@ -194,7 +188,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
f'''
type: Filter
- input: input
config:
language: javascript
keep:
@@ -226,7 +219,6 @@ class YamlUDFMappingTest(unittest.TestCase):
result = elements | YamlTransform(
f'''
type: Filter
- input: input
config:
language: python
keep: