HuangXingBo commented on code in PR #24177:
URL: https://github.com/apache/flink/pull/24177#discussion_r1465791701
##########
flink-python/pyflink/pyflink_gateway_server.py:
##########
@@ -43,8 +43,24 @@ def on_windows():
return platform.system() == "Windows"
-def read_from_config(key, default_value, flink_conf_file):
+def read_from_config(key, default_value, flink_conf_directory):
+ import yaml
Review Comment:
I feel the logic of this block of code can be refactored:
```
if flink-conf.yaml exist:
yaml.parse
elif conf.yaml exist:
old parse logic
else
return default value
```
##########
flink-python/pyflink/table/table_environment.py:
##########
@@ -1542,21 +1542,55 @@ def _set_python_executable_for_local_executor(self):
def _add_jars_to_j_env_config(self, config_key):
jvm = get_gateway().jvm
jar_urls = self.get_config().get(config_key, None)
+
+ isStandardYaml = jvm.org.apache.flink.configuration. \
Review Comment:
`isStandardYaml->is_standard_yaml`
##########
flink-python/pyflink/table/table_environment.py:
##########
@@ -1542,21 +1542,55 @@ def _set_python_executable_for_local_executor(self):
def _add_jars_to_j_env_config(self, config_key):
jvm = get_gateway().jvm
jar_urls = self.get_config().get(config_key, None)
+
+ isStandardYaml = jvm.org.apache.flink.configuration. \
+ GlobalConfiguration.isStandardYaml()
if jar_urls is not None:
- # normalize
jar_urls_list = []
- for url in jar_urls.split(";"):
- url = url.strip()
- if url != "":
- jar_urls_list.append(jvm.java.net.URL(url).toString())
- j_configuration = get_j_env_configuration(self._get_j_env())
- if j_configuration.containsKey(config_key):
- for url in j_configuration.getString(config_key,
"").split(";"):
- url = url.strip()
- if url != "" and url not in jar_urls_list:
- jar_urls_list.append(url)
+ # normalize
+ if not isStandardYaml:
+ self._parse_url_by_legacy_parser(jar_urls, jar_urls_list, jvm)
+ j_configuration = get_j_env_configuration(self._get_j_env())
+ self._parse_j_env_url_by_legacy_parser(config_key,
j_configuration, jar_urls_list)
+ else:
+ import yaml
+ parsed_jar_urls = yaml.safe_load(jar_urls)
+ if isinstance(parsed_jar_urls, list):
+ for url in parsed_jar_urls:
+ url = url.strip()
+ if url != "":
+
jar_urls_list.append(jvm.java.net.URL(url).toString())
+ else:
+ self._parse_url_by_legacy_parser(jar_urls, jar_urls_list,
jvm)
+
+ j_configuration = get_j_env_configuration(self._get_j_env())
+ if j_configuration.containsKey(config_key):
+ jar_urls_from_j_env = yaml. \
+ safe_load(j_configuration.getString(config_key, ""))
+ if isinstance(jar_urls_from_j_env, list):
+ for url in jar_urls_from_j_env:
+ url = url.strip()
+ if url != "" and url not in jar_urls_list:
+ jar_urls_list.append(url)
+ else:
+ self._parse_j_env_url_by_legacy_parser(
+ config_key, j_configuration, jar_urls_list)
+
j_configuration.setString(config_key, ";".join(jar_urls_list))
+ def _parse_j_env_url_by_legacy_parser(self, config_key, j_configuration,
jar_urls_list):
+ if j_configuration.containsKey(config_key):
+ for url in j_configuration.getString(config_key, "").split(";"):
+ url = url.strip()
+ if url != "" and url not in jar_urls_list:
+ jar_urls_list.append(url)
+
+ def _parse_url_by_legacy_parser(self, jar_urls, jar_urls_list, jvm):
+ for url in jar_urls.split(";"):
+ url = url.strip()
+ if url != "":
+ jar_urls_list.append(jvm.java.net.URL(url).toString())
+
Review Comment:
I feel the logic of this block of code can be refactored:
```
def _add_jars_to_j_env_config(self, config_key):
jar_urls = self.get_config().get(config_key, None)
if jar_urls:
jvm = get_gateway().jvm
jar_urls_list = []
_parse_urls([jvm.java.net.URL(url).toString() if item else ""
for item in parse_jars_value(jar_urls, jvm)], jar_urls_list)
j_configuration =
get_j_env_configuration(self._get_j_env())
_parse_urls(parse_jars_value(j_configuration.getString(config_key, ""), jvm),
jar_urls_list)
j_configuration.setString(config_key,
";".join(jar_urls_list))
def _parse_urls(self, jar_urls, jar_urls_list):
for url in jar_urls.split(";"):
url = url.strip()
if url != "" and url not in jar_urls_list:
jar_urls_list.append(url)
```
##########
flink-python/pyflink/table/table_config.py:
##########
@@ -106,8 +106,17 @@ def set(self, key: str, value: str) -> 'TableConfig':
jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key =
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
- add_jars_to_context_class_loader(value.split(";"))
-
+ isStandardYaml = jvm.org.apache.flink.configuration. \
Review Comment:
`isStandardYaml -> is_standard_yaml` is more pythonic
##########
flink-python/pyflink/table/table_config.py:
##########
@@ -106,8 +106,17 @@ def set(self, key: str, value: str) -> 'TableConfig':
jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key =
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
- add_jars_to_context_class_loader(value.split(";"))
-
+ isStandardYaml = jvm.org.apache.flink.configuration. \
+ GlobalConfiguration.isStandardYaml()
+ if isStandardYaml:
+ import yaml
+ jar_urls_list = yaml.safe_load(value)
+ if isinstance(jar_urls_list, list):
+ add_jars_to_context_class_loader(jar_urls_list)
+ else:
+ add_jars_to_context_class_loader(value.split(";"))
Review Comment:
I feel the logic of this block of code can be refactored.
```
if key in [jars_key, classpaths_key]:
jar_urls = parse_jars_value(value)
add_jars_to_context_class_loader(jar_urls)
def parse_jars_value(value: str):
is_standard_yaml =
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
if is_standard_yaml:
import yaml
jar_urls_list = yaml.safe_load(value)
if isinstance(jar_urls_list, list):
return jar_urls_list
return value
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]