[
https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
John Roesler resolved KAFKA-9169.
---------------------------------
Resolution: Fixed
This was fixed in https://github.com/apache/kafka/pull/7681/ and merged to
trunk (currently 2.5.0-SNAPSHOT)
> Standby Tasks point ask for incorrect offsets on resuming post suspension
> -------------------------------------------------------------------------
>
> Key: KAFKA-9169
> URL: https://issues.apache.org/jira/browse/KAFKA-9169
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
> Reporter: Navinder Brar
> Assignee: John Roesler
> Priority: Critical
> Fix For: 2.5.0
>
>
> In versions(check 2.0) where standby tasks are suspended on each rebalance
> the checkpoint file is updated post the flush and the expected behaviour is
> that post assignment the same standby task gets assigned back on the machine
> it will start reading data from changelog from the same offset from it left
> off.
>
> But there looks like a bug in the code, every time post rebalance it starts
> reading from the offset from where it read the first time the task was
> assigned on this machine. This has 2 repercussions:
> # After every rebalance the standby tasks start restoring huge amount of
> data which they have already restored earlier(Verified this via 300x increase
> Network IO on all streams instances post rebalance even when no change in
> assignment) .
> # If changelog has time retention those offsets will not be available in the
> changelog, which leads to offsetOutOfRange exceptions and the stores get
> deleted and recreated again.
>
> I have gone through the code and I think I know the issue.
> In TaskManager# updateNewAndRestoringTasks(), the function
> assignStandbyPartitions() gets called for all the running standby tasks where
> it populates the Map: checkpointedOffsets from the
> standbyTask.checkpointedOffsets() which is only updated at the time of
> initialization of a StandbyTask(i.e. in it's constructor).
>
> This has an easy fix.
> Post resumption we are reading standbyTask.checkpointedOffsets() to know the
> offset from where the standby task should start running and not from
> stateMgr.checkpointed() which gets updated on every commit to the checkpoint
> file. In the former case it's always reading from the same offset, even those
> which it had already read earlier and in cases where changelog topic has a
> retention time, it gives offsetOutOfRange exception. So,
> standbyTask.checkpointedOffsets() is quite useless and we should use
> stateMgr.checkpointed() instead to return offsets to task manager.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)