[
https://issues.apache.org/jira/browse/FLINK-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
mzz updated FLINK-18184:
------------------------
Priority: Major (was: Critical)
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory'
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-18184
> URL: https://issues.apache.org/jira/browse/FLINK-18184
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.9.1
> Environment: local:macos
> flink1.9
>
> Reporter: mzz
> Priority: Major
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(5000) // checkpoint every 5000 msecs
> //kafak配置
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "172.16.30.207:9092")
> properties.setProperty("group.id", "km_aggs_group")
> val fsSettings =
> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
> val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new
> SimpleStringSchema(), properties).setStartFromEarliest()
> // val source = env.addSource(kafkaConsumer)
> val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings)
> streamTableEnvironment.connect(new Kafka()
> .topic(TOPIC)
> .version(VERSION)
> .startFromEarliest()
> .property("bootstrap.servers", "172.16.30.207:9092")
> .property("zookeeper.connect", "172.16.30.207:2181")
> .property("group.id", "km_aggs_group_table")
> // .properties(properties)
> )
> .withFormat(
> new Json()
> .failOnMissingField(true)
> .deriveSchema()
> )
> .withSchema(new Schema()
> .field("advs", Types.STRING())
> .field("devs", Types.STRING())
> .field("environment", Types.STRING())
> .field("events", Types.STRING())
> .field("identity", Types.STRING())
> .field("ip", Types.STRING())
> .field("launchs", Types.STRING())
> .field("ts", Types.STRING())
> )
> .inAppendMode()
> .registerTableSource("aggs_test")
> val tableResult = streamTableEnvironment.sqlQuery("select * from
> aggs_test")
> tableResult.printSchema()
> // streamTableEnvironment.toAppendStream[Row](tableResult).print()
> //启动程序
> env.execute("test_kafka")
> --------------------------------------------------------
> erroe message:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
> at
> KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70)
> at
> KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could
> not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=172.16.30.207:2181
> connector.properties.1.key=group.id
> connector.properties.1.value=km_aggs_group_table
> connector.properties.2.key=bootstrap.servers
> connector.properties.2.value=172.16.30.207:9092
> connector.property-version=1
> connector.startup-mode=earliest-offset
> connector.topic=aggs_topic
> connector.type=kafka
> connector.version=2.0
> format.derive-schema=true
> format.fail-on-missing-field=true
> format.property-version=1
> format.type=json
> schema.0.name=advs
> schema.0.type=VARCHAR
> schema.1.name=devs
> schema.1.type=VARCHAR
> schema.2.name=environment
> schema.2.type=VARCHAR
> schema.3.name=events
> schema.3.type=VARCHAR
> schema.4.name=identity
> schema.4.type=VARCHAR
> schema.5.name=ip
> schema.5.type=VARCHAR
> schema.6.name=launchs
> schema.6.type=VARCHAR
> schema.7.name=ts
> schema.7.type=VARCHAR
> update-mode=append
> The following factories have been considered:
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
> ... 4 more
> I've tried these way,Didn't solve my
> problem。[https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto|https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto]
> Anyone help me ,THX!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)