Repository: beam Updated Branches: refs/heads/master 63c6bea3b -> 604c2f59e
http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/utils/pipeline_options_validator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py deleted file mode 100644 index 4b03c1f..0000000 --- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py +++ /dev/null @@ -1,343 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the pipeline options validator module.""" - -import logging -import unittest - -from apache_beam.internal import pickler -from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator -from hamcrest.core.base_matcher import BaseMatcher - - -# Mock runners to use for validations. -class MockRunners(object): - - class DataflowRunner(object): - pass - - class TestDataflowRunner(object): - pass - - class OtherRunner(object): - pass - - -# Matcher that always passes for testing on_success_matcher option -class AlwaysPassMatcher(BaseMatcher): - - def _matches(self, item): - return True - - -class SetupTest(unittest.TestCase): - - def check_errors_for_arguments(self, errors, args): - """Checks that there is exactly one error for each given argument.""" - missing = [] - remaining = list(errors) - - for arg in args: - found = False - for error in remaining: - if arg in error: - remaining.remove(error) - found = True - break - if not found: - missing.append('Missing error for: ' + arg) - - # Return missing and remaining (not matched) errors. - return missing + remaining - - def test_local_runner(self): - runner = MockRunners.OtherRunner() - options = PipelineOptions([]) - validator = PipelineOptionsValidator(options, runner) - errors = validator.validate() - self.assertEqual(len(errors), 0) - - def test_missing_required_options(self): - options = PipelineOptions(['']) - runner = MockRunners.DataflowRunner() - validator = PipelineOptionsValidator(options, runner) - errors = validator.validate() - - self.assertEqual( - self.check_errors_for_arguments( - errors, - ['project', 'staging_location', 'temp_location']), - []) - - def test_gcs_path(self): - def get_validator(temp_location, staging_location): - options = ['--project=example:example', '--job_name=job'] - - if temp_location is not None: - options.append('--temp_location=' + temp_location) - - if staging_location is not None: - options.append('--staging_location=' + staging_location) - - pipeline_options = PipelineOptions(options) - runner = MockRunners.DataflowRunner() - validator = PipelineOptionsValidator(pipeline_options, runner) - return validator - - test_cases = [ - {'temp_location': None, - 'staging_location': 'gs://foo/bar', - 'errors': ['temp_location']}, - {'temp_location': None, - 'staging_location': None, - 'errors': ['staging_location', 'temp_location']}, - {'temp_location': 'gs://foo/bar', - 'staging_location': None, - 'errors': []}, - {'temp_location': 'gs://foo/bar', - 'staging_location': 'gs://ABC/bar', - 'errors': ['staging_location']}, - {'temp_location': 'gcs:/foo/bar', - 'staging_location': 'gs://foo/bar', - 'errors': ['temp_location']}, - {'temp_location': 'gs:/foo/bar', - 'staging_location': 'gs://foo/bar', - 'errors': ['temp_location']}, - {'temp_location': 'gs://ABC/bar', - 'staging_location': 'gs://foo/bar', - 'errors': ['temp_location']}, - {'temp_location': 'gs://ABC/bar', - 'staging_location': 'gs://foo/bar', - 'errors': ['temp_location']}, - {'temp_location': 'gs://foo', - 'staging_location': 'gs://foo/bar', - 'errors': ['temp_location']}, - {'temp_location': 'gs://foo/', - 'staging_location': 'gs://foo/bar', - 'errors': []}, - {'temp_location': 'gs://foo/bar', - 'staging_location': 'gs://foo/bar', - 'errors': []}, - ] - - for case in test_cases: - errors = get_validator(case['temp_location'], - case['staging_location']).validate() - self.assertEqual( - self.check_errors_for_arguments(errors, case['errors']), []) - - def test_project(self): - def get_validator(project): - options = ['--job_name=job', '--staging_location=gs://foo/bar', - '--temp_location=gs://foo/bar'] - - if project is not None: - options.append('--project=' + project) - - pipeline_options = PipelineOptions(options) - runner = MockRunners.DataflowRunner() - validator = PipelineOptionsValidator(pipeline_options, runner) - return validator - - test_cases = [ - {'project': None, 'errors': ['project']}, - {'project': '12345', 'errors': ['project']}, - {'project': 'FOO', 'errors': ['project']}, - {'project': 'foo:BAR', 'errors': ['project']}, - {'project': 'fo', 'errors': ['project']}, - {'project': 'foo', 'errors': []}, - {'project': 'foo:bar', 'errors': []}, - ] - - for case in test_cases: - errors = get_validator(case['project']).validate() - self.assertEqual( - self.check_errors_for_arguments(errors, case['errors']), []) - - def test_job_name(self): - def get_validator(job_name): - options = ['--project=example:example', '--staging_location=gs://foo/bar', - '--temp_location=gs://foo/bar'] - - if job_name is not None: - options.append('--job_name=' + job_name) - - pipeline_options = PipelineOptions(options) - runner = MockRunners.DataflowRunner() - validator = PipelineOptionsValidator(pipeline_options, runner) - return validator - - test_cases = [ - {'job_name': None, 'errors': []}, - {'job_name': '12345', 'errors': ['job_name']}, - {'job_name': 'FOO', 'errors': ['job_name']}, - {'job_name': 'foo:bar', 'errors': ['job_name']}, - {'job_name': 'fo', 'errors': []}, - {'job_name': 'foo', 'errors': []}, - ] - - for case in test_cases: - errors = get_validator(case['job_name']).validate() - self.assertEqual( - self.check_errors_for_arguments(errors, case['errors']), []) - - def test_num_workers(self): - def get_validator(num_workers): - options = ['--project=example:example', '--job_name=job', - '--staging_location=gs://foo/bar', - '--temp_location=gs://foo/bar'] - - if num_workers is not None: - options.append('--num_workers=' + num_workers) - - pipeline_options = PipelineOptions(options) - runner = MockRunners.DataflowRunner() - validator = PipelineOptionsValidator(pipeline_options, runner) - return validator - - test_cases = [ - {'num_workers': None, 'errors': []}, - {'num_workers': '1', 'errors': []}, - {'num_workers': '0', 'errors': ['num_workers']}, - {'num_workers': '-1', 'errors': ['num_workers']}, - ] - - for case in test_cases: - errors = get_validator(case['num_workers']).validate() - self.assertEqual( - self.check_errors_for_arguments(errors, case['errors']), []) - - def test_is_service_runner(self): - test_cases = [ - { - 'runner': MockRunners.OtherRunner(), - 'options': [], - 'expected': False, - }, - { - 'runner': MockRunners.OtherRunner(), - 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'], - 'expected': False, - }, - { - 'runner': MockRunners.OtherRunner(), - 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'], - 'expected': False, - }, - { - 'runner': MockRunners.DataflowRunner(), - 'options': ['--dataflow_endpoint=https://another.service.com'], - 'expected': False, - }, - { - 'runner': MockRunners.DataflowRunner(), - 'options': ['--dataflow_endpoint=https://another.service.com/'], - 'expected': False, - }, - { - 'runner': MockRunners.DataflowRunner(), - 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'], - 'expected': True, - }, - { - 'runner': MockRunners.DataflowRunner(), - 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'], - 'expected': True, - }, - { - 'runner': MockRunners.DataflowRunner(), - 'options': [], - 'expected': True, - }, - ] - - for case in test_cases: - validator = PipelineOptionsValidator( - PipelineOptions(case['options']), case['runner']) - self.assertEqual(validator.is_service_runner(), case['expected']) - - def test_dataflow_job_file_and_template_location_mutually_exclusive(self): - runner = MockRunners.OtherRunner() - options = PipelineOptions([ - '--template_location', 'abc', - '--dataflow_job_file', 'def' - ]) - validator = PipelineOptionsValidator(options, runner) - errors = validator.validate() - self.assertTrue(errors) - - def test_validate_template_location(self): - runner = MockRunners.OtherRunner() - options = PipelineOptions([ - '--template_location', 'abc', - ]) - validator = PipelineOptionsValidator(options, runner) - errors = validator.validate() - self.assertFalse(errors) - - def test_validate_dataflow_job_file(self): - runner = MockRunners.OtherRunner() - options = PipelineOptions([ - '--dataflow_job_file', 'abc' - ]) - validator = PipelineOptionsValidator(options, runner) - errors = validator.validate() - self.assertFalse(errors) - - def test_streaming(self): - pipeline_options = PipelineOptions(['--streaming']) - runner = MockRunners.TestDataflowRunner() - validator = PipelineOptionsValidator(pipeline_options, runner) - errors = validator.validate() - - self.assertIn('Streaming pipelines are not supported.', errors) - - def test_test_matcher(self): - def get_validator(matcher): - options = ['--project=example:example', - '--job_name=job', - '--staging_location=gs://foo/bar', - '--temp_location=gs://foo/bar',] - if matcher: - options.append('--on_success_matcher=' + matcher) - - pipeline_options = PipelineOptions(options) - runner = MockRunners.TestDataflowRunner() - return PipelineOptionsValidator(pipeline_options, runner) - - test_case = [ - {'on_success_matcher': None, - 'errors': []}, - {'on_success_matcher': pickler.dumps(AlwaysPassMatcher()), - 'errors': []}, - {'on_success_matcher': 'abc', - 'errors': ['on_success_matcher']}, - {'on_success_matcher': pickler.dumps(object), - 'errors': ['on_success_matcher']}, - ] - - for case in test_case: - errors = get_validator(case['on_success_matcher']).validate() - self.assertEqual( - self.check_errors_for_arguments(errors, case['errors']), []) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/utils/value_provider.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py index c00d7bc..46f39ae 100644 --- a/sdks/python/apache_beam/utils/value_provider.py +++ b/sdks/python/apache_beam/utils/value_provider.py @@ -15,89 +15,7 @@ # limitations under the License. # -"""A ValueProvider class to implement templates with both statically -and dynamically provided values. -""" +"""For backwards compatibility""" -from functools import wraps - -from apache_beam import error - - -class ValueProvider(object): - def is_accessible(self): - raise NotImplementedError( - 'ValueProvider.is_accessible implemented in derived classes' - ) - - def get(self): - raise NotImplementedError( - 'ValueProvider.get implemented in derived classes' - ) - - -class StaticValueProvider(ValueProvider): - def __init__(self, value_type, value): - self.value_type = value_type - self.value = value_type(value) - - def is_accessible(self): - return True - - def get(self): - return self.value - - def __str__(self): - return str(self.value) - - -class RuntimeValueProvider(ValueProvider): - runtime_options = None - - def __init__(self, option_name, value_type, default_value): - self.option_name = option_name - self.default_value = default_value - self.value_type = value_type - - def is_accessible(self): - return RuntimeValueProvider.runtime_options is not None - - def get(self): - if RuntimeValueProvider.runtime_options is None: - raise error.RuntimeValueProviderError( - '%s.get() not called from a runtime context' % self) - - candidate = RuntimeValueProvider.runtime_options.get(self.option_name) - if candidate: - value = self.value_type(candidate) - else: - value = self.default_value - return value - - # TODO(BEAM-1999): Remove _unused_options_id - @classmethod - def set_runtime_options(cls, pipeline_options): - RuntimeValueProvider.runtime_options = pipeline_options - - def __str__(self): - return '%s(option: %s, type: %s, default_value: %s)' % ( - self.__class__.__name__, - self.option_name, - self.value_type.__name__, - repr(self.default_value) - ) - - -def check_accessible(value_provider_list): - """Check accessibility of a list of ValueProvider objects.""" - assert isinstance(value_provider_list, list) - - def _check_accessible(fnc): - @wraps(fnc) - def _f(self, *args, **kwargs): - for obj in [getattr(self, vp) for vp in value_provider_list]: - if not obj.is_accessible(): - raise error.RuntimeValueProviderError('%s not accessible' % obj) - return fnc(self, *args, **kwargs) - return _f - return _check_accessible +# pylint: disable=unused-import +from apache_beam.options.value_provider import * http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/utils/value_provider_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider_test.py b/sdks/python/apache_beam/utils/value_provider_test.py deleted file mode 100644 index 1b66dd4..0000000 --- a/sdks/python/apache_beam/utils/value_provider_test.py +++ /dev/null @@ -1,145 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the ValueProvider class.""" - -import unittest - -from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.utils.value_provider import RuntimeValueProvider -from apache_beam.utils.value_provider import StaticValueProvider - - -class ValueProviderTests(unittest.TestCase): - def test_static_value_provider_keyword_argument(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - help='This keyword argument is a value provider', - default='some value') - options = UserDefinedOptions(['--vp_arg', 'abc']) - self.assertTrue(isinstance(options.vp_arg, StaticValueProvider)) - self.assertTrue(options.vp_arg.is_accessible()) - self.assertEqual(options.vp_arg.get(), 'abc') - - def test_runtime_value_provider_keyword_argument(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - help='This keyword argument is a value provider') - options = UserDefinedOptions() - self.assertTrue(isinstance(options.vp_arg, RuntimeValueProvider)) - self.assertFalse(options.vp_arg.is_accessible()) - with self.assertRaises(RuntimeError): - options.vp_arg.get() - - def test_static_value_provider_positional_argument(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - 'vp_pos_arg', - help='This positional argument is a value provider', - default='some value') - options = UserDefinedOptions(['abc']) - self.assertTrue(isinstance(options.vp_pos_arg, StaticValueProvider)) - self.assertTrue(options.vp_pos_arg.is_accessible()) - self.assertEqual(options.vp_pos_arg.get(), 'abc') - - def test_runtime_value_provider_positional_argument(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - 'vp_pos_arg', - help='This positional argument is a value provider') - options = UserDefinedOptions([]) - self.assertTrue(isinstance(options.vp_pos_arg, RuntimeValueProvider)) - self.assertFalse(options.vp_pos_arg.is_accessible()) - with self.assertRaises(RuntimeError): - options.vp_pos_arg.get() - - def test_static_value_provider_type_cast(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - type=int, - help='This flag is a value provider') - - options = UserDefinedOptions(['--vp_arg', '123']) - self.assertTrue(isinstance(options.vp_arg, StaticValueProvider)) - self.assertTrue(options.vp_arg.is_accessible()) - self.assertEqual(options.vp_arg.get(), 123) - - def test_set_runtime_option(self): - # define ValueProvider ptions, with and without default values - class UserDefinedOptions1(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - help='This keyword argument is a value provider') # set at runtime - - parser.add_value_provider_argument( # not set, had default int - '-v', '--vp_arg2', # with short form - default=123, - type=int) - - parser.add_value_provider_argument( # not set, had default str - '--vp-arg3', # with dash in name - default='123', - type=str) - - parser.add_value_provider_argument( # not set and no default - '--vp_arg4', - type=float) - - parser.add_value_provider_argument( # positional argument set - 'vp_pos_arg', # default & runtime ignored - help='This positional argument is a value provider', - type=float, - default=5.4) - - # provide values at graph-construction time - # (options not provided here become of the type RuntimeValueProvider) - options = UserDefinedOptions1(['1.2']) - self.assertFalse(options.vp_arg.is_accessible()) - self.assertFalse(options.vp_arg2.is_accessible()) - self.assertFalse(options.vp_arg3.is_accessible()) - self.assertFalse(options.vp_arg4.is_accessible()) - self.assertTrue(options.vp_pos_arg.is_accessible()) - - # provide values at job-execution time - # (options not provided here will use their default, if they have one) - RuntimeValueProvider.set_runtime_options({'vp_arg': 'abc', - 'vp_pos_arg':'3.2'}) - self.assertTrue(options.vp_arg.is_accessible()) - self.assertEqual(options.vp_arg.get(), 'abc') - self.assertTrue(options.vp_arg2.is_accessible()) - self.assertEqual(options.vp_arg2.get(), 123) - self.assertTrue(options.vp_arg3.is_accessible()) - self.assertEqual(options.vp_arg3.get(), '123') - self.assertTrue(options.vp_arg4.is_accessible()) - self.assertIsNone(options.vp_arg4.get()) - self.assertTrue(options.vp_pos_arg.is_accessible()) - self.assertEqual(options.vp_pos_arg.get(), 1.2)
