[ 
https://issues.apache.org/jira/browse/FLINK-18017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated FLINK-18017:
----------------------------
    Summary: have Kafka connector report metrics on null records   (was: 
improve Kafka connector to handle record deserialization exception and report 
related metrics)

> have Kafka connector report metrics on null records 
> ----------------------------------------------------
>
>                 Key: FLINK-18017
>                 URL: https://issues.apache.org/jira/browse/FLINK-18017
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.1
>            Reporter: Yu Yang
>            Priority: Major
>
> Corrupted messages can get into the message pipeline for various reasons.  
> When a Flink deserializer fails to deserialize the message, and throw an 
> exception due to corrupted message, the flink application will be blocked 
> until we update the deserializer to handle the exception. 
>  
> Currently messages are deserialized as below in 
> flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
> {code:java}
> for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
>  final T value = deserializer.deserialize(record);
>  if (deserializer.isEndOfStream(value)) {
>  // end of stream signaled
>  running = false;
>  break;
>  }
>  // emit the actual record. this also updates offset state atomically
>  // and deals with timestamps and watermark generation
>  emitRecord(value, partition, record.offset(), record);
> }
>   {code}
> Flink Kafka connector needs to catch exception from deserialization, and 
> expose related metrics. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to