lindong28 commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r600214562



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
##########
@@ -60,6 +60,12 @@
             }
         }
         if (!toLookup.isEmpty()) {
+            // First check the committed offsets.

Review comment:
       If the committed offsets do not exist, regardless of whether group ID is 
null, we will either use the specified offset (if it is specified) or fallback 
to use the offsetResetStrategy. We will throw an exception only if we want to 
use the `offsetResetStrategy` and `offsetResetStrategy == NONE`.
   
   According to the code 
[here](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L646
   ), the existing `FlinkKafkaConsumer` will use the committed offset if there 
is no user-specified offset.
   
   Because the existing `FlinkKafkaConsumer` goes through the steps [(1) 
specified (2) committed (3) fallback], I think it is desirable to stick to this 
behavior unless we have good use-case for the other behavior (e.g. [(1) 
specified (2) fallback]),
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to