HuangXingBo commented on code in PR #24177:
URL: https://github.com/apache/flink/pull/24177#discussion_r1465791701


##########
flink-python/pyflink/pyflink_gateway_server.py:
##########
@@ -43,8 +43,24 @@ def on_windows():
     return platform.system() == "Windows"
 
 
-def read_from_config(key, default_value, flink_conf_file):
+def read_from_config(key, default_value, flink_conf_directory):
+    import yaml

Review Comment:
   I feel the logic of this block of code can be refactored:
   ```
   if  flink-conf.yaml exist:
       yaml.parse
   elif conf.yaml exist:
       old parse logic
   else
       return default value
   ```



##########
flink-python/pyflink/table/table_environment.py:
##########
@@ -1542,21 +1542,55 @@ def _set_python_executable_for_local_executor(self):
     def _add_jars_to_j_env_config(self, config_key):
         jvm = get_gateway().jvm
         jar_urls = self.get_config().get(config_key, None)
+
+        isStandardYaml = jvm.org.apache.flink.configuration. \

Review Comment:
   `isStandardYaml->is_standard_yaml`



##########
flink-python/pyflink/table/table_environment.py:
##########
@@ -1542,21 +1542,55 @@ def _set_python_executable_for_local_executor(self):
     def _add_jars_to_j_env_config(self, config_key):
         jvm = get_gateway().jvm
         jar_urls = self.get_config().get(config_key, None)
+
+        isStandardYaml = jvm.org.apache.flink.configuration. \
+            GlobalConfiguration.isStandardYaml()
         if jar_urls is not None:
-            # normalize
             jar_urls_list = []
-            for url in jar_urls.split(";"):
-                url = url.strip()
-                if url != "":
-                    jar_urls_list.append(jvm.java.net.URL(url).toString())
-            j_configuration = get_j_env_configuration(self._get_j_env())
-            if j_configuration.containsKey(config_key):
-                for url in j_configuration.getString(config_key, 
"").split(";"):
-                    url = url.strip()
-                    if url != "" and url not in jar_urls_list:
-                        jar_urls_list.append(url)
+            # normalize
+            if not isStandardYaml:
+                self._parse_url_by_legacy_parser(jar_urls, jar_urls_list, jvm)
+                j_configuration = get_j_env_configuration(self._get_j_env())
+                self._parse_j_env_url_by_legacy_parser(config_key, 
j_configuration, jar_urls_list)
+            else:
+                import yaml
+                parsed_jar_urls = yaml.safe_load(jar_urls)
+                if isinstance(parsed_jar_urls, list):
+                    for url in parsed_jar_urls:
+                        url = url.strip()
+                        if url != "":
+                            
jar_urls_list.append(jvm.java.net.URL(url).toString())
+                else:
+                    self._parse_url_by_legacy_parser(jar_urls, jar_urls_list, 
jvm)
+
+                j_configuration = get_j_env_configuration(self._get_j_env())
+                if j_configuration.containsKey(config_key):
+                    jar_urls_from_j_env = yaml. \
+                        safe_load(j_configuration.getString(config_key, ""))
+                    if isinstance(jar_urls_from_j_env, list):
+                        for url in jar_urls_from_j_env:
+                            url = url.strip()
+                            if url != "" and url not in jar_urls_list:
+                                jar_urls_list.append(url)
+                    else:
+                        self._parse_j_env_url_by_legacy_parser(
+                            config_key, j_configuration, jar_urls_list)
+
             j_configuration.setString(config_key, ";".join(jar_urls_list))
 
+    def _parse_j_env_url_by_legacy_parser(self, config_key, j_configuration, 
jar_urls_list):
+        if j_configuration.containsKey(config_key):
+            for url in j_configuration.getString(config_key, "").split(";"):
+                url = url.strip()
+                if url != "" and url not in jar_urls_list:
+                    jar_urls_list.append(url)
+
+    def _parse_url_by_legacy_parser(self, jar_urls, jar_urls_list, jvm):
+        for url in jar_urls.split(";"):
+            url = url.strip()
+            if url != "":
+                jar_urls_list.append(jvm.java.net.URL(url).toString())
+

Review Comment:
   I feel the logic of this block of code can be refactored:
   ```
   def _add_jars_to_j_env_config(self, config_key):
        jar_urls = self.get_config().get(config_key, None)
   
        if jar_urls:
                jvm = get_gateway().jvm
                jar_urls_list = []
                _parse_urls([jvm.java.net.URL(url).toString() if item else "" 
for item in parse_jars_value(jar_urls, jvm)], jar_urls_list)
   
                     j_configuration = 
get_j_env_configuration(self._get_j_env())
                     
_parse_urls(parse_jars_value(j_configuration.getString(config_key, ""), jvm), 
jar_urls_list)
   
                    j_configuration.setString(config_key, 
";".join(jar_urls_list))
   
   def _parse_urls(self, jar_urls, jar_urls_list):
       for url in jar_urls.split(";"):
        url = url.strip()
        if url != "" and url not in jar_urls_list:
                jar_urls_list.append(url)
   ```



##########
flink-python/pyflink/table/table_config.py:
##########
@@ -106,8 +106,17 @@ def set(self, key: str, value: str) -> 'TableConfig':
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         if key in [jars_key, classpaths_key]:
-            add_jars_to_context_class_loader(value.split(";"))
-
+            isStandardYaml = jvm.org.apache.flink.configuration. \

Review Comment:
   `isStandardYaml -> is_standard_yaml` is more pythonic



##########
flink-python/pyflink/table/table_config.py:
##########
@@ -106,8 +106,17 @@ def set(self, key: str, value: str) -> 'TableConfig':
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         if key in [jars_key, classpaths_key]:
-            add_jars_to_context_class_loader(value.split(";"))
-
+            isStandardYaml = jvm.org.apache.flink.configuration. \
+                GlobalConfiguration.isStandardYaml()
+            if isStandardYaml:
+                import yaml
+                jar_urls_list = yaml.safe_load(value)
+                if isinstance(jar_urls_list, list):
+                    add_jars_to_context_class_loader(jar_urls_list)
+                else:
+                    add_jars_to_context_class_loader(value.split(";"))

Review Comment:
   I feel the logic of this block of code can be refactored.
   ```
   if key in [jars_key, classpaths_key]:
        jar_urls = parse_jars_value(value)
        add_jars_to_context_class_loader(jar_urls)
   
   def parse_jars_value(value: str):
        is_standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
       if is_standard_yaml:
        import yaml
           jar_urls_list = yaml.safe_load(value)
           if isinstance(jar_urls_list, list):
                return jar_urls_list
   
       return value
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to