Xin Gao created FLINK-39012:
-------------------------------

             Summary: Support global enumerator/dispatcher for dynamic Kafka 
source
                 Key: FLINK-39012
                 URL: https://issues.apache.org/jira/browse/FLINK-39012
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
            Reporter: Xin Gao


The current dynamic Kafka source is great for runtime partition and cluster 
change for stream ingestions. But the split scheduling is still falling back to 
the per cluster enumerator and *cannot* handle:
 * Global load balancing of split - slot assignment across multiple clusters.

 * Further parallelism increase across cluster.

Below is a detailed walk through:
 # For the same topic, it could exist on multiple Kafka clusters, and each may 
have {*}different number of partitions{*}.

 # Even with dynamic Kafka source, each cluster would still have their *own 
cluster enumerator.*

 # Each cluster enumerator would {*}assign the split independently{*}, 
resulting in a round robin assignment per cluster.

 # Therefore, it would still end up with some slot taking more partitions but 
rest taking less.

For example, one topic exists in 11 different clusters, 3 with 64 partitions, 8 
with 8 partitions.
 * The max number of slots we could utilize is 64. We cannot have 64 * 3 + 8 * 
8 slots to achieve full parallelism. Current split assignment is only 
determined by hash of topic name. {*}So same topic from different clusters 
would end up with same slot{*}.

 * Even with 64 slots, the l{*}oad may not be balanced{*}. Cluster with 64 
partitions could assign 1 split per slot, but clusters with 8 partitions may 
not achieve global balance. (e.g. 8 * 8 = 64 partitions may not assign each 
slot 1 split).

As a result, a global enumerator or split dispatcher could be useful to achieve 
global parallelism and load balancing. Nowadays, dynamic Kafka source could do 
it well for multiple topics in one cluster. We'd need to extend it to multiple 
clusters as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to