This is an automated email from the ASF dual-hosted git repository.
riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b918dd4c169 [Python] Cleanup Python tests in pipeline options module
(#28158)
b918dd4c169 is described below
commit b918dd4c1691ad76b07a5d2de48e67438b48d0e1
Author: Tim Grein <[email protected]>
AuthorDate: Fri Aug 25 18:30:35 2023 +0200
[Python] Cleanup Python tests in pipeline options module (#28158)
* Use @parameterized decorator instead of a manual test case for loop in
pipeline_options_test.py.
* Use @parameterized decorator instead of a manual test case for loop in
pipeline_options_validator_test.py.
---
.../apache_beam/options/pipeline_options_test.py | 383 ++++++-------
.../options/pipeline_options_validator_test.py | 624 +++++++--------------
2 files changed, 379 insertions(+), 628 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 5355e4a7d3b..2d85d055eaf 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -24,6 +24,7 @@ import logging
import unittest
import hamcrest as hc
+from parameterized import parameterized
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
@@ -72,150 +73,104 @@ class PipelineOptionsTest(unittest.TestCase):
RuntimeValueProvider.set_runtime_options(None)
TEST_CASES = [
- {
- 'flags': ['--num_workers', '5'],
- 'expected': {
- 'num_workers': 5,
- 'mock_flag': False,
- 'mock_option': None,
- 'mock_multi_option': None
- },
- 'display_data': [DisplayDataItemMatcher('num_workers', 5)]
- },
- {
- 'flags': ['--direct_num_workers', '5'],
- 'expected': {
- 'direct_num_workers': 5,
- 'mock_flag': False,
- 'mock_option': None,
- 'mock_multi_option': None
- },
- 'display_data': [DisplayDataItemMatcher('direct_num_workers', 5)]
- },
- {
- 'flags': ['--direct_running_mode', 'multi_threading'],
- 'expected': {
- 'direct_running_mode': 'multi_threading',
- 'mock_flag': False,
- 'mock_option': None,
- 'mock_multi_option': None
- },
- 'display_data': [
- DisplayDataItemMatcher('direct_running_mode', 'multi_threading')
- ]
- },
- {
- 'flags': ['--direct_running_mode', 'multi_processing'],
- 'expected': {
- 'direct_running_mode': 'multi_processing',
- 'mock_flag': False,
- 'mock_option': None,
- 'mock_multi_option': None
- },
- 'display_data': [
- DisplayDataItemMatcher('direct_running_mode', 'multi_processing')
- ]
- },
- {
- 'flags': [
- '--profile_cpu', '--profile_location', 'gs://bucket/', 'ignored'
- ],
- 'expected': {
- 'profile_cpu': True,
- 'profile_location': 'gs://bucket/',
- 'mock_flag': False,
- 'mock_option': None,
- 'mock_multi_option': None
- },
- 'display_data': [
- DisplayDataItemMatcher('profile_cpu', True),
- DisplayDataItemMatcher('profile_location', 'gs://bucket/')
- ]
- },
- {
- 'flags': ['--num_workers', '5', '--mock_flag'],
- 'expected': {
- 'num_workers': 5,
- 'mock_flag': True,
- 'mock_option': None,
- 'mock_multi_option': None
- },
- 'display_data': [
- DisplayDataItemMatcher('num_workers', 5),
- DisplayDataItemMatcher('mock_flag', True)
- ]
- },
- {
- 'flags': ['--mock_option', 'abc'],
- 'expected': {
- 'mock_flag': False,
- 'mock_option': 'abc',
- 'mock_multi_option': None
- },
- 'display_data': [DisplayDataItemMatcher('mock_option', 'abc')]
- },
- {
- 'flags': ['--mock_option', ' abc def '],
- 'expected': {
- 'mock_flag': False,
- 'mock_option': ' abc def ',
- 'mock_multi_option': None
- },
- 'display_data': [DisplayDataItemMatcher('mock_option', ' abc def ')]
- },
- {
- 'flags': ['--mock_option= abc xyz '],
- 'expected': {
- 'mock_flag': False,
- 'mock_option': ' abc xyz ',
- 'mock_multi_option': None
- },
- 'display_data': [DisplayDataItemMatcher('mock_option', ' abc xyz ')]
- },
- {
- 'flags': [
- '--mock_option=gs://my bucket/my folder/my file',
- '--mock_multi_option=op1',
- '--mock_multi_option=op2'
- ],
- 'expected': {
- 'mock_flag': False,
- 'mock_option': 'gs://my bucket/my folder/my file',
- 'mock_multi_option': ['op1', 'op2']
- },
- 'display_data': [
- DisplayDataItemMatcher(
- 'mock_option', 'gs://my bucket/my folder/my file'),
- DisplayDataItemMatcher('mock_multi_option', ['op1', 'op2'])
- ]
- },
- {
- 'flags': ['--mock_multi_option=op1', '--mock_multi_option=op2'],
- 'expected': {
- 'mock_flag': False,
- 'mock_option': None,
- 'mock_multi_option': ['op1', 'op2']
- },
- 'display_data': [
- DisplayDataItemMatcher('mock_multi_option', ['op1', 'op2'])
- ]
- },
- {
- 'flags': ['--mock_json_option={"11a": 0, "37a": 1}'],
- 'expected': {
- 'mock_flag': False,
- 'mock_option': None,
- 'mock_multi_option': None,
- 'mock_json_option': {
- '11a': 0, '37a': 1
- },
- },
- 'display_data': [
- DisplayDataItemMatcher('mock_json_option', {
- '11a': 0, '37a': 1
- })
- ]
- },
+ (['--num_workers', '5'],
+ {
+ 'num_workers': 5,
+ 'mock_flag': False,
+ 'mock_option': None,
+ 'mock_multi_option': None
+ }, [DisplayDataItemMatcher('num_workers', 5)]),
+ (['--direct_num_workers', '5'],
+ {
+ 'direct_num_workers': 5,
+ 'mock_flag': False,
+ 'mock_option': None,
+ 'mock_multi_option': None
+ }, [DisplayDataItemMatcher('direct_num_workers', 5)]),
+ (['--direct_running_mode', 'multi_threading'],
+ {
+ 'direct_running_mode': 'multi_threading',
+ 'mock_flag': False,
+ 'mock_option': None,
+ 'mock_multi_option': None
+ }, [DisplayDataItemMatcher('direct_running_mode', 'multi_threading')]),
+ (['--direct_running_mode', 'multi_processing'],
+ {
+ 'direct_running_mode': 'multi_processing',
+ 'mock_flag': False,
+ 'mock_option': None,
+ 'mock_multi_option': None
+ }, [DisplayDataItemMatcher('direct_running_mode', 'multi_processing')]),
+ (['--profile_cpu', '--profile_location', 'gs://bucket/', 'ignored'],
+ {
+ 'profile_cpu': True,
+ 'profile_location': 'gs://bucket/',
+ 'mock_flag': False,
+ 'mock_option': None,
+ 'mock_multi_option': None
+ },
+ [
+ DisplayDataItemMatcher('profile_cpu', True),
+ DisplayDataItemMatcher('profile_location', 'gs://bucket/')
+ ]),
+ (['--num_workers', '5', '--mock_flag'],
+ {
+ 'num_workers': 5,
+ 'mock_flag': True,
+ 'mock_option': None,
+ 'mock_multi_option': None
+ },
+ [
+ DisplayDataItemMatcher('num_workers', 5),
+ DisplayDataItemMatcher('mock_flag', True)
+ ]),
+ (['--mock_option', 'abc'], {
+ 'mock_flag': False, 'mock_option': 'abc', 'mock_multi_option': None
+ }, [DisplayDataItemMatcher('mock_option', 'abc')]),
+ (['--mock_option', ' abc def '],
+ {
+ 'mock_flag': False,
+ 'mock_option': ' abc def ',
+ 'mock_multi_option': None
+ }, [DisplayDataItemMatcher('mock_option', ' abc def ')]),
+ (['--mock_option= abc xyz '],
+ {
+ 'mock_flag': False,
+ 'mock_option': ' abc xyz ',
+ 'mock_multi_option': None
+ }, [DisplayDataItemMatcher('mock_option', ' abc xyz ')]),
+ ([
+ '--mock_option=gs://my bucket/my folder/my file',
+ '--mock_multi_option=op1',
+ '--mock_multi_option=op2'
+ ],
+ {
+ 'mock_flag': False,
+ 'mock_option': 'gs://my bucket/my folder/my file',
+ 'mock_multi_option': ['op1', 'op2']
+ },
+ [
+ DisplayDataItemMatcher(
+ 'mock_option', 'gs://my bucket/my folder/my file'),
+ DisplayDataItemMatcher('mock_multi_option', ['op1', 'op2'])
+ ]),
+ (['--mock_multi_option=op1', '--mock_multi_option=op2'],
+ {
+ 'mock_flag': False,
+ 'mock_option': None,
+ 'mock_multi_option': ['op1', 'op2']
+ }, [DisplayDataItemMatcher('mock_multi_option', ['op1', 'op2'])]),
+ (['--mock_json_option={"11a": 0, "37a": 1}'],
+ {
+ 'mock_flag': False,
+ 'mock_option': None,
+ 'mock_multi_option': None,
+ 'mock_json_option': {
+ '11a': 0, '37a': 1
+ },
+ }, [DisplayDataItemMatcher('mock_json_option', {
+ '11a': 0, '37a': 1
+ })]),
]
# Used for testing newly added flags.
@@ -238,59 +193,59 @@ class PipelineOptionsTest(unittest.TestCase):
parser.add_argument(
'--fake_multi_option', action='append', help='fake multi option')
- def test_display_data(self):
- for case in PipelineOptionsTest.TEST_CASES:
- options = PipelineOptions(flags=case['flags'])
- dd = DisplayData.create_from(options)
- hc.assert_that(dd.items, hc.contains_inanyorder(*case['display_data']))
-
- def test_get_all_options_subclass(self):
- for case in PipelineOptionsTest.TEST_CASES:
- options = PipelineOptionsTest.MockOptions(flags=case['flags'])
- self.assertDictContainsSubset(case['expected'],
options.get_all_options())
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_flag,
- case['expected']['mock_flag'])
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_option,
- case['expected']['mock_option'])
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
- case['expected']['mock_multi_option'])
-
- def test_get_all_options(self):
- for case in PipelineOptionsTest.TEST_CASES:
- options = PipelineOptions(flags=case['flags'])
- self.assertDictContainsSubset(case['expected'],
options.get_all_options())
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_flag,
- case['expected']['mock_flag'])
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_option,
- case['expected']['mock_option'])
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
- case['expected']['mock_multi_option'])
-
- def test_sublcalsses_of_pipeline_options_can_be_instantiated(self):
- for case in PipelineOptionsTest.TEST_CASES:
- mock_options = PipelineOptionsTest.MockOptions(flags=case['flags'])
- self.assertEqual(mock_options.mock_flag, case['expected']['mock_flag'])
- self.assertEqual(
- mock_options.mock_option, case['expected']['mock_option'])
- self.assertEqual(
- mock_options.mock_multi_option,
case['expected']['mock_multi_option'])
-
- def test_views_can_be_constructed_from_pipeline_option_subclasses(self):
- for case in PipelineOptionsTest.TEST_CASES:
- fake_options = PipelineOptionsTest.FakeOptions(flags=case['flags'])
- mock_options = fake_options.view_as(PipelineOptionsTest.MockOptions)
-
- self.assertEqual(mock_options.mock_flag, case['expected']['mock_flag'])
- self.assertEqual(
- mock_options.mock_option, case['expected']['mock_option'])
- self.assertEqual(
- mock_options.mock_multi_option,
case['expected']['mock_multi_option'])
+ @parameterized.expand(TEST_CASES)
+ def test_display_data(self, flags, _, display_data):
+ options = PipelineOptions(flags=flags)
+ dd = DisplayData.create_from(options)
+ hc.assert_that(dd.items, hc.contains_inanyorder(*display_data))
+
+ @parameterized.expand(TEST_CASES)
+ def test_get_all_options_subclass(self, flags, expected, _):
+ options = PipelineOptionsTest.MockOptions(flags=flags)
+ self.assertDictContainsSubset(expected, options.get_all_options())
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_flag,
+ expected['mock_flag'])
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_option,
+ expected['mock_option'])
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
+ expected['mock_multi_option'])
+
+ @parameterized.expand(TEST_CASES)
+ def test_get_all_options(self, flags, expected, _):
+ options = PipelineOptions(flags=flags)
+ self.assertDictContainsSubset(expected, options.get_all_options())
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_flag,
+ expected['mock_flag'])
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_option,
+ expected['mock_option'])
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
+ expected['mock_multi_option'])
+
+ @parameterized.expand(TEST_CASES)
+ def test_subclasses_of_pipeline_options_can_be_instantiated(
+ self, flags, expected, _):
+ mock_options = PipelineOptionsTest.MockOptions(flags=flags)
+ self.assertEqual(mock_options.mock_flag, expected['mock_flag'])
+ self.assertEqual(mock_options.mock_option, expected['mock_option'])
+ self.assertEqual(
+ mock_options.mock_multi_option, expected['mock_multi_option'])
+
+ @parameterized.expand(TEST_CASES)
+ def test_views_can_be_constructed_from_pipeline_option_subclasses(
+ self, flags, expected, _):
+ fake_options = PipelineOptionsTest.FakeOptions(flags=flags)
+ mock_options = fake_options.view_as(PipelineOptionsTest.MockOptions)
+
+ self.assertEqual(mock_options.mock_flag, expected['mock_flag'])
+ self.assertEqual(mock_options.mock_option, expected['mock_option'])
+ self.assertEqual(
+ mock_options.mock_multi_option, expected['mock_multi_option'])
def test_views_do_not_expose_options_defined_by_other_views(self):
flags = ['--mock_option=mock_value', '--fake_option=fake_value']
@@ -312,23 +267,23 @@ class PipelineOptionsTest(unittest.TestCase):
PipelineOptionsTest.FakeOptions).view_as(
PipelineOptionsTest.MockOptions).fake_option)
- def test_from_dictionary(self):
- for case in PipelineOptionsTest.TEST_CASES:
- options = PipelineOptions(flags=case['flags'])
- all_options_dict = options.get_all_options()
- options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
- self.assertEqual(
- options_from_dict.view_as(PipelineOptionsTest.MockOptions).mock_flag,
- case['expected']['mock_flag'])
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_option,
- case['expected']['mock_option'])
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
- case['expected']['mock_multi_option'])
- self.assertEqual(
- options.view_as(PipelineOptionsTest.MockOptions).mock_json_option,
- case['expected'].get('mock_json_option', {}))
+ @parameterized.expand(TEST_CASES)
+ def test_from_dictionary(self, flags, expected, _):
+ options = PipelineOptions(flags=flags)
+ all_options_dict = options.get_all_options()
+ options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
+ self.assertEqual(
+ options_from_dict.view_as(PipelineOptionsTest.MockOptions).mock_flag,
+ expected['mock_flag'])
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_option,
+ expected['mock_option'])
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
+ expected['mock_multi_option'])
+ self.assertEqual(
+ options.view_as(PipelineOptionsTest.MockOptions).mock_json_option,
+ expected.get('mock_json_option', {}))
def test_none_from_dictionary(self):
class NoneDefaultOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index 1f4c8be226d..015e3ef787f 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -26,6 +26,7 @@ from hamcrest import assert_that
from hamcrest import contains_string
from hamcrest import only_contains
from hamcrest.core.base_matcher import BaseMatcher
+from parameterized import parameterized
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import DebugOptions
@@ -95,170 +96,85 @@ class SetupTest(unittest.TestCase):
errors, ['project', 'staging_location', 'temp_location',
'region']),
[])
- def test_gcs_path(self):
- def get_validator(temp_location, staging_location):
+ @parameterized.expand([
+ (None, 'gs://foo/bar',
+ []), (None, None, ['staging_location', 'temp_location']),
+ ('gs://foo/bar', None, []), ('gs://foo/bar', 'gs://ABC/bar',
+ []), ('gcs:/foo/bar', 'gs://foo/bar', []),
+ ('gs:/foo/bar', 'gs://foo/bar', []), ('gs://ABC/bar', 'gs://foo/bar',
[]),
+ ('gs://ABC/bar', 'gs://BCD/bar', ['temp_location', 'staging_location'
+ ]), ('gs://foo', 'gs://foo/bar', []),
+ ('gs://foo/', 'gs://foo/bar', []), ('gs://foo/bar', 'gs://foo/bar', [])
+ ])
+ def test_gcs_path(self, temp_location, staging_location,
expected_error_args):
+ def get_validator(_temp_location, _staging_location):
options = ['--project=example:example', '--job_name=job']
- if temp_location is not None:
- options.append('--temp_location=' + temp_location)
+ if _temp_location is not None:
+ options.append('--temp_location=' + _temp_location)
- if staging_location is not None:
- options.append('--staging_location=' + staging_location)
+ if _staging_location is not None:
+ options.append('--staging_location=' + _staging_location)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
- test_cases = [{
- 'temp_location': None, 'staging_location': 'gs://foo/bar', 'errors': []
- },
- {
- 'temp_location': None,
- 'staging_location': None,
- 'errors': ['staging_location', 'temp_location']
- },
- {
- 'temp_location': 'gs://foo/bar',
- 'staging_location': None,
- 'errors': []
- },
- {
- 'temp_location': 'gs://foo/bar',
- 'staging_location': 'gs://ABC/bar',
- 'errors': []
- },
- {
- 'temp_location': 'gcs:/foo/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- },
- {
- 'temp_location': 'gs:/foo/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- },
- {
- 'temp_location': 'gs://ABC/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- },
- {
- 'temp_location': 'gs://ABC/bar',
- 'staging_location': 'gs://BCD/bar',
- 'errors': ['temp_location', 'staging_location']
- },
- {
- 'temp_location': 'gs://foo',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- },
- {
- 'temp_location': 'gs://foo/',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- },
- {
- 'temp_location': 'gs://foo/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- }]
-
- for case in test_cases:
- errors = get_validator(case['temp_location'],
- case['staging_location']).validate()
- self.assertEqual(
- self.check_errors_for_arguments(errors, case['errors']), [], case)
-
- def test_project(self):
- def get_validator(project):
+ errors = get_validator(temp_location, staging_location).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, expected_error_args), [])
+
+ @parameterized.expand([(None, ['project']), ('12345', ['project']),
+ ('FOO', ['project']), ('foo:BAR', ['project']),
+ ('fo', ['project']), ('foo', []), ('foo:bar', [])])
+ def test_project(self, project, expected_error_args):
+ def get_validator(_project):
options = [
'--job_name=job',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
- if project is not None:
- options.append('--project=' + project)
+ if _project is not None:
+ options.append('--project=' + _project)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
- test_cases = [
- {
- 'project': None, 'errors': ['project']
- },
- {
- 'project': '12345', 'errors': ['project']
- },
- {
- 'project': 'FOO', 'errors': ['project']
- },
- {
- 'project': 'foo:BAR', 'errors': ['project']
- },
- {
- 'project': 'fo', 'errors': ['project']
- },
- {
- 'project': 'foo', 'errors': []
- },
- {
- 'project': 'foo:bar', 'errors': []
- },
- ]
-
- for case in test_cases:
- errors = get_validator(case['project']).validate()
- self.assertEqual(
- self.check_errors_for_arguments(errors, case['errors']), [])
+ errors = get_validator(project).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, expected_error_args), [])
- def test_job_name(self):
- def get_validator(job_name):
+ @parameterized.expand([(None, []), ('12345', ['job_name']),
+ ('FOO', ['job_name']), ('foo:bar', ['job_name']),
+ ('fo', []), ('foo', [])])
+ def test_job_name(self, job_name, expected_error_args):
+ def get_validator(_job_name):
options = [
'--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
- if job_name is not None:
- options.append('--job_name=' + job_name)
+ if _job_name is not None:
+ options.append('--job_name=' + _job_name)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
- test_cases = [
- {
- 'job_name': None, 'errors': []
- },
- {
- 'job_name': '12345', 'errors': ['job_name']
- },
- {
- 'job_name': 'FOO', 'errors': ['job_name']
- },
- {
- 'job_name': 'foo:bar', 'errors': ['job_name']
- },
- {
- 'job_name': 'fo', 'errors': []
- },
- {
- 'job_name': 'foo', 'errors': []
- },
- ]
-
- for case in test_cases:
- errors = get_validator(case['job_name']).validate()
- self.assertEqual(
- self.check_errors_for_arguments(errors, case['errors']), [])
+ errors = get_validator(job_name).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, expected_error_args), [])
- def test_num_workers(self):
- def get_validator(num_workers):
+ @parameterized.expand([(None, []), ('1', []), ('0', ['num_workers']),
+ ('-1', ['num_workers'])])
+ def test_num_workers(self, num_workers, expected_error_args):
+ def get_validator(_num_workers):
options = [
'--project=example:example',
'--job_name=job',
@@ -266,87 +182,66 @@ class SetupTest(unittest.TestCase):
'--temp_location=gs://foo/bar'
]
- if num_workers is not None:
- options.append('--num_workers=' + num_workers)
+ if _num_workers is not None:
+ options.append('--num_workers=' + _num_workers)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
- test_cases = [
- {
- 'num_workers': None, 'errors': []
- },
- {
- 'num_workers': '1', 'errors': []
- },
- {
- 'num_workers': '0', 'errors': ['num_workers']
- },
- {
- 'num_workers': '-1', 'errors': ['num_workers']
- },
- ]
-
- for case in test_cases:
- errors = get_validator(case['num_workers']).validate()
- self.assertEqual(
- self.check_errors_for_arguments(errors, case['errors']), [])
-
- def test_is_service_runner(self):
- test_cases = [
- {
- 'runner': MockRunners.OtherRunner(),
- 'options': [],
- 'expected': False,
- },
- {
- 'runner': MockRunners.OtherRunner(),
- 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
- 'expected': False,
- },
- {
- 'runner': MockRunners.OtherRunner(),
- 'options':
['--dataflow_endpoint=https://dataflow.googleapis.com/'],
- 'expected': False,
- },
- {
- 'runner': MockRunners.DataflowRunner(),
- 'options': [],
- 'expected': True,
- },
- {
- 'runner': MockRunners.DataflowRunner(),
- 'options': ['--dataflow_endpoint=https://another.service.com'],
- 'expected': True,
- },
- {
- 'runner': MockRunners.DataflowRunner(),
- 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
- 'expected': True,
- },
- {
- 'runner': MockRunners.DataflowRunner(),
- 'options': ['--dataflow_endpoint=http://localhost:1000'],
- 'expected': False,
- },
- {
- 'runner': MockRunners.DataflowRunner(),
- 'options': ['--dataflow_endpoint=foo: //dataflow. googleapis.
com'],
- 'expected': True,
- },
- {
- 'runner': MockRunners.DataflowRunner(),
- 'options': [],
- 'expected': True,
- },
- ]
-
- for case in test_cases:
- validator = PipelineOptionsValidator(
- PipelineOptions(case['options']), case['runner'])
- self.assertEqual(validator.is_service_runner(), case['expected'], case)
+ errors = get_validator(num_workers).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, expected_error_args), [])
+
+ @parameterized.expand([
+ (
+ MockRunners.OtherRunner(),
+ [],
+ False,
+ ),
+ (
+ MockRunners.OtherRunner(),
+ ['--dataflow_endpoint=https://dataflow.googleapis.com'],
+ False,
+ ),
+ (
+ MockRunners.OtherRunner(),
+ ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
+ False,
+ ), (
+ MockRunners.DataflowRunner(),
+ [],
+ True,
+ ),
+ (
+ MockRunners.DataflowRunner(),
+ ['--dataflow_endpoint=https://another.service.com'],
+ True,
+ ),
+ (
+ MockRunners.DataflowRunner(),
+ ['--dataflow_endpoint=https://dataflow.googleapis.com'],
+ True,
+ ),
+ (
+ MockRunners.DataflowRunner(),
+ ['--dataflow_endpoint=http://localhost:1000'],
+ False,
+ ),
+ (
+ MockRunners.DataflowRunner(),
+ ['--dataflow_endpoint=foo: //dataflow. googleapis. com'],
+ True,
+ ), (
+ MockRunners.DataflowRunner(),
+ [],
+ True,
+ )
+ ])
+ def test_is_service_runner(self, runner, options, expected):
+ validator = PipelineOptionsValidator(PipelineOptions(options), runner)
+ self.assertEqual(validator.is_service_runner(), expected)
def test_dataflow_job_file_and_template_location_mutually_exclusive(self):
runner = MockRunners.OtherRunner()
@@ -652,7 +547,10 @@ class SetupTest(unittest.TestCase):
errors = validator.validate()
self.assertEqual(len(errors), 0)
- def test_test_matcher(self):
+ @parameterized.expand([(None, []), (pickler.dumps(AlwaysPassMatcher()), []),
+ (b'abc', ['on_success_matcher']),
+ (pickler.dumps(object), ['on_success_matcher'])])
+ def test_test_matcher(self, on_success_matcher, errors):
def get_validator(matcher):
options = [
'--project=example:example',
@@ -667,27 +565,8 @@ class SetupTest(unittest.TestCase):
runner = MockRunners.TestDataflowRunner()
return PipelineOptionsValidator(pipeline_options, runner)
- test_case = [
- {
- 'on_success_matcher': None, 'errors': []
- },
- {
- 'on_success_matcher': pickler.dumps(AlwaysPassMatcher()),
- 'errors': []
- },
- {
- 'on_success_matcher': b'abc', 'errors': ['on_success_matcher']
- },
- {
- 'on_success_matcher': pickler.dumps(object),
- 'errors': ['on_success_matcher']
- },
- ]
-
- for case in test_case:
- errors = get_validator(case['on_success_matcher']).validate()
- self.assertEqual(
- self.check_errors_for_arguments(errors, case['errors']), [])
+ errors = get_validator(on_success_matcher).validate()
+ self.assertEqual(self.check_errors_for_arguments(errors, errors), [])
def test_transform_name_mapping_without_update(self):
options = [
@@ -747,195 +626,112 @@ class SetupTest(unittest.TestCase):
errors = validator.validate()
self.assertTrue(errors)
- def test_environment_options(self):
- test_cases = [
- {
- 'options': ['--environment_type=dOcKeR'], 'errors': []
- },
- {
- 'options': [
- '--environment_type=dOcKeR',
- '--environment_options=docker_container_image=foo'
- ],
- 'errors': []
- },
- {
- 'options': [
- '--environment_type=dOcKeR', '--environment_config=foo'
- ],
- 'errors': []
- },
- {
- 'options': [
- '--environment_type=dOcKeR',
- '--environment_options=docker_container_image=foo',
- '--environment_config=foo'
- ],
- 'errors': ['environment_config']
- },
- {
- 'options': [
- '--environment_type=dOcKeR',
- '--environment_options=process_command=foo',
- '--environment_options=process_variables=foo=bar',
- '--environment_options=external_service_address=foo'
- ],
- 'errors': [
- 'process_command',
- 'process_variables',
- 'external_service_address'
- ]
- },
- {
- 'options': ['--environment_type=pRoCeSs'],
- 'errors': ['process_command']
- },
- {
- 'options': [
- '--environment_type=pRoCeSs',
- '--environment_options=process_command=foo'
- ],
- 'errors': []
- },
- {
- 'options': [
- '--environment_type=pRoCeSs', '--environment_config=foo'
- ],
- 'errors': []
- },
- {
- 'options': [
- '--environment_type=pRoCeSs',
- '--environment_options=process_command=foo',
- '--environment_config=foo'
- ],
- 'errors': ['environment_config']
- },
- {
- 'options': [
- '--environment_type=pRoCeSs',
- '--environment_options=process_command=foo',
- '--environment_options=process_variables=foo=bar',
- '--environment_options=docker_container_image=foo',
- '--environment_options=external_service_address=foo'
- ],
- 'errors': ['docker_container_image', 'external_service_address']
- },
- {
- 'options': ['--environment_type=eXtErNaL'],
- 'errors': ['external_service_address']
- },
- {
- 'options': [
- '--environment_type=eXtErNaL',
- '--environment_options=external_service_address=foo'
- ],
- 'errors': []
- },
- {
- 'options': [
- '--environment_type=eXtErNaL', '--environment_config=foo'
- ],
- 'errors': []
- },
- {
- 'options': [
- '--environment_type=eXtErNaL',
- '--environment_options=external_service_address=foo',
- '--environment_config=foo'
- ],
- 'errors': ['environment_config']
- },
- {
- 'options': [
- '--environment_type=eXtErNaL',
- '--environment_options=external_service_address=foo',
- '--environment_options=process_command=foo',
- '--environment_options=process_variables=foo=bar',
- '--environment_options=docker_container_image=foo',
- ],
- 'errors': [
- 'process_command',
- 'process_variables',
- 'docker_container_image'
- ]
- },
- {
- 'options': ['--environment_type=lOoPbACk'], 'errors': []
- },
- {
- 'options': [
- '--environment_type=lOoPbACk', '--environment_config=foo'
- ],
- 'errors': ['environment_config']
- },
- {
- 'options': [
- '--environment_type=lOoPbACk',
- '--environment_options=docker_container_image=foo',
- '--environment_options=process_command=foo',
- '--environment_options=process_variables=foo=bar',
- '--environment_options=external_service_address=foo',
- ],
- 'errors': [
- 'docker_container_image',
- 'process_command',
- 'process_variables',
- 'external_service_address'
- ]
- },
- {
- 'options': ['--environment_type=beam:env:foo:v1'], 'errors': []
- },
- {
- 'options': [
- '--environment_type=beam:env:foo:v1',
- '--environment_config=foo'
- ],
- 'errors': []
- },
- {
- 'options': [
- '--environment_type=beam:env:foo:v1',
- '--environment_options=docker_container_image=foo',
- '--environment_options=process_command=foo',
- '--environment_options=process_variables=foo=bar',
- '--environment_options=external_service_address=foo',
- ],
- 'errors': [
- 'docker_container_image',
- 'process_command',
- 'process_variables',
- 'external_service_address'
- ]
- },
- {
- 'options': [
- '--environment_options=docker_container_image=foo',
- '--environment_options=process_command=foo',
- '--environment_options=process_variables=foo=bar',
- '--environment_options=external_service_address=foo',
- ],
- 'errors': [
- 'docker_container_image',
- 'process_command',
- 'process_variables',
- 'external_service_address'
- ]
- },
- ]
+ @parameterized.expand([
+ (['--environment_type=dOcKeR'], []),
+ ([
+ '--environment_type=dOcKeR',
+ '--environment_options=docker_container_image=foo'
+ ], []), (['--environment_type=dOcKeR', '--environment_config=foo'], []),
+ ([
+ '--environment_type=dOcKeR',
+ '--environment_options=docker_container_image=foo',
+ '--environment_config=foo'
+ ], ['environment_config']),
+ ([
+ '--environment_type=dOcKeR',
+ '--environment_options=process_command=foo',
+ '--environment_options=process_variables=foo=bar',
+ '--environment_options=external_service_address=foo'
+ ], ['process_command', 'process_variables', 'external_service_address']),
+ (['--environment_type=pRoCeSs'], ['process_command']),
+ ([
+ '--environment_type=pRoCeSs',
+ '--environment_options=process_command=foo'
+ ], []), (['--environment_type=pRoCeSs', '--environment_config=foo'], []),
+ ([
+ '--environment_type=pRoCeSs',
+ '--environment_options=process_command=foo',
+ '--environment_config=foo'
+ ], ['environment_config']),
+ ([
+ '--environment_type=pRoCeSs',
+ '--environment_options=process_command=foo',
+ '--environment_options=process_variables=foo=bar',
+ '--environment_options=docker_container_image=foo',
+ '--environment_options=external_service_address=foo'
+ ], ['docker_container_image', 'external_service_address']),
+ (['--environment_type=eXtErNaL'], ['external_service_address']),
+ ([
+ '--environment_type=eXtErNaL',
+ '--environment_options=external_service_address=foo'
+ ], []), (['--environment_type=eXtErNaL', '--environment_config=foo'],
[]),
+ ([
+ '--environment_type=eXtErNaL',
+ '--environment_options=external_service_address=foo',
+ '--environment_config=foo'
+ ], ['environment_config']),
+ ([
+ '--environment_type=eXtErNaL',
+ '--environment_options=external_service_address=foo',
+ '--environment_options=process_command=foo',
+ '--environment_options=process_variables=foo=bar',
+ '--environment_options=docker_container_image=foo'
+ ], ['process_command', 'process_variables',
+ 'docker_container_image']), (['--environment_type=lOoPbACk'], []),
+ (['--environment_type=lOoPbACk',
+ '--environment_config=foo'], ['environment_config']),
+ ([
+ '--environment_type=lOoPbACk',
+ '--environment_options=docker_container_image=foo',
+ '--environment_options=process_command=foo',
+ '--environment_options=process_variables=foo=bar',
+ '--environment_options=external_service_address=foo'
+ ],
+ [
+ 'docker_container_image',
+ 'process_command',
+ 'process_variables',
+ 'external_service_address'
+ ]), (['--environment_type=beam:env:foo:v1'], []),
+ (['--environment_type=beam:env:foo:v1', '--environment_config=foo'], []),
+ ([
+ '--environment_type=beam:env:foo:v1',
+ '--environment_options=docker_container_image=foo',
+ '--environment_options=process_command=foo',
+ '--environment_options=process_variables=foo=bar',
+ '--environment_options=external_service_address=foo'
+ ],
+ [
+ 'docker_container_image',
+ 'process_command',
+ 'process_variables',
+ 'external_service_address'
+ ]),
+ ([
+ '--environment_options=docker_container_image=foo',
+ '--environment_options=process_command=foo',
+ '--environment_options=process_variables=foo=bar',
+ '--environment_options=external_service_address=foo'
+ ],
+ [
+ 'docker_container_image',
+ 'process_command',
+ 'process_variables',
+ 'external_service_address'
+ ])
+ ])
+ def test_environment_options(self, options, expected_error_args):
errors = []
- for case in test_cases:
- validator = PipelineOptionsValidator(
- PipelineOptions(case['options']), MockRunners.OtherRunner())
- validation_result = validator.validate()
- validation_errors = self.check_errors_for_arguments(
- validation_result, case['errors'])
- if validation_errors:
- errors.append(
- 'Options "%s" had unexpected validation results: "%s"' %
- (' '.join(case['options']), ' '.join(validation_errors)))
- self.assertEqual(errors, [], errors)
+ validator = PipelineOptionsValidator(
+ PipelineOptions(options), MockRunners.OtherRunner())
+ validation_result = validator.validate()
+ validation_errors = self.check_errors_for_arguments(
+ validation_result, expected_error_args)
+ if validation_errors:
+ errors.append(
+ 'Options "%s" had unexpected validation results: "%s"' %
+ (' '.join(options), ' '.join(validation_errors)))
+ self.assertEqual(errors, [], expected_error_args)
if __name__ == '__main__':