This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new def86646cd1 feat(options): add support for comma-separated options
(#35580)
def86646cd1 is described below
commit def86646cd16a2a9a6edd4600fbf846f8c181d5e
Author: liferoad <[email protected]>
AuthorDate: Mon Jul 14 22:32:31 2025 -0400
feat(options): add support for comma-separated options (#35580)
* feat(options): add support for comma-separated experiments and service
options
Implement _CommaSeparatedListAction to handle comma-separated values for
experiments and dataflow_service_options, matching Java SDK behavior. This
allows more flexible input formats while maintaining backward compatibility.
* fix lint
* lint
* yapf
* updated the test
* updated changes.md
* polished docstrings
---
CHANGES.md | 3 +
.../python/apache_beam/options/pipeline_options.py | 69 +++++++++++++++++++++-
.../apache_beam/options/pipeline_options_test.py | 52 ++++++++++++++++
.../apache_beam/options/value_provider_test.py | 4 +-
4 files changed, 124 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0c3c7817523..965f40c2204 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -76,6 +76,9 @@
* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* Add pip-based install support for JupyterLab Sidepanel extension
([#35397](https://github.com/apache/beam/issues/#35397)).
* [IcebergIO] Create tables with a specified table properties
([#35496](https://github.com/apache/beam/pull/35496))
+* Add support for comma-separated options in Python SDK (Python)
([#35580](https://github.com/apache/beam/pull/35580)).
+ Python SDK now supports comma-separated values for experiments and
dataflow_service_options,
+ matching Java SDK behavior while maintaining backward compatibility.
* Milvus enrichment handler added (Python)
([#35216](https://github.com/apache/beam/pull/35216)).
Beam now supports Milvus enrichment handler capabilities for vector, keyword,
and hybrid search operations.
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index dad905fec79..a7db5bfb0e7 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -220,6 +220,71 @@ class _GcsCustomAuditEntriesAction(argparse.Action):
% _GcsCustomAuditEntriesAction.MAX_ENTRIES)
+class _CommaSeparatedListAction(argparse.Action):
+ """
+ Argparse Action that splits comma-separated values and appends them to
+ a list. This allows options like --experiments=abc,def to be treated
+ as separate experiments 'abc' and 'def', similar to how Java SDK handles
+ them.
+
+ If there are key=value experiments in a raw argument, the remaining part of
+ the argument are treated as values and won't split further. For example:
+ 'abc,def,master_key=k1=v1,k2=v2' becomes
+ ['abc', 'def', 'master_key=k1=v1,k2=v2'].
+ """
+ def __call__(self, parser, namespace, values, option_string=None):
+ if not hasattr(namespace, self.dest) or getattr(namespace,
+ self.dest) is None:
+ setattr(namespace, self.dest, [])
+
+ # Split comma-separated values and extend the list
+ if isinstance(values, str):
+ # Smart splitting: only split at commas that are not part of
+ # key=value pairs
+ split_values = self._smart_split(values)
+ getattr(namespace, self.dest).extend(split_values)
+ else:
+ # If values is not a string, just append it
+ getattr(namespace, self.dest).append(values)
+
+ def _smart_split(self, values):
+ """Split comma-separated values, but preserve commas within
+ key=value pairs."""
+ result = []
+ current = []
+ equals_depth = 0
+
+ i = 0
+ while i < len(values):
+ char = values[i]
+
+ if char == '=':
+ equals_depth += 1
+ current.append(char)
+ elif char == ',' and equals_depth <= 1:
+ # This comma is a top-level separator (not inside a complex value)
+ if current:
+ result.append(''.join(current).strip())
+ current = []
+ equals_depth = 0
+ elif char == ',' and equals_depth > 1:
+ # This comma is inside a complex value, keep it
+ current.append(char)
+ elif char == ' ' and not current:
+ # Skip leading spaces
+ pass
+ else:
+ current.append(char)
+
+ i += 1
+
+ # Add the last item
+ if current:
+ result.append(''.join(current).strip())
+
+ return [v for v in result if v] # Filter out empty values
+
+
class PipelineOptions(HasDisplayData):
"""This class and subclasses are used as containers for command line options.
@@ -977,7 +1042,7 @@ class GoogleCloudOptions(PipelineOptions):
'--dataflow_service_option',
'--dataflow_service_options',
dest='dataflow_service_options',
- action='append',
+ action=_CommaSeparatedListAction,
default=None,
help=(
'Options to configure the Dataflow service. These '
@@ -1412,7 +1477,7 @@ class DebugOptions(PipelineOptions):
'--experiment',
'--experiments',
dest='experiments',
- action='append',
+ action=_CommaSeparatedListAction,
default=None,
help=(
'Runners may provide a number of experimental features that can be
'
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 099c9e80e21..06270d4cd31 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -893,6 +893,58 @@ class PipelineOptionsTest(unittest.TestCase):
'staging_location.'
])
+ def test_comma_separated_experiments(self):
+ """Test that comma-separated experiments are parsed correctly."""
+ # Test single experiment
+ options = PipelineOptions(['--experiments=abc'])
+ self.assertEqual(['abc'], options.get_all_options()['experiments'])
+
+ # Test comma-separated experiments
+ options = PipelineOptions(['--experiments=abc,def,ghi'])
+ self.assertEqual(['abc', 'def', 'ghi'],
+ options.get_all_options()['experiments'])
+
+ # Test multiple flags with comma-separated values
+ options = PipelineOptions(
+ ['--experiments=abc,def', '--experiments=ghi,jkl'])
+ self.assertEqual(['abc', 'def', 'ghi', 'jkl'],
+ options.get_all_options()['experiments'])
+
+ # Test with spaces around commas
+ options = PipelineOptions(['--experiments=abc, def , ghi'])
+ self.assertEqual(['abc', 'def', 'ghi'],
+ options.get_all_options()['experiments'])
+
+ # Test empty values are filtered out
+ options = PipelineOptions(['--experiments=abc,,def,'])
+ self.assertEqual(['abc', 'def'], options.get_all_options()['experiments'])
+
+ def test_comma_separated_dataflow_service_options(self):
+ """Test that comma-separated dataflow service options are parsed
+ correctly."""
+ # Test single option
+ options = PipelineOptions(['--dataflow_service_options=option1=value1'])
+ self.assertEqual(['option1=value1'],
+ options.get_all_options()['dataflow_service_options'])
+
+ # Test comma-separated options
+ options = PipelineOptions([
+ '--dataflow_service_options=option1=value1,option2=value2,'
+ 'option3=value3'
+ ])
+ self.assertEqual(['option1=value1', 'option2=value2', 'option3=value3'],
+ options.get_all_options()['dataflow_service_options'])
+
+ # Test multiple flags with comma-separated values
+ options = PipelineOptions([
+ '--dataflow_service_options=option1=value1,option2=value2',
+ '--dataflow_service_options=option3=value3,option4=value4'
+ ])
+ self.assertEqual([
+ 'option1=value1', 'option2=value2', 'option3=value3', 'option4=value4'
+ ],
+ options.get_all_options()['dataflow_service_options'])
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/options/value_provider_test.py
b/sdks/python/apache_beam/options/value_provider_test.py
index 42afa8c0def..e88d2371464 100644
--- a/sdks/python/apache_beam/options/value_provider_test.py
+++ b/sdks/python/apache_beam/options/value_provider_test.py
@@ -216,8 +216,8 @@ class ValueProviderTests(unittest.TestCase):
options = PipelineOptions(['--experiments', 'a', '--experiments', 'b,c'])
options = options.view_as(DebugOptions)
self.assertIn('a', options.experiments)
- self.assertIn('b,c', options.experiments)
- self.assertNotIn('c', options.experiments)
+ self.assertIn('b', options.experiments)
+ self.assertIn('c', options.experiments)
def test_nested_value_provider_wrap_static(self):
vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1)