Remove options_id concept from templated runs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7749581 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7749581 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7749581 Branch: refs/heads/master Commit: f77495819e82926ffcfa1d3c328e094023216e6b Parents: e99a394 Author: Ahmet Altay <[email protected]> Authored: Fri Apr 14 15:33:39 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Apr 18 15:30:06 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/error.py | 4 ++ sdks/python/apache_beam/io/filebasedsource.py | 3 +- .../apache_beam/io/filebasedsource_test.py | 3 +- sdks/python/apache_beam/io/fileio.py | 6 +-- .../runners/dataflow/dataflow_runner.py | 4 +- .../runners/dataflow/internal/apiclient.py | 2 +- .../apache_beam/runners/direct/direct_runner.py | 9 ++--- .../apache_beam/utils/pipeline_options.py | 22 +++-------- .../apache_beam/utils/pipeline_options_test.py | 6 +-- sdks/python/apache_beam/utils/value_provider.py | 39 ++++++-------------- .../apache_beam/utils/value_provider_test.py | 22 +---------- 11 files changed, 35 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/error.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/error.py b/sdks/python/apache_beam/error.py index 672469d..6ecb74f 100644 --- a/sdks/python/apache_beam/error.py +++ b/sdks/python/apache_beam/error.py @@ -34,6 +34,10 @@ class RunnerError(BeamError): """An error related to a Runner object (e.g. cannot find a runner to run).""" +class RuntimeValueProviderError(RuntimeError): + """An error related to a ValueProvider object raised during runtime.""" + + class SideInputError(BeamError): """An error related to a side input to a parallel Do operation.""" http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 2e7043f..ef44b3e 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -78,8 +78,7 @@ class FileBasedSource(iobase.BoundedSource): IOError: when the file pattern specified yields an empty result. """ - if (not (isinstance(file_pattern, basestring) - or isinstance(file_pattern, ValueProvider))): + if not isinstance(file_pattern, (basestring, ValueProvider)): raise TypeError('%s: file_pattern must be of type string' ' or ValueProvider; got %r instead' % (self.__class__.__name__, file_pattern)) http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 4083efd..e681f26 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -236,8 +236,7 @@ class TestFileBasedSource(unittest.TestCase): runtime_vp_file_pattern = RuntimeValueProvider( option_name='arg', value_type=str, - default_value=str_file_pattern, - options_id=1) + default_value=str_file_pattern) self.assertEqual(runtime_vp_file_pattern, FileBasedSource(runtime_vp_file_pattern)._pattern) http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 85f0718..8ee5198 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -156,12 +156,10 @@ class FileSink(iobase.Sink): or if compression_type is not member of CompressionTypes. ValueError: if shard_name_template is not of expected format. """ - if not (isinstance(file_path_prefix, basestring) - or isinstance(file_path_prefix, ValueProvider)): + if not isinstance(file_path_prefix, (basestring, ValueProvider)): raise TypeError('file_path_prefix must be a string or ValueProvider;' 'got %r instead' % file_path_prefix) - if not (isinstance(file_name_suffix, basestring) - or isinstance(file_name_suffix, ValueProvider)): + if not isinstance(file_name_suffix, (basestring, ValueProvider)): raise TypeError('file_name_suffix must be a string or ValueProvider;' 'got %r instead' % file_name_suffix) http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 24c0d6b..779db8f 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -26,6 +26,7 @@ import threading import time import traceback +from apache_beam import error from apache_beam import coders from apache_beam import pvalue from apache_beam.internal import pickler @@ -43,7 +44,6 @@ from apache_beam.runners.runner import PipelineState from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.utils.pipeline_options import StandardOptions -from apache_beam.utils.value_provider import RuntimeValueProviderError class DataflowRunner(PipelineRunner): @@ -480,7 +480,7 @@ class DataflowRunner(PipelineRunner): 'estimated_size_bytes': json_value.get_typed_value_descriptor( transform.source.estimate_size()) } - except RuntimeValueProviderError: + except error.RuntimeValueProviderError: # Size estimation is best effort, and this error is by value provider. logging.info( 'Could not estimate size of source %r due to ' + \ http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index efcb37f..50f9ff4 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -233,7 +233,7 @@ class Environment(object): options_dict = {k: v for k, v in sdk_pipeline_options.iteritems() if v is not None} - options_dict['_options_id'] = options._options_id + options_dict['_options_id'] = 0 # TODO(BEAM-1999): Remove. self.proto.sdkPipelineOptions.additionalProperties.append( dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( key='options', value=to_json_value(options_dict))) http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 9b4e1ac..d776719 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -91,7 +91,9 @@ class DirectRunner(PipelineRunner): # execution in background threads and return. if pipeline.options: - RuntimeValueProvider.set_runtime_options(pipeline.options._options_id, {}) + # DirectRunner does not support RuntimeValueProviders. + RuntimeValueProvider.set_runtime_options(None, {}) + executor.start(self.consumer_tracking_visitor.root_transforms) result = DirectPipelineResult(executor, evaluation_context) @@ -101,11 +103,6 @@ class DirectRunner(PipelineRunner): result.wait_until_finish() self._cache.finalize() - # Unset runtime options after the pipeline finishes. - # TODO: Move this to a post finish hook and clean for all cases. - if pipeline.options: - RuntimeValueProvider.unset_runtime_options(pipeline.options._options_id) - return result @property http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py index c6f928e..e8966d6 100644 --- a/sdks/python/apache_beam/utils/pipeline_options.py +++ b/sdks/python/apache_beam/utils/pipeline_options.py @@ -18,7 +18,6 @@ """Pipeline options obtained from command line parsing.""" import argparse -import itertools from apache_beam.transforms.display import HasDisplayData from apache_beam.utils.value_provider import StaticValueProvider @@ -57,10 +56,6 @@ class BeamArgumentParser(argparse.ArgumentParser): parser.add_argument('--non-vp-arg') """ - def __init__(self, options_id, *args, **kwargs): - self._options_id = options_id - super(BeamArgumentParser, self).__init__(*args, **kwargs) - def add_value_provider_argument(self, *args, **kwargs): """ValueProvider arguments can be either of type keyword or positional. At runtime, even positional arguments will need to be supplied in the @@ -87,8 +82,7 @@ class BeamArgumentParser(argparse.ArgumentParser): kwargs['default'] = RuntimeValueProvider( option_name=option_name, value_type=value_type, - default_value=default_value, - options_id=self._options_id + default_value=default_value ) # have add_argument do most of the work @@ -122,9 +116,7 @@ class PipelineOptions(HasDisplayData): By default the options classes will use command line arguments to initialize the options. """ - _options_id_generator = itertools.count(1) - - def __init__(self, flags=None, options_id=None, **kwargs): + def __init__(self, flags=None, **kwargs): """Initialize an options class. The initializer will traverse all subclasses, add all their argparse @@ -141,9 +133,7 @@ class PipelineOptions(HasDisplayData): """ self._flags = flags self._all_options = kwargs - self._options_id = ( - options_id or PipelineOptions._options_id_generator.next()) - parser = BeamArgumentParser(self._options_id) + parser = BeamArgumentParser() for cls in type(self).mro(): if cls == PipelineOptions: @@ -197,7 +187,7 @@ class PipelineOptions(HasDisplayData): # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be # repeated. Pick last unique instance of each subclass to avoid conflicts. subset = {} - parser = BeamArgumentParser(self._options_id) + parser = BeamArgumentParser() for cls in PipelineOptions.__subclasses__(): subset[str(cls)] = cls for cls in subset.values(): @@ -220,7 +210,7 @@ class PipelineOptions(HasDisplayData): return self.get_all_options(True) def view_as(self, cls): - view = cls(self._flags, options_id=self._options_id) + view = cls(self._flags) view._all_options = self._all_options return view @@ -244,7 +234,7 @@ class PipelineOptions(HasDisplayData): (type(self).__name__, name)) def __setattr__(self, name, value): - if name in ('_flags', '_all_options', '_visible_options', '_options_id'): + if name in ('_flags', '_all_options', '_visible_options'): super(PipelineOptions, self).__setattr__(name, value) elif name in self._visible_option_list(): self._all_options[name] = value http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/pipeline_options_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py index 633d7da..df9b2e3 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_test.py @@ -30,7 +30,8 @@ from apache_beam.utils.value_provider import RuntimeValueProvider class PipelineOptionsTest(unittest.TestCase): def setUp(self): - RuntimeValueProvider.runtime_options_map = {} + # Clean up the global variable used by RuntimeValueProvider + RuntimeValueProvider.runtime_options = None TEST_CASES = [ {'flags': ['--num_workers', '5'], @@ -223,8 +224,7 @@ class PipelineOptionsTest(unittest.TestCase): non_vp_arg=RuntimeValueProvider( option_name='foo', value_type=int, - default_value=10, - options_id=10)) + default_value=10)) self.assertEqual(options.vp_arg, 5) self.assertTrue(options.vp_arg2.is_accessible(), '%s is not accessible' % options.vp_arg2) http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/value_provider.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py index 271202d..235d257 100644 --- a/sdks/python/apache_beam/utils/value_provider.py +++ b/sdks/python/apache_beam/utils/value_provider.py @@ -21,14 +21,7 @@ and dynamically provided values. from functools import wraps - -class RuntimeValueProviderError(RuntimeError): - def __init__(self, msg): - """Class representing the errors thrown during runtime by the valueprovider - Args: - msg: Message string for the exception thrown - """ - super(RuntimeValueProviderError, self).__init__(msg) +from apache_beam import error class ValueProvider(object): @@ -59,42 +52,32 @@ class StaticValueProvider(ValueProvider): class RuntimeValueProvider(ValueProvider): - runtime_options_map = {} + runtime_options = None - def __init__(self, option_name, value_type, default_value, options_id): - assert options_id is not None + def __init__(self, option_name, value_type, default_value): self.option_name = option_name self.default_value = default_value self.value_type = value_type - self.options_id = options_id def is_accessible(self): - return RuntimeValueProvider.runtime_options_map.get( - self.options_id) is not None + return RuntimeValueProvider.runtime_options is not None def get(self): - runtime_options = ( - RuntimeValueProvider.runtime_options_map.get(self.options_id)) - if runtime_options is None: - raise RuntimeValueProviderError( + if RuntimeValueProvider.runtime_options is None: + raise error.RuntimeValueProviderError( '%s.get() not called from a runtime context' % self) - candidate = runtime_options.get(self.option_name) + candidate = RuntimeValueProvider.runtime_options.get(self.option_name) if candidate: value = self.value_type(candidate) else: value = self.default_value return value + # TODO(BEAM-1999): Remove _unused_options_id @classmethod - def set_runtime_options(cls, options_id, pipeline_options): - assert options_id not in RuntimeValueProvider.runtime_options_map - RuntimeValueProvider.runtime_options_map[options_id] = pipeline_options - - @classmethod - def unset_runtime_options(cls, options_id): - assert options_id in RuntimeValueProvider.runtime_options_map - del RuntimeValueProvider.runtime_options_map[options_id] + def set_runtime_options(cls, _unused_options_id, pipeline_options): + RuntimeValueProvider.runtime_options = pipeline_options def __str__(self): return '%s(option: %s, type: %s, default_value: %s)' % ( @@ -114,7 +97,7 @@ def check_accessible(value_provider_list): def _f(self, *args, **kwargs): for obj in [getattr(self, vp) for vp in value_provider_list]: if not obj.is_accessible(): - raise RuntimeValueProviderError('%s not accessible' % obj) + raise error.RuntimeValueProviderError('%s not accessible' % obj) return fnc(self, *args, **kwargs) return _f return _check_accessible http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/value_provider_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider_test.py b/sdks/python/apache_beam/utils/value_provider_test.py index 83cb5e9..0411dcc 100644 --- a/sdks/python/apache_beam/utils/value_provider_test.py +++ b/sdks/python/apache_beam/utils/value_provider_test.py @@ -132,7 +132,7 @@ class ValueProviderTests(unittest.TestCase): # provide values at job-execution time # (options not provided here will use their default, if they have one) RuntimeValueProvider.set_runtime_options( - options._options_id, {'vp_arg': 'abc', 'vp_pos_arg':'3.2'}) + None, {'vp_arg': 'abc', 'vp_pos_arg':'3.2'}) self.assertTrue(options.vp_arg.is_accessible()) self.assertEqual(options.vp_arg.get(), 'abc') self.assertTrue(options.vp_arg2.is_accessible()) @@ -143,23 +143,3 @@ class ValueProviderTests(unittest.TestCase): self.assertIsNone(options.vp_arg4.get()) self.assertTrue(options.vp_pos_arg.is_accessible()) self.assertEqual(options.vp_pos_arg.get(), 1.2) - - def test_options_id(self): - class Opt1(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument('--arg1') - - class Opt2(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument('--arg2') - - opt1 = Opt1() - opt2 = Opt2() - self.assertFalse(opt1.arg1.is_accessible()) - self.assertFalse(opt2.arg2.is_accessible()) - RuntimeValueProvider.set_runtime_options( - opt1.arg1.options_id, {'arg1': 'val1'}) - self.assertTrue(opt1.arg1.is_accessible()) - self.assertFalse(opt2.arg2.is_accessible())
