Hi,
Please let me know if this is not the correct forum to ask this. But I have a
doubt, I was hoping someone can clear it for me.
In TaskManager:: updateNewAndRestoringTasks(), the function
assignStandbyPartitions() gets called for all the running standby tasks where
it populates the Map: checkpointedOffsets from the
standbyTask.checkpointedOffsets() which is only updated at the time of
initialization of a StandbyTask(i.e. in it's constructor). I have checked and
this goes way to 1.1 version when the rebalance protocol was old and standby
tasks were suspended during rebalance and then resumed on assignment.
I want to know, why post resumption we were/are reading
standbyTask.checkpointedOffsets() to know the offset from where the standby
task should start running and not from stateMgr.checkpointed() which gets
updated on every commit to the checkpoint file. In the former case it's always
reading from the same offset, even those which it had already read earlier and
in cases where changelog topic has a retention time, it gives offsetOutOfRange
exception.
Regards,
Navinder