[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400472#comment-15400472 ]
Tzu-Li (Gordon) Tai commented on FLINK-4280: -------------------------------------------- [~StephanEwen] On second thought, I think I misunderstood what you meant in the first place. What you're proposing is this (I think this is a clearer design than what I mentioned above): {code} Properties props = new Properties(); ... FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically. kafka.setForwardMetrics(boolean); ... env.addSource(kafka) ... {code} Echoing your statement if we are going with this approach: now that we are trying to separate Flink-specific configs with Kafka configs, I think we should clearly state (and change implementation) that the {{Properties}} provided in the constructor will only be used to configure the internal Kafka consumers the connector is using by simply passing the {{Properties}}. So, the only valid configs given in the {{Properties}} that will take effect are the ones that the Kafka API supports, i.e. in {{FlinkKafkaConsumer08}}, only the configs that the Kafka {{SimpleConsumer}} API support take effect; in {{FlinkKafkaConsumer09}}, only the configs that the new consumer API {{KafkaConsumer}} support will take effect. Any additional function or Flink-specific behaviour on top of the internal Kafka consumers should go through setter methods. The problem to solve, in general, with the current configuration is that we are trying to "mimic" high-level consumer functions with original config keys. Take {{FlinkKafkaConsumer08}} for example: the {{SimpleConsumer}} API actually doesn't use the {{group.id}} or {{auto.offset.reset}} configs. We're re-implementing the behavior of these configs ourselves, and providing them through the original config keys in the {{Properties}}. When it comes to adding functionality on top of the internally used {{SimpleConsumer}}, we tend to stretch the original definition of these keys and try to have them work with our re-implementations of configs such as {{group.id}} and {{auto.offset.reset}}. An example of confusions that users might also get when we're re-implementing configs when the internal API doesn't actually use them is present in this user ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html. This also reasons the idea you mentioned in FLINK-3398 that we should drop Kafka's {{group.id}} and perhaps have Flink's own groupId. Since Kafka's {{group.id}} was never actually used by the internal {{SimpleConsumer}} of {{FlinkKafkaConsumer08}} in the first place, we should have setter methods for functions like "start with offset" or "offset committing", which the user should supply with a groupId. For {{FlinkKafkaConsumer09}}, we won't need a setter method for "periodic offset committing" because the internal {{KafkaConsumer}} supports the function through {{group.id}} and {{enable.auto.commit}}; instead, we have a setter method to opt to switch to "commit offsets on checkpoint". Summarize in code: {code} // for FlinkKafkaConsumer08 Properties props = new Properties(); ... FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08("topic", schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setStartFromExternalOffsets("groupId") kafka.setEnableCommitOffsets("groupId"); // periodic if checkpointing is not enabled, otherwise on notifyCheckpointComplete() kafka.setForwardMetrics(boolean); ... {code} {code} // for FlinkKafkaConsumer09 Properties props = new Properties(); ... FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setStartFromExternalOffsets(); // doesn't take a "group.id", because in FlinkKafkaConsumer09, "group.id" is a reckognized config by the new KafkaConsumer API kafka.setCommitOffsetsOnCheckpoint(boolean); // if true (checkpointing should be enabled), overrides periodic checkpointing if "enable.auto.commit" is set in props kafka.setForwardMetrics(boolean); ... {code} So, the general rule is: - Supplied configuration is used only to configure the internally used client APIs of the external system. - All Flink-specific configuration, or functions that the internal API do not support, go through connector-specific setter methods. This might be a general rule we would like all Flink supported connectors to follow, in the long run? Users will have clear understanding and full control of the behaviours of the internal API that the connectors are using, and we'd also have a clear line on how new functionality should be added upon them. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > ----------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)