[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976900#comment-15976900
 ] 

ASF GitHub Bot commented on FLINK-4022:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3746

    [FLINK-4022] [kafka] Partition and topic discovery for FlinkKafkaConsumer

    This PR adds the required internals to allow partition and topic regex 
pattern discovery in the `FlinkKafkaConsumer`.
    
    It doesn't expose a new constructor that accepts regex topic patterns yet. 
I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 
(deprecate the original FlinkKafkaConsumer constructors in favor of new ones 
with new offset behaviours). For this reason, I also propose to update the 
Kafka documentation when the new constructors are added.
    
    ## Design
    
    Some description to ease review:
    
    - `AbstractPartitionDiscoverer`:
    An `AbstractPartitionDiscoverer` is a stateful utility instance that 
remembers what partitions are discovered already. It also wraps the logic for 
partition-to-subtask assignment. The main `run()` method now has a discovery 
loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed 
interval. This method returns only new partitions that should be subscribed by 
the subtask.
    The returned partitions are used to invoke 
`AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.
    On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is 
also used to fetch the initial seed startup partitions in `open()`.
    
    - `AbstractFetcher#addDiscoveredPartitions(...)`
    The fetcher now has an `unassignedPartitionsQueue` that contains discovered 
partitions not yet assigned to Kafka consumers to be read. Whenever 
`addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will 
create the state holders for the partitions, and add the partitions to the 
queue.
    Concrete implementations of the fetcher should continuously poll this queue 
in the fetch loop. If partitions are found from the queue, they should be 
assigned for consuming.
    
    - Concrete fetchers continuously polls the queue in `runFetchLoop()`
    For 0.8, this simply means that the original `unassignedPartitionsQueue` in 
Kafka08Fetcher is moved to the base abstract fetcher class. Nothing else is 
touched.
    For 0.9+, queue polling and partition reassignment for the high-level 
consumer happens in `KafkaConsumerThread`.
    
    ## Limitations
    
    For the partition discovery to work properly after restores, the previous 
state had to be migrated to use union list states instead 
(`OperatorStateStore#getUnionListState()`).
    
    One limitation that comes with this migration is that if the current run 
was restored from old state (Flink 1.1 / 1.2), then partition discovery is 
disabled. In order to use partition discovery, the user must do a manual 
snapshot (with the new union state) & restore.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4022

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3746
    
----
commit 67c3b872cc0129b2f87b565e9fb92908926eac64
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-04-20T09:17:43Z

    [FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer

commit bf2dd78e7706b9aae08b75ff917878e3f2ceb68b
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-04-20T15:08:39Z

    [FLINK-4022] Migrate to union list state

----


> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
>                 Key: FLINK-4022
>                 URL: https://issues.apache.org/jira/browse/FLINK-4022
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector, Streaming Connectors
>    Affects Versions: 1.0.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to