Rafał Sumisławski created KAFKA-18861: -----------------------------------------
Summary: kafka-streams stuck in a loop of "SyncGroup failed" with an unbalanced assignment Key: KAFKA-18861 URL: https://issues.apache.org/jira/browse/KAFKA-18861 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.9.0 Reporter: Rafał Sumisławski h2. Overview {{kafka-streams}} applications sometimes end up in a loop of rebalances, with {{SyncGroup failed}} being logged. This seems to be an issue with the design of the consumer group protocol. On one hand performing a {{SyncGroup}} between rebalances is a necessary step for rebalances to work. On the other hand there are "immediate rebalances", which lack any mechanism that would ensure consumers have a reasonable amount of time to {{SyncGroup}}. To be clear this issue is not about {{SyncGroup failed}} happening from time to time. It's about it happening in a loop which prevents any progress of rebalances and leads to lag. h2. The loop of events Background knowledge: Moving a stateful task from consumer {{A}} to consumer {{B}} requires 2 rebalances because we need to make sure that {{A}} stops processing (first rebalance), before we can let {{B}} start processing (second rebalance). For that reason the first rebalance is supposed to immediately be follow by a second rebalance. We start with an unbalanced assignment, for example as a result of loosing one consumer. As a result consumer {{A}} is under 100% CPU load. 1. A rebalance is triggered (for whatever reason). The group enters {{PreparingRebalance}} state 2. All consumers send {{Join}} requests to the coordinator. The group enters {{CompletingRebalance}} state 3. Leader determines a new task assignment, which moves some tasks from the overloaded {{A}} to an underloaded {{B}}. 4. The coordinator sends {{Join}} response to all consumers indicating a change of assignment. The group enters `Stable` state. Members are expected to follow up with a {{SyncGroup}}. But {{SyncGroup}} is only accepted when the group is {{Stable}}. 5. All consumers except for {{A}} send {{SyncGroup}} and get their new assignments. 6. {{B}} received new {{revoking active}} tasks in this rebalance. It can't process it yet, because first {{A}} needs to revoke them. So {{B}} immediately triggers a new rebalance (it was told to do it by the leader via the coordinator). The group transitions to {{PreparingRebalance}}. In practice this happens {{~70ms}} after it transitioned to {{Stable}} 7. Because {{A}} is overloaded it takes more than the {{70ms}} to send {{SyncGroup}} and that request is rejected because the group is not {{Stable}} anymore. Because {{A}} didn't manage to {{SyncGroup}}, it won't be able to give up the tasks, and the next rebalance will achieve no progress (it will again ask A to revoke the tasks, and B will again receive them in {{revoking active}} state). We go to point 1. and loop forever (or at least as long as {{A}} is under heavy load, but since we can't move that load, it may be forever) h2. Fixing the issue In order to resolve the problem we would need to let {{A}} perform the {{SyncGroup}}. This can be achieved in two ways: 1. Accepting the {{SyncGroup}} while the group is in {{PreparingRebalance}} state. This seems to be a clean solution. But I don't know the background behind disallowing {{SyncGroup}} while {{PreparingRebalance}}. Maybe there is some good reason why it can't be done? 2. Delaying the followup rebalance to give {{A}} more than {{70ms}} to {{SyncGroup}} While I think 1 is a more promising solution. I tried 2, as it can be done on application side, and it works in practice. You can see my changes here: https://github.com/coralogix/kafka/commit/69825558295d7adcf0d218c86c7b27746b67d501 I removed the {{need to revoke partitions}} source of rebalances completely. And I delayed the {{Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner}} rebalances by a configurable amount of time. With these changes rebalances still work, and the issue doesn't occur. -- This message was sent by Atlassian Jira (v8.20.10#820010)