This is an automated email from the ASF dual-hosted git repository. fcsaky pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new c9828e8d287 [FLINK-37505][python] Add pyflink YAML based config support c9828e8d287 is described below commit c9828e8d2879e8a91187a105967b61a410119724 Author: Gabor Somogyi <gabor_somog...@apple.com> AuthorDate: Thu Mar 20 16:52:32 2025 +0100 [FLINK-37505][python] Add pyflink YAML based config support Co-authored-by: Ferenc Csaky <fcs...@apache.org> --- flink-python/pyflink/common/configuration.py | 18 +++++++----- .../pyflink/common/tests/test_configuration.py | 20 +++++++++---- .../datastream/stream_execution_environment.py | 21 +++++++------ .../tests/test_stream_execution_environment.py | 34 ++++++++++++++++++++++ flink-python/pyflink/table/table_config.py | 5 +++- flink-python/pyflink/table/table_environment.py | 9 ++++-- 6 files changed, 82 insertions(+), 25 deletions(-) diff --git a/flink-python/pyflink/common/configuration.py b/flink-python/pyflink/common/configuration.py index 779568da151..8042fc9afb4 100644 --- a/flink-python/pyflink/common/configuration.py +++ b/flink-python/pyflink/common/configuration.py @@ -69,20 +69,24 @@ class Configuration: jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() if key in [jars_key, classpaths_key]: - jar_urls = Configuration.parse_jars_value(value, jvm) + jar_urls = Configuration.parse_list_value( + value, + jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + ) add_jars_to_context_class_loader(jar_urls) self._j_configuration.setString(key, value) return self @staticmethod - def parse_jars_value(value: str, jvm): - is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() - if is_standard_yaml: + def parse_list_value(value: str, standard_yaml: bool = True): + if not value: + return [] + if standard_yaml: from ruamel.yaml import YAML yaml = YAML(typ='safe') - jar_urls_list = yaml.load(value) - if isinstance(jar_urls_list, list): - return jar_urls_list + value_list = yaml.load(value) + if isinstance(value_list, list): + return value_list return value.split(";") def get_integer(self, key: str, default_value: int) -> int: diff --git a/flink-python/pyflink/common/tests/test_configuration.py b/flink-python/pyflink/common/tests/test_configuration.py index 978bba538a7..26ca1e92cf9 100644 --- a/flink-python/pyflink/common/tests/test_configuration.py +++ b/flink-python/pyflink/common/tests/test_configuration.py @@ -18,7 +18,6 @@ from copy import deepcopy from pyflink.common import Configuration -from pyflink.java_gateway import get_gateway from pyflink.testing.test_case_utils import PyFlinkTestCase @@ -165,16 +164,27 @@ class ConfigurationTests(PyFlinkTestCase): self.assertEqual(str(conf), "{k1=v1, k2=1}") - def test_parse_jars_value(self): - jvm = get_gateway().jvm + def test_parse_list_value(self): + # test None + value = None + expected_result = [] + result = Configuration.parse_list_value(value) + self.assertEqual(result, expected_result) + # test parse YAML list + value = "[jar1, jar2, jar3]" + expected_result = ['jar1', 'jar2', 'jar3'] + result = Configuration.parse_list_value(value) + self.assertEqual(result, expected_result) + + # test parse multiline YAML list value = "- jar1\n- jar2\n- jar3" expected_result = ['jar1', 'jar2', 'jar3'] - result = Configuration.parse_jars_value(value, jvm) + result = Configuration.parse_list_value(value) self.assertEqual(result, expected_result) # test parse legacy pattern value = "jar1;jar2;jar3" expected_result = ['jar1', 'jar2', 'jar3'] - result = Configuration.parse_jars_value(value, jvm) + result = Configuration.parse_list_value(value) self.assertEqual(result, expected_result) diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index ce84e94ec0e..9553d4f4bfe 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -768,11 +768,12 @@ class StreamExecutionEnvironment(object): jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ .getEnvironmentConfig(self._j_stream_execution_environment) - old_jar_paths = env_config.getString(jars_key, None) - joined_jars_path = ';'.join(jars_path) - if old_jar_paths and old_jar_paths.strip(): - joined_jars_path = ';'.join([old_jar_paths, joined_jars_path]) - env_config.setString(jars_key, joined_jars_path) + old_jars_path = env_config.getString(jars_key, None) + old_jars_list = Configuration.parse_list_value( + old_jars_path, + jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()) + joined_jars_list = [*old_jars_list, *jars_path] + env_config.setString(jars_key, str(joined_jars_list)) def add_classpaths(self, *classpaths: str): """ @@ -787,10 +788,12 @@ class StreamExecutionEnvironment(object): env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ .getEnvironmentConfig(self._j_stream_execution_environment) old_classpaths = env_config.getString(classpaths_key, None) - joined_classpaths = ';'.join(list(classpaths)) - if old_classpaths and old_classpaths.strip(): - joined_classpaths = ';'.join([old_classpaths, joined_classpaths]) - env_config.setString(classpaths_key, joined_classpaths) + old_classpaths_list = Configuration.parse_list_value( + old_classpaths, + jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + ) + joined_classpaths_list = [*old_classpaths_list, *classpaths] + env_config.setString(classpaths_key, str(joined_classpaths_list)) def get_default_local_parallelism(self) -> int: """ diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 1ac1eb95ed2..8be06db1d03 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -554,6 +554,40 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): expected.sort() self.assertEqual(expected, result) + def test_add_jars_basic(self): + jvm = get_gateway().jvm + jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() + env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ + .getEnvironmentConfig(self.env._j_stream_execution_environment) + + old_jars = env_config.getString(jars_key, None) + self.assertIsNone(old_jars) + + self.env.add_jars('file://1.jar') + new_jars = env_config.getString(jars_key, None) + self.assertEqual(new_jars, '[\'file://1.jar\']') + + self.env.add_jars('file://2.jar', 'file://3.jar') + new_jars = env_config.getString(jars_key, None) + self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']') + + def test_add_classpaths_basic(self): + jvm = get_gateway().jvm + classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() + env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ + .getEnvironmentConfig(self.env._j_stream_execution_environment) + + old_classpaths = env_config.getString(classpaths_key, None) + self.assertIsNone(old_classpaths) + + self.env.add_classpaths('file://1.jar') + new_classpaths = env_config.getString(classpaths_key, None) + self.assertEqual(new_classpaths, '[\'file://1.jar\']') + + self.env.add_classpaths('file://2.jar', 'file://3.jar') + new_classpaths = env_config.getString(classpaths_key, None) + self.assertEqual(new_classpaths, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']') + def test_add_jars(self): # find kafka connector jars flink_source_root = _find_flink_source_root() diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index ba17767c776..d9869e03c2c 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -106,7 +106,10 @@ class TableConfig(object): jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() if key in [jars_key, classpaths_key]: - jar_urls = Configuration.parse_jars_value(value, jvm) + jar_urls = Configuration.parse_list_value( + value, + jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + ) add_jars_to_context_class_loader(jar_urls) return self diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index ebfcc7cb095..b36021bd003 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1545,7 +1545,10 @@ class TableEnvironment(object): if jar_urls: jvm = get_gateway().jvm jar_urls_list = [] - parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm) + parsed_jar_urls = Configuration.parse_list_value( + jar_urls, + jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + ) url_strings = [ jvm.java.net.URL(url).toString() if url else "" for url in parsed_jar_urls @@ -1553,9 +1556,9 @@ class TableEnvironment(object): self._parse_urls(url_strings, jar_urls_list) j_configuration = get_j_env_configuration(self._get_j_env()) - parsed_jar_urls = Configuration.parse_jars_value( + parsed_jar_urls = Configuration.parse_list_value( j_configuration.getString(config_key, ""), - jvm + jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() ) self._parse_urls(parsed_jar_urls, jar_urls_list)