GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3476

    [FLINK-4022] [kafka] Partition and topic pattern discovery for 
FlinkKafkaConsumer

    This PR adds partition and topic regex pattern discovery to the 
`FlinkKafkaConsumer`.
    
    It doesn't expose a new constructor that accepts regex topic patterns yet. 
I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 
(deprecate the original `FlinkKafkaConsumer` constructors in favor of new ones 
with new offset behaviours). For this reason, I also propose to update the 
Kafka documentation when the new constructors are added.
    
    ### Design
    
    Some description to ease review:
    
    - `AbstractPartitionDiscoverer`:
    A `AbstractPartitionDiscoverer` is a stateful utility instance that 
remembers what partitions are discovered already. It also wraps the logic for 
partition-to-subtask assignment. The main `run()` method now has a discovery 
loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed 
interval. This method returns only new partitions that should be subscribed by 
the subtask.
    The returned partitions are used to invoke 
`AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.
    
    On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is 
also used to fetch the initial seed startup partitions in `open()`.
    
    - `AbstractFetcher#addDiscoveredPartitions(...)`
    
    The fetcher now has a `unassignedPartitionsQueue` that contains discovered 
partitions not yet consumed by concrete Kafka clients. Whenever 
`addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will 
create the state holders for the partitions, and add the partitions to the 
queue.
    
    Concrete implementations of the fetcher should continuously poll this queue 
in the fetch loop. If partitions are found from the queue, they should be 
assigned for consuming.
    
    - Concrete fetchers continuously polls the queue in `runFetchLoop()`
    
    For 0.8, this simply means that the original `unassignedPartitionsQueue` in 
`Kafka08Fetcher` is moved to the base abstract fetcher class. Nothing else is 
touched.
    
    For 0.9+, queue polling and partition reassignment for the high-level 
consumer happens in `KafkaConsumerThread`.
    
    ### TODOs
    
    This PR serves as a preview for the new functionality. Below are some 
pending TODOs
    
    - Currently, partition discovery will not work correctly after restore. The 
reason for this is explained with `TODO` comments within the 
`FlinkKafkaConsumerBase#open()` method. For this to work correctly, @rmetzger 
and I are considering 2 options: 1) use broadcast state, or 2) assign 
partitions using `maxParallelism` and `assignedKeyGroupIds` instead of subtask 
index / number of subtasks.
    
    - The PR still lacks exactly-once integration tests with Kafka 
repartitioning / dynamic topics.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4022

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3476
    
----
commit 469a3c964e13ff559afd1d46933113bcff48eafc
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-03-06T09:20:32Z

    [FLINK-4022] [kafka] Partition and topic pattern discovery for 
FlinkKafkaConsumer

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to