[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400472#comment-15400472
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4280:
--------------------------------------------

[~StephanEwen] On second thought, I think I misunderstood what you meant in the 
first place.

What you're proposing is this (I think this is a clearer design than what I 
mentioned above):

{code}
Properties props = new Properties();
...

FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if 
checkpointing is enabled, otherwise, periodically.
kafka.setForwardMetrics(boolean);
...

env.addSource(kafka) ...
{code}

Echoing your statement if we are going with this approach:

now that we are trying to separate Flink-specific configs with Kafka configs, I 
think we should clearly state (and change implementation) that the 
{{Properties}} provided in the constructor will only be used to configure the 
internal Kafka consumers the connector is using by simply passing the 
{{Properties}}. So, the only valid configs given in the {{Properties}} that 
will take effect are the ones that the Kafka API supports, i.e. in 
{{FlinkKafkaConsumer08}}, only the configs that the Kafka {{SimpleConsumer}} 
API support take effect; in {{FlinkKafkaConsumer09}}, only the configs that the 
new consumer API {{KafkaConsumer}} support will take effect. Any additional 
function or Flink-specific behaviour on top of the internal Kafka consumers 
should go through setter methods.

The problem to solve, in general, with the current configuration is that we are 
trying to "mimic" high-level consumer functions with original config keys. Take 
{{FlinkKafkaConsumer08}} for example: the {{SimpleConsumer}} API actually 
doesn't use the {{group.id}} or {{auto.offset.reset}} configs. We're 
re-implementing the behavior of these configs ourselves, and providing them 
through the original config keys in the {{Properties}}. When it comes to adding 
functionality on top of the internally used {{SimpleConsumer}}, we tend to 
stretch the original definition of these keys and try to have them work with 
our re-implementations of configs such as {{group.id}} and 
{{auto.offset.reset}}. An example of confusions that users might also get when 
we're re-implementing configs when the internal API doesn't actually use them 
is present in this user ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html.

This also reasons the idea you mentioned in FLINK-3398 that we should drop 
Kafka's {{group.id}} and perhaps have Flink's own groupId. Since Kafka's 
{{group.id}} was never actually used by the internal {{SimpleConsumer}} of 
{{FlinkKafkaConsumer08}} in the first place, we should have setter methods for 
functions like "start with offset" or "offset committing", which the user 
should supply with a groupId. For {{FlinkKafkaConsumer09}}, we won't need a 
setter method for "periodic offset committing" because the internal 
{{KafkaConsumer}} supports the function through {{group.id}} and 
{{enable.auto.commit}}; instead, we have a setter method to opt to switch to 
"commit offsets on checkpoint".

Summarize in code:

{code}
// for FlinkKafkaConsumer08
Properties props = new Properties();
...
FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setStartFromExternalOffsets("groupId")
kafka.setEnableCommitOffsets("groupId"); // periodic if checkpointing is not 
enabled, otherwise on notifyCheckpointComplete()
kafka.setForwardMetrics(boolean);
...
{code}

{code}
// for FlinkKafkaConsumer09
Properties props = new Properties();
...
FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setStartFromExternalOffsets(); // doesn't take a "group.id", because in 
FlinkKafkaConsumer09, "group.id" is a reckognized config by the new 
KafkaConsumer API
kafka.setCommitOffsetsOnCheckpoint(boolean); // if true (checkpointing should 
be enabled), overrides periodic checkpointing if "enable.auto.commit" is set in 
props
kafka.setForwardMetrics(boolean);
...
{code}


So, the general rule is:

- Supplied configuration is used only to configure the internally used client 
APIs of the external system.
- All Flink-specific configuration, or functions that the internal API do not 
support, go through connector-specific setter methods.

This might be a general rule we would like all Flink supported connectors to 
follow, in the long run? Users will have clear understanding and full control 
of the behaviours of the internal API that the connectors are using, and we'd 
also have a clear line on how new functionality should be added upon them.

> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4280
>                 URL: https://issues.apache.org/jira/browse/FLINK-4280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to