GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/3746
[FLINK-4022] [kafka] Partition and topic discovery for FlinkKafkaConsumer
This PR adds the required internals to allow partition and topic regex
pattern discovery in the `FlinkKafkaConsumer`.
It doesn't expose a new constructor that accepts regex topic patterns yet.
I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704
(deprecate the original FlinkKafkaConsumer constructors in favor of new ones
with new offset behaviours). For this reason, I also propose to update the
Kafka documentation when the new constructors are added.
## Design
Some description to ease review:
- `AbstractPartitionDiscoverer`:
An `AbstractPartitionDiscoverer` is a stateful utility instance that
remembers what partitions are discovered already. It also wraps the logic for
partition-to-subtask assignment. The main `run()` method now has a discovery
loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed
interval. This method returns only new partitions that should be subscribed by
the subtask.
The returned partitions are used to invoke
`AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.
On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is
also used to fetch the initial seed startup partitions in `open()`.
- `AbstractFetcher#addDiscoveredPartitions(...)`
The fetcher now has an `unassignedPartitionsQueue` that contains discovered
partitions not yet assigned to Kafka consumers to be read. Whenever
`addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will
create the state holders for the partitions, and add the partitions to the
queue.
Concrete implementations of the fetcher should continuously poll this queue
in the fetch loop. If partitions are found from the queue, they should be
assigned for consuming.
- Concrete fetchers continuously polls the queue in `runFetchLoop()`
For 0.8, this simply means that the original `unassignedPartitionsQueue` in
Kafka08Fetcher is moved to the base abstract fetcher class. Nothing else is
touched.
For 0.9+, queue polling and partition reassignment for the high-level
consumer happens in `KafkaConsumerThread`.
## Limitations
For the partition discovery to work properly after restores, the previous
state had to be migrated to use union list states instead
(`OperatorStateStore#getUnionListState()`).
One limitation that comes with this migration is that if the current run
was restored from old state (Flink 1.1 / 1.2), then partition discovery is
disabled. In order to use partition discovery, the user must do a manual
snapshot (with the new union state) & restore.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-4022
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3746.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3746
----
commit 67c3b872cc0129b2f87b565e9fb92908926eac64
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-04-20T09:17:43Z
[FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer
commit bf2dd78e7706b9aae08b75ff917878e3f2ceb68b
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-04-20T15:08:39Z
[FLINK-4022] Migrate to union list state
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---