Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168377228
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
    @@ -441,28 +481,57 @@ public void open(Configuration configuration) throws 
Exception {
                                getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
                } else {
                        // use the partition discoverer to fetch the initial 
seed partitions,
    -                   // and set their initial offsets depending on the 
startup mode
    -                   for (KafkaTopicPartition seedPartition : allPartitions) 
{
    -                           if (startupMode != 
StartupMode.SPECIFIC_OFFSETS) {
    -                                   
subscribedPartitionsToStartOffsets.put(seedPartition, 
startupMode.getStateSentinel());
    -                           } else {
    +                   // and set their initial offsets depending on the 
startup mode.
    +                   // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the 
specific offsets now;
    +                   // for other modes (EARLIEST, LATEST, and 
GROUP_OFFSETS), the offset is lazily determined
    +                   // when the partition is actually read.
    +                   switch (startupMode) {
    +                           case SPECIFIC_OFFSETS:
                                        if (specificStartupOffsets == null) {
                                                throw new 
IllegalArgumentException(
                                                        "Startup mode for the 
consumer set to " + StartupMode.SPECIFIC_OFFSETS +
    -                                                           ", but no 
specific offsets were specified");
    +                                                           ", but no 
specific offsets were specified.");
                                        }
     
    -                                   Long specificOffset = 
specificStartupOffsets.get(seedPartition);
    -                                   if (specificOffset != null) {
    -                                           // since the specified offsets 
represent the next record to read, we subtract
    -                                           // it by one so that the 
initial state of the consumer will be correct
    -                                           
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
    -                                   } else {
    -                                           // default to group offset 
behaviour if the user-provided specific offsets
    -                                           // do not contain a value for 
this partition
    -                                           
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +                                   for (KafkaTopicPartition seedPartition 
: allPartitions) {
    +                                           Long specificOffset = 
specificStartupOffsets.get(seedPartition);
    +                                           if (specificOffset != null) {
    +                                                   // since the specified 
offsets represent the next record to read, we subtract
    +                                                   // it by one so that 
the initial state of the consumer will be correct
    +                                                   
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
    +                                           } else {
    +                                                   // default to group 
offset behaviour if the user-provided specific offsets
    +                                                   // do not contain a 
value for this partition
    +                                                   
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +                                           }
    +                                   }
    +
    +                                   break;
    +                           case TIMESTAMP:
    +                                   if (startupOffsetsTimestamp == null) {
    +                                           throw new 
IllegalArgumentException(
    --- End diff --
    
    That makes sense, will change (including usage in existing code)


---

Reply via email to