This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new c9828e8d287 [FLINK-37505][python] Add pyflink YAML based config support
c9828e8d287 is described below

commit c9828e8d2879e8a91187a105967b61a410119724
Author: Gabor Somogyi <gabor_somog...@apple.com>
AuthorDate: Thu Mar 20 16:52:32 2025 +0100

    [FLINK-37505][python] Add pyflink YAML based config support
    
    Co-authored-by: Ferenc Csaky <fcs...@apache.org>
---
 flink-python/pyflink/common/configuration.py       | 18 +++++++-----
 .../pyflink/common/tests/test_configuration.py     | 20 +++++++++----
 .../datastream/stream_execution_environment.py     | 21 +++++++------
 .../tests/test_stream_execution_environment.py     | 34 ++++++++++++++++++++++
 flink-python/pyflink/table/table_config.py         |  5 +++-
 flink-python/pyflink/table/table_environment.py    |  9 ++++--
 6 files changed, 82 insertions(+), 25 deletions(-)

diff --git a/flink-python/pyflink/common/configuration.py 
b/flink-python/pyflink/common/configuration.py
index 779568da151..8042fc9afb4 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -69,20 +69,24 @@ class Configuration:
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         if key in [jars_key, classpaths_key]:
-            jar_urls = Configuration.parse_jars_value(value, jvm)
+            jar_urls = Configuration.parse_list_value(
+                value,
+                
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+            )
             add_jars_to_context_class_loader(jar_urls)
         self._j_configuration.setString(key, value)
         return self
 
     @staticmethod
-    def parse_jars_value(value: str, jvm):
-        is_standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
-        if is_standard_yaml:
+    def parse_list_value(value: str, standard_yaml: bool = True):
+        if not value:
+            return []
+        if standard_yaml:
             from ruamel.yaml import YAML
             yaml = YAML(typ='safe')
-            jar_urls_list = yaml.load(value)
-            if isinstance(jar_urls_list, list):
-                return jar_urls_list
+            value_list = yaml.load(value)
+            if isinstance(value_list, list):
+                return value_list
         return value.split(";")
 
     def get_integer(self, key: str, default_value: int) -> int:
diff --git a/flink-python/pyflink/common/tests/test_configuration.py 
b/flink-python/pyflink/common/tests/test_configuration.py
index 978bba538a7..26ca1e92cf9 100644
--- a/flink-python/pyflink/common/tests/test_configuration.py
+++ b/flink-python/pyflink/common/tests/test_configuration.py
@@ -18,7 +18,6 @@
 from copy import deepcopy
 
 from pyflink.common import Configuration
-from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase
 
 
@@ -165,16 +164,27 @@ class ConfigurationTests(PyFlinkTestCase):
 
         self.assertEqual(str(conf), "{k1=v1, k2=1}")
 
-    def test_parse_jars_value(self):
-        jvm = get_gateway().jvm
+    def test_parse_list_value(self):
+        # test None
+        value = None
+        expected_result = []
+        result = Configuration.parse_list_value(value)
+        self.assertEqual(result, expected_result)
+
         # test parse YAML list
+        value = "[jar1, jar2, jar3]"
+        expected_result = ['jar1', 'jar2', 'jar3']
+        result = Configuration.parse_list_value(value)
+        self.assertEqual(result, expected_result)
+
+        # test parse multiline YAML list
         value = "- jar1\n- jar2\n- jar3"
         expected_result = ['jar1', 'jar2', 'jar3']
-        result = Configuration.parse_jars_value(value, jvm)
+        result = Configuration.parse_list_value(value)
         self.assertEqual(result, expected_result)
 
         # test parse legacy pattern
         value = "jar1;jar2;jar3"
         expected_result = ['jar1', 'jar2', 'jar3']
-        result = Configuration.parse_jars_value(value, jvm)
+        result = Configuration.parse_list_value(value)
         self.assertEqual(result, expected_result)
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index ce84e94ec0e..9553d4f4bfe 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -768,11 +768,12 @@ class StreamExecutionEnvironment(object):
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
             .getEnvironmentConfig(self._j_stream_execution_environment)
