[ 
https://issues.apache.org/jira/browse/FLINK-18017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated FLINK-18017:
----------------------------
    Description: 
Corrupted messages can get into the message pipeline for various reasons.  When 
a Flink deserializer fails to deserialize the message, and throw an exception 
due to corrupted message, the flink application will be blocked until we update 
the deserializer to handle the exception.  
AbstractFetcher.emitRecordsWithTimestamps skips null records.  We need to add 
an metric on # of null records so that the users can measure # of null records 
that KafkaConnector encounters, and set up monitoring & alerting based on that. 

[https://github.com/apache/flink/blob/1cd696d92c3e088a5bd8e5e11b54aacf46e92ae8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L350]

 

 

 

  was:
Corrupted messages can get into the message pipeline for various reasons.  When 
a Flink deserializer fails to deserialize the message, and throw an exception 
due to corrupted message, the flink application will be blocked until we update 
the deserializer to handle the exception. 

 

Currently messages are deserialized as below in 

flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
{code:java}

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {

 final T value = deserializer.deserialize(record);

 if (deserializer.isEndOfStream(value)) {
 // end of stream signaled
 running = false;
 break;
 }

 // emit the actual record. this also updates offset state atomically
 // and deals with timestamps and watermark generation
 emitRecord(value, partition, record.offset(), record);
}
  {code}
Flink Kafka connector needs to catch exception from deserialization, and expose 
related metrics. 

 


> have Kafka connector report metrics on null records 
> ----------------------------------------------------
>
>                 Key: FLINK-18017
>                 URL: https://issues.apache.org/jira/browse/FLINK-18017
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.1
>            Reporter: Yu Yang
>            Priority: Major
>
> Corrupted messages can get into the message pipeline for various reasons.  
> When a Flink deserializer fails to deserialize the message, and throw an 
> exception due to corrupted message, the flink application will be blocked 
> until we update the deserializer to handle the exception.  
> AbstractFetcher.emitRecordsWithTimestamps skips null records.  We need to add 
> an metric on # of null records so that the users can measure # of null 
> records that KafkaConnector encounters, and set up monitoring & alerting 
> based on that. 
> [https://github.com/apache/flink/blob/1cd696d92c3e088a5bd8e5e11b54aacf46e92ae8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L350]
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to