This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new def86646cd1 feat(options): add support for comma-separated options 
(#35580)
def86646cd1 is described below

commit def86646cd16a2a9a6edd4600fbf846f8c181d5e
Author: liferoad <[email protected]>
AuthorDate: Mon Jul 14 22:32:31 2025 -0400

    feat(options): add support for comma-separated options (#35580)
    
    * feat(options): add support for comma-separated experiments and service 
options
    
    Implement _CommaSeparatedListAction to handle comma-separated values for 
experiments and dataflow_service_options, matching Java SDK behavior. This 
allows more flexible input formats while maintaining backward compatibility.
    
    * fix lint
    
    * lint
    
    * yapf
    
    * updated the test
    
    * updated changes.md
    
    * polished docstrings
---
 CHANGES.md                                         |  3 +
 .../python/apache_beam/options/pipeline_options.py | 69 +++++++++++++++++++++-
 .../apache_beam/options/pipeline_options_test.py   | 52 ++++++++++++++++
 .../apache_beam/options/value_provider_test.py     |  4 +-
 4 files changed, 124 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0c3c7817523..965f40c2204 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -76,6 +76,9 @@
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * Add pip-based install support for JupyterLab Sidepanel extension 
([#35397](https://github.com/apache/beam/issues/#35397)).
 * [IcebergIO] Create tables with a specified table properties 
([#35496](https://github.com/apache/beam/pull/35496))
+* Add support for comma-separated options in Python SDK (Python) 
([#35580](https://github.com/apache/beam/pull/35580)).
+  Python SDK now supports comma-separated values for experiments and 
dataflow_service_options,
+  matching Java SDK behavior while maintaining backward compatibility.
 * Milvus enrichment handler added (Python) 
([#35216](https://github.com/apache/beam/pull/35216)).
   Beam now supports Milvus enrichment handler capabilities for vector, keyword,
   and hybrid search operations.
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index dad905fec79..a7db5bfb0e7 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -220,6 +220,71 @@ class _GcsCustomAuditEntriesAction(argparse.Action):
           % _GcsCustomAuditEntriesAction.MAX_ENTRIES)
 
 
+class _CommaSeparatedListAction(argparse.Action):
+  """
+  Argparse Action that splits comma-separated values and appends them to
+  a list. This allows options like --experiments=abc,def to be treated
+  as separate experiments 'abc' and 'def', similar to how Java SDK handles
+  them.
+  
+  If there are key=value experiments in a raw argument, the remaining part of
+  the argument are treated as values and won't split further. For example:
+  'abc,def,master_key=k1=v1,k2=v2' becomes
+  ['abc', 'def', 'master_key=k1=v1,k2=v2'].
+  """
+  def __call__(self, parser, namespace, values, option_string=None):
+    if not hasattr(namespace, self.dest) or getattr(namespace,
+                                                    self.dest) is None:
+      setattr(namespace, self.dest, [])
+
+    # Split comma-separated values and extend the list
+    if isinstance(values, str):
+      # Smart splitting: only split at commas that are not part of
+      # key=value pairs
+      split_values = self._smart_split(values)
+      getattr(namespace, self.dest).extend(split_values)
+    else:
+      # If values is not a string, just append it
+      getattr(namespace, self.dest).append(values)
+
+  def _smart_split(self, values):
+    """Split comma-separated values, but preserve commas within
+    key=value pairs."""
+    result = []
+    current = []
+    equals_depth = 0
+
+    i = 0
+    while i < len(values):
+      char = values[i]
+
+      if char == '=':
+        equals_depth += 1
+        current.append(char)
+      elif char == ',' and equals_depth <= 1:
+        # This comma is a top-level separator (not inside a complex value)
+        if current:
+          result.append(''.join(current).strip())
+          current = []
+          equals_depth = 0
+      elif char == ',' and equals_depth > 1:
+        # This comma is inside a complex value, keep it
+        current.append(char)
+      elif char == ' ' and not current:
+        # Skip leading spaces
+        pass
+      else:
+        current.append(char)
+
+      i += 1
+
+    # Add the last item
+    if current:
+      result.append(''.join(current).strip())
+
+    return [v for v in result if v]  # Filter out empty values
+
+
 class PipelineOptions(HasDisplayData):
   """This class and subclasses are used as containers for command line options.
 
@@ -977,7 +1042,7 @@ class GoogleCloudOptions(PipelineOptions):
         '--dataflow_service_option',
         '--dataflow_service_options',
         dest='dataflow_service_options',
-        action='append',
+        action=_CommaSeparatedListAction,
         default=None,
         help=(
             'Options to configure the Dataflow service. These '
@@ -1412,7 +1477,7 @@ class DebugOptions(PipelineOptions):
         '--experiment',
         '--experiments',
         dest='experiments',
-        action='append',
+        action=_CommaSeparatedListAction,
         default=None,
         help=(
             'Runners may provide a number of experimental features that can be 
'
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py 
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 099c9e80e21..06270d4cd31 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -893,6 +893,58 @@ class PipelineOptionsTest(unittest.TestCase):
             'staging_location.'
         ])
 
+  def test_comma_separated_experiments(self):
+    """Test that comma-separated experiments are parsed correctly."""
+    # Test single experiment
+    options = PipelineOptions(['--experiments=abc'])
+    self.assertEqual(['abc'], options.get_all_options()['experiments'])
+
+    # Test comma-separated experiments
+    options = PipelineOptions(['--experiments=abc,def,ghi'])
+    self.assertEqual(['abc', 'def', 'ghi'],
+                     options.get_all_options()['experiments'])
+
+    # Test multiple flags with comma-separated values
+    options = PipelineOptions(
+        ['--experiments=abc,def', '--experiments=ghi,jkl'])
+    self.assertEqual(['abc', 'def', 'ghi', 'jkl'],
+                     options.get_all_options()['experiments'])
+
+    # Test with spaces around commas
+    options = PipelineOptions(['--experiments=abc, def , ghi'])
+    self.assertEqual(['abc', 'def', 'ghi'],
+                     options.get_all_options()['experiments'])
+
+    # Test empty values are filtered out
+    options = PipelineOptions(['--experiments=abc,,def,'])
+    self.assertEqual(['abc', 'def'], options.get_all_options()['experiments'])
+
+  def test_comma_separated_dataflow_service_options(self):
+    """Test that comma-separated dataflow service options are parsed
+    correctly."""
+    # Test single option
+    options = PipelineOptions(['--dataflow_service_options=option1=value1'])
+    self.assertEqual(['option1=value1'],
+                     options.get_all_options()['dataflow_service_options'])
+
+    # Test comma-separated options
+    options = PipelineOptions([
+        '--dataflow_service_options=option1=value1,option2=value2,'
+        'option3=value3'
+    ])
+    self.assertEqual(['option1=value1', 'option2=value2', 'option3=value3'],
+                     options.get_all_options()['dataflow_service_options'])
+
+    # Test multiple flags with comma-separated values
+    options = PipelineOptions([
+        '--dataflow_service_options=option1=value1,option2=value2',
+        '--dataflow_service_options=option3=value3,option4=value4'
+    ])
+    self.assertEqual([
+        'option1=value1', 'option2=value2', 'option3=value3', 'option4=value4'
+    ],
+                     options.get_all_options()['dataflow_service_options'])
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/options/value_provider_test.py 
b/sdks/python/apache_beam/options/value_provider_test.py
index 42afa8c0def..e88d2371464 100644
--- a/sdks/python/apache_beam/options/value_provider_test.py
+++ b/sdks/python/apache_beam/options/value_provider_test.py
@@ -216,8 +216,8 @@ class ValueProviderTests(unittest.TestCase):
     options = PipelineOptions(['--experiments', 'a', '--experiments', 'b,c'])
     options = options.view_as(DebugOptions)
     self.assertIn('a', options.experiments)
-    self.assertIn('b,c', options.experiments)
-    self.assertNotIn('c', options.experiments)
+    self.assertIn('b', options.experiments)
+    self.assertIn('c', options.experiments)
 
   def test_nested_value_provider_wrap_static(self):
     vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1)

Reply via email to