tashoyan commented on a change in pull request #17765:
URL: https://github.com/apache/flink/pull/17765#discussion_r759187903
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -438,9 +457,13 @@ private void parseAndSetRequiredProperties() {
maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false",
false);
}
maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
false);
+
maybeOverride(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-
startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
+ startingOffsetsInitializer
Review comment:
Could be NPE? You removed the initialization for
`startingOffsetsInitializer` from the constructor.
UPDATE: I see, that `sanityCheck()` is called before
`parseAndSetRequiredProperties()` - and this allows us to escape from
NullPointerException. However, this approach seems very fragile to me. I
suggest to set `startingOffsetsInitializer` in
`parseAndSetRequiredProperties()`.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -355,6 +352,24 @@ public void run() {
};
runner.start();
+ Thread.sleep(2 * 1000L);
+ Properties producerProperties =
+
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+ producerProperties.setProperty("retries", "0");
+ producerProperties.setProperty(
+ "key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ producerProperties.setProperty(
+ "value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ producerProperties.putAll(secureProps);
+ KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(producerProperties);
+
+ for (int i = 0; i < parallelism; i++) {
+ for (int l = 0; l < recordsInEachPartition; l++) {
+ kafkaProducer.send(new ProducerRecord<>(topicName, i, "",
String.valueOf(l)));
+ }
+ }
+ kafkaProducer.close();
+
Review comment:
What is the purpose of this change? How is it related to the changes in
KafkaSourceBuilder and KafkaSinkBuilder?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -497,6 +520,29 @@ private void sanityCheck() {
"Property %s is required when offset commit is
enabled",
ConsumerConfig.GROUP_ID_CONFIG));
// Check offsets initializers
+ if (startingOffsetsInitializer == null) {
+ if (props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) != null) {
+ startingOffsetsInitializer =
+ OffsetsInitializer.committedOffsets(
Review comment:
I believe, the function `sanityCheck()` is for checking the validity of
the settings, but not to change them. Shouldn't we instead update the settings
in `parseAndSetRequiredProperties()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]