[
https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601611#comment-15601611
]
ASF GitHub Bot commented on FLINK-3123:
---------------------------------------
GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/2687
[FLINK-3123] [kafka] Allow custom specific start offsets for Kafka consumers
This PR is based on #2509, so only the last commit is relevant.
With this change, users can now specify specific start offsets for Kafka
consumers like this:
```
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(...);
consumer.setStartFromSpecificOffsets(specificStartOffsets);
...
```
If a subscribed partition is not defined a specific offset (does not have a
corresponding entry in the `specificStartOffsets` map), then the startup
behaviour for that particular partition fallbacks to the default group offset
behaviour (look for offset in ZK / Kafka for that partition, or use
"auto.offset.reset" if none can be found).
An IT test `runStartFromSpecificOffsets()` is added for this functionality,
however is currently only enabled on the Kafka 0.8 consumer, because 0.9 and
0.10 tests have the same Kafka config problem mentioned in #2509. So, for now,
for versions 0.9 and 0.10, I have only manually tested this new functionality,
and it works correctly as described above.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-3123
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2687.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2687
----
commit eca9043ebd63ea201b14b129ce08a9f3ee78c49c
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-09-17T13:41:50Z
[FLINK-4280][kafka] Explicit start position configuration for Kafka Consumer
commit 0703469e1880daa63bc5e92bae9920573659806d
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-23T08:55:58Z
[FLINK-4280] Allow Kafka 0.10 to override 0.10-specific API calls
Methods calls to `seekToBeginning` and `seekToEnd` have breaking APIs
across 0.9 and 0.10, causing 0.10 IT tests to fail.
commit bdf6b76eb86c3d6a4a0bb84ab26beb13e84526b1
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-24T05:24:10Z
[FLINK-4280] Add IT tests for explicit start position configuration
commit d8f5f976ef2e2e1d994a45468d7e9ef3b8bf0015
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-24T08:57:05Z
[FLINK-4280] Add documentation for the new explicit start position methods
commit 098360fc797b78f15917aaf1b22d09c06a4d0a6c
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2016-10-24T08:08:18Z
[FLINK-3123] [kafka] Allow custom specific start offsets for Kafka consumers
----
> Allow setting custom start-offsets for the Kafka consumer
> ---------------------------------------------------------
>
> Key: FLINK-3123
> URL: https://issues.apache.org/jira/browse/FLINK-3123
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Robert Metzger
> Fix For: 1.0.0
>
>
> Currently, the Kafka consumer only allows to start reading from the earliest
> available offset or the current offset.
> Sometimes, users want to set a specific start offset themselves.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)