[
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16575703#comment-16575703
]
陈梓立 commented on FLINK-10119:
-----------------------------
[~CrestOfWave] I think it is good to add Chinese version as information to
express you more clearly for Chinese Developers. However, for general discuss,
please use English to describe it.
> 存在数据非json格式,使用KafkaJsonTableSource的话,job无法拉起。
> ---------------------------------------------
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.5.1
> Environment: 无
> Reporter: sean.miao
> Priority: Major
>
> 开启checkpoint和savepoint,同时开启了job的自动拉起。
> flink从kafka消费数据,使用的是Kafka010JsonTableSource。发现只要有一条数据非json格式,就会导致应用挂掉无法拉起。
> 当前,这仅是满足了处理语义,但是导致应用不可以用就不太好了吧。能不能改成像spark
> sql一样,不满足格式的数据,增加到一个专门存储无法解析的数据的列里面。
>
> 我们目前的做法是
> JsonRowDeserializationSchema
> @Override
> public Row deserialize(byte[] message) throws IOException {
> try {
> final JsonNode root = objectMapper.readTree(message);
> return convertRow(root, (RowTypeInfo) typeInfo);
> } catch (Throwable t) {
> throw new IOException("Failed to deserialize JSON object.", t);
> }
> }
> catch 里抛异常改成了传入一个 “{}”,会使得所有不能解析数据给所有列返回空值。
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)