Repository: beam
Updated Branches:
  refs/heads/master 63c6bea3b -> 604c2f59e


http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py 
b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
deleted file mode 100644
index 4b03c1f..0000000
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ /dev/null
@@ -1,343 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the pipeline options validator module."""
-
-import logging
-import unittest
-
-from apache_beam.internal import pickler
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options_validator import 
PipelineOptionsValidator
-from hamcrest.core.base_matcher import BaseMatcher
-
-
-# Mock runners to use for validations.
-class MockRunners(object):
-
-  class DataflowRunner(object):
-    pass
-
-  class TestDataflowRunner(object):
-    pass
-
-  class OtherRunner(object):
-    pass
-
-
-# Matcher that always passes for testing on_success_matcher option
-class AlwaysPassMatcher(BaseMatcher):
-
-  def _matches(self, item):
-    return True
-
-
-class SetupTest(unittest.TestCase):
-
-  def check_errors_for_arguments(self, errors, args):
-    """Checks that there is exactly one error for each given argument."""
-    missing = []
-    remaining = list(errors)
-
-    for arg in args:
-      found = False
-      for error in remaining:
-        if arg in error:
-          remaining.remove(error)
-          found = True
-          break
-      if not found:
-        missing.append('Missing error for: ' + arg)
-
-    # Return missing and remaining (not matched) errors.
-    return missing + remaining
-
-  def test_local_runner(self):
-    runner = MockRunners.OtherRunner()
-    options = PipelineOptions([])
-    validator = PipelineOptionsValidator(options, runner)
-    errors = validator.validate()
-    self.assertEqual(len(errors), 0)
-
-  def test_missing_required_options(self):
-    options = PipelineOptions([''])
-    runner = MockRunners.DataflowRunner()
-    validator = PipelineOptionsValidator(options, runner)
-    errors = validator.validate()
-
-    self.assertEqual(
-        self.check_errors_for_arguments(
-            errors,
-            ['project', 'staging_location', 'temp_location']),
-        [])
-
-  def test_gcs_path(self):
-    def get_validator(temp_location, staging_location):
-      options = ['--project=example:example', '--job_name=job']
-
-      if temp_location is not None:
-        options.append('--temp_location=' + temp_location)
-
-      if staging_location is not None:
-        options.append('--staging_location=' + staging_location)
-
-      pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowRunner()
-      validator = PipelineOptionsValidator(pipeline_options, runner)
-      return validator
-
-    test_cases = [
-        {'temp_location': None,
-         'staging_location': 'gs://foo/bar',
-         'errors': ['temp_location']},
-        {'temp_location': None,
-         'staging_location': None,
-         'errors': ['staging_location', 'temp_location']},
-        {'temp_location': 'gs://foo/bar',
-         'staging_location': None,
-         'errors': []},
-        {'temp_location': 'gs://foo/bar',
-         'staging_location': 'gs://ABC/bar',
-         'errors': ['staging_location']},
-        {'temp_location': 'gcs:/foo/bar',
-         'staging_location': 'gs://foo/bar',
-         'errors': ['temp_location']},
-        {'temp_location': 'gs:/foo/bar',
-         'staging_location': 'gs://foo/bar',
-         'errors': ['temp_location']},
-        {'temp_location': 'gs://ABC/bar',
-         'staging_location': 'gs://foo/bar',
-         'errors': ['temp_location']},
-        {'temp_location': 'gs://ABC/bar',
-         'staging_location': 'gs://foo/bar',
-         'errors': ['temp_location']},
-        {'temp_location': 'gs://foo',
-         'staging_location': 'gs://foo/bar',
-         'errors': ['temp_location']},
-        {'temp_location': 'gs://foo/',
-         'staging_location': 'gs://foo/bar',
-         'errors': []},
-        {'temp_location': 'gs://foo/bar',
-         'staging_location': 'gs://foo/bar',
-         'errors': []},
-    ]
-
-    for case in test_cases:
-      errors = get_validator(case['temp_location'],
-                             case['staging_location']).validate()
-      self.assertEqual(
-          self.check_errors_for_arguments(errors, case['errors']), [])
-
-  def test_project(self):
-    def get_validator(project):
-      options = ['--job_name=job', '--staging_location=gs://foo/bar',
-                 '--temp_location=gs://foo/bar']
-
-      if project is not None:
-        options.append('--project=' + project)
-
-      pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowRunner()
-      validator = PipelineOptionsValidator(pipeline_options, runner)
-      return validator
-
-    test_cases = [
-        {'project': None, 'errors': ['project']},
-        {'project': '12345', 'errors': ['project']},
-        {'project': 'FOO', 'errors': ['project']},
-        {'project': 'foo:BAR', 'errors': ['project']},
-        {'project': 'fo', 'errors': ['project']},
-        {'project': 'foo', 'errors': []},
-        {'project': 'foo:bar', 'errors': []},
-    ]
-
-    for case in test_cases:
-      errors = get_validator(case['project']).validate()
-      self.assertEqual(
-          self.check_errors_for_arguments(errors, case['errors']), [])
-
-  def test_job_name(self):
-    def get_validator(job_name):
-      options = ['--project=example:example', 
'--staging_location=gs://foo/bar',
-                 '--temp_location=gs://foo/bar']
-
-      if job_name is not None:
-        options.append('--job_name=' + job_name)
-
-      pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowRunner()
-      validator = PipelineOptionsValidator(pipeline_options, runner)
-      return validator
-
-    test_cases = [
-        {'job_name': None, 'errors': []},
-        {'job_name': '12345', 'errors': ['job_name']},
-        {'job_name': 'FOO', 'errors': ['job_name']},
-        {'job_name': 'foo:bar', 'errors': ['job_name']},
-        {'job_name': 'fo', 'errors': []},
-        {'job_name': 'foo', 'errors': []},
-    ]
-
-    for case in test_cases:
-      errors = get_validator(case['job_name']).validate()
-      self.assertEqual(
-          self.check_errors_for_arguments(errors, case['errors']), [])
-
-  def test_num_workers(self):
-    def get_validator(num_workers):
-      options = ['--project=example:example', '--job_name=job',
-                 '--staging_location=gs://foo/bar',
-                 '--temp_location=gs://foo/bar']
-
-      if num_workers is not None:
-        options.append('--num_workers=' + num_workers)
-
-      pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowRunner()
-      validator = PipelineOptionsValidator(pipeline_options, runner)
-      return validator
-
-    test_cases = [
-        {'num_workers': None, 'errors': []},
-        {'num_workers': '1', 'errors': []},
-        {'num_workers': '0', 'errors': ['num_workers']},
-        {'num_workers': '-1', 'errors': ['num_workers']},
-    ]
-
-    for case in test_cases:
-      errors = get_validator(case['num_workers']).validate()
-      self.assertEqual(
-          self.check_errors_for_arguments(errors, case['errors']), [])
-
-  def test_is_service_runner(self):
-    test_cases = [
-        {
-            'runner': MockRunners.OtherRunner(),
-            'options': [],
-            'expected': False,
-        },
-        {
-            'runner': MockRunners.OtherRunner(),
-            'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
-            'expected': False,
-        },
-        {
-            'runner': MockRunners.OtherRunner(),
-            'options': 
['--dataflow_endpoint=https://dataflow.googleapis.com/'],
-            'expected': False,
-        },
-        {
-            'runner': MockRunners.DataflowRunner(),
-            'options': ['--dataflow_endpoint=https://another.service.com'],
-            'expected': False,
-        },
-        {
-            'runner': MockRunners.DataflowRunner(),
-            'options': ['--dataflow_endpoint=https://another.service.com/'],
-            'expected': False,
-        },
-        {
-            'runner': MockRunners.DataflowRunner(),
-            'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
-            'expected': True,
-        },
-        {
-            'runner': MockRunners.DataflowRunner(),
-            'options': 
['--dataflow_endpoint=https://dataflow.googleapis.com/'],
-            'expected': True,
-        },
-        {
-            'runner': MockRunners.DataflowRunner(),
-            'options': [],
-            'expected': True,
-        },
-    ]
-
-    for case in test_cases:
-      validator = PipelineOptionsValidator(
-          PipelineOptions(case['options']), case['runner'])
-      self.assertEqual(validator.is_service_runner(), case['expected'])
-
-  def test_dataflow_job_file_and_template_location_mutually_exclusive(self):
-    runner = MockRunners.OtherRunner()
-    options = PipelineOptions([
-        '--template_location', 'abc',
-        '--dataflow_job_file', 'def'
-    ])
-    validator = PipelineOptionsValidator(options, runner)
-    errors = validator.validate()
-    self.assertTrue(errors)
-
-  def test_validate_template_location(self):
-    runner = MockRunners.OtherRunner()
-    options = PipelineOptions([
-        '--template_location', 'abc',
-    ])
-    validator = PipelineOptionsValidator(options, runner)
-    errors = validator.validate()
-    self.assertFalse(errors)
-
-  def test_validate_dataflow_job_file(self):
-    runner = MockRunners.OtherRunner()
-    options = PipelineOptions([
-        '--dataflow_job_file', 'abc'
-    ])
-    validator = PipelineOptionsValidator(options, runner)
-    errors = validator.validate()
-    self.assertFalse(errors)
-
-  def test_streaming(self):
-    pipeline_options = PipelineOptions(['--streaming'])
-    runner = MockRunners.TestDataflowRunner()
-    validator = PipelineOptionsValidator(pipeline_options, runner)
-    errors = validator.validate()
-
-    self.assertIn('Streaming pipelines are not supported.', errors)
-
-  def test_test_matcher(self):
-    def get_validator(matcher):
-      options = ['--project=example:example',
-                 '--job_name=job',
-                 '--staging_location=gs://foo/bar',
-                 '--temp_location=gs://foo/bar',]
-      if matcher:
-        options.append('--on_success_matcher=' + matcher)
-
-      pipeline_options = PipelineOptions(options)
-      runner = MockRunners.TestDataflowRunner()
-      return PipelineOptionsValidator(pipeline_options, runner)
-
-    test_case = [
-        {'on_success_matcher': None,
-         'errors': []},
-        {'on_success_matcher': pickler.dumps(AlwaysPassMatcher()),
-         'errors': []},
-        {'on_success_matcher': 'abc',
-         'errors': ['on_success_matcher']},
-        {'on_success_matcher': pickler.dumps(object),
-         'errors': ['on_success_matcher']},
-    ]
-
-    for case in test_case:
-      errors = get_validator(case['on_success_matcher']).validate()
-      self.assertEqual(
-          self.check_errors_for_arguments(errors, case['errors']), [])
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/utils/value_provider.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider.py 
b/sdks/python/apache_beam/utils/value_provider.py
index c00d7bc..46f39ae 100644
--- a/sdks/python/apache_beam/utils/value_provider.py
+++ b/sdks/python/apache_beam/utils/value_provider.py
@@ -15,89 +15,7 @@
 # limitations under the License.
 #
 
-"""A ValueProvider class to implement templates with both statically
-and dynamically provided values.
-"""
+"""For backwards compatibility"""
 
-from functools import wraps
-
-from apache_beam import error
-
-
-class ValueProvider(object):
-  def is_accessible(self):
-    raise NotImplementedError(
-        'ValueProvider.is_accessible implemented in derived classes'
-    )
-
-  def get(self):
-    raise NotImplementedError(
-        'ValueProvider.get implemented in derived classes'
-    )
-
-
-class StaticValueProvider(ValueProvider):
-  def __init__(self, value_type, value):
-    self.value_type = value_type
-    self.value = value_type(value)
-
-  def is_accessible(self):
-    return True
-
-  def get(self):
-    return self.value
-
-  def __str__(self):
-    return str(self.value)
-
-
-class RuntimeValueProvider(ValueProvider):
-  runtime_options = None
-
-  def __init__(self, option_name, value_type, default_value):
-    self.option_name = option_name
-    self.default_value = default_value
-    self.value_type = value_type
-
-  def is_accessible(self):
-    return RuntimeValueProvider.runtime_options is not None
-
-  def get(self):
-    if RuntimeValueProvider.runtime_options is None:
-      raise error.RuntimeValueProviderError(
-          '%s.get() not called from a runtime context' % self)
-
-    candidate = RuntimeValueProvider.runtime_options.get(self.option_name)
-    if candidate:
-      value = self.value_type(candidate)
-    else:
-      value = self.default_value
-    return value
-
-  # TODO(BEAM-1999): Remove _unused_options_id
-  @classmethod
-  def set_runtime_options(cls, pipeline_options):
-    RuntimeValueProvider.runtime_options = pipeline_options
-
-  def __str__(self):
-    return '%s(option: %s, type: %s, default_value: %s)' % (
-        self.__class__.__name__,
-        self.option_name,
-        self.value_type.__name__,
-        repr(self.default_value)
-    )
-
-
-def check_accessible(value_provider_list):
-  """Check accessibility of a list of ValueProvider objects."""
-  assert isinstance(value_provider_list, list)
-
-  def _check_accessible(fnc):
-    @wraps(fnc)
-    def _f(self, *args, **kwargs):
-      for obj in [getattr(self, vp) for vp in value_provider_list]:
-        if not obj.is_accessible():
-          raise error.RuntimeValueProviderError('%s not accessible' % obj)
-      return fnc(self, *args, **kwargs)
-    return _f
-  return _check_accessible
+# pylint: disable=unused-import
+from apache_beam.options.value_provider import *

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/utils/value_provider_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider_test.py 
b/sdks/python/apache_beam/utils/value_provider_test.py
deleted file mode 100644
index 1b66dd4..0000000
--- a/sdks/python/apache_beam/utils/value_provider_test.py
+++ /dev/null
@@ -1,145 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the ValueProvider class."""
-
-import unittest
-
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.value_provider import RuntimeValueProvider
-from apache_beam.utils.value_provider import StaticValueProvider
-
-
-class ValueProviderTests(unittest.TestCase):
-  def test_static_value_provider_keyword_argument(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            help='This keyword argument is a value provider',
-            default='some value')
-    options = UserDefinedOptions(['--vp_arg', 'abc'])
-    self.assertTrue(isinstance(options.vp_arg, StaticValueProvider))
-    self.assertTrue(options.vp_arg.is_accessible())
-    self.assertEqual(options.vp_arg.get(), 'abc')
-
-  def test_runtime_value_provider_keyword_argument(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            help='This keyword argument is a value provider')
-    options = UserDefinedOptions()
-    self.assertTrue(isinstance(options.vp_arg, RuntimeValueProvider))
-    self.assertFalse(options.vp_arg.is_accessible())
-    with self.assertRaises(RuntimeError):
-      options.vp_arg.get()
-
-  def test_static_value_provider_positional_argument(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            'vp_pos_arg',
-            help='This positional argument is a value provider',
-            default='some value')
-    options = UserDefinedOptions(['abc'])
-    self.assertTrue(isinstance(options.vp_pos_arg, StaticValueProvider))
-    self.assertTrue(options.vp_pos_arg.is_accessible())
-    self.assertEqual(options.vp_pos_arg.get(), 'abc')
-
-  def test_runtime_value_provider_positional_argument(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            'vp_pos_arg',
-            help='This positional argument is a value provider')
-    options = UserDefinedOptions([])
-    self.assertTrue(isinstance(options.vp_pos_arg, RuntimeValueProvider))
-    self.assertFalse(options.vp_pos_arg.is_accessible())
-    with self.assertRaises(RuntimeError):
-      options.vp_pos_arg.get()
-
-  def test_static_value_provider_type_cast(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            type=int,
-            help='This flag is a value provider')
-
-    options = UserDefinedOptions(['--vp_arg', '123'])
-    self.assertTrue(isinstance(options.vp_arg, StaticValueProvider))
-    self.assertTrue(options.vp_arg.is_accessible())
-    self.assertEqual(options.vp_arg.get(), 123)
-
-  def test_set_runtime_option(self):
-    # define ValueProvider ptions, with and without default values
-    class UserDefinedOptions1(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            help='This keyword argument is a value provider')   # set at 
runtime
-
-        parser.add_value_provider_argument(         # not set, had default int
-            '-v', '--vp_arg2',                      # with short form
-            default=123,
-            type=int)
-
-        parser.add_value_provider_argument(         # not set, had default str
-            '--vp-arg3',                            # with dash in name
-            default='123',
-            type=str)
-
-        parser.add_value_provider_argument(         # not set and no default
-            '--vp_arg4',
-            type=float)
-
-        parser.add_value_provider_argument(         # positional argument set
-            'vp_pos_arg',                           # default & runtime ignored
-            help='This positional argument is a value provider',
-            type=float,
-            default=5.4)
-
-    # provide values at graph-construction time
-    # (options not provided here become of the type RuntimeValueProvider)
-    options = UserDefinedOptions1(['1.2'])
-    self.assertFalse(options.vp_arg.is_accessible())
-    self.assertFalse(options.vp_arg2.is_accessible())
-    self.assertFalse(options.vp_arg3.is_accessible())
-    self.assertFalse(options.vp_arg4.is_accessible())
-    self.assertTrue(options.vp_pos_arg.is_accessible())
-
-    # provide values at job-execution time
-    # (options not provided here will use their default, if they have one)
-    RuntimeValueProvider.set_runtime_options({'vp_arg': 'abc',
-                                              'vp_pos_arg':'3.2'})
-    self.assertTrue(options.vp_arg.is_accessible())
-    self.assertEqual(options.vp_arg.get(), 'abc')
-    self.assertTrue(options.vp_arg2.is_accessible())
-    self.assertEqual(options.vp_arg2.get(), 123)
-    self.assertTrue(options.vp_arg3.is_accessible())
-    self.assertEqual(options.vp_arg3.get(), '123')
-    self.assertTrue(options.vp_arg4.is_accessible())
-    self.assertIsNone(options.vp_arg4.get())
-    self.assertTrue(options.vp_pos_arg.is_accessible())
-    self.assertEqual(options.vp_pos_arg.get(), 1.2)

Reply via email to