hililiwei commented on a change in pull request #17765:
URL: https://github.com/apache/flink/pull/17765#discussion_r760224509



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -355,6 +352,24 @@ public void run() {
                 };
         runner.start();
 
+        Thread.sleep(2 * 1000L);
+        Properties producerProperties =
+                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+        producerProperties.setProperty("retries", "0");
+        producerProperties.setProperty(
+                "key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProperties.setProperty(
+                "value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProperties.putAll(secureProps);
+        KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(producerProperties);
+
+        for (int i = 0; i < parallelism; i++) {
+            for (int l = 0; l < recordsInEachPartition; l++) {
+                kafkaProducer.send(new ProducerRecord<>(topicName, i, "", 
String.valueOf(l)));
+            }
+        }
+        kafkaProducer.close();
+

Review comment:
       
https://github.com/apache/flink/blob/ca72f9e9bd95ad8a5368089d7bc0367256339b8b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java#L310-L312
   
   After modifying the code, the test case failed to run. I checked its code 
and found that the test case didn't throw any exceptions before modifying the 
code, but it didn't seem to work as expected. It uses properties to set the 
value of auto.offset.reset to latest, and this value will be overwritten as 
early, which leads to the fact that the data obtained by the consumer has 
actually arrived in advance. Therefore, it is inconsistent with the above 
description. 
   
   In order to make this test case achieve the expected effect, I start sending 
data to topic after the consumer starts, and test whether the consumer can 
obtain the data in the expected way.
   
   If I am wrong, please correct. Thank you.
   
   
https://github.com/apache/flink/blob/ca72f9e9bd95ad8a5368089d7bc0367256339b8b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java#L324-L337




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to