[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855466#comment-15855466
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5583:
--------------------------------------------

Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

```
public interface NewDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
    // user uses collector to buffer outputs
    void deserialize(byte[] message, OutputCollector<T> collector);
}
```

Something like the above (ignore the dummy name, we can think of a better one 
:-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
consumer state, as a single atomic operation synchronized on the checkpoint 
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
>                 Key: FLINK-5583
>                 URL: https://issues.apache.org/jira/browse/FLINK-5583
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to