This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d799365dbaf [Yaml] Unit Tests for SafeLineLoader and LightweightScope
(yaml/yaml_transform.py) (#27086)
d799365dbaf is described below
commit d799365dbafeddc5a87722f41960c5e683ae9922
Author: bzablocki <[email protected]>
AuthorDate: Wed Jul 12 00:40:20 2023 +0200
[Yaml] Unit Tests for SafeLineLoader and LightweightScope
(yaml/yaml_transform.py) (#27086)
---
.../python/apache_beam/yaml/yaml_transform_test.py | 126 +++++++++++++++++++++
1 file changed, 126 insertions(+)
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index 4ccc50da757..ec5b122741a 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -22,14 +22,140 @@ import os
import tempfile
import unittest
+import yaml
+
import apache_beam as beam
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.yaml import yaml_provider
+from apache_beam.yaml import yaml_transform
+from apache_beam.yaml.yaml_transform import LightweightScope
+from apache_beam.yaml.yaml_transform import SafeLineLoader
from apache_beam.yaml.yaml_transform import YamlTransform
class YamlTransformTest(unittest.TestCase):
+ def test_only_element(self):
+ self.assertEqual(yaml_transform.only_element((1, )), 1)
+
+
+class SafeLineLoaderTest(unittest.TestCase):
+ def test_get_line(self):
+ pipeline_yaml = '''
+ type: composite
+ input:
+ elements: input
+ transforms:
+ - type: PyMap
+ name: Square
+ input: elements
+ fn: "lambda x: x * x"
+ - type: PyMap
+ name: Cube
+ input: elements
+ fn: "lambda x: x * x * x"
+ output:
+ Flatten
+ '''
+ spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+ self.assertEqual(SafeLineLoader.get_line(spec['type']), 2)
+ self.assertEqual(SafeLineLoader.get_line(spec['input']), 4)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 10)
+ self.assertEqual(SafeLineLoader.get_line(spec['output']), 15)
+ self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown")
+
+ def test_strip_metadata(self):
+ spec_yaml = '''
+ transforms:
+ - type: PyMap
+ name: Square
+ '''
+ spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+ stripped = SafeLineLoader.strip_metadata(spec['transforms'])
+
+ self.assertFalse(hasattr(stripped[0], '__line__'))
+ self.assertFalse(hasattr(stripped[0], '__uuid__'))
+
+ def test_strip_metadata_nothing_to_strip(self):
+ spec_yaml = 'prop: 123'
+ spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+ stripped = SafeLineLoader.strip_metadata(spec['prop'])
+
+ self.assertFalse(hasattr(stripped, '__line__'))
+ self.assertFalse(hasattr(stripped, '__uuid__'))
+
+
+class LightweightScopeTest(unittest.TestCase):
+ @staticmethod
+ def get_spec():
+ pipeline_yaml = '''
+ - type: PyMap
+ name: Square
+ input: elements
+ fn: "lambda x: x * x"
+ - type: PyMap
+ name: PyMap
+ input: elements
+ fn: "lambda x: x * x * x"
+ - type: Filter
+ name: FilterOutBigNumbers
+ input: PyMap
+ keep: "lambda x: x<100"
+ '''
+ return yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+
+ def test_init(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ self.assertEqual(len(scope._transforms_by_uuid), 3)
+ self.assertCountEqual(
+ list(scope._uuid_by_name.keys()),
+ ["PyMap", "Square", "Filter", "FilterOutBigNumbers"])
+
+ def test_get_transform_id_and_output_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id, output = scope.get_transform_id_and_output_name("Square")
+ self.assertEqual(transform_id, spec[0]['__uuid__'])
+ self.assertEqual(output, None)
+
+ def test_get_transform_id_and_output_name_with_dot(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id, output = \
+ scope.get_transform_id_and_output_name("Square.OutputName")
+ self.assertEqual(transform_id, spec[0]['__uuid__'])
+ self.assertEqual(output, "OutputName")
+
+ def test_get_transform_id_by_uuid(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id = scope.get_transform_id(spec[0]['__uuid__'])
+ self.assertEqual(transform_id, spec[0]['__uuid__'])
+
+ def test_get_transform_id_by_unique_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ transform_id = scope.get_transform_id("Square")
+ self.assertEqual(transform_id, spec[0]['__uuid__'])
+
+ def test_get_transform_id_by_ambiguous_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'):
+ scope.get_transform_id(scope.get_transform_id(spec[1]['name']))
+
+ def test_get_transform_id_by_unknown_name(self):
+ spec = self.get_spec()
+ scope = LightweightScope(spec)
+ with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'):
+ scope.get_transform_id("NotExistingTransform")
+
+
+class YamlTransformE2ETest(unittest.TestCase):
def test_composite(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p: