lindong28 commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r600214562
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
##########
@@ -60,6 +60,12 @@
}
}
if (!toLookup.isEmpty()) {
+ // First check the committed offsets.
Review comment:
If the committed offsets do not exist, regardless of whether group ID is
null, we will either use the specified offset (if it is specified) or fallback
to use the offsetResetStrategy. We will throw an exception only if we want to
use the `offsetResetStrategy` and `offsetResetStrategy == NONE`.
According to the code
[here](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L646
), the existing `FlinkKafkaConsumer` will use the committed offset if there
is no user-specified offset.
Because the existing `FlinkKafkaConsumer` goes through the steps [(1)
specified (2) committed (3) fallback], I think it is desirable to stick to this
behavior unless we have good use-case for the other behavior (e.g. [(1)
specified (2) fallback]),
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]