A. Sophie Blee-Goldman created KAFKA-10651:
----------------------------------------------
Summary: Assignor reports offsets from uninitialized task
Key: KAFKA-10651
URL: https://issues.apache.org/jira/browse/KAFKA-10651
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.6.0
Reporter: A. Sophie Blee-Goldman
Fix For: 2.7.0, 2.6.1
In KIP-441, the new HA assignor makes an informed decision about stateful task
placement based on the offset sums reported by each instance. Offset sums are
computed one of two ways: for assigned tasks (ie those in the TaskManager's
"tasks" map), it will just sum up the tasks' changelog offsets directly. For
tasks that are not assigned but whose directory remains on disk, it reads the
changelog offsets from the checkpoint file. This is encoded with the
subscription userdata sent during the JoinGroup phase of a rebalance.
The problem here is that it's possible for the instance to rejoin the group
after having been assigned a new task, but before that task is initialized. In
this case it would not compute the offset sum from the checkpoint file but
instead from the uninitialized task, causing it to skip reporting any offsets
for that task whatsoever.
This results in a particularly nefarious interaction between HA and cooperative
rebalancing. An instance may read from the checkpoint file of a caught-up (but
unassigned) task and report this in its subscription, leading the assignor to
compute a small lag and place this task on the instance. After placing all
stateful tasks in this way, it will distribute the stateless tasks across the
group to balance the overall workload. It does this without considering the
previous owner of the stateless tasks, so odds are good that moving the
stateful task to this instance will result in a different assortment of
stateless tasks in this rebalance.
Any time owned tasks are moved around, the current owner will have to revoke
them and trigger a followup cooperative rebalance. Within the Consumer client,
this actually happens immediately: that is, within an invocation of poll() it
will loop inside joinGroupIfNeeded() as long as a rejoin is needed. And at the
end of the last rebalance, if any partitions are revoked then a rejoin will
indeed be needed. So the Consumer will send out it's next JoinGroup – including
the userdata with computed task offset sums – without first exiting from the
current poll(). Streams never gets the chance to initialize its new tasks, and
ends up excluding them from the offset sums it reports in the following
rebalance.
And since it doesn't report any offsets for this task, the assignor now
believes the instance does _not_ have any caught up state for this task, and
assigns the task elsewhere. This causes a shuffling of stateless tasks once
more, which in turn results in another cooperative rebalance. This time the
task is no longer assigned so the instance reports offsets based on the
checkpoint file again, and we're back at the beginning.
Given the deterministic assignment, once a group is caught up in this cycle it
will be impossible to escape it without manual intervention.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)