This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new ceb2c69f77c KAFKA-14422; Consumer rebalance stuck after new static 
member joins a group with members not supporting static members (#12909)
ceb2c69f77c is described below

commit ceb2c69f77c2c627e6364cd50ff39b54c5b5744f
Author: David Jacot <[email protected]>
AuthorDate: Mon Nov 28 20:15:54 2022 +0100

    KAFKA-14422; Consumer rebalance stuck after new static member joins a group 
with members not supporting static members (#12909)
    
    When a consumer group on a version prior to 2.3 is upgraded to a newer 
version and static membership is enabled in the meantime, the consumer group 
remains stuck, iff the leader is still on the old version.
    
    The issue is that setting `GroupInstanceId` in the response to the leader 
is only supported from JoinGroup version >= 5 and that `GroupInstanceId` is not 
ignorable nor handled anywhere else. Hence is there is at least one static 
member in the group, sending the JoinGroup response to the leader fails with a 
serialization error.
    
    ```
    org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
write a non-default groupInstanceId at version 2
    ```
    
    When this happens, the member stays around until the group coordinator is 
bounced because a member with a non-null `awaitingJoinCallback` is never 
expired.
    
    This patch fixes the issue by making `GroupInstanceId` ignorable. A unit 
test has been modified to cover this.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 clients/src/main/resources/common/message/JoinGroupResponse.json    | 2 +-
 .../java/org/apache/kafka/common/requests/RequestResponseTest.java  | 6 ++----
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json 
b/clients/src/main/resources/common/message/JoinGroupResponse.json
index 79d596df270..d01c2c1c028 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -58,7 +58,7 @@
     { "name": "Members", "type": "[]JoinGroupResponseMember", "versions": 
"0+", "fields": [
       { "name": "MemberId", "type": "string", "versions": "0+",
         "about": "The group member ID." },
-      { "name": "GroupInstanceId", "type": "string", "versions": "5+",
+      { "name": "GroupInstanceId", "type": "string", "versions": "5+", 
"ignorable": true,
         "nullableVersions": "5+", "default": "null",
         "about": "The unique identifier of the consumer instance provided by 
end user." },
       { "name": "Metadata", "type": "bytes", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 390cacf4337..3fa063fba18 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1756,10 +1756,8 @@ public class RequestResponseTest {
         for (int i = 0; i < 2; i++) {
             JoinGroupResponseMember member = new 
JoinGroupResponseData.JoinGroupResponseMember()
                 .setMemberId("consumer" + i)
-                .setMetadata(new byte[0]);
-
-            if (version >= 5)
-                member.setGroupInstanceId("instance" + i);
+                .setMetadata(new byte[0])
+                .setGroupInstanceId("instance" + i);
 
             members.add(member);
         }

Reply via email to