[
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15539942#comment-15539942
]
ASF GitHub Bot commented on FLINK-4723:
---------------------------------------
Github user tzulitai commented on the pull request:
https://github.com/apache/flink/commit/53ed6adac8cbe6b5dcb692dc9b94970f3ec5887c#commitcomment-19258663
Found some bugs in `KafkaConsumerBaseTest#testSnapshotState()` here, the
code happened to workaround and bypass the bugs and asserts :P
I'll fix this test as part of
[FLINK-4723](https://issues.apache.org/jira/browse/FLINK-4723) since I'll need
to change this test over there.
> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9
> consumer
> ---------------------------------------------------------------------------------
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the
> committed offsets back to Kafka by the 0.9 by 1, so that the internal
> {{KafkaConsumer}} picks up the correct start position when committed offsets
> are present. This fix was required because the start position from committed
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs,
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka
> consumer to read the same topic in a non-Flink application, the first record
> will be duplicate (because, like described above, Kafka high-level consumers
> expect the committed offsets to be "the next record to process" and not "the
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be
> incremented by 1, and changing how Flink internal offsets are initialized
> with accordance to the acquired ZK offsets.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)