becketqin commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r600633809
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -442,10 +442,8 @@ private void parseAndSetRequiredProperties() {
true);
// If the source is bounded, do not run periodic partition discovery.
- if (maybeOverride(
- KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
- "-1",
- boundedness == Boundedness.BOUNDED)) {
+ if (boundedness == Boundedness.BOUNDED &&
Review comment:
I think it is fine either way if there are customers expecting the
partitions to not change. I turned the partition discovery on by default
because there are also customers got surprised that the newly added partitions
are not picked up automatically.
One thing that might be worth clarifying is that keeping reading from a
static partition set cannot guarantee the message orders in case of Kafka
partition expansion.
For example, assuming there are 2 partition in the beginning. And a key of
hash `H` goes to partition `H%2`. A key whose hash value is 3 may move from
partition 1 to partition 0 after a partition expansion from 2 partitions to 3
partitions. So even if the Flink job keeps consuming from partition 0 and 1
without picking up the new partitions, the message order is broken.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]