This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 44b639fdd7e [yaml] yaml_transform.py unit tests (#27338)
44b639fdd7e is described below
commit 44b639fdd7e8cdc7b29bef8bbe99545a0f3b3dc8
Author: bzablocki <[email protected]>
AuthorDate: Fri Aug 25 01:09:39 2023 +0200
[yaml] yaml_transform.py unit tests (#27338)
* SafeLineLoader unit tests
* LightweightScope unit tests
* Scope Unit Tests
* rename yaml_transform_ut_test.py to yaml_transform_unit_test.py
---
sdks/python/apache_beam/yaml/yaml_transform.py | 2 +-
.../apache_beam/yaml/yaml_transform_scope_test.py | 204 ++++++++++++++++++++
.../python/apache_beam/yaml/yaml_transform_test.py | 133 +------------
.../apache_beam/yaml/yaml_transform_unit_test.py | 210 +++++++++++++++++++++
4 files changed, 416 insertions(+), 133 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py
b/sdks/python/apache_beam/yaml/yaml_transform.py
index f2e73029811..e2b85d8522e 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -346,7 +346,7 @@ def expand_leaf_transform(spec, scope):
# TODO: Handle (or at least reject) nested case.
return outputs
elif isinstance(outputs, (tuple, list)):
- return {'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
+ return {f'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
elif isinstance(outputs, beam.PCollection):
return {'out': outputs}
else:
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py
new file mode 100644
index 00000000000..ead5d5d66d2
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import unittest
+
+import yaml
+
+import apache_beam as beam
+from apache_beam.yaml import yaml_provider
+from apache_beam.yaml.yaml_transform import LightweightScope
+from apache_beam.yaml.yaml_transform import SafeLineLoader
+from apache_beam.yaml.yaml_transform import Scope
+
+
+class ScopeTest(unittest.TestCase):
+ def get_scope_by_spec(self, p, spec):
+ spec = yaml.load(spec, Loader=SafeLineLoader)
+
+ scope = Scope(
+ beam.pvalue.PBegin(p), {},
+ spec['transforms'],
+ yaml_provider.standard_providers(), {})
+ return scope, spec
+
+ def test_get_pcollection_input(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ elements = p | beam.Create(range(3))
+ scope = Scope(
+ p, {'input': elements},
+ transforms=[],
+ providers=yaml_provider.standard_providers(),
+ input_providers={})
+
+ result = scope.get_pcollection('input')
+ self.assertEqual("PCollection[Create/Map(decode).None]", str(result))
+
+ def test_get_pcollection_output(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ spec = '''
+ transforms:
+ - type: Create
+ config:
+ elements: [0, 1, 3, 4]
+ - type: PyMap
+ name: Square
+ input: Create
+ config:
+ fn: "lambda x: x*x"
+ '''
+
+ scope, spec = self.get_scope_by_spec(p, spec)
+
+ self.assertEqual(
+ "PCollection[Create/Map(decode).None]",
+ str(scope.get_pcollection("Create")))
+
+ self.assertEqual(
+ "PCollection[Square.None]", str(scope.get_pcollection("Square")))
+
+ self.assertEqual(
+ "PCollection[Square.None]", str(scope.get_pcollection("PyMap")))
+
+ self.assertTrue(
+ scope.get_pcollection("Square") == scope.get_pcollection("PyMap"))
+
+ def test_create_ptransform(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ spec = '''
+ transforms:
+ - type: PyMap
+ config:
+ fn: "lambda x: x*x"
+ '''
+ scope, spec = self.get_scope_by_spec(p, spec)
+
+ result = scope.create_ptransform(spec['transforms'][0], [])
+ self.assertIsInstance(result, beam.transforms.ParDo)
+ self.assertEqual(result.label, 'Map(lambda x: x*x)')
+
+ result_annotations = {**result.annotations()}
+ target_annotations = {
+ 'yaml_type': 'PyMap',
+ 'yaml_args': '{"fn": "lambda x: x*x"}',
+ 'yaml_provider': '{"type": "InlineProvider"}'
+ }
+
+ # Check if target_annotations is a subset of result_annotations
+ self.assertDictEqual(
+ result_annotations, {
+ **result_annotations, **target_annotations
+ })
+
+ def test_create_ptransform_with_inputs(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ spec = '''
+ transforms:
+ - type: PyMap
+ config:
+ fn: "lambda x: x*x"
+ '''
+ scope, spec = self.get_scope_by_spec(p, spec)
+
+ result = scope.create_ptransform(spec['transforms'][0], [])
+ self.assertIsInstance(result, beam.transforms.ParDo)
+ self.assertEqual(result.label, 'Map(lambda x: x*x)')
+
+ result_annotations = {**result.annotations()}
+ target_annotations = {
+ 'yaml_type': 'PyMap',
+ 'yaml_args': '{"fn": "lambda x: x*x"}',
+ 'yaml_provider': '{"type": "InlineProvider"}'
+ }
+ self.assertDictEqual(result_annotations, target_annotations)
+
+
+class LightweightScopeTest(unittest.TestCase):
+ @staticmethod
+ def get_spec():
+ pipeline_yaml = '''
+ - type: PyMap
+ name: Square
+ input: elements
+ fn: "lambda x: x * x"
+ - type: PyMap
+ name: PyMap
+ input: elements
+ fn: "lambda x: x * x * x"
+ - type: Filter
+ name: FilterOutBigNumbers
+ input: PyMap
+ keep: "lambda x: x<100"
+ '''
+ return yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+
+ def test_init(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ self.assertEqual(len(scope._transforms_by_uuid), 3)
+ self.assertCountEqual(
+ list(scope._uuid_by_name.keys()),
+ ["PyMap", "Square", "Filter", "FilterOutBigNumbers"])
+
+ def test_get_transform_id_and_output_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id, output = scope.get_transform_id_and_output_name("Square")
+ self.assertEqual(transform_id, spec[0]['__uuid__'])
+ self.assertEqual(output, None)
+
+ def test_get_transform_id_and_output_name_with_dot(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id, output = \
+ scope.get_transform_id_and_output_name("Square.OutputName")
+ self.assertEqual(transform_id, spec[0]['__uuid__'])
+ self.assertEqual(output, "OutputName")
+
+ def test_get_transform_id_by_uuid(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id = scope.get_transform_id(spec[0]['__uuid__'])
+ self.assertEqual(spec[0]['__uuid__'], transform_id)
+
+ def test_get_transform_id_by_unique_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id = scope.get_transform_id("Square")
+ self.assertEqual(transform_id, spec[0]['__uuid__'])
+
+ def test_get_transform_id_by_ambiguous_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'):
+ scope.get_transform_id(scope.get_transform_id(spec[1]['name']))
+
+ def test_get_transform_id_by_unknown_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'):
+ scope.get_transform_id("NotExistingTransform")
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index eec564b6a10..9a540e3551f 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -22,144 +22,13 @@ import os
import tempfile
import unittest
-import yaml
-
import apache_beam as beam
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.yaml import yaml_provider
-from apache_beam.yaml import yaml_transform
-from apache_beam.yaml.yaml_transform import LightweightScope
-from apache_beam.yaml.yaml_transform import SafeLineLoader
from apache_beam.yaml.yaml_transform import YamlTransform
-class YamlTransformTest(unittest.TestCase):
- def test_only_element(self):
- self.assertEqual(yaml_transform.only_element((1, )), 1)
-
-
-class SafeLineLoaderTest(unittest.TestCase):
- def test_get_line(self):
- pipeline_yaml = '''
- type: composite
- input:
- elements: input
- transforms:
- - type: PyMap
- name: Square
- input: elements
- config:
- fn: "lambda x: x * x"
- - type: PyMap
- name: Cube
- input: elements
- config:
- fn: "lambda x: x * x * x"
- output:
- Flatten
- '''
- spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader)
- self.assertEqual(SafeLineLoader.get_line(spec['type']), 2)
- self.assertEqual(SafeLineLoader.get_line(spec['input']), 4)
- self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6)
- self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6)
- self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7)
- self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 11)
- self.assertEqual(SafeLineLoader.get_line(spec['output']), 17)
- self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown")
-
- def test_strip_metadata(self):
- spec_yaml = '''
- transforms:
- - type: PyMap
- name: Square
- '''
- spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
- stripped = SafeLineLoader.strip_metadata(spec['transforms'])
-
- self.assertFalse(hasattr(stripped[0], '__line__'))
- self.assertFalse(hasattr(stripped[0], '__uuid__'))
-
- def test_strip_metadata_nothing_to_strip(self):
- spec_yaml = 'prop: 123'
- spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
- stripped = SafeLineLoader.strip_metadata(spec['prop'])
-
- self.assertFalse(hasattr(stripped, '__line__'))
- self.assertFalse(hasattr(stripped, '__uuid__'))
-
-
-class LightweightScopeTest(unittest.TestCase):
- @staticmethod
- def get_spec():
- pipeline_yaml = '''
- - type: PyMap
- name: Square
- input: elements
- config:
- fn: "lambda x: x * x"
- - type: PyMap
- name: PyMap
- input: elements
- config:
- fn: "lambda x: x * x * x"
- - type: Filter
- name: FilterOutBigNumbers
- input: PyMap
- config:
- keep: "lambda x: x<100"
- '''
- return yaml.load(pipeline_yaml, Loader=SafeLineLoader)
-
- def test_init(self):
- spec = self.get_spec()
- scope = LightweightScope(spec)
- self.assertEqual(len(scope._transforms_by_uuid), 3)
- self.assertCountEqual(
- list(scope._uuid_by_name.keys()),
- ["PyMap", "Square", "Filter", "FilterOutBigNumbers"])
-
- def test_get_transform_id_and_output_name(self):
- spec = self.get_spec()
- scope = LightweightScope(spec)
- transform_id, output = scope.get_transform_id_and_output_name("Square")
- self.assertEqual(transform_id, spec[0]['__uuid__'])
- self.assertEqual(output, None)
-
- def test_get_transform_id_and_output_name_with_dot(self):
- spec = self.get_spec()
- scope = LightweightScope(spec)
- transform_id, output = \
- scope.get_transform_id_and_output_name("Square.OutputName")
- self.assertEqual(transform_id, spec[0]['__uuid__'])
- self.assertEqual(output, "OutputName")
-
- def test_get_transform_id_by_uuid(self):
- spec = self.get_spec()
- scope = LightweightScope(spec)
- transform_id = scope.get_transform_id(spec[0]['__uuid__'])
- self.assertEqual(transform_id, spec[0]['__uuid__'])
-
- def test_get_transform_id_by_unique_name(self):
- spec = self.get_spec()
- scope = LightweightScope(spec)
- transform_id = scope.get_transform_id("Square")
- self.assertEqual(transform_id, spec[0]['__uuid__'])
-
- def test_get_transform_id_by_ambiguous_name(self):
- spec = self.get_spec()
- scope = LightweightScope(spec)
- with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'):
- scope.get_transform_id(scope.get_transform_id(spec[1]['name']))
-
- def test_get_transform_id_by_unknown_name(self):
- spec = self.get_spec()
- scope = LightweightScope(spec)
- with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'):
- scope.get_transform_id("NotExistingTransform")
-
-
class YamlTransformE2ETest(unittest.TestCase):
def test_composite(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
@@ -338,7 +207,7 @@ class YamlTransformE2ETest(unittest.TestCase):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
# pylint: disable=expression-not-assigned
- with self.assertRaises(ValueError):
+ with self.assertRaisesRegex(ValueError, r'Ambiguous.*'):
p | YamlTransform(
'''
type: composite
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
new file mode 100644
index 00000000000..d903d85cd21
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import unittest
+
+import yaml
+
+import apache_beam as beam
+from apache_beam.yaml import yaml_provider
+from apache_beam.yaml import yaml_transform
+from apache_beam.yaml.yaml_transform import SafeLineLoader
+from apache_beam.yaml.yaml_transform import Scope
+from apache_beam.yaml.yaml_transform import expand_composite_transform
+from apache_beam.yaml.yaml_transform import pipeline_as_composite
+
+
+class YamlTransformTest(unittest.TestCase):
+ def test_only_element(self):
+ self.assertEqual(yaml_transform.only_element((1, )), 1)
+
+
+class SafeLineLoaderTest(unittest.TestCase):
+ def test_get_line(self):
+ pipeline_yaml = '''
+ type: composite
+ input:
+ elements: input
+ transforms:
+ - type: PyMap
+ name: Square
+ input: elements
+ fn: "lambda x: x * x"
+ - type: PyMap
+ name: Cube
+ input: elements
+ fn: "lambda x: x * x * x"
+ output:
+ Flatten
+ '''
+ spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+ self.assertEqual(SafeLineLoader.get_line(spec['type']), 2)
+ self.assertEqual(SafeLineLoader.get_line(spec['input']), 4)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 10)
+ self.assertEqual(SafeLineLoader.get_line(spec['output']), 15)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown")
+
+ def test_strip_metadata(self):
+ spec_yaml = '''
+ transforms:
+ - type: PyMap
+ name: Square
+ '''
+ spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+ stripped = SafeLineLoader.strip_metadata(spec['transforms'])
+
+ self.assertFalse(hasattr(stripped[0], '__line__'))
+ self.assertFalse(hasattr(stripped[0], '__uuid__'))
+
+ def test_strip_metadata_nothing_to_strip(self):
+ spec_yaml = 'prop: 123'
+ spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+ stripped = SafeLineLoader.strip_metadata(spec['prop'])
+
+ self.assertFalse(hasattr(stripped, '__line__'))
+ self.assertFalse(hasattr(stripped, '__uuid__'))
+
+
+def new_pipeline():
+ return beam.Pipeline(
+ options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle'))
+
+
+class MainTest(unittest.TestCase):
+ def get_scope_by_spec(self, p, spec, inputs=None):
+ if inputs is None:
+ inputs = {}
+ spec = yaml.load(spec, Loader=SafeLineLoader)
+
+ scope = Scope(
+ beam.pvalue.PBegin(p),
+ inputs,
+ spec['transforms'],
+ yaml_provider.standard_providers(), {})
+ return scope, spec
+
+ def test_pipeline_as_composite_with_type_transforms(self):
+ spec = '''
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements: [0,1,2]
+ - type: PyMap
+ config:
+ fn: 'lambda x: x*x'
+ '''
+ spec = yaml.load(spec, Loader=SafeLineLoader)
+ result = pipeline_as_composite(spec)
+
+ self.assertEqual(result['type'], 'composite')
+ self.assertEqual(result['name'], None)
+
+ def test_pipeline_as_composite_with_transforms(self):
+ spec = '''
+ transforms:
+ - type: Create
+ config:
+ elements: [0,1,2]
+ - type: PyMap
+ config:
+ fn: 'lambda x: x*x'
+ '''
+ spec = yaml.load(spec, Loader=SafeLineLoader)
+ result = pipeline_as_composite(spec)
+
+ self.assertEqual(result['type'], 'composite')
+ self.assertEqual(result['name'], None)
+
+ def test_pipeline_as_composite_list(self):
+ spec = '''
+ - type: Create
+ config:
+ elements: [0,1,2]
+ - type: PyMap
+ config:
+ fn: 'lambda x: x*x'
+ '''
+ spec = yaml.load(spec, Loader=SafeLineLoader)
+ result = pipeline_as_composite(spec)
+
+ self.assertEqual(result['type'], 'composite')
+ self.assertEqual(result['name'], None)
+ self.assertEqual(result['transforms'], spec)
+ self.assertTrue('__line__' in result)
+ self.assertTrue('__uuid__' in result)
+
+ def test_expand_composite_transform_with_name(self):
+ with new_pipeline() as p:
+ spec = '''
+ type: composite
+ name: Custom
+ transforms:
+ - type: Create
+ config:
+ elements: [0,1,2]
+ output:
+ Create
+ '''
+ scope, spec = self.get_scope_by_spec(p, spec)
+ result = expand_composite_transform(spec, scope)
+ self.assertRegex(
+ str(result['output']), r"PCollection.*Custom/Create/Map.*")
+
+ def test_expand_composite_transform_with_name_input(self):
+ with new_pipeline() as p:
+ spec = '''
+ type: composite
+ input: elements
+ transforms:
+ - type: PyMap
+ input: input
+ config:
+ fn: 'lambda x: x*x'
+ output:
+ PyMap
+ '''
+ elements = p | beam.Create(range(3))
+ scope, spec = self.get_scope_by_spec(p, spec,
+ inputs={'elements': elements})
+ result = expand_composite_transform(spec, scope)
+
+ self.assertRegex(str(result['output']), r"PCollection.*Composite/Map.*")
+
+ def test_expand_composite_transform_root(self):
+ with new_pipeline() as p:
+ spec = '''
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements: [0,1,2]
+ output:
+ Create
+ '''
+ scope, spec = self.get_scope_by_spec(p, spec)
+ result = expand_composite_transform(spec, scope)
+ self.assertRegex(str(result['output']), r"PCollection.*Create/Map.*")
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()