This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 7f0a676ec12 [FLINK-38143][python] Fix pyflink flat YAML based config 
support
7f0a676ec12 is described below

commit 7f0a676ec1287d83a88bf60987b621836fcb359e
Author: Gabor Somogyi <gabor_somog...@apple.com>
AuthorDate: Sat Jul 26 10:26:47 2025 +0200

    [FLINK-38143][python] Fix pyflink flat YAML based config support
---
 .../pyflink/common/tests/test_configuration.py     | 21 ++++++++---
 .../datastream/stream_execution_environment.py     | 21 ++++++-----
 .../tests/test_stream_execution_environment.py     | 41 ++++++++++++++++++----
 3 files changed, 64 insertions(+), 19 deletions(-)

diff --git a/flink-python/pyflink/common/tests/test_configuration.py 
b/flink-python/pyflink/common/tests/test_configuration.py
index 26ca1e92cf9..02c12420be2 100644
--- a/flink-python/pyflink/common/tests/test_configuration.py
+++ b/flink-python/pyflink/common/tests/test_configuration.py
@@ -164,7 +164,20 @@ class ConfigurationTests(PyFlinkTestCase):
 
         self.assertEqual(str(conf), "{k1=v1, k2=1}")
 
-    def test_parse_list_value(self):
+    def test_parse_list_value_non_standard_yaml(self):
+        # test None
+        value = None
+        expected_result = []
+        result = Configuration.parse_list_value(value, False)
+        self.assertEqual(result, expected_result)
+
+        # test parse legacy pattern
+        value = "jar1;jar2;jar3"
+        expected_result = ['jar1', 'jar2', 'jar3']
+        result = Configuration.parse_list_value(value, False)
+        self.assertEqual(result, expected_result)
+
+    def test_parse_list_value_standard_yaml(self):
         # test None
         value = None
         expected_result = []
@@ -174,17 +187,17 @@ class ConfigurationTests(PyFlinkTestCase):
         # test parse YAML list
         value = "[jar1, jar2, jar3]"
         expected_result = ['jar1', 'jar2', 'jar3']
-        result = Configuration.parse_list_value(value)
+        result = Configuration.parse_list_value(value, True)
         self.assertEqual(result, expected_result)
 
         # test parse multiline YAML list
         value = "- jar1\n- jar2\n- jar3"
         expected_result = ['jar1', 'jar2', 'jar3']
-        result = Configuration.parse_list_value(value)
+        result = Configuration.parse_list_value(value, True)
         self.assertEqual(result, expected_result)
 
         # test parse legacy pattern
         value = "jar1;jar2;jar3"
         expected_result = ['jar1', 'jar2', 'jar3']
-        result = Configuration.parse_list_value(value)
+        result = Configuration.parse_list_value(value, True)
         self.assertEqual(result, expected_result)
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index 9553d4f4bfe..26872880b1c 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -769,11 +769,13 @@ class StreamExecutionEnvironment(object):
         env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
             .getEnvironmentConfig(self._j_stream_execution_environment)
         old_jars_path = env_config.getString(jars_key, None)
-        old_jars_list = Configuration.parse_list_value(
-            old_jars_path,
-            
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml())
+        standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+        old_jars_list = Configuration.parse_list_value(old_jars_path, 
standard_yaml)
         joined_jars_list = [*old_jars_list, *jars_path]
-        env_config.setString(jars_key, str(joined_jars_list))
+        if standard_yaml:
+            env_config.setString(jars_key, str(joined_jars_list))
+        else:
+            env_config.setString(jars_key, ';'.join(joined_jars_list))
 
     def add_classpaths(self, *classpaths: str):
         """
@@ -788,12 +790,13 @@ class StreamExecutionEnvironment(object):
         env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
             .getEnvironmentConfig(self._j_stream_execution_environment)
         old_classpaths = env_config.getString(classpaths_key, None)
-        old_classpaths_list = Configuration.parse_list_value(
-            old_classpaths,
-            
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
-        )
+        standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+        old_classpaths_list = Configuration.parse_list_value(old_classpaths, 
standard_yaml)
         joined_classpaths_list = [*old_classpaths_list, *classpaths]
-        env_config.setString(classpaths_key, str(joined_classpaths_list))
+        if standard_yaml:
+            env_config.setString(classpaths_key, str(joined_classpaths_list))
+        else:
+            env_config.setString(classpaths_key, 
';'.join(joined_classpaths_list))
 
     def get_default_local_parallelism(self) -> int:
         """
diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 8be06db1d03..92b5e2384e9 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -554,39 +554,68 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
         expected.sort()
         self.assertEqual(expected, result)
 
-    def test_add_jars_basic(self):
+    def test_add_jars_basic_non_standard_yaml(self):
+        self._test_add_jars_basic(False)
+
+    def test_add_jars_basic_standard_yaml(self):
+        self._test_add_jars_basic(True)
+
+    def _test_add_jars_basic(self, standard_yaml):
         jvm = get_gateway().jvm
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
             .getEnvironmentConfig(self.env._j_stream_execution_environment)
 
+        
jvm.org.apache.flink.configuration.GlobalConfiguration.setStandardYaml(standard_yaml)
+
         old_jars = env_config.getString(jars_key, None)
         self.assertIsNone(old_jars)
 
         self.env.add_jars('file://1.jar')
         new_jars = env_config.getString(jars_key, None)
-        self.assertEqual(new_jars, '[\'file://1.jar\']')
+        if standard_yaml:
+            self.assertEqual(new_jars, '[\'file://1.jar\']')
+        else:
+            self.assertEqual(new_jars, 'file://1.jar')
 
         self.env.add_jars('file://2.jar', 'file://3.jar')
         new_jars = env_config.getString(jars_key, None)
-        self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\', 
\'file://3.jar\']')
+        if standard_yaml:
+            self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\', 
\'file://3.jar\']')
+        else:
+            self.assertEqual(new_jars, 
'file://1.jar;file://2.jar;file://3.jar')
 
-    def test_add_classpaths_basic(self):
+    def test_add_classpaths_basic_non_standard_yaml(self):
+        self._test_add_classpaths_basic(False)
+
+    def test_add_classpaths_basic_standard_yaml(self):
+        self._test_add_classpaths_basic(True)
+
+    def _test_add_classpaths_basic(self, standard_yaml):
         jvm = get_gateway().jvm
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
             .getEnvironmentConfig(self.env._j_stream_execution_environment)
 
+        
jvm.org.apache.flink.configuration.GlobalConfiguration.setStandardYaml(standard_yaml)
+
         old_classpaths = env_config.getString(classpaths_key, None)
         self.assertIsNone(old_classpaths)
 
         self.env.add_classpaths('file://1.jar')
         new_classpaths = env_config.getString(classpaths_key, None)
-        self.assertEqual(new_classpaths, '[\'file://1.jar\']')
+        if standard_yaml:
+            self.assertEqual(new_classpaths, '[\'file://1.jar\']')
+        else:
+            self.assertEqual(new_classpaths, 'file://1.jar')
 
         self.env.add_classpaths('file://2.jar', 'file://3.jar')
         new_classpaths = env_config.getString(classpaths_key, None)
-        self.assertEqual(new_classpaths, '[\'file://1.jar\', \'file://2.jar\', 
\'file://3.jar\']')
+        if standard_yaml:
+            self.assertEqual(
+                new_classpaths, '[\'file://1.jar\', \'file://2.jar\', 
\'file://3.jar\']')
+        else:
+            self.assertEqual(new_classpaths, 
'file://1.jar;file://2.jar;file://3.jar')
 
     def test_add_jars(self):
         # find kafka connector jars

Reply via email to