[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855424#comment-15855424
 ] 

Haohui Mai commented on FLINK-5583:
-----------------------------------

The pseudo-code looks like the following:

{code}
void invoke() {
  try {
    Tuple2<Key, Value> kv = deserialization.deserialze(bytes);
  } catch (Throwable v) {
    // Write the corrupted message to external sources (e.g., Kafka, HDFS)
    deserialization.writeToExternalSources(bytes);
  }
}

void checkpoint() {
  ...
  deserialization.flush();
}
{code}

Strictly speaking the {{DeserialzationSchema}} does not need to have persistent 
state (which IMO should be the right thing to do). However, it does require 
proper synchronizations when the consumer checkpoints. Does it make sense to 
you [~tzulitai]?

> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
>                 Key: FLINK-5583
>                 URL: https://issues.apache.org/jira/browse/FLINK-5583
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to