This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 7024d1f6a99ec5589ae589deb1b36ef91d37d398 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Mon Nov 21 20:43:01 2022 +0800 document the parallelism setting part --- .../flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java index 58b09c9..d11f899 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java @@ -342,6 +342,8 @@ public class FlinkKafkaShuffle { "Missing partition number for Kafka Shuffle"); int numberOfPartitions = PropertiesUtil.getInt(kafkaProperties, PARTITION_NUMBER, Integer.MIN_VALUE); + // Set the parallelism / max parallelism of the keyed stream in consumer side as the number + // of kafka partitions DataStream<T> outputDataStream = env.addSource(kafkaConsumer) .setParallelism(numberOfPartitions)