[
https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17016853#comment-17016853
]
Jark Wu commented on FLINK-15552:
---------------------------------
Thanks for the investigation [~Leonard Xu]. I got it. The reason why e2e works
is because it uses YAML which uses the correct classloader in SQL CLI.
However, the classloader is not correctly if it is registered via DDL. So
problem is {{--jar}} and {{--library}} doesn't work.
The fixing might be simple, we should use current classloader when
{{TableFactoryUtil#findAndCreateTableSource(table)}} which contains the user's
jars.
cc [~twalthr]
> SQL Client can not correctly create kafka table using --library to indicate a
> kafka connector directory
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-15552
> URL: https://issues.apache.org/jira/browse/FLINK-15552
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Client, Table SQL / Runtime
> Reporter: Terry Wang
> Assignee: Leonard Xu
> Priority: Critical
> Fix For: 1.10.0
>
>
> How to Reproduce:
> first, I start a sql client and using `-l` to point to a kafka connector
> directory.
> `
> bin/sql-client.sh embedded -l /xx/connectors/kafka/
> `
> Then, I create a Kafka Table like following
> `
> Flink SQL> CREATE TABLE MyUserTable (
> > content String
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'test',
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',
> > 'connector.properties.group.id' = 'testGroup',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type' = 'csv'
> > );
> [INFO] Table has been created.
> `
> Then I select from just created table and an exception been thrown:
> `
> Flink SQL> select * from MyUserTable;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a
> suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: Required context properties mismatch.
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
> The following properties are requested:
> connector.properties.bootstrap.servers=localhost:9092
> connector.properties.group.id=testGroup
> connector.properties.zookeeper.connect=localhost:2181
> connector.startup-mode=earliest-offset
> connector.topic=test
> connector.type=kafka
> connector.version=universal
> format.type=csv
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=content
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> `
> Potential Reasons:
> Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a
> CatalogTable to TableSource, but when call `TableFactoryService.find` we
> don't pass current classLoader to this method, the default loader will be
> BootStrapClassLoader, which can not find our factory.
> I verified in my box, it's truly caused by this behavior.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)