becketqin commented on a change in pull request #9912: 
[FLINK-14370][kafka][test-stability] Fix the cascading failure in 
kaProducerTestBase.
URL: https://github.com/apache/flink/pull/9912#discussion_r339295768
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 ##########
 @@ -218,6 +218,8 @@ protected void assertAtLeastOnceForTopic(
                while (System.currentTimeMillis() < startMillis + 
timeoutMillis) {
                        properties.put("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
                        properties.put("value.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
+                       properties.put("session.timeout.ms", "2000");
+                       properties.put("heartbeat.interval.ms", "500");
 
 Review comment:
   These configurations are only used when we are using Kafka based group 
management. Given that Flink does not use that in the UT when consuming from 
Kafka. These configs just need to be set so the configurations are "valid" to 
KafkaConsumer. And even if the consumers do hit timeout in UT, in the worst 
case a consumer will just rejoin the group and consume again. So it probably 
won't really hurt the test stability.
   
   The reason we set these two configurations was that we reduced the request 
timeout to 3 seconds. And the `session.timeout.ms` and `heartbeat.interval.ms` 
cannot be longer than that in some old KafkaConsumer versions. I'll add a 
comment to explain that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to