[
https://issues.apache.org/jira/browse/FLINK-18076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dawid Wysakowicz closed FLINK-18076.
------------------------------------
Resolution: Fixed
Fixed in:
master: 61e6f70dba3e724c479b8cd7753b314ebb1d5517
1.11: e7902bb4d1329833870ee53c782c6431cfc8cb80
> Sql client uses wrong class loader when parsing queries
> -------------------------------------------------------
>
> Key: FLINK-18076
> URL: https://issues.apache.org/jira/browse/FLINK-18076
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Client
> Affects Versions: 1.11.0, 1.12.0
> Reporter: Dawid Wysakowicz
> Assignee: Leonard Xu
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Sql-client when parsing queries does not use the user class loader from
> {{ExecutionContext}}. This makes it impossible to query any sources if the
> dependencies are added with {{-j}} flag.
> In order to reproduce it try querying e.g. KafkaDynamicSource with
> {code}
> CREATE TABLE MyUserTable (
> f0 BIGINT
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'topic_name', -- required: topic name from which the table is read
> -- required: specify the Kafka server connection string
> 'properties.bootstrap.servers' = 'localhost:9092',
> -- required for Kafka source, optional for Kafka sink, specify consumer
> group
> 'properties.group.id' = 'testGroup',
> -- optional: valid modes are "earliest-offset", "latest-offset",
> "group-offsets", "specific-offsets" or "timestamp"
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'avro'
> );
> SELECT * FROM MyUserTable;
> {code}
> It give exception:
> {code}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException:
> Unexpected exception. This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Invalidate SQL statement.
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:95)
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:79)
> at
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:256)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create a
> source for reading table 'default_catalog.default_database.MyUserTable'.
> Table options are:
> 'connector'='kafka'
> 'format'='avro'
> 'properties.bootstrap.servers'='localhost:9092'
> 'properties.group.id'='testGroup'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='topic_name'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:90)
> ... 6 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
> connector using option ''connector'='kafka''.
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ... 23 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any
> factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
> Available factory identifiers are:
> datagen
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 24 more
> Shutting down the session...
> done.
> {code}
> Because the factories are present only in the user classloader.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)