This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 98faabf  [BEAM-6942]  Make modifications to pipeline options to be 
visible to all views. (#8225)
98faabf is described below

commit 98faabff70b58935b7cbc7efeacc8ec1a850479d
Author: tvalentyn <tvalen...@users.noreply.github.com>
AuthorDate: Fri Apr 12 18:10:39 2019 -0700

    [BEAM-6942]  Make modifications to pipeline options to be visible to all 
views. (#8225)
    
    * Fixes a bug in pipeline option that prevents changes of multi-valued 
options to be propagated across views. Also, adds documentation and unit tests 
for codepath affected by the bug and the fix.
    
    * Fix typos in flags defined in PortableOptions.
    
    * Check that views do not expose attributes defined by other views.
---
 .../python/apache_beam/options/pipeline_options.py |  95 +++++++++---
 .../apache_beam/options/pipeline_options_test.py   | 159 ++++++++++++++++++++-
 .../runners/dataflow/dataflow_runner.py            |  32 ++---
 .../runners/dataflow/dataflow_runner_test.py       |  27 ++++
 4 files changed, 273 insertions(+), 40 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index d9e8164..745dcad 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -70,9 +70,9 @@ class _BeamArgumentParser(argparse.ArgumentParser):
       @classmethod
 
       def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument('--vp-arg1', default='start')
-        parser.add_value_provider_argument('--vp-arg2')
-        parser.add_argument('--non-vp-arg')
+        parser.add_value_provider_argument('--vp_arg1', default='start')
+        parser.add_value_provider_argument('--vp_arg2')
+        parser.add_argument('--non_vp_arg')
 
   """
   def add_value_provider_argument(self, *args, **kwargs):
@@ -117,11 +117,13 @@ class _BeamArgumentParser(argparse.ArgumentParser):
 
 
 class PipelineOptions(HasDisplayData):
-  """Pipeline options class used as container for command line options.
+  """This class and subclasses are used as containers for command line options.
 
-  The class is essentially a wrapper over the standard argparse Python module
+  These classes are wrappers over the standard argparse Python module
   (see https://docs.python.org/3/library/argparse.html).  To define one option
-  or a group of options you subclass from PipelineOptions::
+  or a group of options, create a subclass from PipelineOptions.
+
+  Example Usage::
 
     class XyzOptions(PipelineOptions):
 
@@ -134,12 +136,20 @@ class PipelineOptions(HasDisplayData):
   described in the argparse public documentation.
 
   Pipeline objects require an options object during initialization.
-  This is obtained simply by initializing an options class as defined above::
+  This is obtained simply by initializing an options class as defined above.
+
+  Example Usage::
 
     p = Pipeline(options=XyzOptions())
     if p.options.xyz == 'end':
       raise ValueError('Option xyz has an invalid value.')
 
+  Instances of PipelineOptions or any of its subclass have access to values
+  defined by other PipelineOption subclasses (see get_all_options()), and
+  can be converted to an instance of another PipelineOptions subclass
+  (see view_as()). All views share the underlying data structure that stores
+  option key-value pairs.
+
   By default the options classes will use command line arguments to initialize
   the options.
   """
@@ -150,7 +160,7 @@ class PipelineOptions(HasDisplayData):
     arguments and then parse the command line specified by flags or by default
     the one obtained from sys.argv.
 
-    The subclasses are not expected to require a redefinition of __init__.
+    The subclasses of PipelineOptions do not need to redefine __init__.
 
     Args:
       flags: An iterable of command line arguments to be used. If not specified
@@ -158,20 +168,40 @@ class PipelineOptions(HasDisplayData):
 
       **kwargs: Add overrides for arguments passed in flags.
     """
+    # self._flags stores a list of not yet parsed arguments, typically,
+    # command-line flags. This list is shared across different views.
+    # See: view_as().
     self._flags = flags
-    self._all_options = kwargs
-    parser = _BeamArgumentParser()
 
+    # Build parser that will parse options recognized by the [sub]class of
+    # PipelineOptions whose object is being instantiated.
+    parser = _BeamArgumentParser()
     for cls in type(self).mro():
       if cls == PipelineOptions:
         break
       elif '_add_argparse_args' in cls.__dict__:
         cls._add_argparse_args(parser)
-    # The _visible_options attribute will contain only those options from the
-    # flags (i.e., command line) that can be recognized. The _all_options
-    # field contains additional overrides.
+
+    # The _visible_options attribute will contain options that were recognized
+    # by the parser.
     self._visible_options, _ = parser.parse_known_args(flags)
 
+    # self._all_options is initialized with overrides to flag values,
+    # provided in kwargs, and will store key-value pairs for options recognized
+    # by current PipelineOptions [sub]class and its views that may be created.
+    # See: view_as().
+    # This dictionary is shared across different views, and is lazily updated
+    # as each new views are created.
+    # Users access this dictionary store via __getattr__ / __setattr__ methods.
+    self._all_options = kwargs
+
+    # Initialize values of keys defined by this class.
+    for option_name in self._visible_option_list():
+      # Note that options specified in kwargs will not be overwritten.
+      if option_name not in self._all_options:
+        self._all_options[option_name] = getattr(self._visible_options,
+                                                 option_name)
+
   @classmethod
   def _add_argparse_args(cls, parser):
     # Override this in subclasses to provide options.
@@ -246,7 +276,40 @@ class PipelineOptions(HasDisplayData):
     return self.get_all_options(True)
 
   def view_as(self, cls):
+    """Returns a view of current object as provided PipelineOption subclass.
+
+    Example Usage::
+
+      options = PipelineOptions(['--runner', 'Direct', '--streaming'])
+      standard_options = options.view_as(StandardOptions)
+      if standard_options.streaming:
+        # ... start a streaming job ...
+
+    Note that options objects may have multiple views, and modifications
+    of values in any view-object will apply to current object and other
+    view-objects.
+
+    Args:
+      cls: PipelineOptions class or any of its subclasses.
+
+    Returns:
+      An instance of cls that is intitialized using options contained in 
current
+      object.
+
+    """
     view = cls(self._flags)
+    for option_name in view._visible_option_list():
+      # Initialize values of keys defined by a cls.
+      #
+      # Note that we do initialization only once per key to make sure that
+      # values in _all_options dict are not-recreated with each new view.
+      # This is important to make sure that values of multi-options keys are
+      # backed by the same list across multiple views, and that any overrides 
of
+      # pipeline options already stored in _all_options are preserved.
+      if option_name not in self._all_options:
+        self._all_options[option_name] = getattr(view._visible_options,
+                                                 option_name)
+    # Note that views will still store _all_options of the source object.
     view._all_options = self._all_options
     return view
 
@@ -264,7 +327,7 @@ class PipelineOptions(HasDisplayData):
     if name[:2] == name[-2:] == '__':
       return object.__getattribute__(self, name)
     elif name in self._visible_option_list():
-      return self._all_options.get(name, getattr(self._visible_options, name))
+      return self._all_options[name]
     else:
       raise AttributeError("'%s' object has no attribute '%s'" %
                            (type(self).__name__, name))
@@ -738,13 +801,13 @@ class PortableOptions(PipelineOptions):
               '"<ENV_VAL>"} }. All fields in the json are optional except '
               'command.'))
     parser.add_argument(
-        '--sdk-worker-parallelism', default=None,
+        '--sdk_worker_parallelism', default=None,
         help=('Sets the number of sdk worker processes that will run on each '
               'worker node. Default is 1. If 0, it will be automatically set '
               'by the runner by looking at different parameters (e.g. number '
               'of CPU cores on the worker machine).'))
     parser.add_argument(
-        '--environment-cache-millis', default=0,
+        '--environment_cache_millis', default=0,
         help=('Duration in milliseconds for environment cache within a job. '
               '0 means no caching.'))
 
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py 
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 021097b..5c51725 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -24,6 +24,7 @@ import unittest
 
 import hamcrest as hc
 
+from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import ProfilingOptions
 from apache_beam.options.pipeline_options import TypeOptions
@@ -119,12 +120,36 @@ class PipelineOptionsTest(unittest.TestCase):
           '--mock_multi_option', action='append', help='mock multi option')
       parser.add_argument('--option with space', help='mock option with space')
 
+  # Use with MockOptions in test cases where multiple option classes are 
needed.
+  class FakeOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--fake_flag', action='store_true', help='fake flag')
+      parser.add_argument('--fake_option', help='fake option')
+      parser.add_argument(
+          '--fake_multi_option', action='append', help='fake multi option')
+
   def test_display_data(self):
     for case in PipelineOptionsTest.TEST_CASES:
       options = PipelineOptions(flags=case['flags'])
       dd = DisplayData.create_from(options)
       hc.assert_that(dd.items, hc.contains_inanyorder(*case['display_data']))
 
+  def test_get_all_options_subclass(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      options = PipelineOptionsTest.MockOptions(flags=case['flags'])
+      self.assertDictContainsSubset(case['expected'], 
options.get_all_options())
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_flag,
+                       case['expected']['mock_flag'])
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_option,
+                       case['expected']['mock_option'])
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_multi_option,
+                       case['expected']['mock_multi_option'])
+
   def test_get_all_options(self):
     for case in PipelineOptionsTest.TEST_CASES:
       options = PipelineOptions(flags=case['flags'])
@@ -139,6 +164,49 @@ class PipelineOptionsTest(unittest.TestCase):
           PipelineOptionsTest.MockOptions).mock_multi_option,
                        case['expected']['mock_multi_option'])
 
+  def test_sublcalsses_of_pipeline_options_can_be_instantiated(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      mock_options = PipelineOptionsTest.MockOptions(flags=case['flags'])
+      self.assertEqual(mock_options.mock_flag,
+                       case['expected']['mock_flag'])
+      self.assertEqual(mock_options.mock_option,
+                       case['expected']['mock_option'])
+      self.assertEqual(mock_options.mock_multi_option,
+                       case['expected']['mock_multi_option'])
+
+  def test_views_can_be_constructed_from_pipeline_option_subclasses(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      fake_options = PipelineOptionsTest.FakeOptions(flags=case['flags'])
+      mock_options = fake_options.view_as(PipelineOptionsTest.MockOptions)
+
+      self.assertEqual(mock_options.mock_flag,
+                       case['expected']['mock_flag'])
+      self.assertEqual(mock_options.mock_option,
+                       case['expected']['mock_option'])
+      self.assertEqual(mock_options.mock_multi_option,
+                       case['expected']['mock_multi_option'])
+
+  def test_views_do_not_expose_options_defined_by_other_views(self):
+    flags = ['--mock_option=mock_value', '--fake_option=fake_value']
+
+    options = PipelineOptions(flags)
+    assert options.view_as(
+        PipelineOptionsTest.MockOptions).mock_option == 'mock_value'
+    assert options.view_as(
+        PipelineOptionsTest.FakeOptions).fake_option == 'fake_value'
+    assert options.view_as(
+        PipelineOptionsTest.MockOptions).view_as(
+            PipelineOptionsTest.FakeOptions).fake_option == 'fake_value'
+
+    self.assertRaises(
+        AttributeError,
+        lambda: options.view_as(PipelineOptionsTest.MockOptions).fake_option)
+    self.assertRaises(
+        AttributeError,
+        lambda: options.view_as(PipelineOptionsTest.MockOptions).view_as(
+            PipelineOptionsTest.FakeOptions).view_as(
+                PipelineOptionsTest.MockOptions).fake_option)
+
   def test_from_dictionary(self):
     for case in PipelineOptionsTest.TEST_CASES:
       options = PipelineOptions(flags=case['flags'])
@@ -184,6 +252,57 @@ class PipelineOptionsTest(unittest.TestCase):
     options = PipelineOptions(flags=[''])
     self.assertEqual(options.get_all_options()['experiments'], None)
 
+  def test_option_modifications_are_shared_between_views(self):
+    pipeline_options = PipelineOptions([
+        '--mock_option', 'value', '--mock_flag',
+        '--mock_multi_option', 'value1',
+        '--mock_multi_option', 'value2',
+    ])
+
+    mock_options = PipelineOptionsTest.MockOptions([
+        '--mock_option', 'value', '--mock_flag',
+        '--mock_multi_option', 'value1',
+        '--mock_multi_option', 'value2',
+    ])
+
+    for options in [pipeline_options, mock_options]:
+      view1 = options.view_as(PipelineOptionsTest.MockOptions)
+      view2 = options.view_as(PipelineOptionsTest.MockOptions)
+
+      view1.mock_option = 'new_value'
+      view1.mock_flag = False
+      view1.mock_multi_option.append('value3')
+
+      view3 = options.view_as(PipelineOptionsTest.MockOptions)
+      view4 = view1.view_as(PipelineOptionsTest.MockOptions)
+      view5 = options.view_as(TypeOptions).view_as(
+          PipelineOptionsTest.MockOptions)
+
+      for view in [view1, view2, view3, view4, view5]:
+        self.assertEqual('new_value', view.mock_option)
+        self.assertFalse(view.mock_flag)
+        self.assertEqual(['value1', 'value2', 'value3'], 
view.mock_multi_option)
+
+  def test_uninitialized_option_modifications_are_shared_between_views(self):
+    options = PipelineOptions([])
+
+    view1 = options.view_as(PipelineOptionsTest.MockOptions)
+    view2 = options.view_as(PipelineOptionsTest.MockOptions)
+
+    view1.mock_option = 'some_value'
+    view1.mock_flag = False
+    view1.mock_multi_option = ['value1', 'value2']
+
+    view3 = options.view_as(PipelineOptionsTest.MockOptions)
+    view4 = view1.view_as(PipelineOptionsTest.MockOptions)
+    view5 = options.view_as(TypeOptions).view_as(
+        PipelineOptionsTest.MockOptions)
+
+    for view in [view1, view2, view3, view4, view5]:
+      self.assertEqual('some_value', view.mock_option)
+      self.assertFalse(view.mock_flag)
+      self.assertEqual(['value1', 'value2'], view.mock_multi_option)
+
   def test_extra_package(self):
     options = PipelineOptions(['--extra_package', 'abc',
                                '--extra_packages', 'def',
@@ -211,13 +330,13 @@ class PipelineOptionsTest(unittest.TestCase):
 
   def test_redefine_options(self):
 
-    class TestRedefinedOptios(PipelineOptions):  # pylint: 
disable=unused-variable
+    class TestRedefinedOptions(PipelineOptions):  # pylint: 
disable=unused-variable
 
       @classmethod
       def _add_argparse_args(cls, parser):
         parser.add_argument('--redefined_flag', action='store_true')
 
-    class TestRedefinedOptios(PipelineOptions):
+    class TestRedefinedOptions(PipelineOptions):
 
       @classmethod
       def _add_argparse_args(cls, parser):
@@ -303,6 +422,42 @@ class PipelineOptionsTest(unittest.TestCase):
       options = PipelineOptions(['--type_check_strictness', 'blahblah'])
       options.view_as(TypeOptions)
 
+  def test_add_experiment(self):
+    options = PipelineOptions([])
+    options.view_as(DebugOptions).add_experiment('new_experiment')
+    self.assertEqual(
+        ['new_experiment'],
+        options.view_as(DebugOptions).experiments
+    )
+
+  def test_add_experiment_preserves_existing_experiments(self):
+    options = PipelineOptions(['--experiment=existing_experiment'])
+    options.view_as(DebugOptions).add_experiment('new_experiment')
+    self.assertEqual(
+        ['existing_experiment', 'new_experiment'],
+        options.view_as(DebugOptions).experiments
+    )
+
+  def test_lookup_experiments(self):
+    options = PipelineOptions([
+        '--experiment=existing_experiment',
+        '--experiment', 'key=value',
+        '--experiment', 'master_key=k1=v1,k2=v2',
+    ])
+    debug_options = options.view_as(DebugOptions)
+    self.assertEqual(
+        'default_value',
+        debug_options.lookup_experiment('nonexistent', 'default_value'))
+    self.assertEqual(
+        'value',
+        debug_options.lookup_experiment('key', 'default_value'))
+    self.assertEqual(
+        'k1=v1,k2=v2',
+        debug_options.lookup_experiment('master_key'))
+    self.assertEqual(
+        True,
+        debug_options.lookup_experiment('existing_experiment'))
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e3e7bd6..3f21011 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -358,34 +358,22 @@ class DataflowRunner(PipelineRunner):
     debug_options = options.view_as(DebugOptions)
     worker_options = options.view_as(WorkerOptions)
     if worker_options.min_cpu_platform:
-      experiments = ["min_cpu_platform=%s" % worker_options.min_cpu_platform]
-      if debug_options.experiments is not None:
-        experiments = list(set(experiments + debug_options.experiments))
-      debug_options.experiments = experiments
+      debug_options.add_experiment('min_cpu_platform=' +
+                                   worker_options.min_cpu_platform)
 
     # Elevate "enable_streaming_engine" to pipeline option, but using the
     # existing experiment.
     google_cloud_options = options.view_as(GoogleCloudOptions)
     if google_cloud_options.enable_streaming_engine:
-      if debug_options.experiments is None:
-        debug_options.experiments = []
-      if "enable_windmill_service" not in debug_options.experiments:
-        debug_options.experiments.append("enable_windmill_service")
-      if "enable_streaming_engine" not in debug_options.experiments:
-        debug_options.experiments.append("enable_streaming_engine")
+      debug_options.add_experiment("enable_windmill_service")
+      debug_options.add_experiment("enable_streaming_engine")
     else:
-      if debug_options.experiments is not None:
-        if ("enable_windmill_service" in debug_options.experiments
-            or "enable_streaming_engine" in debug_options.experiments):
-          raise ValueError("""Streaming engine both disabled and enabled:
-          enable_streaming_engine flag is not set, but enable_windmill_service
-          and/or enable_streaming_engine are present. It is recommended you
-          only set the enable_streaming_engine flag.""")
-
-    # TODO(BEAM-6664): Remove once Dataflow supports --dataflow_kms_key.
-    if google_cloud_options.dataflow_kms_key is not None:
-      debug_options.add_experiment('service_default_cmek_config=' +
-                                   google_cloud_options.dataflow_kms_key)
+      if (debug_options.lookup_experiment("enable_windmill_service") or
+          debug_options.lookup_experiment("enable_streaming_engine")):
+        raise ValueError("""Streaming engine both disabled and enabled:
+        enable_streaming_engine flag is not set, but enable_windmill_service
+        and/or enable_streaming_engine experiments are present.
+        It is recommended you only set the enable_streaming_engine flag.""")
 
     self.job = apiclient.Job(options, self.proto_pipeline)
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 6c19c79..8c6ad05 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -29,6 +29,7 @@ import mock
 
 import apache_beam as beam
 import apache_beam.transforms as ptransform
+from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import AppliedPTransform
 from apache_beam.pipeline import Pipeline
@@ -388,6 +389,32 @@ class DataflowRunnerTest(unittest.TestCase):
           common_urns.side_inputs.MULTIMAP.urn,
           side_input._side_input_data().access_pattern)
 
+  def test_min_cpu_platform_flag_is_propagated_to_experiments(self):
+    remote_runner = DataflowRunner()
+    self.default_properties.append('--min_cpu_platform=Intel Haswell')
+
+    p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+    p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
+    p.run()
+    self.assertIn('min_cpu_platform=Intel Haswell',
+                  remote_runner.job.options.view_as(DebugOptions).experiments)
+
+  def test_streaming_engine_flag_adds_windmill_experiments(self):
+    remote_runner = DataflowRunner()
+    self.default_properties.append('--streaming')
+    self.default_properties.append('--enable_streaming_engine')
+    self.default_properties.append('--experiment=some_other_experiment')
+
+    p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+    p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
+    p.run()
+
+    experiments_for_job = (
+        remote_runner.job.options.view_as(DebugOptions).experiments)
+    self.assertIn('enable_streaming_engine', experiments_for_job)
+    self.assertIn('enable_windmill_service', experiments_for_job)
+    self.assertIn('some_other_experiment', experiments_for_job)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to