ASF GitHub Bot commented on FLINK-4280:

GitHub user tzulitai opened a pull request:


    [FLINK-4280][kafka-connector] Explicit start position configuration for 
Kafka Consumer

    This PR adds the following new explicit setter methods to configure the 
starting position for the Kafka Consumer connector:
    FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08(...) // or 09
    kafka.setStartFromEarliest(); // start from earliest without respecting any 
committed offsets
    kafka.setStartFromLatest(); // start from latest without respecting any 
committed offsets
    kafka.setStartFromGroupOffsets(); // respects committed offsets in ZK / 
Kafka as starting points
    The default is to start from group offsets, so we won't be breaking 
existing user code.
    One thing to note is that this PR also includes some refactoring to 
consolidate all start offset assigning logic for partitions within the fetcher. 
For example, in 0.8 version, with this change the `SimpleConsumerThread` no 
longer deals with deciding where a partition needs to start from; all 
partitions should already be assigned starting offsets by the fetcher, and it 
simply needs to start consuming the partition.This is a pre-preparation for 
transparent partition discovery for the Kafka consumers in 
    I suggest to review this PR after #2369 to reduce effort in getting the 
0.10 Kafka consumer in first. Tests for the new function will be added in 
follow-up commits.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4280

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2509
commit f1d24806d902a45f66fc9b42a19a303a031b81b1
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2016-09-17T13:41:50Z

    [FLINK-4280][kafka-connector] Explicit start position configuration for 
Kafka Consumer


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>                 Key: FLINK-4280
>                 URL: https://issues.apache.org/jira/browse/FLINK-4280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.

This message was sent by Atlassian JIRA

Reply via email to