[
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855466#comment-15855466
]
Tzu-Li (Gordon) Tai commented on FLINK-5583:
--------------------------------------------
Ah, I see what you mean. For your use case it makes sense, but I don't think
this necessary for general use cases (especially the {{writeToExternalSources}}
method).
First of all, I would still like to keep the interface to the minimal
flatMap-like version proposed in FLINK-3679:
```
public interface NewDeserializationSchema<T> extends Serializable,
ResultTypeQueryable<T> {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector<T> collector);
}
```
Something like the above (ignore the dummy name, we can think of a better one
:-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into
consumer state, as a single atomic operation synchronized on the checkpoint
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a
checkpoint barrier will only come either after or before all the produced
records of offset 32.
For the synchronization explained above, we do not need to expose another
{{flush}} method to the user.
For your use case, in which you want to write corrupt bytes to a storage, you
would do that with a try-catch block in your implementation of
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is
that it must be a blocking call. Blocking call for this might be ok, depending
on the frequency of corrupt messages. What do you think [~wheat9]?
> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Haohui Mai
> Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and
> exceptions in the Kafka consumer in order to build a robust application in
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The
> streaming pipeline might want to bail out (which is the current behavior) or
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)