William Que created FLINK-36766:
-----------------------------------
Summary: Use pyflink to create remote env
Key: FLINK-36766
URL: https://issues.apache.org/jira/browse/FLINK-36766
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.19.1, 1.20.0
Environment: Ubuntu 24 LTSC
Flink : 1.19.1 or 1.20.0
Reporter: William Que
I use the following codes to connect remote flink cluster and then create a
remote flink env. After adding jar files to the streamExecutionEnvironment,
evary time when executing flink sql, error will be reported, something like
error of parsing yaml file.
{code:java}
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from pyflink.table import StreamTableEnvironment
gateway = get_gateway()
string_class = gateway.jvm.String
string_array = gateway.new_array(string_class, 0)
stream_env =
gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
j_stream_exection_environment =
stream_env.createRemoteEnvironment("master",8081,string_array)
env = StreamExecutionEnvironment(j_stream_exection_environment)
jars_path = "F:/jars/flink-1.19.1/"
jar_files = ["file:///" + jars_path + f for f in os.listdir(jars_path) if
f.endswith('.jar')]
jar_files_str = ';'.join(jar_files)
env.add_jars(*jar_files) ## Cause Error
t_env = StreamTableEnvironment.create(env)
{code}
Then I trace the error, and find it caused by a static method in
configuration.py of pyflink package. after env.add_jars(*jar_files) , the value
parameter will be like this, which caused the above error.
value =
'{color:#FF0000}[];{color}file:///F:/software/jars-flink3/flink-clients-1.20.0.jar;file:///F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar;......'
{code:java}
@staticmethod
def parse_jars_value(value: str, jvm):
is_standard_yaml =
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
if is_standard_yaml:
from ruamel.yaml import YAML
yaml = YAML(typ='safe')
jar_urls_list = yaml.load(value) # ERROR
if isinstance(jar_urls_list, list):
return jar_urls_list
return value.split(";") {code}
I once tried to fix it by the way of removing "[];" part from the value of
value parameter, one problem solved but another then came out, it seems jar
files have been added twice in classpath at some place.
{code:java}
Caused by: java.lang.IllegalStateException: The library registration references
a different set of library BLOBs than previous registrations for this job:
old:[file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar,
file:/F:/software/jars-flink3/flink-clients-1.20.0.jar,
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar,
file:/F:/software/jars-flink3/flink-json-1.20.0.jar,
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar,
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar]
new:[file:/F:/software/jars-flink3/flink-clients-1.20.0.jar,
file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar,
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar,
file:/F:/software/jars-flink3/flink-json-1.20.0.jar,
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar,
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar,
file:/F:/software/jars-flink3/flink-clients-1.20.0.jar,
file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar,
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar,
file:/F:/software/jars-flink3/flink-json-1.20.0.jar,
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar,
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar]{code}
Please check it carefullly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)