becketqin commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r600633809



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -442,10 +442,8 @@ private void parseAndSetRequiredProperties() {
                 true);
 
         // If the source is bounded, do not run periodic partition discovery.
-        if (maybeOverride(
-                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
-                "-1",
-                boundedness == Boundedness.BOUNDED)) {
+        if (boundedness == Boundedness.BOUNDED &&

Review comment:
       I think it is fine either way if there are customers expecting the 
partitions to not change. I turned the partition discovery on by default 
because there are also customers got surprised that the newly added partitions 
are not picked up automatically. 
   
   One thing that might be worth clarifying is that keeping reading from a 
static partition set cannot guarantee the message orders in case of Kafka 
partition expansion.
   
   For example, assuming there are 2 partition in the beginning. And a key of 
hash `H` goes to partition `H%2`. A key whose hash value is 3 may move from 
partition 1 to partition 0 after a partition expansion from 2 partitions to 3 
partitions. So even if the Flink job keeps consuming from partition 0 and 1 
without picking up the new partitions, the message order is broken.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to