-        old_jar_paths = env_config.getString(jars_key, None)
-        joined_jars_path = ';'.join(jars_path)
-        if old_jar_paths and old_jar_paths.strip():
-            joined_jars_path = ';'.join([old_jar_paths, joined_jars_path])
-        env_config.setString(jars_key, joined_jars_path)
+        old_jars_path = env_config.getString(jars_key, None)
+        old_jars_list = Configuration.parse_list_value(
+            old_jars_path,
+            
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml())
+        joined_jars_list = [*old_jars_list, *jars_path]
+        env_config.setString(jars_key, str(joined_jars_list))
 
     def add_classpaths(self, *classpaths: str):
         """
@@ -787,10 +788,12 @@ class StreamExecutionEnvironment(object):
         env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
             .getEnvironmentConfig(self._j_stream_execution_environment)
         old_classpaths = env_config.getString(classpaths_key, None)
-        joined_classpaths = ';'.join(list(classpaths))
-        if old_classpaths and old_classpaths.strip():
-            joined_classpaths = ';'.join([old_classpaths, joined_classpaths])
-        env_config.setString(classpaths_key, joined_classpaths)
+        old_classpaths_list = Configuration.parse_list_value(
+            old_classpaths,
+            
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+        )
+        joined_classpaths_list = [*old_classpaths_list, *classpaths]
+        env_config.setString(classpaths_key, str(joined_classpaths_list))
 
     def get_default_local_parallelism(self) -> int:
         """
diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 1ac1eb95ed2..8be06db1d03 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -554,6 +554,40 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
         expected.sort()
         self.assertEqual(expected, result)
 
+    def test_add_jars_basic(self):
+        jvm = get_gateway().jvm
+        jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self.env._j_stream_execution_environment)
+
+        old_jars = env_config.getString(jars_key, None)
+        self.assertIsNone(old_jars)
+
+        self.env.add_jars('file://1.jar')
+        new_jars = env_config.getString(jars_key, None)
+        self.assertEqual(new_jars, '[\'file://1.jar\']')
+
+        self.env.add_jars('file://2.jar', 'file://3.jar')
+        new_jars = env_config.getString(jars_key, None)
+        self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\', 
\'file://3.jar\']')
+
+    def test_add_classpaths_basic(self):
+        jvm = get_gateway().jvm
+        classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self.env._j_stream_execution_environment)
+
+        old_classpaths = env_config.getString(classpaths_key, None)
+        self.assertIsNone(old_classpaths)
+
+        self.env.add_classpaths('file://1.jar')
+        new_classpaths = env_config.getString(classpaths_key, None)
+        self.assertEqual(new_classpaths, '[\'file://1.jar\']')
+
+        self.env.add_classpaths('file://2.jar', 'file://3.jar')
+        new_classpaths = env_config.getString(classpaths_key, None)
+        self.assertEqual(new_classpaths, '[\'file://1.jar\', \'file://2.jar\', 
\'file://3.jar\']')
+
     def test_add_jars(self):
         # find kafka connector jars
         flink_source_root = _find_flink_source_root()
diff --git a/flink-python/pyflink/table/table_config.py 
b/flink-python/pyflink/table/table_config.py
index ba17767c776..d9869e03c2c 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -106,7 +106,10 @@ class TableConfig(object):
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         if key in [jars_key, classpaths_key]:
-            jar_urls = Configuration.parse_jars_value(value, jvm)
+            jar_urls = Configuration.parse_list_value(
+                value,
+                
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+            )
             add_jars_to_context_class_loader(jar_urls)
         return self
 
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index ebfcc7cb095..b36021bd003 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1545,7 +1545,10 @@ class TableEnvironment(object):
         if jar_urls:
             jvm = get_gateway().jvm
             jar_urls_list = []
-            parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm)
+            parsed_jar_urls = Configuration.parse_list_value(
+                jar_urls,
+                
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+            )
             url_strings = [
                 jvm.java.net.URL(url).toString() if url else ""
                 for url in parsed_jar_urls
@@ -1553,9 +1556,9 @@ class TableEnvironment(object):
             self._parse_urls(url_strings, jar_urls_list)
 
             j_configuration = get_j_env_configuration(self._get_j_env())
-            parsed_jar_urls = Configuration.parse_jars_value(
+            parsed_jar_urls = Configuration.parse_list_value(
                 j_configuration.getString(config_key, ""),
-                jvm
+                
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
             )
             self._parse_urls(parsed_jar_urls, jar_urls_list)
 

Reply via email to