This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 98faabf [BEAM-6942] Make modifications to pipeline options to be
visible to all views. (#8225)
98faabf is described below
commit 98faabff70b58935b7cbc7efeacc8ec1a850479d
Author: tvalentyn <[email protected]>
AuthorDate: Fri Apr 12 18:10:39 2019 -0700
[BEAM-6942] Make modifications to pipeline options to be visible to all
views. (#8225)
* Fixes a bug in pipeline option that prevents changes of multi-valued
options to be propagated across views. Also, adds documentation and unit tests
for codepath affected by the bug and the fix.
* Fix typos in flags defined in PortableOptions.
* Check that views do not expose attributes defined by other views.
---
.../python/apache_beam/options/pipeline_options.py | 95 +++++++++---
.../apache_beam/options/pipeline_options_test.py | 159 ++++++++++++++++++++-
.../runners/dataflow/dataflow_runner.py | 32 ++---
.../runners/dataflow/dataflow_runner_test.py | 27 ++++
4 files changed, 273 insertions(+), 40 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index d9e8164..745dcad 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -70,9 +70,9 @@ class _BeamArgumentParser(argparse.ArgumentParser):
@classmethod
def _add_argparse_args(cls, parser):
- parser.add_value_provider_argument('--vp-arg1', default='start')
- parser.add_value_provider_argument('--vp-arg2')
- parser.add_argument('--non-vp-arg')
+ parser.add_value_provider_argument('--vp_arg1', default='start')
+ parser.add_value_provider_argument('--vp_arg2')
+ parser.add_argument('--non_vp_arg')
"""
def add_value_provider_argument(self, *args, **kwargs):
@@ -117,11 +117,13 @@ class _BeamArgumentParser(argparse.ArgumentParser):
class PipelineOptions(HasDisplayData):
- """Pipeline options class used as container for command line options.
+ """This class and subclasses are used as containers for command line options.
- The class is essentially a wrapper over the standard argparse Python module
+ These classes are wrappers over the standard argparse Python module
(see https://docs.python.org/3/library/argparse.html). To define one option
- or a group of options you subclass from PipelineOptions::
+ or a group of options, create a subclass from PipelineOptions.
+
+ Example Usage::
class XyzOptions(PipelineOptions):
@@ -134,12 +136,20 @@ class PipelineOptions(HasDisplayData):
described in the argparse public documentation.
Pipeline objects require an options object during initialization.
- This is obtained simply by initializing an options class as defined above::
+ This is obtained simply by initializing an options class as defined above.
+
+ Example Usage::
p = Pipeline(options=XyzOptions())
if p.options.xyz == 'end':
raise ValueError('Option xyz has an invalid value.')
+ Instances of PipelineOptions or any of its subclass have access to values
+ defined by other PipelineOption subclasses (see get_all_options()), and
+ can be converted to an instance of another PipelineOptions subclass
+ (see view_as()). All views share the underlying data structure that stores
+ option key-value pairs.
+
By default the options classes will use command line arguments to initialize
the options.
"""
@@ -150,7 +160,7 @@ class PipelineOptions(HasDisplayData):
arguments and then parse the command line specified by flags or by default
the one obtained from sys.argv.
- The subclasses are not expected to require a redefinition of __init__.
+ The subclasses of PipelineOptions do not need to redefine __init__.
Args:
flags: An iterable of command line arguments to be used. If not specified
@@ -158,20 +168,40 @@ class PipelineOptions(HasDisplayData):
**kwargs: Add overrides for arguments passed in flags.
"""
+ # self._flags stores a list of not yet parsed arguments, typically,
+ # command-line flags. This list is shared across different views.
+ # See: view_as().
self._flags = flags
- self._all_options = kwargs
- parser = _BeamArgumentParser()
+ # Build parser that will parse options recognized by the [sub]class of
+ # PipelineOptions whose object is being instantiated.
+ parser = _BeamArgumentParser()
for cls in type(self).mro():
if cls == PipelineOptions:
break
elif '_add_argparse_args' in cls.__dict__:
cls._add_argparse_args(parser)
- # The _visible_options attribute will contain only those options from the
- # flags (i.e., command line) that can be recognized. The _all_options
- # field contains additional overrides.
+
+ # The _visible_options attribute will contain options that were recognized
+ # by the parser.
self._visible_options, _ = parser.parse_known_args(flags)
+ # self._all_options is initialized with overrides to flag values,
+ # provided in kwargs, and will store key-value pairs for options recognized
+ # by current PipelineOptions [sub]class and its views that may be created.
+ # See: view_as().
+ # This dictionary is shared across different views, and is lazily updated
+ # as each new views are created.
+ # Users access this dictionary store via __getattr__ / __setattr__ methods.
+ self._all_options = kwargs
+
+ # Initialize values of keys defined by this class.
+ for option_name in self._visible_option_list():
+ # Note that options specified in kwargs will not be overwritten.
+ if option_name not in self._all_options:
+ self._all_options[option_name] = getattr(self._visible_options,
+ option_name)
+
@classmethod
def _add_argparse_args(cls, parser):
# Override this in subclasses to provide options.
@@ -246,7 +276,40 @@ class PipelineOptions(HasDisplayData):
return self.get_all_options(True)
def view_as(self, cls):
+ """Returns a view of current object as provided PipelineOption subclass.
+
+ Example Usage::
+
+ options = PipelineOptions(['--runner', 'Direct', '--streaming'])
+ standard_options = options.view_as(StandardOptions)
+ if standard_options.streaming:
+ # ... start a streaming job ...
+
+ Note that options objects may have multiple views, and modifications
+ of values in any view-object will apply to current object and other
+ view-objects.
+
+ Args:
+ cls: PipelineOptions class or any of its subclasses.
+
+ Returns:
+ An instance of cls that is intitialized using options contained in
current
+ object.
+
+ """
view = cls(self._flags)
+ for option_name in view._visible_option_list():
+ # Initialize values of keys defined by a cls.
+ #
+ # Note that we do initialization only once per key to make sure that
+ # values in _all_options dict are not-recreated with each new view.
+ # This is important to make sure that values of multi-options keys are
+ # backed by the same list across multiple views, and that any overrides
of
+ # pipeline options already stored in _all_options are preserved.
+ if option_name not in self._all_options:
+ self._all_options[option_name] = getattr(view._visible_options,
+ option_name)
+ # Note that views will still store _all_options of the source object.
view._all_options = self._all_options
return view
@@ -264,7 +327,7 @@ class PipelineOptions(HasDisplayData):
if name[:2] == name[-2:] == '__':
return object.__getattribute__(self, name)
elif name in self._visible_option_list():
- return self._all_options.get(name, getattr(self._visible_options, name))
+ return self._all_options[name]
else:
raise AttributeError("'%s' object has no attribute '%s'" %
(type(self).__name__, name))
@@ -738,13 +801,13 @@ class PortableOptions(PipelineOptions):
'"<ENV_VAL>"} }. All fields in the json are optional except '
'command.'))
parser.add_argument(
- '--sdk-worker-parallelism', default=None,
+ '--sdk_worker_parallelism', default=None,
help=('Sets the number of sdk worker processes that will run on each '
'worker node. Default is 1. If 0, it will be automatically set '
'by the runner by looking at different parameters (e.g. number '
'of CPU cores on the worker machine).'))
parser.add_argument(
- '--environment-cache-millis', default=0,
+ '--environment_cache_millis', default=0,
help=('Duration in milliseconds for environment cache within a job. '
'0 means no caching.'))
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 021097b..5c51725 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -24,6 +24,7 @@ import unittest
import hamcrest as hc
+from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import TypeOptions
@@ -119,12 +120,36 @@ class PipelineOptionsTest(unittest.TestCase):
'--mock_multi_option', action='append', help='mock multi option')
parser.add_argument('--option with space', help='mock option with space')
+ # Use with MockOptions in test cases where multiple option classes are
needed.
+ class FakeOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--fake_flag', action='store_true', help='fake flag')
+ parser.add_argument('--fake_option', help='fake option')
+ parser.add_argument(
+ '--fake_multi_option', action='append', help='fake multi option')
+
def test_display_data(self):
for case in PipelineOptionsTest.TEST_CASES:
options = PipelineOptions(flags=case['flags'])
dd = DisplayData.create_from(options)
hc.assert_that(dd.items, hc.contains_inanyorder(*case['display_data']))
+ def test_get_all_options_subclass(self):
+ for case in PipelineOptionsTest.TEST_CASES:
+ options = PipelineOptionsTest.MockOptions(flags=case['flags'])
+ self.assertDictContainsSubset(case['expected'],
options.get_all_options())
+ self.assertEqual(options.view_as(
+ PipelineOptionsTest.MockOptions).mock_flag,
+ case['expected']['mock_flag'])
+ self.assertEqual(options.view_as(
+ PipelineOptionsTest.MockOptions).mock_option,
+ case['expected']['mock_option'])
+ self.assertEqual(options.view_as(
+ PipelineOptionsTest.MockOptions).mock_multi_option,
+ case['expected']['mock_multi_option'])
+
def test_get_all_options(self):
for case in PipelineOptionsTest.TEST_CASES:
options = PipelineOptions(flags=case['flags'])
@@ -139,6 +164,49 @@ class PipelineOptionsTest(unittest.TestCase):
PipelineOptionsTest.MockOptions).mock_multi_option,
case['expected']['mock_multi_option'])
+ def test_sublcalsses_of_pipeline_options_can_be_instantiated(self):
+ for case in PipelineOptionsTest.TEST_CASES:
+ mock_options = PipelineOptionsTest.MockOptions(flags=case['flags'])
+ self.assertEqual(mock_options.mock_flag,
+ case['expected']['mock_flag'])
+ self.assertEqual(mock_options.mock_option,
+ case['expected']['mock_option'])
+ self.assertEqual(mock_options.mock_multi_option,
+ case['expected']['mock_multi_option'])
+
+ def test_views_can_be_constructed_from_pipeline_option_subclasses(self):
+ for case in PipelineOptionsTest.TEST_CASES:
+ fake_options = PipelineOptionsTest.FakeOptions(flags=case['flags'])
+ mock_options = fake_options.view_as(PipelineOptionsTest.MockOptions)
+
+ self.assertEqual(mock_options.mock_flag,
+ case['expected']['mock_flag'])
+ self.assertEqual(mock_options.mock_option,
+ case['expected']['mock_option'])
+ self.assertEqual(mock_options.mock_multi_option,
+ case['expected']['mock_multi_option'])
+
+ def test_views_do_not_expose_options_defined_by_other_views(self):
+ flags = ['--mock_option=mock_value', '--fake_option=fake_value']
+
+ options = PipelineOptions(flags)
+ assert options.view_as(
+ PipelineOptionsTest.MockOptions).mock_option == 'mock_value'
+ assert options.view_as(
+ PipelineOptionsTest.FakeOptions).fake_option == 'fake_value'
+ assert options.view_as(
+ PipelineOptionsTest.MockOptions).view_as(
+ PipelineOptionsTest.FakeOptions).fake_option == 'fake_value'
+
+ self.assertRaises(
+ AttributeError,
+ lambda: options.view_as(PipelineOptionsTest.MockOptions).fake_option)
+ self.assertRaises(
+ AttributeError,
+ lambda: options.view_as(PipelineOptionsTest.MockOptions).view_as(
+ PipelineOptionsTest.FakeOptions).view_as(
+ PipelineOptionsTest.MockOptions).fake_option)
+
def test_from_dictionary(self):
for case in PipelineOptionsTest.TEST_CASES:
options = PipelineOptions(flags=case['flags'])
@@ -184,6 +252,57 @@ class PipelineOptionsTest(unittest.TestCase):
options = PipelineOptions(flags=[''])
self.assertEqual(options.get_all_options()['experiments'], None)
+ def test_option_modifications_are_shared_between_views(self):
+ pipeline_options = PipelineOptions([
+ '--mock_option', 'value', '--mock_flag',
+ '--mock_multi_option', 'value1',
+ '--mock_multi_option', 'value2',
+ ])
+
+ mock_options = PipelineOptionsTest.MockOptions([
+ '--mock_option', 'value', '--mock_flag',
+ '--mock_multi_option', 'value1',
+ '--mock_multi_option', 'value2',
+ ])
+
+ for options in [pipeline_options, mock_options]:
+ view1 = options.view_as(PipelineOptionsTest.MockOptions)
+ view2 = options.view_as(PipelineOptionsTest.MockOptions)
+
+ view1.mock_option = 'new_value'
+ view1.mock_flag = False
+ view1.mock_multi_option.append('value3')
+
+ view3 = options.view_as(PipelineOptionsTest.MockOptions)
+ view4 = view1.view_as(PipelineOptionsTest.MockOptions)
+ view5 = options.view_as(TypeOptions).view_as(
+ PipelineOptionsTest.MockOptions)
+
+ for view in [view1, view2, view3, view4, view5]:
+ self.assertEqual('new_value', view.mock_option)
+ self.assertFalse(view.mock_flag)
+ self.assertEqual(['value1', 'value2', 'value3'],
view.mock_multi_option)
+
+ def test_uninitialized_option_modifications_are_shared_between_views(self):
+ options = PipelineOptions([])
+
+ view1 = options.view_as(PipelineOptionsTest.MockOptions)
+ view2 = options.view_as(PipelineOptionsTest.MockOptions)
+
+ view1.mock_option = 'some_value'
+ view1.mock_flag = False
+ view1.mock_multi_option = ['value1', 'value2']
+
+ view3 = options.view_as(PipelineOptionsTest.MockOptions)
+ view4 = view1.view_as(PipelineOptionsTest.MockOptions)
+ view5 = options.view_as(TypeOptions).view_as(
+ PipelineOptionsTest.MockOptions)
+
+ for view in [view1, view2, view3, view4, view5]:
+ self.assertEqual('some_value', view.mock_option)
+ self.assertFalse(view.mock_flag)
+ self.assertEqual(['value1', 'value2'], view.mock_multi_option)
+
def test_extra_package(self):
options = PipelineOptions(['--extra_package', 'abc',
'--extra_packages', 'def',
@@ -211,13 +330,13 @@ class PipelineOptionsTest(unittest.TestCase):
def test_redefine_options(self):
- class TestRedefinedOptios(PipelineOptions): # pylint:
disable=unused-variable
+ class TestRedefinedOptions(PipelineOptions): # pylint:
disable=unused-variable
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--redefined_flag', action='store_true')
- class TestRedefinedOptios(PipelineOptions):
+ class TestRedefinedOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
@@ -303,6 +422,42 @@ class PipelineOptionsTest(unittest.TestCase):
options = PipelineOptions(['--type_check_strictness', 'blahblah'])
options.view_as(TypeOptions)
+ def test_add_experiment(self):
+ options = PipelineOptions([])
+ options.view_as(DebugOptions).add_experiment('new_experiment')
+ self.assertEqual(
+ ['new_experiment'],
+ options.view_as(DebugOptions).experiments
+ )
+
+ def test_add_experiment_preserves_existing_experiments(self):
+ options = PipelineOptions(['--experiment=existing_experiment'])
+ options.view_as(DebugOptions).add_experiment('new_experiment')
+ self.assertEqual(
+ ['existing_experiment', 'new_experiment'],
+ options.view_as(DebugOptions).experiments
+ )
+
+ def test_lookup_experiments(self):
+ options = PipelineOptions([
+ '--experiment=existing_experiment',
+ '--experiment', 'key=value',
+ '--experiment', 'master_key=k1=v1,k2=v2',
+ ])
+ debug_options = options.view_as(DebugOptions)
+ self.assertEqual(
+ 'default_value',
+ debug_options.lookup_experiment('nonexistent', 'default_value'))
+ self.assertEqual(
+ 'value',
+ debug_options.lookup_experiment('key', 'default_value'))
+ self.assertEqual(
+ 'k1=v1,k2=v2',
+ debug_options.lookup_experiment('master_key'))
+ self.assertEqual(
+ True,
+ debug_options.lookup_experiment('existing_experiment'))
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e3e7bd6..3f21011 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -358,34 +358,22 @@ class DataflowRunner(PipelineRunner):
debug_options = options.view_as(DebugOptions)
worker_options = options.view_as(WorkerOptions)
if worker_options.min_cpu_platform:
- experiments = ["min_cpu_platform=%s" % worker_options.min_cpu_platform]
- if debug_options.experiments is not None:
- experiments = list(set(experiments + debug_options.experiments))
- debug_options.experiments = experiments
+ debug_options.add_experiment('min_cpu_platform=' +
+ worker_options.min_cpu_platform)
# Elevate "enable_streaming_engine" to pipeline option, but using the
# existing experiment.
google_cloud_options = options.view_as(GoogleCloudOptions)
if google_cloud_options.enable_streaming_engine:
- if debug_options.experiments is None:
- debug_options.experiments = []
- if "enable_windmill_service" not in debug_options.experiments:
- debug_options.experiments.append("enable_windmill_service")
- if "enable_streaming_engine" not in debug_options.experiments:
- debug_options.experiments.append("enable_streaming_engine")
+ debug_options.add_experiment("enable_windmill_service")
+ debug_options.add_experiment("enable_streaming_engine")
else:
- if debug_options.experiments is not None:
- if ("enable_windmill_service" in debug_options.experiments
- or "enable_streaming_engine" in debug_options.experiments):
- raise ValueError("""Streaming engine both disabled and enabled:
- enable_streaming_engine flag is not set, but enable_windmill_service
- and/or enable_streaming_engine are present. It is recommended you
- only set the enable_streaming_engine flag.""")
-
- # TODO(BEAM-6664): Remove once Dataflow supports --dataflow_kms_key.
- if google_cloud_options.dataflow_kms_key is not None:
- debug_options.add_experiment('service_default_cmek_config=' +
- google_cloud_options.dataflow_kms_key)
+ if (debug_options.lookup_experiment("enable_windmill_service") or
+ debug_options.lookup_experiment("enable_streaming_engine")):
+ raise ValueError("""Streaming engine both disabled and enabled:
+ enable_streaming_engine flag is not set, but enable_windmill_service
+ and/or enable_streaming_engine experiments are present.
+ It is recommended you only set the enable_streaming_engine flag.""")
self.job = apiclient.Job(options, self.proto_pipeline)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 6c19c79..8c6ad05 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -29,6 +29,7 @@ import mock
import apache_beam as beam
import apache_beam.transforms as ptransform
+from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import AppliedPTransform
from apache_beam.pipeline import Pipeline
@@ -388,6 +389,32 @@ class DataflowRunnerTest(unittest.TestCase):
common_urns.side_inputs.MULTIMAP.urn,
side_input._side_input_data().access_pattern)
+ def test_min_cpu_platform_flag_is_propagated_to_experiments(self):
+ remote_runner = DataflowRunner()
+ self.default_properties.append('--min_cpu_platform=Intel Haswell')
+
+ p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+ p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
+ p.run()
+ self.assertIn('min_cpu_platform=Intel Haswell',
+ remote_runner.job.options.view_as(DebugOptions).experiments)
+
+ def test_streaming_engine_flag_adds_windmill_experiments(self):
+ remote_runner = DataflowRunner()
+ self.default_properties.append('--streaming')
+ self.default_properties.append('--enable_streaming_engine')
+ self.default_properties.append('--experiment=some_other_experiment')
+
+ p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+ p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
+ p.run()
+
+ experiments_for_job = (
+ remote_runner.job.options.view_as(DebugOptions).experiments)
+ self.assertIn('enable_streaming_engine', experiments_for_job)
+ self.assertIn('enable_windmill_service', experiments_for_job)
+ self.assertIn('some_other_experiment', experiments_for_job)
+
if __name__ == '__main__':
unittest.main()