[
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852543#comment-17852543
]
Kevin Lam commented on FLINK-31762:
-----------------------------------
Hi, I'm also experiencing this issue, added some more details in
[https://lists.apache.org/thread/96qct8n92pqbrsc8h8xq146o5cmjjhd7] before
finding this issue, your write-up describes it well.
> Subscribe to multiple Kafka topics may cause partition assignment skew
> ----------------------------------------------------------------------
>
> Key: FLINK-31762
> URL: https://issues.apache.org/jira/browse/FLINK-31762
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.13.0, 1.18.0
> Reporter: Liam
> Priority: Major
> Attachments: image-2023-04-11-08-00-16-054.png,
> image-2023-04-11-08-12-24-115.png
>
>
> To simplify the demonstration, let us assume that there are two topics, and
> each topic has four partitions. We have set the parallelism to eight to
> consume these two topics. However, the current partition assignment method
> may lead to some subtasks being assigned two partitions while others are left
> with none.
> !image-2023-04-11-08-00-16-054.png|width=500,height=143!
> In my case, the situation is even worse as I have ten topics, each with 100
> partitions. If I set the parallelism to 1000, some slots may be assigned
> seven partitions while others remain unassigned.
> To address this issue, I propose a new partition assignment solution. In this
> approach, round-robin assignment takes place between all topics, not just one.
> For example, the ideal assignment for the case mentioned above is presented
> below:
>
> !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!
> This new solution can also handle cases where each topic has more partitions.
> !image-2023-04-11-08-12-24-115.png|width=444,height=127!
> Let us work together to reach a consensus on this proposal. Thank you!
>
> FYI: how the partition be assigned currently
> {code:java}
> public class KafkaTopicPartitionAssigner {
> public static int assign(KafkaTopicPartition partition, int
> numParallelSubtasks) {
> return assign(partition.getTopic(), partition.getPartition(),
> numParallelSubtasks);
> } public static int assign(String topic, int partition, int
> numParallelSubtasks) {
> int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) %
> numParallelSubtasks; // here, the assumption is that the id of Kafka
> partitions are always ascending
> // starting from 0, and therefore can be used directly as the offset
> clockwise from the
> // start index
> return (startIndex + partition) % numParallelSubtasks;
> }
> {code}
> for Kafka Source, it's implemented in the KafkaSourceEnumerator as below
> {code:java}
> static int getSplitOwner(TopicPartition tp, int numReaders) {
> int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
> numReaders; // here, the assumption is that the id of Kafka partitions
> are always ascending
> // starting from 0, and therefore can be used directly as the offset
> clockwise from the
> // start index
> return (startIndex + tp.partition()) % numReaders;
> } {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)