robertwb commented on code in PR #34569: URL: https://github.com/apache/beam/pull/34569#discussion_r2034227921
########## sdks/python/apache_beam/yaml/main.py: ########## @@ -130,12 +155,112 @@ def run(argv=None): print('Running pipeline...') -def build_pipeline_components_from_argv(argv): +def run_tests(argv=None, exit=True): + known_args, pipeline_args, _, pipeline_yaml = _build_pipeline_yaml_from_argv( + argv) + pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) + options = _build_pipeline_options(pipeline_spec, pipeline_args) + + if known_args.create_test and not known_args.fix_tests: Review Comment: Sure. Done. ########## sdks/python/apache_beam/yaml/main.py: ########## @@ -90,6 +95,26 @@ def _parse_arguments(argv): type=json.loads, help='A json dict of variables used when invoking the jinja preprocessor ' 'on the provided yaml pipeline.') + parser.add_argument( + '--test', + action=argparse.BooleanOptionalAction, + help='Run the tests associated with the given pipeline, rather than the ' + 'pipeline itself.') + parser.add_argument( + '--fix_tests', + action=argparse.BooleanOptionalAction, + help='Update failing test expectations to match the actual ouput.') Review Comment: This only requires test-suite if the pipeline is templated. I'll update the doc. ########## sdks/python/apache_beam/yaml/main.py: ########## @@ -90,6 +95,26 @@ def _parse_arguments(argv): type=json.loads, help='A json dict of variables used when invoking the jinja preprocessor ' 'on the provided yaml pipeline.') + parser.add_argument( + '--test', + action=argparse.BooleanOptionalAction, + help='Run the tests associated with the given pipeline, rather than the ' + 'pipeline itself.') + parser.add_argument( + '--fix_tests', + action=argparse.BooleanOptionalAction, + help='Update failing test expectations to match the actual ouput.') Review Comment: Done. ########## sdks/python/apache_beam/yaml/main.py: ########## @@ -130,12 +155,112 @@ def run(argv=None): print('Running pipeline...') -def build_pipeline_components_from_argv(argv): +def run_tests(argv=None, exit=True): + known_args, pipeline_args, _, pipeline_yaml = _build_pipeline_yaml_from_argv( + argv) + pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) + options = _build_pipeline_options(pipeline_spec, pipeline_args) + + if known_args.create_test and not known_args.fix_tests: + result = unittest.TestResult() + tests = [] + else: + if known_args.test_suite: + with open(known_args.test_suite) as fin: + test_suite = yaml.load(fin, Loader=yaml_transform.SafeLineLoader) or {} + if 'tests' not in test_suite or not isinstance(test_suite['tests'], list): + raise TypeError('tests attribute must be a list of test specifications') Review Comment: Updated and simplified. ########## sdks/python/apache_beam/yaml/main.py: ########## @@ -130,12 +155,112 @@ def run(argv=None): print('Running pipeline...') -def build_pipeline_components_from_argv(argv): +def run_tests(argv=None, exit=True): + known_args, pipeline_args, _, pipeline_yaml = _build_pipeline_yaml_from_argv( + argv) + pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) + options = _build_pipeline_options(pipeline_spec, pipeline_args) + + if known_args.create_test and not known_args.fix_tests: + result = unittest.TestResult() + tests = [] + else: + if known_args.test_suite: + with open(known_args.test_suite) as fin: + test_suite = yaml.load(fin, Loader=yaml_transform.SafeLineLoader) or {} + if 'tests' not in test_suite or not isinstance(test_suite['tests'], list): + raise TypeError('tests attribute must be a list of test specifications') + test_specs = test_suite['tests'] + else: + test_specs = pipeline_spec.get('tests', []) + if not isinstance(test_specs, list): + raise TypeError('tests attribute must be a list of test specifications') + if not test_specs: + raise RuntimeError('No tests found.') + + with _fix_xlang_instant_coding(): + tests = [ + _YamlTestCase( + pipeline_spec, test_spec, options, known_args.fix_tests) + for test_spec in test_specs + ] + suite = unittest.TestSuite(tests) + result = unittest.TextTestRunner().run(suite) + + if known_args.fix_tests or known_args.create_test: + update_tests(known_args, pipeline_spec, options, tests) + + if exit: + # emulates unittest.main() + sys.exit(0 if result.wasSuccessful() else 1) + else: + if not result.wasSuccessful(): + raise RuntimeError(result) + + +def update_tests(known_args, pipeline_spec, options, tests): + if known_args.test_suite: + path = known_args.test_suite + if not os.path.exists(path) and known_args.create_test: + with open(path, 'w') as fout: + fout.write('tests: []') + elif known_args.yaml_pipeline_file: + path = known_args.yaml_pipeline_file + else: + raise RuntimeError( + 'Test fixing only supported for file-backed tests. ' + 'Please use the --test_suite flag.') + with open(path) as fin: + original_yaml = fin.read() + if path == known_args.yaml_pipeline_file and pipeline_yaml.strip( + ) != original_yaml.strip(): + raise RuntimeError( + 'In-file test fixing not yet supported for templated pipelines. ' + 'Please use the --test_suite flag.') + updated_spec = yaml.load(original_yaml, Loader=yaml.SafeLoader) or {} + + if known_args.fix_tests: + for ix, test in enumerate(tests): + if test.fixes: Review Comment: create_test uses this path as well ########## sdks/python/apache_beam/yaml/main.py: ########## @@ -90,6 +95,26 @@ def _parse_arguments(argv): type=json.loads, help='A json dict of variables used when invoking the jinja preprocessor ' 'on the provided yaml pipeline.') + parser.add_argument( + '--test', + action=argparse.BooleanOptionalAction, + help='Run the tests associated with the given pipeline, rather than the ' + 'pipeline itself.') + parser.add_argument( + '--fix_tests', + action=argparse.BooleanOptionalAction, + help='Update failing test expectations to match the actual ouput.') Review Comment: Done. ########## sdks/python/apache_beam/yaml/main.py: ########## @@ -130,12 +155,112 @@ def run(argv=None): print('Running pipeline...') -def build_pipeline_components_from_argv(argv): +def run_tests(argv=None, exit=True): + known_args, pipeline_args, _, pipeline_yaml = _build_pipeline_yaml_from_argv( + argv) + pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) + options = _build_pipeline_options(pipeline_spec, pipeline_args) + + if known_args.create_test and not known_args.fix_tests: + result = unittest.TestResult() + tests = [] + else: + if known_args.test_suite: + with open(known_args.test_suite) as fin: + test_suite = yaml.load(fin, Loader=yaml_transform.SafeLineLoader) or {} + if 'tests' not in test_suite or not isinstance(test_suite['tests'], list): + raise TypeError('tests attribute must be a list of test specifications') + test_specs = test_suite['tests'] + else: + test_specs = pipeline_spec.get('tests', []) + if not isinstance(test_specs, list): + raise TypeError('tests attribute must be a list of test specifications') + if not test_specs: + raise RuntimeError('No tests found.') + + with _fix_xlang_instant_coding(): + tests = [ + _YamlTestCase( + pipeline_spec, test_spec, options, known_args.fix_tests) + for test_spec in test_specs + ] + suite = unittest.TestSuite(tests) + result = unittest.TextTestRunner().run(suite) + + if known_args.fix_tests or known_args.create_test: + update_tests(known_args, pipeline_spec, options, tests) + + if exit: + # emulates unittest.main() + sys.exit(0 if result.wasSuccessful() else 1) + else: + if not result.wasSuccessful(): + raise RuntimeError(result) + + +def update_tests(known_args, pipeline_spec, options, tests): + if known_args.test_suite: + path = known_args.test_suite + if not os.path.exists(path) and known_args.create_test: + with open(path, 'w') as fout: + fout.write('tests: []') + elif known_args.yaml_pipeline_file: + path = known_args.yaml_pipeline_file + else: + raise RuntimeError( + 'Test fixing only supported for file-backed tests. ' + 'Please use the --test_suite flag.') + with open(path) as fin: + original_yaml = fin.read() + if path == known_args.yaml_pipeline_file and pipeline_yaml.strip( + ) != original_yaml.strip(): + raise RuntimeError( + 'In-file test fixing not yet supported for templated pipelines. ' + 'Please use the --test_suite flag.') + updated_spec = yaml.load(original_yaml, Loader=yaml.SafeLoader) or {} + + if known_args.fix_tests: + for ix, test in enumerate(tests): + if test.fixes: Review Comment: Good call. I've cleaned this up (and moved it all into yaml_testing). ########## sdks/python/apache_beam/yaml/main.py: ########## @@ -90,6 +95,26 @@ def _parse_arguments(argv): type=json.loads, help='A json dict of variables used when invoking the jinja preprocessor ' 'on the provided yaml pipeline.') + parser.add_argument( + '--test', + action=argparse.BooleanOptionalAction, + help='Run the tests associated with the given pipeline, rather than the ' + 'pipeline itself.') + parser.add_argument( + '--fix_tests', + action=argparse.BooleanOptionalAction, + help='Update failing test expectations to match the actual ouput.') Review Comment: Same. ########## website/www/site/content/en/documentation/sdks/yaml-testing.md: ########## @@ -0,0 +1,292 @@ +--- +type: languages +title: "Apache Beam YAML Testing" +--- +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Beam YAML Tests + +A robust testing story is an important part of any production setup. +Though the various built-in (and externally provided) transform in a Beam YAML +pipeline can be expected to be well tested, it can be important to have tests +that ensure the pipeline as a whole behaves as expected. This is particularly +true for transforms that contain non-trivial UDF logic. + +# Whole pipeline tests + +For example, consider the example word count pipeline. + +``` +pipeline: + transforms: + - type: ReadFromText + name: Read from GCS + config: + path: gs://dataflow-samples/shakespeare/kinglear.txt + - type: MapToFields + name: Split words + config: + language: python + fields: + word: + callable: | + import re + def all_words(row): + return re.findall(r'[a-z]+', row.line.lower()) + value: 1 + input: Read from GCS + - type: Explode + name: Explode word arrays + config: + fields: [word] + input: Split words + - type: Combine + name: Count words + config: + group_by: [word] + combine: + value: sum + input: Explode word arrays + - type: MapToFields + name: Format output + config: + language: python + fields: + output: "word + ': ' + str(value)" + input: Count words + - type: WriteToText + name: Write to GCS + config: + path: gs://bucket/counts.txt + input: Format output + +tests: [] +``` + +To write tests for this pipeline, one creates a `tests` section that enumerates +a number of tests, each of which provide example input and assert the expected +output is produced. An example test might be as follows + +``` +tests: +- name: MyRegressionTest + mock_outputs: + - name: Read from GCS + elements: + - line: "Nothing can come of nothing" + expected_inputs: + - name: Write to GCS + elements: + - output: 'nothing: 2' + - output: 'can: 1' + - output: 'come: 1' + - output: 'of: 1' +``` + +The `mock_outputs` section designates that the transform named `Read from GCS` +should produce the single row `{line: "Nothing can come of nothing"}` for the +purposes of this test, and the `expected_inputs` section indicates that the +transform `Write to GCS` should expect to receive exactly the given elements. +Neither the actual Read transform nor Write transform from the original +pipelines are executed when running the test, but all intermediate transforms +are. + +This test can then be executed by running + +``` +python -m apache_beam.yaml.main \ + --yaml_pipeline_file=wordcount.yaml \ + --tests +``` + +Alternatively, the a `tests:` block may be placed in a separate file and be +validated by running + +``` +python -m apache_beam.yaml.main \ + --yaml_pipeline_file=wordcount.yaml \ + --tests \ + --test_suite=test_file.yaml +``` + +Neither the actual Read transform nor Write transform from the original +pipelines are executed when running the test, but all intermediate transforms +are. For hermeticity, we require that all inputs (with the exception of +`Create` that are needed to compute the expected outputs are explicitly mocked; +to explicitly allow a sources to be executed as part of a test their names or +types can be enumerated in an `allowed_sources` attribute of the test +specification. + + +## Pipeline fragment tests + +One can also tests a portion of a pipeline using the `mock_inputs` and +`expected_outputs` section of a test, for example + +``` +tests: +- name: TestSplittingWithPunctuation + mock_inputs: + - name: Split words + elements: + - line: "lots-of-words" + - line: "...and more" + expected_outputs: + - name: Explode + elements: + - word: lots + value: 1 + - word: of + value: 1 + - word: words + value: 1 + - word: and + value: 1 + - word: more + value: 1 + +- name: TestCombineAndFormat + mock_inputs: + - name: Count words + elements: + - word: more + value: 1 + - word: and + value: 1 + - word: more + value: 1 + expected_outputs: + - name: Format output + elements: + - output: "more: 2" + - output: "and: 1" +``` + +As before, each test only executes the portion of the pipeline between the +mock inputs and expected outputs. Note that the named transform in a +`mock_inputs` specification *is* executed, while the named transform of a +`mock_oupputs` specification is not. +Similarly, the named transform of a `expected_inputs` specification is *not* +executed, while the named transform of an `expected_outputs` necessarily is. + + +## Automatically generating tests. + +In an effort to make tests as easy to write and maintain as possible, +Beam YAML provides utilities to compute the expected outputs for your tests. +Running + +``` +python -m apache_beam.yaml.main \ + --yaml_pipeline_file=wordcount.yaml \ + --tests \ + [--test_suite=...] \ + --fix_tests +``` + +will update any existing `expected_input` and `expected_output` blocks of your +pipeline to contain the actual values computed during the test. +(Of course, it is on any user of this flag to verify that the produced values +are as expected.) +This can be useful in authoring tests as well--one can simply specify a +nonsensical or empty elements block in the expectation and the `--fix_tests` +flag will populate it for you. + +Beam YAML also has a `--create_test` flag which can be used to create an Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org