This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new be032735b39 KAFKA-14422; Consumer rebalance stuck after new static
member joins a group with members not supporting static members (#12909)
be032735b39 is described below
commit be032735b39360df1a6de1a7feea8b4336e5bcc0
Author: David Jacot <[email protected]>
AuthorDate: Mon Nov 28 20:15:54 2022 +0100
KAFKA-14422; Consumer rebalance stuck after new static member joins a group
with members not supporting static members (#12909)
When a consumer group on a version prior to 2.3 is upgraded to a newer
version and static membership is enabled in the meantime, the consumer group
remains stuck, iff the leader is still on the old version.
The issue is that setting `GroupInstanceId` in the response to the leader
is only supported from JoinGroup version >= 5 and that `GroupInstanceId` is not
ignorable nor handled anywhere else. Hence is there is at least one static
member in the group, sending the JoinGroup response to the leader fails with a
serialization error.
```
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to
write a non-default groupInstanceId at version 2
```
When this happens, the member stays around until the group coordinator is
bounced because a member with a non-null `awaitingJoinCallback` is never
expired.
This patch fixes the issue by making `GroupInstanceId` ignorable. A unit
test has been modified to cover this.
Reviewers: Jason Gustafson <[email protected]>
---
clients/src/main/resources/common/message/JoinGroupResponse.json | 2 +-
.../java/org/apache/kafka/common/requests/RequestResponseTest.java | 6 ++----
2 files changed, 3 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json
b/clients/src/main/resources/common/message/JoinGroupResponse.json
index 79d596df270..d01c2c1c028 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -58,7 +58,7 @@
{ "name": "Members", "type": "[]JoinGroupResponseMember", "versions":
"0+", "fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The group member ID." },
- { "name": "GroupInstanceId", "type": "string", "versions": "5+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "5+",
"ignorable": true,
"nullableVersions": "5+", "default": "null",
"about": "The unique identifier of the consumer instance provided by
end user." },
{ "name": "Metadata", "type": "bytes", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d4ea7df3b0a..2129322196a 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1756,10 +1756,8 @@ public class RequestResponseTest {
for (int i = 0; i < 2; i++) {
JoinGroupResponseMember member = new
JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("consumer" + i)
- .setMetadata(new byte[0]);
-
- if (version >= 5)
- member.setGroupInstanceId("instance" + i);
+ .setMetadata(new byte[0])
+ .setGroupInstanceId("instance" + i);
members.add(member);
}