Zakelly Lan created FLINK-29437:
-----------------------------------
Summary: The partition of data before and after the Kafka Shuffle
are not aligned
Key: FLINK-29437
URL: https://issues.apache.org/jira/browse/FLINK-29437
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.15.2
Reporter: Zakelly Lan
Fix For: 1.16.0
Attachments: image-2022-09-28-14-32-28-116.png,
image-2022-09-28-14-35-47-954.png
I notice that the key group range in consumer side of Kafka Shuffle is not
aligned with the producer side, there are two problems:
# The data partitioning of the sink(producer) is exactly the same way as a
keyed stream that as the same maximum parallelism as the number of kafka
partitions does, but in consumer side the number of partitions and key groups
are not the same.
# There is a distribution of assigning kafka partitions to consumer subtasks
(See KafkaTopicPartitionAssigner#assign), but the producer of Kafka Shuffle
simply assume the partition index equals the subtask index. e.g.
!image-2022-09-28-14-32-28-116.png|width=1133,height=274!
My proposed change:
# Set the max parallelism of the key stream in consumer size as the number of
kafka partitions.
# Use the same method when assigning kafka partitions to consumer subtasks to
maintain a map from subtasks to kafka partitions, which is used by the producer
to insert into the right partition for data for a subtask. i.e.
!image-2022-09-28-14-35-47-954.png|width=1030,height=283!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)