This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push: new 487af011ca5 KAFKA-19444: Add back JoinGroup v0 & v1 (#20116) 487af011ca5 is described below commit 487af011ca5a186306298f7b861fbcce4f86e12b Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Mon Jul 7 08:44:24 2025 -0700 KAFKA-19444: Add back JoinGroup v0 & v1 (#20116) This fixes librdkafka older than the recently released 2.11.0 with Kerberos authentication and Apache Kafka 4.x. Even though this is a bug in librdkafka, a key goal of KIP-896 is not to break the popular client libraries listed in it. Adding back JoinGroup v0 & v1 is a very small change and worth it from that perspective. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../resources/common/message/JoinGroupRequest.json | 4 +--- .../resources/common/message/JoinGroupResponse.json | 4 +--- .../kafka/common/requests/JoinGroupRequestTest.java | 18 ++++++++++++++++++ .../kafka/common/requests/RequestResponseTest.java | 8 ++++++++ 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json index 41d7c1acbae..31afdb1a32a 100644 --- a/clients/src/main/resources/common/message/JoinGroupRequest.json +++ b/clients/src/main/resources/common/message/JoinGroupRequest.json @@ -18,8 +18,6 @@ "type": "request", "listeners": ["broker"], "name": "JoinGroupRequest", - // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline. - // // Version 1 adds RebalanceTimeoutMs. Version 2 and 3 are the same as version 1. // // Starting from version 4, the client needs to issue a second request to join group @@ -34,7 +32,7 @@ // Version 8 adds the Reason field (KIP-800). // // Version 9 is the same as version 8. - "validVersions": "2-9", + "validVersions": "0-9", "flexibleVersions": "6+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json b/clients/src/main/resources/common/message/JoinGroupResponse.json index 364309596eb..d2f016f62f6 100644 --- a/clients/src/main/resources/common/message/JoinGroupResponse.json +++ b/clients/src/main/resources/common/message/JoinGroupResponse.json @@ -17,8 +17,6 @@ "apiKey": 11, "type": "response", "name": "JoinGroupResponse", - // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline. - // // Version 1 is the same as version 0. // // Version 2 adds throttle time. @@ -37,7 +35,7 @@ // Version 8 is the same as version 7. // // Version 9 adds the SkipAssignment field. - "validVersions": "2-9", + "validVersions": "0-9", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java index 60d10a68939..9c411005583 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java @@ -19,12 +19,14 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import java.util.Arrays; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @@ -65,4 +67,20 @@ public class JoinGroupRequestTest { .setProtocolType("consumer") ).build((short) 4)); } + + @Test + public void testRebalanceTimeoutDefaultsToSessionTimeoutV0() { + int sessionTimeoutMs = 30000; + short version = 0; + + var buffer = MessageUtil.toByteBufferAccessor(new JoinGroupRequestData() + .setGroupId("groupId") + .setMemberId("consumerId") + .setProtocolType("consumer") + .setSessionTimeoutMs(sessionTimeoutMs), version); + + JoinGroupRequest request = JoinGroupRequest.parse(buffer, version); + assertEquals(sessionTimeoutMs, request.data().sessionTimeoutMs()); + assertEquals(sessionTimeoutMs, request.data().rebalanceTimeoutMs()); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 6be4c35bd25..957544e2f73 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -732,6 +732,14 @@ public class RequestResponseTest { assertEquals(request.isolationLevel(), deserialized.isolationLevel()); } + @Test + public void testJoinGroupRequestV0RebalanceTimeout() { + final short version = 0; + JoinGroupRequest jgr = createJoinGroupRequest(version); + JoinGroupRequest jgr2 = JoinGroupRequest.parse(jgr.serialize(), version); + assertEquals(jgr2.data().rebalanceTimeoutMs(), jgr.data().rebalanceTimeoutMs()); + } + @Test public void testSerializeWithHeader() { CreatableTopicCollection topicsToCreate = new CreatableTopicCollection(1);