Github user tzulitai commented on a diff in the pull request:
    --- Diff: 
    @@ -204,7 +204,22 @@ public void run() {
                        // seek the consumer to the initial offsets
                        for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions()) {
                                if (partition.isOffsetDefined()) {
    +                         "Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
    +                                           "to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
                               , partition.getOffset() + 1);
    +                           } else {
    +                                   // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
    +                                   // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
    +                                   // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
    +                                   long fetchedOffset = 
    --- End diff --
    I just confirmed this with a simple test.
    Yes, when no commit offset is available for the group id, the 
"auto.offset.reset" behavior is used.
    The "position" basically means which offset the KafkaConsumer will start 
reading from. It automatically determines the position once it is assigned 
partitions, either from committed offsets or the "auto.offset.reset" if none 

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

Reply via email to