This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 2b2c242b810 KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member
under certain error conditions (#14635)
2b2c242b810 is described below
commit 2b2c242b810ce919499aeba939f5f726f518d1a6
Author: Robert Wagner <[email protected]>
AuthorDate: Thu Nov 16 04:17:33 2023 -0500
KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain
error conditions (#14635)
This patch fixes a bug in the LeaveGroupResponse construction. Basically,
when a top level error is set, no members are expected but the current check
always requires one for versions prior to version 3.
Reviewers: David Jacot <[email protected]>
(cherry picked from commit 3fd6293449a0e92b709d2f74f0c444a05f52a287)
---
.../kafka/common/requests/LeaveGroupResponse.java | 6 +--
.../common/requests/LeaveGroupResponseTest.java | 57 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index e1fd2913163..c32d6144263 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -61,15 +61,15 @@ public class LeaveGroupResponse extends AbstractResponse {
if (version >= 3) {
this.data = data;
+ } else if (data.errorCode() != Errors.NONE.code()) {
+ this.data = new
LeaveGroupResponseData().setErrorCode(data.errorCode());
} else {
if (data.members().size() != 1) {
throw new UnsupportedVersionException("LeaveGroup response
version " + version +
" can only contain one member, got " +
data.members().size() + " members.");
}
- Errors topLevelError = Errors.forCode(data.errorCode());
- short errorCode = getError(topLevelError, data.members()).code();
- this.data = new LeaveGroupResponseData().setErrorCode(errorCode);
+ this.data = new
LeaveGroupResponseData().setErrorCode(data.members().get(0).errorCode());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
index d5132182cee..4bb5c722819 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -34,6 +37,7 @@ import java.util.Map;
import static
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class LeaveGroupResponseTest {
@@ -165,4 +169,57 @@ public class LeaveGroupResponseTest {
assertEquals(primaryResponse.hashCode(),
reversedResponse.hashCode());
}
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+ public void testNoErrorNoMembersResponses(short version) {
+ LeaveGroupResponseData data = new LeaveGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setMembers(Collections.emptyList());
+
+ if (version < 3) {
+ assertThrows(UnsupportedVersionException.class,
+ () -> new LeaveGroupResponse(data, version));
+ } else {
+ LeaveGroupResponse response = new LeaveGroupResponse(data,
version);
+ assertEquals(Errors.NONE, response.topLevelError());
+ assertEquals(Collections.emptyList(), response.memberResponses());
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+ public void testNoErrorMultipleMembersResponses(short version) {
+ LeaveGroupResponseData data = new LeaveGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setMembers(memberResponses);
+
+ if (version < 3) {
+ assertThrows(UnsupportedVersionException.class,
+ () -> new LeaveGroupResponse(data, version));
+ } else {
+ LeaveGroupResponse response = new LeaveGroupResponse(data,
version);
+ assertEquals(Errors.NONE, response.topLevelError());
+ assertEquals(memberResponses, response.memberResponses());
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+ public void testErrorResponses(short version) {
+ LeaveGroupResponseData dataNoMembers = new LeaveGroupResponseData()
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setMembers(Collections.emptyList());
+
+ LeaveGroupResponse responseNoMembers = new
LeaveGroupResponse(dataNoMembers, version);
+ assertEquals(Errors.GROUP_ID_NOT_FOUND,
responseNoMembers.topLevelError());
+
+ LeaveGroupResponseData dataMembers = new LeaveGroupResponseData()
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setMembers(memberResponses);
+
+ LeaveGroupResponse responseMembers = new
LeaveGroupResponse(dataMembers, version);
+ assertEquals(Errors.GROUP_ID_NOT_FOUND,
responseMembers.topLevelError());
+ }
+
}