This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8e70cba6487 Pass options in DaskOptions inheritance hierarchy only for
Dask runner (#37101)
8e70cba6487 is described below
commit 8e70cba64875ce8a45ba6e6c6c9d95eaedfc4fee
Author: Yi Hu <[email protected]>
AuthorDate: Mon Dec 15 15:10:29 2025 -0500
Pass options in DaskOptions inheritance hierarchy only for Dask runner
(#37101)
* Pass options in DaskOptions inheritance hierarchy only for Dask runner
* address comments
---
sdks/python/apache_beam/options/pipeline_options.py | 21 ++++++++++++++-------
.../apache_beam/options/pipeline_options_test.py | 13 +++++++++++++
sdks/python/apache_beam/runners/dask/dask_runner.py | 2 +-
3 files changed, 28 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 38b36c3a2c4..0e1012b2de6 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -486,11 +486,12 @@ class PipelineOptions(HasDisplayData):
drop_default=False,
add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] =
None,
retain_unknown_options=False,
- display_warnings=False) -> Dict[str, Any]:
+ display_warnings=False,
+ current_only=False,
+ ) -> Dict[str, Any]:
"""Returns a dictionary of all defined arguments.
- Returns a dictionary of all defined arguments (arguments that are defined
in
- any subclass of PipelineOptions) into a dictionary.
+ Returns a dictionary of all defined arguments into a dictionary.
Args:
drop_default: If set to true, options that are equal to their default
@@ -500,6 +501,9 @@ class PipelineOptions(HasDisplayData):
retain_unknown_options: If set to true, options not recognized by any
known pipeline options class will still be included in the result. If
set to false, they will be discarded.
+ current_only: If set to true, only returns options defined in this class.
+ Otherwise, arguments that are defined in any subclass of PipelineOptions
+ are returned (default).
Returns:
Dictionary of all args and values.
@@ -510,8 +514,11 @@ class PipelineOptions(HasDisplayData):
# instance of each subclass to avoid conflicts.
subset = {}
parser = _BeamArgumentParser(allow_abbrev=False)
- for cls in PipelineOptions.__subclasses__():
- subset.setdefault(str(cls), cls)
+ if current_only:
+ subset.setdefault(str(type(self)), type(self))
+ else:
+ for cls in PipelineOptions.__subclasses__():
+ subset.setdefault(str(cls), cls)
for cls in subset.values():
cls._add_argparse_args(parser) # pylint: disable=protected-access
if add_extra_args_fn:
@@ -562,7 +569,7 @@ class PipelineOptions(HasDisplayData):
continue
parsed_args, _ = parser.parse_known_args(self._flags)
else:
- if unknown_args:
+ if unknown_args and not current_only:
_LOGGER.warning("Discarding unparseable args: %s", unknown_args)
parsed_args = known_args
result = vars(parsed_args)
@@ -580,7 +587,7 @@ class PipelineOptions(HasDisplayData):
if overrides:
if retain_unknown_options:
result.update(overrides)
- else:
+ elif not current_only:
_LOGGER.warning("Discarding invalid overrides: %s", overrides)
return result
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 705e8e1e2c0..c683c962527 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -238,6 +238,19 @@ class PipelineOptionsTest(unittest.TestCase):
options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
expected['mock_multi_option'])
+ def test_get_superclass_options(self):
+ flags = ["--mock_option", "mock", "--fake_option", "fake"]
+ options = PipelineOptions(flags=flags).view_as(
+ PipelineOptionsTest.FakeOptions)
+ items = options.get_all_options(current_only=True).items()
+ print(items)
+ self.assertTrue(('fake_option', 'fake') in items)
+ self.assertFalse(('mock_option', 'mock') in items)
+ items = options.view_as(PipelineOptionsTest.MockOptions).get_all_options(
+ current_only=True).items()
+ self.assertFalse(('fake_option', 'fake') in items)
+ self.assertTrue(('mock_option', 'mock') in items)
+
@parameterized.expand(TEST_CASES)
def test_subclasses_of_pipeline_options_can_be_instantiated(
self, flags, expected, _):
diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py
b/sdks/python/apache_beam/runners/dask/dask_runner.py
index 8975fcf1e13..bc915d30085 100644
--- a/sdks/python/apache_beam/runners/dask/dask_runner.py
+++ b/sdks/python/apache_beam/runners/dask/dask_runner.py
@@ -236,7 +236,7 @@ class DaskRunner(BundleBasedDirectRunner):
'DaskRunner is not available. Please install apache_beam[dask].')
dask_options = options.view_as(DaskOptions).get_all_options(
- drop_default=True)
+ drop_default=True, current_only=True)
bag_kwargs = DaskOptions._extract_bag_kwargs(dask_options)
client = ddist.Client(**dask_options)