GitHub user tzulitai opened a pull request:

    [FLINK-4280][kafka-connector] Explicit start position configuration for 
Kafka Consumer

    This PR adds the following new explicit setter methods to configure the 
starting position for the Kafka Consumer connector:
    FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08(...) // or 09
    kafka.setStartFromEarliest(); // start from earliest without respecting any 
committed offsets
    kafka.setStartFromLatest(); // start from latest without respecting any 
committed offsets
    kafka.setStartFromGroupOffsets(); // respects committed offsets in ZK / 
Kafka as starting points
    The default is to start from group offsets, so we won't be breaking 
existing user code.
    One thing to note is that this PR also includes some refactoring to 
consolidate all start offset assigning logic for partitions within the fetcher. 
For example, in 0.8 version, with this change the `SimpleConsumerThread` no 
longer deals with deciding where a partition needs to start from; all 
partitions should already be assigned starting offsets by the fetcher, and it 
simply needs to start consuming the partition.This is a pre-preparation for 
transparent partition discovery for the Kafka consumers in 
    I suggest to review this PR after #2369 to reduce effort in getting the 
0.10 Kafka consumer in first. Tests for the new function will be added in 
follow-up commits.

You can merge this pull request into a Git repository by running:

    $ git pull FLINK-4280

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2509
commit f1d24806d902a45f66fc9b42a19a303a031b81b1
Author: Tzu-Li (Gordon) Tai <>
Date:   2016-09-17T13:41:50Z

    [FLINK-4280][kafka-connector] Explicit start position configuration for 
Kafka Consumer


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

Reply via email to