Xin Gao created FLINK-39012:
-------------------------------
Summary: Support global enumerator/dispatcher for dynamic Kafka
source
Key: FLINK-39012
URL: https://issues.apache.org/jira/browse/FLINK-39012
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Reporter: Xin Gao
The current dynamic Kafka source is great for runtime partition and cluster
change for stream ingestions. But the split scheduling is still falling back to
the per cluster enumerator and *cannot* handle:
* Global load balancing of split - slot assignment across multiple clusters.
* Further parallelism increase across cluster.
Below is a detailed walk through:
# For the same topic, it could exist on multiple Kafka clusters, and each may
have {*}different number of partitions{*}.
# Even with dynamic Kafka source, each cluster would still have their *own
cluster enumerator.*
# Each cluster enumerator would {*}assign the split independently{*},
resulting in a round robin assignment per cluster.
# Therefore, it would still end up with some slot taking more partitions but
rest taking less.
For example, one topic exists in 11 different clusters, 3 with 64 partitions, 8
with 8 partitions.
* The max number of slots we could utilize is 64. We cannot have 64 * 3 + 8 *
8 slots to achieve full parallelism. Current split assignment is only
determined by hash of topic name. {*}So same topic from different clusters
would end up with same slot{*}.
* Even with 64 slots, the l{*}oad may not be balanced{*}. Cluster with 64
partitions could assign 1 split per slot, but clusters with 8 partitions may
not achieve global balance. (e.g. 8 * 8 = 64 partitions may not assign each
slot 1 split).
As a result, a global enumerator or split dispatcher could be useful to achieve
global parallelism and load balancing. Nowadays, dynamic Kafka source could do
it well for multiple topics in one cluster. We'd need to extend it to multiple
clusters as well.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)