Xin Yu created FLINK-24335:
------------------------------
Summary: Get java.net.MalformedURLException when submit job from
python by pyflink table api.
Key: FLINK-24335
URL: https://issues.apache.org/jira/browse/FLINK-24335
Project: Flink
Issue Type: Bug
Components: API / Python, Client / Job Submission, Table SQL / API
Affects Versions: 1.11.2
Environment: Liunx, Flink 1.11.2
Reporter: Xin Yu
When I run flink client to submit a python based workflow, I got the
MalformedURLException like this:
{{Traceback (most recent call last):Traceback (most recent call last): File
"/opt/python-occlum/lib/python3.7/site-packages/ai_flow_plugins/job_plugins/flink/flink_run_main.py",
line 96, in run_project flink_execute_func(run_graph=run_graph,
job_execution_info=job_execution_info, flink_env=flink_env) File
"/opt/python-occlum/lib/python3.7/site-packages/ai_flow_plugins/job_plugins/flink/flink_run_main.py",
line 75, in flink_execute_func job_client =
statement_set.execute().get_job_client() File
"/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/statement_set.py", line
104, in execute return TableResult(self._j_statement_set.execute()) File
"/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line
1286, in __call__ answer, self.gateway_client, self.target_id, self.name)
File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1],
stack_trace)pyflink.util.exceptions.TableException: 'Failed to execute
sql'Traceback (most recent call last): File
"/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
147, in deco File
"/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line
328, in get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while
calling o24.execute.: org.apache.flink.table.api.TableException: Failed to
execute sql at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:721)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)Caused by:
java.net.MalformedURLException: no protocol: at
java.base/java.net.URL.<init>(URL.java:645) at
java.base/java.net.URL.<init>(URL.java:541) at
java.base/java.net.URL.<init>(URL.java:488) at
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:127)
at
org.apache.flink.client.cli.ExecutionConfigAccessor.getClasspaths(ExecutionConfigAccessor.java:79)
at
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:62)
at
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:57)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:705)
... 12 more}}
After some debug work, I found the problem is related with
TableEvneriontment.execute_sql. The root cause is
TableEvenriontment._add_jars_to_j_env_config in
pyflink/table/TableEnverionment.py.
{{if j_configuration.containsKey(config_key):}}
{{ for url in j_configuration.getString(config_key, "").split(";"):}}
{{ jar_urls_set.add(url)}}
In our case, pipeline.classpaths was set by empty list value
from FromProgramOption, so the upper code block will
introduce a empty string ("") into pipeline.classpaths, for example
"a.jar;b.jar;;c.jar", and it will cause the according exception.
Another problem, the order of string set in python is not
determinate, so ";".join(jar_urls_set) does NOT keep the
classpaths order. The list is more suiteable in this case.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)