GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/3378
[FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to
open()
This PR fixes a regression due to the recently merged #2509 (FLINK-4280).
The new start position feature added in #2509 needed to assume that on
restore, all offsets are defined. This was not true, if a restored checkpoint
was taken before the fetcher was ever initialized or run.
This PR fixes this by changing the following:
1. Move the start position determination logic to `open()`. This assures
that when `snapshotState()` is called, we will always have defined offsets.
2. Introduce special "magic offset values" to represent that a partition is
to be started from either `EARLIEST`, `LATEST`, or `GROUP_OFFSETS`. These
values are set as placeholders in `open()`. The consumer follows a lazy
evaluation approach to only replace these magic values with actual offsets when
the fetcher actually starts running.
Therefore, with this PR, if a checkpoint happens before a fetcher fully
starts consuming all of its subscribed partitions, it will at least contain the
"magic offset value" in the state, instead of an undefined offset like before.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-5849
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3378.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3378
----
commit 7e7bf1d106d4dc0d24fa6746e94ccdadbc06088e
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-02-21T15:05:32Z
[FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to
open()
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---