[ 
https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601611#comment-15601611
 ] 

ASF GitHub Bot commented on FLINK-3123:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2687

    [FLINK-3123] [kafka] Allow custom specific start offsets for Kafka consumers

    This PR is based on #2509, so only the last commit is relevant.
    
    With this change, users can now specify specific start offsets for Kafka 
consumers like this:
    ```
    Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
    specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
    specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
    specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);
    
    FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(...);
    consumer.setStartFromSpecificOffsets(specificStartOffsets);
    ...
    ```
    
    If a subscribed partition is not defined a specific offset (does not have a 
corresponding entry in the `specificStartOffsets` map), then the startup 
behaviour for that particular partition fallbacks to the default group offset 
behaviour (look for offset in ZK / Kafka for that partition, or use 
"auto.offset.reset" if none can be found).
    
    An IT test `runStartFromSpecificOffsets()` is added for this functionality, 
however is currently only enabled on the Kafka 0.8 consumer, because 0.9 and 
0.10 tests have the same Kafka config problem mentioned in #2509. So, for now, 
for versions 0.9 and 0.10, I have only manually tested this new functionality, 
and it works correctly as described above.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-3123

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2687.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2687
    
----
commit eca9043ebd63ea201b14b129ce08a9f3ee78c49c
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2016-09-17T13:41:50Z

    [FLINK-4280][kafka] Explicit start position configuration for Kafka Consumer

commit 0703469e1880daa63bc5e92bae9920573659806d
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2016-10-23T08:55:58Z

    [FLINK-4280] Allow Kafka 0.10 to override 0.10-specific API calls
    
    Methods calls to `seekToBeginning` and `seekToEnd` have breaking APIs
    across 0.9 and 0.10, causing 0.10 IT tests to fail.

commit bdf6b76eb86c3d6a4a0bb84ab26beb13e84526b1
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2016-10-24T05:24:10Z

    [FLINK-4280] Add IT tests for explicit start position configuration

commit d8f5f976ef2e2e1d994a45468d7e9ef3b8bf0015
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2016-10-24T08:57:05Z

    [FLINK-4280] Add documentation for the new explicit start position methods

commit 098360fc797b78f15917aaf1b22d09c06a4d0a6c
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2016-10-24T08:08:18Z

    [FLINK-3123] [kafka] Allow custom specific start offsets for Kafka consumers

----


> Allow setting custom start-offsets for the Kafka consumer
> ---------------------------------------------------------
>
>                 Key: FLINK-3123
>                 URL: https://issues.apache.org/jira/browse/FLINK-3123
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>             Fix For: 1.0.0
>
>
> Currently, the Kafka consumer only allows to start reading from the earliest 
> available offset or the current offset.
> Sometimes, users want to set a specific start offset themselves.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to