GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/2687
[FLINK-3123] [kafka] Allow custom specific start offsets for Kafka consumers
This PR is based on #2509, so only the last commit is relevant.
With this change, users can now specify specific start offsets for Kafka
consumers like this:
```
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(...);
consumer.setStartFromSpecificOffsets(specificStartOffsets);
...
```
If a subscribed partition is not defined a specific offset (does not have a
corresponding entry in the `specificStartOffsets` map), then the startup
behaviour for that particular partition fallbacks to the default group offset
behaviour (look for offset in ZK / Kafka for that partition, or use
"auto.offset.reset" if none can be found).
An IT test `runStartFromSpecificOffsets()` is added for this functionality,
however is currently only enabled on the Kafka 0.8 consumer, because 0.9 and
0.10 tests have the same Kafka config problem mentioned in #2509. So, for now,
for versions 0.9 and 0.10, I have only manually tested this new functionality,
and it works correctly as described above.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-3123
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2687.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2687
----
commit eca9043ebd63ea201b14b129ce08a9f3ee78c49c
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-09-17T13:41:50Z
[FLINK-4280][kafka] Explicit start position configuration for Kafka Consumer
commit 0703469e1880daa63bc5e92bae9920573659806d
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-23T08:55:58Z
[FLINK-4280] Allow Kafka 0.10 to override 0.10-specific API calls
Methods calls to `seekToBeginning` and `seekToEnd` have breaking APIs
across 0.9 and 0.10, causing 0.10 IT tests to fail.
commit bdf6b76eb86c3d6a4a0bb84ab26beb13e84526b1
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-24T05:24:10Z
[FLINK-4280] Add IT tests for explicit start position configuration
commit d8f5f976ef2e2e1d994a45468d7e9ef3b8bf0015
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-24T08:57:05Z
[FLINK-4280] Add documentation for the new explicit start position methods
commit 098360fc797b78f15917aaf1b22d09c06a4d0a6c
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-24T08:08:18Z
[FLINK-3123] [kafka] Allow custom specific start offsets for Kafka consumers
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---