GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3746

    [FLINK-4022] [kafka] Partition and topic discovery for FlinkKafkaConsumer

    This PR adds the required internals to allow partition and topic regex 
pattern discovery in the `FlinkKafkaConsumer`.
    
    It doesn't expose a new constructor that accepts regex topic patterns yet. 
I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 
(deprecate the original FlinkKafkaConsumer constructors in favor of new ones 
with new offset behaviours). For this reason, I also propose to update the 
Kafka documentation when the new constructors are added.
    
    ## Design
    
    Some description to ease review:
    
    - `AbstractPartitionDiscoverer`:
    An `AbstractPartitionDiscoverer` is a stateful utility instance that 
remembers what partitions are discovered already. It also wraps the logic for 
partition-to-subtask assignment. The main `run()` method now has a discovery 
loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed 
interval. This method returns only new partitions that should be subscribed by 
the subtask.
    The returned partitions are used to invoke 
`AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.
    On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is 
also used to fetch the initial seed startup partitions in `open()`.
    
    - `AbstractFetcher#addDiscoveredPartitions(...)`
    The fetcher now has an `unassignedPartitionsQueue` that contains discovered 
partitions not yet assigned to Kafka consumers to be read. Whenever 
`addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will 
create the state holders for the partitions, and add the partitions to the 
queue.
    Concrete implementations of the fetcher should continuously poll this queue 
in the fetch loop. If partitions are found from the queue, they should be 
assigned for consuming.
    
    - Concrete fetchers continuously polls the queue in `runFetchLoop()`
    For 0.8, this simply means that the original `unassignedPartitionsQueue` in 
Kafka08Fetcher is moved to the base abstract fetcher class. Nothing else is 
touched.
    For 0.9+, queue polling and partition reassignment for the high-level 
consumer happens in `KafkaConsumerThread`.
    
    ## Limitations
    
    For the partition discovery to work properly after restores, the previous 
state had to be migrated to use union list states instead 
(`OperatorStateStore#getUnionListState()`).
    
    One limitation that comes with this migration is that if the current run 
was restored from old state (Flink 1.1 / 1.2), then partition discovery is 
disabled. In order to use partition discovery, the user must do a manual 
snapshot (with the new union state) & restore.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4022

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3746
    
----
commit 67c3b872cc0129b2f87b565e9fb92908926eac64
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-04-20T09:17:43Z

    [FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer

commit bf2dd78e7706b9aae08b75ff917878e3f2ceb68b
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-04-20T15:08:39Z

    [FLINK-4022] Migrate to union list state

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to