This is an automated email from the ASF dual-hosted git repository. gaborgsomogyi pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new 7f0a676ec12 [FLINK-38143][python] Fix pyflink flat YAML based config support 7f0a676ec12 is described below commit 7f0a676ec1287d83a88bf60987b621836fcb359e Author: Gabor Somogyi <gabor_somog...@apple.com> AuthorDate: Sat Jul 26 10:26:47 2025 +0200 [FLINK-38143][python] Fix pyflink flat YAML based config support --- .../pyflink/common/tests/test_configuration.py | 21 ++++++++--- .../datastream/stream_execution_environment.py | 21 ++++++----- .../tests/test_stream_execution_environment.py | 41 ++++++++++++++++++---- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/flink-python/pyflink/common/tests/test_configuration.py b/flink-python/pyflink/common/tests/test_configuration.py index 26ca1e92cf9..02c12420be2 100644 --- a/flink-python/pyflink/common/tests/test_configuration.py +++ b/flink-python/pyflink/common/tests/test_configuration.py @@ -164,7 +164,20 @@ class ConfigurationTests(PyFlinkTestCase): self.assertEqual(str(conf), "{k1=v1, k2=1}") - def test_parse_list_value(self): + def test_parse_list_value_non_standard_yaml(self): + # test None + value = None + expected_result = [] + result = Configuration.parse_list_value(value, False) + self.assertEqual(result, expected_result) + + # test parse legacy pattern + value = "jar1;jar2;jar3" + expected_result = ['jar1', 'jar2', 'jar3'] + result = Configuration.parse_list_value(value, False) + self.assertEqual(result, expected_result) + + def test_parse_list_value_standard_yaml(self): # test None value = None expected_result = [] @@ -174,17 +187,17 @@ class ConfigurationTests(PyFlinkTestCase): # test parse YAML list value = "[jar1, jar2, jar3]" expected_result = ['jar1', 'jar2', 'jar3'] - result = Configuration.parse_list_value(value) + result = Configuration.parse_list_value(value, True) self.assertEqual(result, expected_result) # test parse multiline YAML list value = "- jar1\n- jar2\n- jar3" expected_result = ['jar1', 'jar2', 'jar3'] - result = Configuration.parse_list_value(value) + result = Configuration.parse_list_value(value, True) self.assertEqual(result, expected_result) # test parse legacy pattern value = "jar1;jar2;jar3" expected_result = ['jar1', 'jar2', 'jar3'] - result = Configuration.parse_list_value(value) + result = Configuration.parse_list_value(value, True) self.assertEqual(result, expected_result) diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index 9553d4f4bfe..26872880b1c 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -769,11 +769,13 @@ class StreamExecutionEnvironment(object): env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ .getEnvironmentConfig(self._j_stream_execution_environment) old_jars_path = env_config.getString(jars_key, None) - old_jars_list = Configuration.parse_list_value( - old_jars_path, - jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()) + standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + old_jars_list = Configuration.parse_list_value(old_jars_path, standard_yaml) joined_jars_list = [*old_jars_list, *jars_path] - env_config.setString(jars_key, str(joined_jars_list)) + if standard_yaml: + env_config.setString(jars_key, str(joined_jars_list)) + else: + env_config.setString(jars_key, ';'.join(joined_jars_list)) def add_classpaths(self, *classpaths: str): """ @@ -788,12 +790,13 @@ class StreamExecutionEnvironment(object): env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ .getEnvironmentConfig(self._j_stream_execution_environment) old_classpaths = env_config.getString(classpaths_key, None) - old_classpaths_list = Configuration.parse_list_value( - old_classpaths, - jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() - ) + standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + old_classpaths_list = Configuration.parse_list_value(old_classpaths, standard_yaml) joined_classpaths_list = [*old_classpaths_list, *classpaths] - env_config.setString(classpaths_key, str(joined_classpaths_list)) + if standard_yaml: + env_config.setString(classpaths_key, str(joined_classpaths_list)) + else: + env_config.setString(classpaths_key, ';'.join(joined_classpaths_list)) def get_default_local_parallelism(self) -> int: """ diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 8be06db1d03..92b5e2384e9 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -554,39 +554,68 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): expected.sort() self.assertEqual(expected, result) - def test_add_jars_basic(self): + def test_add_jars_basic_non_standard_yaml(self): + self._test_add_jars_basic(False) + + def test_add_jars_basic_standard_yaml(self): + self._test_add_jars_basic(True) + + def _test_add_jars_basic(self, standard_yaml): jvm = get_gateway().jvm jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ .getEnvironmentConfig(self.env._j_stream_execution_environment) + jvm.org.apache.flink.configuration.GlobalConfiguration.setStandardYaml(standard_yaml) + old_jars = env_config.getString(jars_key, None) self.assertIsNone(old_jars) self.env.add_jars('file://1.jar') new_jars = env_config.getString(jars_key, None) - self.assertEqual(new_jars, '[\'file://1.jar\']') + if standard_yaml: + self.assertEqual(new_jars, '[\'file://1.jar\']') + else: + self.assertEqual(new_jars, 'file://1.jar') self.env.add_jars('file://2.jar', 'file://3.jar') new_jars = env_config.getString(jars_key, None) - self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']') + if standard_yaml: + self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']') + else: + self.assertEqual(new_jars, 'file://1.jar;file://2.jar;file://3.jar') - def test_add_classpaths_basic(self): + def test_add_classpaths_basic_non_standard_yaml(self): + self._test_add_classpaths_basic(False) + + def test_add_classpaths_basic_standard_yaml(self): + self._test_add_classpaths_basic(True) + + def _test_add_classpaths_basic(self, standard_yaml): jvm = get_gateway().jvm classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ .getEnvironmentConfig(self.env._j_stream_execution_environment) + jvm.org.apache.flink.configuration.GlobalConfiguration.setStandardYaml(standard_yaml) + old_classpaths = env_config.getString(classpaths_key, None) self.assertIsNone(old_classpaths) self.env.add_classpaths('file://1.jar') new_classpaths = env_config.getString(classpaths_key, None) - self.assertEqual(new_classpaths, '[\'file://1.jar\']') + if standard_yaml: + self.assertEqual(new_classpaths, '[\'file://1.jar\']') + else: + self.assertEqual(new_classpaths, 'file://1.jar') self.env.add_classpaths('file://2.jar', 'file://3.jar') new_classpaths = env_config.getString(classpaths_key, None) - self.assertEqual(new_classpaths, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']') + if standard_yaml: + self.assertEqual( + new_classpaths, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']') + else: + self.assertEqual(new_classpaths, 'file://1.jar;file://2.jar;file://3.jar') def test_add_jars(self): # find kafka connector jars