[
https://issues.apache.org/jira/browse/FLINK-27974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553333#comment-17553333
]
Dong Jiang edited comment on FLINK-27974 at 6/12/22 8:36 PM:
-------------------------------------------------------------
This can be replicated by pyflink-shell from 1.15.0. Using 1.14.4 pyflink-shell
would not produce the error
{code}
./pyflink-shell.sh local
import tempfile
import os
import shutil
sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
if os.path.isfile(sink_path):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
s_env.set_parallelism(2)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b',
'c'])
st_env.execute_sql(f"create temporary table stream_sink (a bigint, b string, c
string) with ('connector' = 'filesystem', 'path' = '{sink_path}', 'format' =
'csv')")
t.select("a + 1, b, c").execute_insert("stream_sink").wait()
{code}
{code}
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/table/table.py",
line 1107, in execute_insert
File
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py",
line 1321, in __call__
File
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 146, in deco
File
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py",
line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o74.executeInsert.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for
writing table 'default_catalog.default_database.stream_sink'.
Table options are:
'connector'='filesystem'
'format'='csv'
'path'='/var/folders/47/d25xsmfd0_ldv7cx2lfywgvh0000gs/T/streaming.csv'
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:421)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:178)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
at
org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
at org.apache.flink.table.api.Table.executeInsert(Table.java:1470)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError:
org/apache/flink/table/planner/delegation/ParserFactory
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
org.apache.flink.connector.file.table.FileSystemTableFactory.formatFactoryExists(FileSystemTableFactory.java:205)
at
org.apache.flink.connector.file.table.FileSystemTableFactory.discoverDecodingFormat(FileSystemTableFactory.java:171)
at
org.apache.flink.connector.file.table.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:90)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
... 30 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.planner.delegation.ParserFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 52 more
{code}
was (Author: djiangxu):
This can be replicated by pyflink-shell
{code}
./pyflink-shell.sh local
import tempfile
import os
import shutil
sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
if os.path.isfile(sink_path):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
s_env.set_parallelism(2)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b',
'c'])
st_env.execute_sql(f"create temporary table stream_sink (a bigint, b string, c
string) with ('connector' = 'filesystem', 'path' = '{sink_path}', 'format' =
'csv')")
t.select("a + 1, b, c").execute_insert("stream_sink").wait()
{code}
{code}
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/table/table.py",
line 1107, in execute_insert
File
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py",
line 1321, in __call__
File
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 146, in deco
File
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py",
line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o74.executeInsert.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for
writing table 'default_catalog.default_database.stream_sink'.
Table options are:
'connector'='filesystem'
'format'='csv'
'path'='/var/folders/47/d25xsmfd0_ldv7cx2lfywgvh0000gs/T/streaming.csv'
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:421)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:178)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
at
org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
at org.apache.flink.table.api.Table.executeInsert(Table.java:1470)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError:
org/apache/flink/table/planner/delegation/ParserFactory
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
org.apache.flink.connector.file.table.FileSystemTableFactory.formatFactoryExists(FileSystemTableFactory.java:205)
at
org.apache.flink.connector.file.table.FileSystemTableFactory.discoverDecodingFormat(FileSystemTableFactory.java:171)
at
org.apache.flink.connector.file.table.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:90)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
... 30 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.planner.delegation.ParserFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 52 more
{code}
> Potentially wrong classloader being used to create dynamic table sources
> ------------------------------------------------------------------------
>
> Key: FLINK-27974
> URL: https://issues.apache.org/jira/browse/FLINK-27974
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.15.0
> Reporter: Chesnay Schepler
> Priority: Major
>
> A user reported an issue on slack where a job fails in the CLI because of
> {{ClassNotFoundException:
> org.apache.flink.table.planner.delegation.ParserFactory}} in
> {{FileSystemTableFactory#formatFactoryExists}} when trying to load the
> {{Factory}} service.
> While looking through the call stack I noticed that the classloader passed
> via the context is a thread's context classloader, set in
> {{CatalogSourceTable#createDynamicTableSource}}.
> This seems a bit fishy; since this runs in the context of the CLI this CL is
> likely the user CL, but the planner classes are loaded in a separate
> classloader (not in the parent). As a result the planner classes cannot be
> looked up via the service loader mechanism.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)