[ 
https://issues.apache.org/jira/browse/FLINK-27974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553333#comment-17553333
 ] 

Dong Jiang edited comment on FLINK-27974 at 6/12/22 8:32 PM:
-------------------------------------------------------------

This can be replicated by pyflink-shell
{code}
./pyflink-shell.sh local
import tempfile
import os
import shutil

sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
    if os.path.isfile(sink_path):
        os.remove(sink_path)
    else:
        shutil.rmtree(sink_path)

s_env.set_parallelism(2)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
st_env.execute_sql(f"create temporary table stream_sink (a bigint, b string, c 
string) with ('connector' = 'filesystem', 'path' = '{sink_path}', 'format' = 
'csv')")
t.select("a + 1, b, c").execute_insert("stream_sink").wait()
{code}
{code}
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/table/table.py",
 line 1107, in execute_insert
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py",
 line 1321, in __call__
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 146, in deco
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py",
 line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o74.executeInsert.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for 
writing table 'default_catalog.default_database.stream_sink'.

Table options are:

'connector'='filesystem'
'format'='csv'
'path'='/var/folders/47/d25xsmfd0_ldv7cx2lfywgvh0000gs/T/streaming.csv'
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:421)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:178)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
        at 
org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
        at org.apache.flink.table.api.Table.executeInsert(Table.java:1470)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/table/planner/delegation/ParserFactory
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at 
org.apache.flink.connector.file.table.FileSystemTableFactory.formatFactoryExists(FileSystemTableFactory.java:205)
        at 
org.apache.flink.connector.file.table.FileSystemTableFactory.discoverDecodingFormat(FileSystemTableFactory.java:171)
        at 
org.apache.flink.connector.file.table.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:90)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
        ... 30 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.planner.delegation.ParserFactory
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 52 more
{code}


was (Author: djiangxu):
This can be replicated by pyflink-shell
{code}
./pyflink-shell.sh local
import tempfile
import os
import shutil

sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
    if os.path.isfile(sink_path):
        os.remove(sink_path)
    else:
        shutil.rmtree(sink_path)

s_env.set_parallelism(2)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
st_env.execute_sql(f"create temporary table stream_sink (a bigint, b string, c 
string) with ('connector' = 'filesystem', 'path' = '{sink_path}', 'format' = 
'csv')")
t.select("a + 1, b, c").execute_insert("stream_sink").wait()
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/table/table.py",
 line 1107, in execute_insert
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py",
 line 1321, in __call__
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 146, in deco
  File 
"/Users/djiang/flink/flink-1.15.0/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py",
 line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o74.executeInsert.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for 
writing table 'default_catalog.default_database.stream_sink'.

Table options are:

'connector'='filesystem'
'format'='csv'
'path'='/var/folders/47/d25xsmfd0_ldv7cx2lfywgvh0000gs/T/streaming.csv'
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:421)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:178)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
        at 
org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
        at org.apache.flink.table.api.Table.executeInsert(Table.java:1470)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/table/planner/delegation/ParserFactory
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at 
org.apache.flink.connector.file.table.FileSystemTableFactory.formatFactoryExists(FileSystemTableFactory.java:205)
        at 
org.apache.flink.connector.file.table.FileSystemTableFactory.discoverDecodingFormat(FileSystemTableFactory.java:171)
        at 
org.apache.flink.connector.file.table.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:90)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
        ... 30 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.planner.delegation.ParserFactory
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 52 more
{code}

> Potentially wrong classloader being used to create dynamic table sources
> ------------------------------------------------------------------------
>
>                 Key: FLINK-27974
>                 URL: https://issues.apache.org/jira/browse/FLINK-27974
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.15.0
>            Reporter: Chesnay Schepler
>            Priority: Major
>
> A user reported an issue on slack where a job fails in the CLI because of 
> {{ClassNotFoundException: 
> org.apache.flink.table.planner.delegation.ParserFactory}} in 
> {{FileSystemTableFactory#formatFactoryExists}} when trying to load the 
> {{Factory}} service.
> While looking through the call stack I noticed that the classloader passed 
> via the context is a thread's context classloader, set in 
> {{CatalogSourceTable#createDynamicTableSource}}.
> This seems a bit fishy; since this runs in the context of the CLI this CL is 
> likely the user CL, but the planner classes are loaded in a separate 
> classloader (not in the parent). As a result the planner classes cannot be 
> looked up via the service loader mechanism.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to