[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sean.miao updated FLINK-10119:
------------------------------
    Description: 
enable checkpoint and  use RestartStrategies.fixedDelayRestart 。

Recently, we are using Kafka010JsonTableSource to process kafka's json 
messages.We turned on checkpoint and auto-restart strategy .

We found that as long as the format of a message is not json, it will cause the 
job to not be pulled up. Of course, this is to ensure that only one processing 
or at least one processing, but the resulting application is not available and 
has a greater impact on us.

the code is :

class : JsonRowDeserializationSchema

function :

@Override
 public Row deserialize(byte[] message) throws IOException {
 try

{ final JsonNode root = objectMapper.readTree(message); return convertRow(root, 
(RowTypeInfo) typeInfo); }

catch (Throwable t)

{ throw new IOException("Failed to deserialize JSON object.", t); }

}

now ,i change it to  :

public Row deserialize(byte[] message) throws IOException {
 try {
 JsonNode root = this.objectMapper.readTree(message);
 return this.convertRow(root, (RowTypeInfo)this.typeInfo);
 } catch (Throwable var4) {
 message = this.objectMapper.writeValueAsBytes("{}");
 JsonNode root = this.objectMapper.readTree(message);
 return this.convertRow(root, (RowTypeInfo)this.typeInfo);
 }
}

 

I think that data format errors are inevitable during network transmission, so 
can we add a new column to the table for the wrong data format? like spark sql 
does。

 

  was:
开启checkpoint和savepoint,同时开启了job的自动拉起。

flink从kafka消费数据,使用的是Kafka010JsonTableSource。发现只要有一条数据非json格式,就会导致应用挂掉无法拉起。

当前,这仅是满足了处理语义,但是导致应用不可以用就不太好了吧。能不能改成像spark sql一样,不满足格式的数据,增加到一个专门存储无法解析的数据的列里面。

 

我们目前的做法是

JsonRowDeserializationSchema

@Override
public Row deserialize(byte[] message) throws IOException {
 try {
 final JsonNode root = objectMapper.readTree(message);
 return convertRow(root, (RowTypeInfo) typeInfo);
 } catch (Throwable t) {
 throw new IOException("Failed to deserialize JSON object.", t);
 }
}

catch 里抛异常改成了传入一个 “{}”,会使得所有不能解析数据给所有列返回空值。

        Summary: JsonRowDeserializationSchema deserialize kafka message  (was: 
存在数据非json格式,使用KafkaJsonTableSource的话,job无法拉起。)

> JsonRowDeserializationSchema deserialize kafka message
> ------------------------------------------------------
>
>                 Key: FLINK-10119
>                 URL: https://issues.apache.org/jira/browse/FLINK-10119
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.1
>         Environment: 无
>            Reporter: sean.miao
>            Priority: Major
>
> enable checkpoint and  use RestartStrategies.fixedDelayRestart 。
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only one 
> processing or at least one processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try {
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  } catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
> }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to