[
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854944#comment-15854944
]
Haohui Mai commented on FLINK-5583:
-----------------------------------
In general (1) sounds good to me. Taking a closer look it seems that it might
require a stateful API instead of the traditional {{Collector}} APIs.
We have a mission-critical use case that needs to write all corrupted messages
to a persistent store so that these messages can be inspected and backfilled
later. Ideally the {{DeserializationSchema}} could some state and probably will
need to be synchronized when checkpoints happen.
It might be more natural to deserialize messages as a subsequent stage of the
consumer. Thoughts?
[~rmetzger] [~tzulitai]
> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Haohui Mai
> Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and
> exceptions in the Kafka consumer in order to build a robust application in
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The
> streaming pipeline might want to bail out (which is the current behavior) or
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)