This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 487af011ca5 KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)
487af011ca5 is described below

commit 487af011ca5a186306298f7b861fbcce4f86e12b
Author: Ismael Juma <ism...@juma.me.uk>
AuthorDate: Mon Jul 7 08:44:24 2025 -0700

    KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)
    
    This fixes librdkafka older than the recently released 2.11.0 with
    Kerberos authentication and Apache Kafka 4.x.
    
    Even though this is a bug in librdkafka, a key goal of KIP-896 is not to
    break the popular client libraries listed in it. Adding back JoinGroup
    v0 & v1 is a very small change and worth it from that perspective.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../resources/common/message/JoinGroupRequest.json     |  4 +---
 .../resources/common/message/JoinGroupResponse.json    |  4 +---
 .../kafka/common/requests/JoinGroupRequestTest.java    | 18 ++++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java     |  8 ++++++++
 4 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json 
b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 41d7c1acbae..31afdb1a32a 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -18,8 +18,6 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "JoinGroupRequest",
-  // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new 
baseline.
-  //
   // Version 1 adds RebalanceTimeoutMs. Version 2 and 3 are the same as 
version 1.
   //
   // Starting from version 4, the client needs to issue a second request to 
join group
@@ -34,7 +32,7 @@
   // Version 8 adds the Reason field (KIP-800).
   //
   // Version 9 is the same as version 8.
-  "validVersions": "2-9",
+  "validVersions": "0-9",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json 
b/clients/src/main/resources/common/message/JoinGroupResponse.json
index 364309596eb..d2f016f62f6 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -17,8 +17,6 @@
   "apiKey": 11,
   "type": "response",
   "name": "JoinGroupResponse",
-  // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new 
baseline.
-  //
   // Version 1 is the same as version 0.
   //
   // Version 2 adds throttle time.
@@ -37,7 +35,7 @@
   // Version 8 is the same as version 7.
   //
   // Version 9 adds the SkipAssignment field.
-  "validVersions": "2-9",
+  "validVersions": "0-9",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", 
"ignorable": true,
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
index 60d10a68939..9c411005583 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -65,4 +67,20 @@ public class JoinGroupRequestTest {
                 .setProtocolType("consumer")
         ).build((short) 4));
     }
+
+    @Test
+    public void testRebalanceTimeoutDefaultsToSessionTimeoutV0() {
+        int sessionTimeoutMs = 30000;
+        short version = 0;
+
+        var buffer = MessageUtil.toByteBufferAccessor(new 
JoinGroupRequestData()
+                .setGroupId("groupId")
+                .setMemberId("consumerId")
+                .setProtocolType("consumer")
+                .setSessionTimeoutMs(sessionTimeoutMs), version);
+
+        JoinGroupRequest request = JoinGroupRequest.parse(buffer, version);
+        assertEquals(sessionTimeoutMs, request.data().sessionTimeoutMs());
+        assertEquals(sessionTimeoutMs, request.data().rebalanceTimeoutMs());
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 6be4c35bd25..957544e2f73 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -732,6 +732,14 @@ public class RequestResponseTest {
         assertEquals(request.isolationLevel(), deserialized.isolationLevel());
     }
 
+    @Test
+    public void testJoinGroupRequestV0RebalanceTimeout() {
+        final short version = 0;
+        JoinGroupRequest jgr = createJoinGroupRequest(version);
+        JoinGroupRequest jgr2 = JoinGroupRequest.parse(jgr.serialize(), 
version);
+        assertEquals(jgr2.data().rebalanceTimeoutMs(), 
jgr.data().rebalanceTimeoutMs());
+    }
+
     @Test
     public void testSerializeWithHeader() {
         CreatableTopicCollection topicsToCreate = new 
CreatableTopicCollection(1);

Reply via email to