[
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897191#comment-15897191
]
ASF GitHub Bot commented on FLINK-4022:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/3476
Seems like some Kafka tests are failing .. looking into it.
> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector, Streaming Connectors
> Affects Versions: 1.0.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job
> submission, the main big change required for this feature will be dynamic
> partition assignment to subtasks while the Kafka consumer is running. This
> will mainly be accomplished using Kafka 0.9.x API
> `KafkaConsumer#subscribe(java.util.regex.Pattern,
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be
> added to the same consumer group when instantiated, and rely on Kafka to
> dynamically reassign partitions to them whenever a rebalance happens. The
> registered `ConsumerRebalanceListener` is a callback that is called right
> before and after rebalancing happens. We'll use this callback to let each
> subtask commit its last offsets of partitions its currently responsible of to
> an external store (or Kafka) before a rebalance; after rebalance and the
> substasks gets the new partitions it'll be reading from, they'll read from
> the external store to get the last offsets for their new partitions
> (partitions which don't have offset entries in the store are new partitions
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot
> the offsets of partitions they are currently holding. Restoring will be a
> bit different in that subtasks might not be assigned matching partitions to
> the snapshot the subtask is restored with (since we're letting Kafka
> dynamically assign partitions). There will need to be a coordination process
> where, if a restore state exists, all subtasks first commit the offsets they
> receive (as a result of the restore state) to the external store, and then
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is
> available, then the restore will be simple again, as each subtask has full
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign
> static partitions, use subscribe() registered with the callback
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics),
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed
> list of topics. We can simply decide which subscribe() overload to use
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance.
> Instead, un-assigned subtasks should be running a fetcher instance too and
> take part as a process pool for the consumer group of the subscribed topics.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)