This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 98faabf [BEAM-6942] Make modifications to pipeline options to be visible to all views. (#8225) 98faabf is described below commit 98faabff70b58935b7cbc7efeacc8ec1a850479d Author: tvalentyn <tvalen...@users.noreply.github.com> AuthorDate: Fri Apr 12 18:10:39 2019 -0700 [BEAM-6942] Make modifications to pipeline options to be visible to all views. (#8225) * Fixes a bug in pipeline option that prevents changes of multi-valued options to be propagated across views. Also, adds documentation and unit tests for codepath affected by the bug and the fix. * Fix typos in flags defined in PortableOptions. * Check that views do not expose attributes defined by other views. --- .../python/apache_beam/options/pipeline_options.py | 95 +++++++++--- .../apache_beam/options/pipeline_options_test.py | 159 ++++++++++++++++++++- .../runners/dataflow/dataflow_runner.py | 32 ++--- .../runners/dataflow/dataflow_runner_test.py | 27 ++++ 4 files changed, 273 insertions(+), 40 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index d9e8164..745dcad 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -70,9 +70,9 @@ class _BeamArgumentParser(argparse.ArgumentParser): @classmethod def _add_argparse_args(cls, parser): - parser.add_value_provider_argument('--vp-arg1', default='start') - parser.add_value_provider_argument('--vp-arg2') - parser.add_argument('--non-vp-arg') + parser.add_value_provider_argument('--vp_arg1', default='start') + parser.add_value_provider_argument('--vp_arg2') + parser.add_argument('--non_vp_arg') """ def add_value_provider_argument(self, *args, **kwargs): @@ -117,11 +117,13 @@ class _BeamArgumentParser(argparse.ArgumentParser): class PipelineOptions(HasDisplayData): - """Pipeline options class used as container for command line options. + """This class and subclasses are used as containers for command line options. - The class is essentially a wrapper over the standard argparse Python module + These classes are wrappers over the standard argparse Python module (see https://docs.python.org/3/library/argparse.html). To define one option - or a group of options you subclass from PipelineOptions:: + or a group of options, create a subclass from PipelineOptions. + + Example Usage:: class XyzOptions(PipelineOptions): @@ -134,12 +136,20 @@ class PipelineOptions(HasDisplayData): described in the argparse public documentation. Pipeline objects require an options object during initialization. - This is obtained simply by initializing an options class as defined above:: + This is obtained simply by initializing an options class as defined above. + + Example Usage:: p = Pipeline(options=XyzOptions()) if p.options.xyz == 'end': raise ValueError('Option xyz has an invalid value.') + Instances of PipelineOptions or any of its subclass have access to values + defined by other PipelineOption subclasses (see get_all_options()), and + can be converted to an instance of another PipelineOptions subclass + (see view_as()). All views share the underlying data structure that stores + option key-value pairs. + By default the options classes will use command line arguments to initialize the options. """ @@ -150,7 +160,7 @@ class PipelineOptions(HasDisplayData): arguments and then parse the command line specified by flags or by default the one obtained from sys.argv. - The subclasses are not expected to require a redefinition of __init__. + The subclasses of PipelineOptions do not need to redefine __init__. Args: flags: An iterable of command line arguments to be used. If not specified @@ -158,20 +168,40 @@ class PipelineOptions(HasDisplayData): **kwargs: Add overrides for arguments passed in flags. """ + # self._flags stores a list of not yet parsed arguments, typically, + # command-line flags. This list is shared across different views. + # See: view_as(). self._flags = flags - self._all_options = kwargs - parser = _BeamArgumentParser() + # Build parser that will parse options recognized by the [sub]class of + # PipelineOptions whose object is being instantiated. + parser = _BeamArgumentParser() for cls in type(self).mro(): if cls == PipelineOptions: break elif '_add_argparse_args' in cls.__dict__: cls._add_argparse_args(parser) - # The _visible_options attribute will contain only those options from the - # flags (i.e., command line) that can be recognized. The _all_options - # field contains additional overrides. + + # The _visible_options attribute will contain options that were recognized + # by the parser. self._visible_options, _ = parser.parse_known_args(flags) + # self._all_options is initialized with overrides to flag values, + # provided in kwargs, and will store key-value pairs for options recognized + # by current PipelineOptions [sub]class and its views that may be created. + # See: view_as(). + # This dictionary is shared across different views, and is lazily updated + # as each new views are created. + # Users access this dictionary store via __getattr__ / __setattr__ methods. + self._all_options = kwargs + + # Initialize values of keys defined by this class. + for option_name in self._visible_option_list(): + # Note that options specified in kwargs will not be overwritten. + if option_name not in self._all_options: + self._all_options[option_name] = getattr(self._visible_options, + option_name) + @classmethod def _add_argparse_args(cls, parser): # Override this in subclasses to provide options. @@ -246,7 +276,40 @@ class PipelineOptions(HasDisplayData): return self.get_all_options(True) def view_as(self, cls): + """Returns a view of current object as provided PipelineOption subclass. + + Example Usage:: + + options = PipelineOptions(['--runner', 'Direct', '--streaming']) + standard_options = options.view_as(StandardOptions) + if standard_options.streaming: + # ... start a streaming job ... + + Note that options objects may have multiple views, and modifications + of values in any view-object will apply to current object and other + view-objects. + + Args: + cls: PipelineOptions class or any of its subclasses. + + Returns: + An instance of cls that is intitialized using options contained in current + object. + + """ view = cls(self._flags) + for option_name in view._visible_option_list(): + # Initialize values of keys defined by a cls. + # + # Note that we do initialization only once per key to make sure that + # values in _all_options dict are not-recreated with each new view. + # This is important to make sure that values of multi-options keys are + # backed by the same list across multiple views, and that any overrides of + # pipeline options already stored in _all_options are preserved. + if option_name not in self._all_options: + self._all_options[option_name] = getattr(view._visible_options, + option_name) + # Note that views will still store _all_options of the source object. view._all_options = self._all_options return view @@ -264,7 +327,7 @@ class PipelineOptions(HasDisplayData): if name[:2] == name[-2:] == '__': return object.__getattribute__(self, name) elif name in self._visible_option_list(): - return self._all_options.get(name, getattr(self._visible_options, name)) + return self._all_options[name] else: raise AttributeError("'%s' object has no attribute '%s'" % (type(self).__name__, name)) @@ -738,13 +801,13 @@ class PortableOptions(PipelineOptions): '"<ENV_VAL>"} }. All fields in the json are optional except ' 'command.')) parser.add_argument( - '--sdk-worker-parallelism', default=None, + '--sdk_worker_parallelism', default=None, help=('Sets the number of sdk worker processes that will run on each ' 'worker node. Default is 1. If 0, it will be automatically set ' 'by the runner by looking at different parameters (e.g. number ' 'of CPU cores on the worker machine).')) parser.add_argument( - '--environment-cache-millis', default=0, + '--environment_cache_millis', default=0, help=('Duration in milliseconds for environment cache within a job. ' '0 means no caching.')) diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 021097b..5c51725 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -24,6 +24,7 @@ import unittest import hamcrest as hc +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions from apache_beam.options.pipeline_options import TypeOptions @@ -119,12 +120,36 @@ class PipelineOptionsTest(unittest.TestCase): '--mock_multi_option', action='append', help='mock multi option') parser.add_argument('--option with space', help='mock option with space') + # Use with MockOptions in test cases where multiple option classes are needed. + class FakeOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--fake_flag', action='store_true', help='fake flag') + parser.add_argument('--fake_option', help='fake option') + parser.add_argument( + '--fake_multi_option', action='append', help='fake multi option') + def test_display_data(self): for case in PipelineOptionsTest.TEST_CASES: options = PipelineOptions(flags=case['flags']) dd = DisplayData.create_from(options) hc.assert_that(dd.items, hc.contains_inanyorder(*case['display_data'])) + def test_get_all_options_subclass(self): + for case in PipelineOptionsTest.TEST_CASES: + options = PipelineOptionsTest.MockOptions(flags=case['flags']) + self.assertDictContainsSubset(case['expected'], options.get_all_options()) + self.assertEqual(options.view_as( + PipelineOptionsTest.MockOptions).mock_flag, + case['expected']['mock_flag']) + self.assertEqual(options.view_as( + PipelineOptionsTest.MockOptions).mock_option, + case['expected']['mock_option']) + self.assertEqual(options.view_as( + PipelineOptionsTest.MockOptions).mock_multi_option, + case['expected']['mock_multi_option']) + def test_get_all_options(self): for case in PipelineOptionsTest.TEST_CASES: options = PipelineOptions(flags=case['flags']) @@ -139,6 +164,49 @@ class PipelineOptionsTest(unittest.TestCase): PipelineOptionsTest.MockOptions).mock_multi_option, case['expected']['mock_multi_option']) + def test_sublcalsses_of_pipeline_options_can_be_instantiated(self): + for case in PipelineOptionsTest.TEST_CASES: + mock_options = PipelineOptionsTest.MockOptions(flags=case['flags']) + self.assertEqual(mock_options.mock_flag, + case['expected']['mock_flag']) + self.assertEqual(mock_options.mock_option, + case['expected']['mock_option']) + self.assertEqual(mock_options.mock_multi_option, + case['expected']['mock_multi_option']) + + def test_views_can_be_constructed_from_pipeline_option_subclasses(self): + for case in PipelineOptionsTest.TEST_CASES: + fake_options = PipelineOptionsTest.FakeOptions(flags=case['flags']) + mock_options = fake_options.view_as(PipelineOptionsTest.MockOptions) + + self.assertEqual(mock_options.mock_flag, + case['expected']['mock_flag']) + self.assertEqual(mock_options.mock_option, + case['expected']['mock_option']) + self.assertEqual(mock_options.mock_multi_option, + case['expected']['mock_multi_option']) + + def test_views_do_not_expose_options_defined_by_other_views(self): + flags = ['--mock_option=mock_value', '--fake_option=fake_value'] + + options = PipelineOptions(flags) + assert options.view_as( + PipelineOptionsTest.MockOptions).mock_option == 'mock_value' + assert options.view_as( + PipelineOptionsTest.FakeOptions).fake_option == 'fake_value' + assert options.view_as( + PipelineOptionsTest.MockOptions).view_as( + PipelineOptionsTest.FakeOptions).fake_option == 'fake_value' + + self.assertRaises( + AttributeError, + lambda: options.view_as(PipelineOptionsTest.MockOptions).fake_option) + self.assertRaises( + AttributeError, + lambda: options.view_as(PipelineOptionsTest.MockOptions).view_as( + PipelineOptionsTest.FakeOptions).view_as( + PipelineOptionsTest.MockOptions).fake_option) + def test_from_dictionary(self): for case in PipelineOptionsTest.TEST_CASES: options = PipelineOptions(flags=case['flags']) @@ -184,6 +252,57 @@ class PipelineOptionsTest(unittest.TestCase): options = PipelineOptions(flags=['']) self.assertEqual(options.get_all_options()['experiments'], None) + def test_option_modifications_are_shared_between_views(self): + pipeline_options = PipelineOptions([ + '--mock_option', 'value', '--mock_flag', + '--mock_multi_option', 'value1', + '--mock_multi_option', 'value2', + ]) + + mock_options = PipelineOptionsTest.MockOptions([ + '--mock_option', 'value', '--mock_flag', + '--mock_multi_option', 'value1', + '--mock_multi_option', 'value2', + ]) + + for options in [pipeline_options, mock_options]: + view1 = options.view_as(PipelineOptionsTest.MockOptions) + view2 = options.view_as(PipelineOptionsTest.MockOptions) + + view1.mock_option = 'new_value' + view1.mock_flag = False + view1.mock_multi_option.append('value3') + + view3 = options.view_as(PipelineOptionsTest.MockOptions) + view4 = view1.view_as(PipelineOptionsTest.MockOptions) + view5 = options.view_as(TypeOptions).view_as( + PipelineOptionsTest.MockOptions) + + for view in [view1, view2, view3, view4, view5]: + self.assertEqual('new_value', view.mock_option) + self.assertFalse(view.mock_flag) + self.assertEqual(['value1', 'value2', 'value3'], view.mock_multi_option) + + def test_uninitialized_option_modifications_are_shared_between_views(self): + options = PipelineOptions([]) + + view1 = options.view_as(PipelineOptionsTest.MockOptions) + view2 = options.view_as(PipelineOptionsTest.MockOptions) + + view1.mock_option = 'some_value' + view1.mock_flag = False + view1.mock_multi_option = ['value1', 'value2'] + + view3 = options.view_as(PipelineOptionsTest.MockOptions) + view4 = view1.view_as(PipelineOptionsTest.MockOptions) + view5 = options.view_as(TypeOptions).view_as( + PipelineOptionsTest.MockOptions) + + for view in [view1, view2, view3, view4, view5]: + self.assertEqual('some_value', view.mock_option) + self.assertFalse(view.mock_flag) + self.assertEqual(['value1', 'value2'], view.mock_multi_option) + def test_extra_package(self): options = PipelineOptions(['--extra_package', 'abc', '--extra_packages', 'def', @@ -211,13 +330,13 @@ class PipelineOptionsTest(unittest.TestCase): def test_redefine_options(self): - class TestRedefinedOptios(PipelineOptions): # pylint: disable=unused-variable + class TestRedefinedOptions(PipelineOptions): # pylint: disable=unused-variable @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--redefined_flag', action='store_true') - class TestRedefinedOptios(PipelineOptions): + class TestRedefinedOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): @@ -303,6 +422,42 @@ class PipelineOptionsTest(unittest.TestCase): options = PipelineOptions(['--type_check_strictness', 'blahblah']) options.view_as(TypeOptions) + def test_add_experiment(self): + options = PipelineOptions([]) + options.view_as(DebugOptions).add_experiment('new_experiment') + self.assertEqual( + ['new_experiment'], + options.view_as(DebugOptions).experiments + ) + + def test_add_experiment_preserves_existing_experiments(self): + options = PipelineOptions(['--experiment=existing_experiment']) + options.view_as(DebugOptions).add_experiment('new_experiment') + self.assertEqual( + ['existing_experiment', 'new_experiment'], + options.view_as(DebugOptions).experiments + ) + + def test_lookup_experiments(self): + options = PipelineOptions([ + '--experiment=existing_experiment', + '--experiment', 'key=value', + '--experiment', 'master_key=k1=v1,k2=v2', + ]) + debug_options = options.view_as(DebugOptions) + self.assertEqual( + 'default_value', + debug_options.lookup_experiment('nonexistent', 'default_value')) + self.assertEqual( + 'value', + debug_options.lookup_experiment('key', 'default_value')) + self.assertEqual( + 'k1=v1,k2=v2', + debug_options.lookup_experiment('master_key')) + self.assertEqual( + True, + debug_options.lookup_experiment('existing_experiment')) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index e3e7bd6..3f21011 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -358,34 +358,22 @@ class DataflowRunner(PipelineRunner): debug_options = options.view_as(DebugOptions) worker_options = options.view_as(WorkerOptions) if worker_options.min_cpu_platform: - experiments = ["min_cpu_platform=%s" % worker_options.min_cpu_platform] - if debug_options.experiments is not None: - experiments = list(set(experiments + debug_options.experiments)) - debug_options.experiments = experiments + debug_options.add_experiment('min_cpu_platform=' + + worker_options.min_cpu_platform) # Elevate "enable_streaming_engine" to pipeline option, but using the # existing experiment. google_cloud_options = options.view_as(GoogleCloudOptions) if google_cloud_options.enable_streaming_engine: - if debug_options.experiments is None: - debug_options.experiments = [] - if "enable_windmill_service" not in debug_options.experiments: - debug_options.experiments.append("enable_windmill_service") - if "enable_streaming_engine" not in debug_options.experiments: - debug_options.experiments.append("enable_streaming_engine") + debug_options.add_experiment("enable_windmill_service") + debug_options.add_experiment("enable_streaming_engine") else: - if debug_options.experiments is not None: - if ("enable_windmill_service" in debug_options.experiments - or "enable_streaming_engine" in debug_options.experiments): - raise ValueError("""Streaming engine both disabled and enabled: - enable_streaming_engine flag is not set, but enable_windmill_service - and/or enable_streaming_engine are present. It is recommended you - only set the enable_streaming_engine flag.""") - - # TODO(BEAM-6664): Remove once Dataflow supports --dataflow_kms_key. - if google_cloud_options.dataflow_kms_key is not None: - debug_options.add_experiment('service_default_cmek_config=' + - google_cloud_options.dataflow_kms_key) + if (debug_options.lookup_experiment("enable_windmill_service") or + debug_options.lookup_experiment("enable_streaming_engine")): + raise ValueError("""Streaming engine both disabled and enabled: + enable_streaming_engine flag is not set, but enable_windmill_service + and/or enable_streaming_engine experiments are present. + It is recommended you only set the enable_streaming_engine flag.""") self.job = apiclient.Job(options, self.proto_pipeline) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 6c19c79..8c6ad05 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -29,6 +29,7 @@ import mock import apache_beam as beam import apache_beam.transforms as ptransform +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import AppliedPTransform from apache_beam.pipeline import Pipeline @@ -388,6 +389,32 @@ class DataflowRunnerTest(unittest.TestCase): common_urns.side_inputs.MULTIMAP.urn, side_input._side_input_data().access_pattern) + def test_min_cpu_platform_flag_is_propagated_to_experiments(self): + remote_runner = DataflowRunner() + self.default_properties.append('--min_cpu_platform=Intel Haswell') + + p = Pipeline(remote_runner, PipelineOptions(self.default_properties)) + p | ptransform.Create([1]) # pylint: disable=expression-not-assigned + p.run() + self.assertIn('min_cpu_platform=Intel Haswell', + remote_runner.job.options.view_as(DebugOptions).experiments) + + def test_streaming_engine_flag_adds_windmill_experiments(self): + remote_runner = DataflowRunner() + self.default_properties.append('--streaming') + self.default_properties.append('--enable_streaming_engine') + self.default_properties.append('--experiment=some_other_experiment') + + p = Pipeline(remote_runner, PipelineOptions(self.default_properties)) + p | ptransform.Create([1]) # pylint: disable=expression-not-assigned + p.run() + + experiments_for_job = ( + remote_runner.job.options.view_as(DebugOptions).experiments) + self.assertIn('enable_streaming_engine', experiments_for_job) + self.assertIn('enable_windmill_service', experiments_for_job) + self.assertIn('some_other_experiment', experiments_for_job) + if __name__ == '__main__': unittest.main()