Hi,
Please let me know if this is not the correct forum to ask this. But I have a 
doubt, I was hoping someone can clear it for me.
In TaskManager:: updateNewAndRestoringTasks(), the function 
assignStandbyPartitions() gets called for all the running standby tasks where 
it populates the Map: checkpointedOffsets from the 
standbyTask.checkpointedOffsets() which is only updated at the time of 
initialization of a StandbyTask(i.e. in it's constructor). I have checked and 
this goes way to 1.1 version when the rebalance protocol was old and standby 
tasks were suspended during rebalance and then resumed on assignment.
I want to know, why post resumption we were/are reading 
standbyTask.checkpointedOffsets() to know the offset from where the standby 
task should start running and not from stateMgr.checkpointed() which gets 
updated on every commit to the checkpoint file. In the former case it's always 
reading from the same offset, even those which it had already read earlier and 
in cases where changelog topic has a retention time, it gives offsetOutOfRange 
exception.
Regards,
Navinder

Reply via email to