[ https://issues.apache.org/jira/browse/FLINK-18076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dawid Wysakowicz updated FLINK-18076: ------------------------------------- Description: Sql-client when parsing queries does not use the user class loader from {{ExecutionContext}}. This makes it impossible to query any sources if the dependencies are added with {{-j}} flag. In order to reproduce it try querying e.g. KafkaDynamicSource with {code} CREATE TABLE MyUserTable ( f0 BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_name', -- required: topic name from which the table is read -- required: specify the Kafka server connection string 'properties.bootstrap.servers' = 'localhost:9092', -- required for Kafka source, optional for Kafka sink, specify consumer group 'properties.group.id' = 'testGroup', -- optional: valid modes are "earliest-offset", "latest-offset", "group-offsets", "specific-offsets" or "timestamp" 'scan.startup.mode' = 'earliest-offset', 'format' = 'avro' ); SELECT * FROM MyUserTable; {code} It give exception: {code} Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL statement. at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:95) at org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:79) at org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:256) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.MyUserTable'. Table options are: 'connector'='kafka' 'format'='avro' 'properties.bootstrap.servers'='localhost:9092' 'properties.group.id'='testGroup' 'scan.startup.mode'='earliest-offset' 'topic'='topic_name' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:90) ... 6 more Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ... 23 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ... 24 more Shutting down the session... done. {code} Because the factories are present only in the user classloader. was: Sql-client when parsing queries does not use the user class loader from {{ExecutionContext}}. This makes it impossible to query any sources if the dependencies are added with {{-j}} flag. In order to reproduce it try querying e.g. KafkaDynamicSource with {code} CREATE TABLE MyUserTable ( f0 BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_name', -- required: topic name from which the table is read -- required: specify the Kafka server connection string 'properties.bootstrap.servers' = 'localhost:9092', -- required for Kafka source, optional for Kafka sink, specify consumer group 'properties.group.id' = 'testGroup', -- optional: valid modes are "earliest-offset", "latest-offset", "group-offsets", "specific-offsets" or "timestamp" 'scan.startup.mode' = 'earliest-offset', 'format' = 'avro' ); SELECT * FROM MyUserTable; {code} > Sql client uses wrong class loader when parsing queries > ------------------------------------------------------- > > Key: FLINK-18076 > URL: https://issues.apache.org/jira/browse/FLINK-18076 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client > Affects Versions: 1.11.0, 1.12.0 > Reporter: Dawid Wysakowicz > Priority: Blocker > Fix For: 1.11.0 > > > Sql-client when parsing queries does not use the user class loader from > {{ExecutionContext}}. This makes it impossible to query any sources if the > dependencies are added with {{-j}} flag. > In order to reproduce it try querying e.g. KafkaDynamicSource with > {code} > CREATE TABLE MyUserTable ( > f0 BIGINT > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_name', -- required: topic name from which the table is read > -- required: specify the Kafka server connection string > 'properties.bootstrap.servers' = 'localhost:9092', > -- required for Kafka source, optional for Kafka sink, specify consumer > group > 'properties.group.id' = 'testGroup', > -- optional: valid modes are "earliest-offset", "latest-offset", > "group-offsets", "specific-offsets" or "timestamp" > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > SELECT * FROM MyUserTable; > {code} > It give exception: > {code} > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Invalidate SQL statement. > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:95) > at > org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:79) > at > org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:256) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > Caused by: org.apache.flink.table.api.ValidationException: Unable to create a > source for reading table 'default_catalog.default_database.MyUserTable'. > Table options are: > 'connector'='kafka' > 'format'='avro' > 'properties.bootstrap.servers'='localhost:9092' > 'properties.group.id'='testGroup' > 'scan.startup.mode'='earliest-offset' > 'topic'='topic_name' > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:90) > ... 6 more > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a > connector using option ''connector'='kafka''. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ... 23 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any > factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. > Available factory identifiers are: > datagen > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ... 24 more > Shutting down the session... > done. > {code} > Because the factories are present only in the user classloader. -- This message was sent by Atlassian Jira (v8.3.4#803005)