LakeShen created FLINK-16639: -------------------------------- Summary: Flink SQL Kafka source connector, add the no json format filter params when format.type is json Key: FLINK-16639 URL: https://issues.apache.org/jira/browse/FLINK-16639 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: LakeShen Fix For: 1.10.2
In my thought, kafka source connector is the one of most frequently used connector in flink sql. Flink sql kafka source connector supports the json,csv or other data format. But there is a problem for json format in kafka source connector. For example, flink sql kafka source ddl l like this: CREATE TABLE team_olap_table ( a varchar, b varchar, ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = topics', 'connector.properties.0.key' = 'group.id', 'connector.properties.0.value' = 'hello_world', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'xxx', 'connector.property-version' = '1', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json', 'format.property-version' = '1', 'format.derive-schema' = 'true', 'update-mode' = 'append' ); If the kafka topic messages are not json format ,just one or two records,the flink sql task will fail-over all the time . In order to solve this problem , if flink sql source connector use the json-format, I want to add the 'format.fail-on-not-json-record' param in flink-json module, if this param is true(default),when read the no-json records, the flink will fail, if this param is false, the flink sql task will filter no-json records,the flink task running normally. -- This message was sent by Atlassian Jira (v8.3.4#803005)