hililiwei commented on a change in pull request #17765:
URL: https://github.com/apache/flink/pull/17765#discussion_r760224509
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -355,6 +352,24 @@ public void run() {
};
runner.start();
+ Thread.sleep(2 * 1000L);
+ Properties producerProperties =
+
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+ producerProperties.setProperty("retries", "0");
+ producerProperties.setProperty(
+ "key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ producerProperties.setProperty(
+ "value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ producerProperties.putAll(secureProps);
+ KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(producerProperties);
+
+ for (int i = 0; i < parallelism; i++) {
+ for (int l = 0; l < recordsInEachPartition; l++) {
+ kafkaProducer.send(new ProducerRecord<>(topicName, i, "",
String.valueOf(l)));
+ }
+ }
+ kafkaProducer.close();
+
Review comment:
https://github.com/apache/flink/blob/ca72f9e9bd95ad8a5368089d7bc0367256339b8b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java#L310-L312
After modifying the code, the test case failed to run. I checked its code
and found that the test case didn't throw any exceptions before modifying the
code, but it didn't seem to work as expected. It uses properties to set the
value of auto.offset.reset to latest, and this value will be overwritten as
early, which leads to the fact that the data obtained by the consumer has
actually arrived in advance. Therefore, it is inconsistent with the above
description.
In order to make this test case achieve the expected effect, I start sending
data to topic after the consumer starts, and test whether the consumer can
obtain the data in the expected way.
If I am wrong, please correct. Thank you.
https://github.com/apache/flink/blob/ca72f9e9bd95ad8a5368089d7bc0367256339b8b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java#L324-L337
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]