Rafał Sumisławski created KAFKA-18861:
-----------------------------------------
Summary: kafka-streams stuck in a loop of "SyncGroup failed" with
an unbalanced assignment
Key: KAFKA-18861
URL: https://issues.apache.org/jira/browse/KAFKA-18861
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.9.0
Reporter: Rafał Sumisławski
h2. Overview
{{kafka-streams}} applications sometimes end up in a loop of rebalances, with
{{SyncGroup failed}} being logged.
This seems to be an issue with the design of the consumer group protocol. On
one hand performing a {{SyncGroup}} between rebalances is a necessary step for
rebalances to work. On the other hand there are "immediate rebalances", which
lack any mechanism that would ensure consumers have a reasonable amount of time
to {{SyncGroup}}.
To be clear this issue is not about {{SyncGroup failed}} happening from time to
time. It's about it happening in a loop which prevents any progress of
rebalances and leads to lag.
h2. The loop of events
Background knowledge: Moving a stateful task from consumer {{A}} to consumer
{{B}} requires 2 rebalances because we need to make sure that {{A}} stops
processing (first rebalance), before we can let {{B}} start processing (second
rebalance). For that reason the first rebalance is supposed to immediately be
follow by a second rebalance.
We start with an unbalanced assignment, for example as a result of loosing one
consumer. As a result consumer {{A}} is under 100% CPU load.
1. A rebalance is triggered (for whatever reason). The group enters
{{PreparingRebalance}} state
2. All consumers send {{Join}} requests to the coordinator. The group enters
{{CompletingRebalance}} state
3. Leader determines a new task assignment, which moves some tasks from the
overloaded {{A}} to an underloaded {{B}}.
4. The coordinator sends {{Join}} response to all consumers indicating a change
of assignment. The group enters `Stable` state. Members are expected to follow
up with a {{SyncGroup}}. But {{SyncGroup}} is only accepted when the group is
{{Stable}}.
5. All consumers except for {{A}} send {{SyncGroup}} and get their new
assignments.
6. {{B}} received new {{revoking active}} tasks in this rebalance. It can't
process it yet, because first {{A}} needs to revoke them. So {{B}} immediately
triggers a new rebalance (it was told to do it by the leader via the
coordinator). The group transitions to {{PreparingRebalance}}. In practice this
happens {{~70ms}} after it transitioned to {{Stable}}
7. Because {{A}} is overloaded it takes more than the {{70ms}} to send
{{SyncGroup}} and that request is rejected because the group is not {{Stable}}
anymore. Because {{A}} didn't manage to {{SyncGroup}}, it won't be able to give
up the tasks, and the next rebalance will achieve no progress (it will again
ask A to revoke the tasks, and B will again receive them in {{revoking active}}
state). We go to point 1. and loop forever (or at least as long as {{A}} is
under heavy load, but since we can't move that load, it may be forever)
h2. Fixing the issue
In order to resolve the problem we would need to let {{A}} perform the
{{SyncGroup}}. This can be achieved in two ways:
1. Accepting the {{SyncGroup}} while the group is in {{PreparingRebalance}}
state. This seems to be a clean solution. But I don't know the background
behind disallowing {{SyncGroup}} while {{PreparingRebalance}}. Maybe there is
some good reason why it can't be done?
2. Delaying the followup rebalance to give {{A}} more than {{70ms}} to
{{SyncGroup}}
While I think 1 is a more promising solution. I tried 2, as it can be done on
application side, and it works in practice. You can see my changes here:
https://github.com/coralogix/kafka/commit/69825558295d7adcf0d218c86c7b27746b67d501
I removed the {{need to revoke partitions}} source of rebalances completely.
And I delayed the {{Requested to schedule immediate rebalance for new tasks to
be safely revoked from current owner}} rebalances by a configurable amount of
time. With these changes rebalances still work, and the issue doesn't occur.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)