[ 
https://issues.apache.org/jira/browse/FLINK-18076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-18076:
-------------------------------------
    Description: 
Sql-client when parsing queries does not use the user class loader from 
{{ExecutionContext}}. This makes it impossible to query any sources if the 
dependencies are added with {{-j}} flag.

In order to reproduce it try querying e.g. KafkaDynamicSource with
{code}
CREATE TABLE MyUserTable (
   f0 BIGINT
) WITH (
  'connector' = 'kafka',       

  'topic' = 'topic_name', -- required: topic name from which the table is read

 -- required: specify the Kafka server connection string
  'properties.bootstrap.servers' = 'localhost:9092',
  -- required for Kafka source, optional for Kafka sink, specify consumer group
  'properties.group.id' = 'testGroup',
  -- optional: valid modes are "earliest-offset", "latest-offset", 
"group-offsets", "specific-offsets" or "timestamp"
'scan.startup.mode' = 'earliest-offset',

   'format' = 'avro'
);


SELECT * FROM MyUserTable;
{code}

It give exception:

{code}
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
Invalidate SQL statement.
        at 
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:95)
        at 
org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:79)
        at 
org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:256)
        at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a 
source for reading table 'default_catalog.default_database.MyUserTable'.

Table options are:

'connector'='kafka'
'format'='avro'
'properties.bootstrap.servers'='localhost:9092'
'properties.group.id'='testGroup'
'scan.startup.mode'='earliest-offset'
'topic'='topic_name'
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
        at 
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:90)
        ... 6 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
connector using option ''connector'='kafka''.
        at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
        ... 23 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
        at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
        at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
        ... 24 more

Shutting down the session...
done.
{code}

Because the factories are present only in the user classloader.

  was:
Sql-client when parsing queries does not use the user class loader from 
{{ExecutionContext}}. This makes it impossible to query any sources if the 
dependencies are added with {{-j}} flag.

In order to reproduce it try querying e.g. KafkaDynamicSource with
{code}
CREATE TABLE MyUserTable (
   f0 BIGINT
) WITH (
  'connector' = 'kafka',       

  'topic' = 'topic_name', -- required: topic name from which the table is read

 -- required: specify the Kafka server connection string
  'properties.bootstrap.servers' = 'localhost:9092',
  -- required for Kafka source, optional for Kafka sink, specify consumer group
  'properties.group.id' = 'testGroup',
  -- optional: valid modes are "earliest-offset", "latest-offset", 
"group-offsets", "specific-offsets" or "timestamp"
'scan.startup.mode' = 'earliest-offset',

   'format' = 'avro'
);


SELECT * FROM MyUserTable;
{code}


> Sql client uses wrong class loader when parsing queries
> -------------------------------------------------------
>
>                 Key: FLINK-18076
>                 URL: https://issues.apache.org/jira/browse/FLINK-18076
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: Dawid Wysakowicz
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> Sql-client when parsing queries does not use the user class loader from 
> {{ExecutionContext}}. This makes it impossible to query any sources if the 
> dependencies are added with {{-j}} flag.
> In order to reproduce it try querying e.g. KafkaDynamicSource with
> {code}
> CREATE TABLE MyUserTable (
>    f0 BIGINT
> ) WITH (
>   'connector' = 'kafka',       
>   'topic' = 'topic_name', -- required: topic name from which the table is read
>  -- required: specify the Kafka server connection string
>   'properties.bootstrap.servers' = 'localhost:9092',
>   -- required for Kafka source, optional for Kafka sink, specify consumer 
> group
>   'properties.group.id' = 'testGroup',
>   -- optional: valid modes are "earliest-offset", "latest-offset", 
> "group-offsets", "specific-offsets" or "timestamp"
> 'scan.startup.mode' = 'earliest-offset',
>    'format' = 'avro'
> );
> SELECT * FROM MyUserTable;
> {code}
> It give exception:
> {code}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>       at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalidate SQL statement.
>       at 
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:95)
>       at 
> org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:79)
>       at 
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:256)
>       at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
>       at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
>       at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
>       at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create a 
> source for reading table 'default_catalog.default_database.MyUserTable'.
> Table options are:
> 'connector'='kafka'
> 'format'='avro'
> 'properties.bootstrap.servers'='localhost:9092'
> 'properties.group.id'='testGroup'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='topic_name'
>       at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
>       at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
>       at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>       at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>       at 
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:90)
>       ... 6 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
> connector using option ''connector'='kafka''.
>       at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>       at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
>       ... 23 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
> factory for identifier 'kafka' that implements 
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
> Available factory identifiers are:
> datagen
>       at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>       at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>       ... 24 more
> Shutting down the session...
> done.
> {code}
> Because the factories are present only in the user classloader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to