[ https://issues.apache.org/jira/browse/FLINK-3188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15066567#comment-15066567 ]
Robert Metzger commented on FLINK-3188: --------------------------------------- Thank you for providing a patch for the issue. Do you know how Kafka's own high level (and the new consumer API) are handling deleted messages? Are they also passing null values for those messages? I would like to make Flink's behavior similar to Kafka's. I would also like to add a test case for this to ensure its always working ;) How urgent do you need this fixed? > Deletes in Kafka source should be passed on to KeyedDeserializationSchema > ------------------------------------------------------------------------- > > Key: FLINK-3188 > URL: https://issues.apache.org/jira/browse/FLINK-3188 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.0 > Reporter: Sebastian Klemke > Attachments: kafka-deletions.patch > > > When keys are deleted in the kafka queue, they show up as keys with null > payload. Currently in Flink 1.0-SNAPSHOT, these deletions are silently > skipped, without increasing current offset. > This leads to two problems: > 1. When a fetch window contains only deletions, LegacyFetcher gets stuck > 2. For KeyedDeserializationSchemas, it would make sense to pass deletions to > the deserializer, so that it can decide to wrap deleted keys as a deletion > command. This is also more consistent with the semantics of keys in Kafka > queues: When compaction is activated, only the latest message with the same > key needs to be kept by Kafka. > We propose the attached patch as a workaround for both issues. -- This message was sent by Atlassian JIRA (v6.3.4#6332)