GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/3476
[FLINK-4022] [kafka] Partition and topic pattern discovery for
FlinkKafkaConsumer
This PR adds partition and topic regex pattern discovery to the
`FlinkKafkaConsumer`.
It doesn't expose a new constructor that accepts regex topic patterns yet.
I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704
(deprecate the original `FlinkKafkaConsumer` constructors in favor of new ones
with new offset behaviours). For this reason, I also propose to update the
Kafka documentation when the new constructors are added.
### Design
Some description to ease review:
- `AbstractPartitionDiscoverer`:
A `AbstractPartitionDiscoverer` is a stateful utility instance that
remembers what partitions are discovered already. It also wraps the logic for
partition-to-subtask assignment. The main `run()` method now has a discovery
loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed
interval. This method returns only new partitions that should be subscribed by
the subtask.
The returned partitions are used to invoke
`AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.
On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is
also used to fetch the initial seed startup partitions in `open()`.
- `AbstractFetcher#addDiscoveredPartitions(...)`
The fetcher now has a `unassignedPartitionsQueue` that contains discovered
partitions not yet consumed by concrete Kafka clients. Whenever
`addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will
create the state holders for the partitions, and add the partitions to the
queue.
Concrete implementations of the fetcher should continuously poll this queue
in the fetch loop. If partitions are found from the queue, they should be
assigned for consuming.
- Concrete fetchers continuously polls the queue in `runFetchLoop()`
For 0.8, this simply means that the original `unassignedPartitionsQueue` in
`Kafka08Fetcher` is moved to the base abstract fetcher class. Nothing else is
touched.
For 0.9+, queue polling and partition reassignment for the high-level
consumer happens in `KafkaConsumerThread`.
### TODOs
This PR serves as a preview for the new functionality. Below are some
pending TODOs
- Currently, partition discovery will not work correctly after restore. The
reason for this is explained with `TODO` comments within the
`FlinkKafkaConsumerBase#open()` method. For this to work correctly, @rmetzger
and I are considering 2 options: 1) use broadcast state, or 2) assign
partitions using `maxParallelism` and `assignedKeyGroupIds` instead of subtask
index / number of subtasks.
- The PR still lacks exactly-once integration tests with Kafka
repartitioning / dynamic topics.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-4022
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3476.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3476
----
commit 469a3c964e13ff559afd1d46933113bcff48eafc
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-03-06T09:20:32Z
[FLINK-4022] [kafka] Partition and topic pattern discovery for
FlinkKafkaConsumer
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---