tashoyan commented on a change in pull request #17765:
URL: https://github.com/apache/flink/pull/17765#discussion_r759187903



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -438,9 +457,13 @@ private void parseAndSetRequiredProperties() {
             
maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", 
false);
         }
         maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", 
false);
+
         maybeOverride(
                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-                
startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
+                startingOffsetsInitializer

Review comment:
       Could be NPE? You removed the initialization for 
`startingOffsetsInitializer` from the constructor.
   
   UPDATE: I see, that `sanityCheck()` is called before 
`parseAndSetRequiredProperties()` - and this allows us to escape from 
NullPointerException. However, this approach seems very fragile to me. I 
suggest to set `startingOffsetsInitializer` in 
`parseAndSetRequiredProperties()`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -355,6 +352,24 @@ public void run() {
                 };
         runner.start();
 
+        Thread.sleep(2 * 1000L);
+        Properties producerProperties =
+                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+        producerProperties.setProperty("retries", "0");
+        producerProperties.setProperty(
+                "key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProperties.setProperty(
+                "value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProperties.putAll(secureProps);
+        KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(producerProperties);
+
+        for (int i = 0; i < parallelism; i++) {
+            for (int l = 0; l < recordsInEachPartition; l++) {
+                kafkaProducer.send(new ProducerRecord<>(topicName, i, "", 
String.valueOf(l)));
+            }
+        }
+        kafkaProducer.close();
+

Review comment:
       What is the purpose of this change? How is it related to the changes in 
KafkaSourceBuilder and KafkaSinkBuilder?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -497,6 +520,29 @@ private void sanityCheck() {
                         "Property %s is required when offset commit is 
enabled",
                         ConsumerConfig.GROUP_ID_CONFIG));
         // Check offsets initializers
+        if (startingOffsetsInitializer == null) {
+            if (props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) != null) {
+                startingOffsetsInitializer =
+                        OffsetsInitializer.committedOffsets(

Review comment:
       I believe, the function `sanityCheck()` is for checking the validity of 
the settings, but not to change them. Shouldn't we instead update the settings 
in `parseAndSetRequiredProperties()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to