This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3fd6293449a KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member 
under certain error conditions (#14635)
3fd6293449a is described below

commit 3fd6293449a0e92b709d2f74f0c444a05f52a287
Author: Robert Wagner <[email protected]>
AuthorDate: Thu Nov 16 04:17:33 2023 -0500

    KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain 
error conditions (#14635)
    
    This patch fixes a bug in the LeaveGroupResponse construction. Basically, 
when a top level error is set, no members are expected but the current check 
always requires one for versions prior to version 3.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/common/requests/LeaveGroupResponse.java  |  6 +--
 .../common/requests/LeaveGroupResponseTest.java    | 57 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index e1fd2913163..c32d6144263 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -61,15 +61,15 @@ public class LeaveGroupResponse extends AbstractResponse {
 
         if (version >= 3) {
             this.data = data;
+        } else if (data.errorCode() != Errors.NONE.code()) {
+            this.data = new 
LeaveGroupResponseData().setErrorCode(data.errorCode());
         } else {
             if (data.members().size() != 1) {
                 throw new UnsupportedVersionException("LeaveGroup response 
version " + version +
                     " can only contain one member, got " + 
data.members().size() + " members.");
             }
 
-            Errors topLevelError = Errors.forCode(data.errorCode());
-            short errorCode = getError(topLevelError, data.members()).code();
-            this.data = new LeaveGroupResponseData().setErrorCode(errorCode);
+            this.data = new 
LeaveGroupResponseData().setErrorCode(data.members().get(0).errorCode());
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
index d5132182cee..4bb5c722819 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
@@ -16,13 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -34,6 +37,7 @@ import java.util.Map;
 import static 
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LeaveGroupResponseTest {
@@ -165,4 +169,57 @@ public class LeaveGroupResponseTest {
             assertEquals(primaryResponse.hashCode(), 
reversedResponse.hashCode());
         }
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+    public void testNoErrorNoMembersResponses(short version) {
+        LeaveGroupResponseData data = new LeaveGroupResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setMembers(Collections.emptyList());
+
+        if (version < 3) {
+            assertThrows(UnsupportedVersionException.class,
+                () -> new LeaveGroupResponse(data, version));
+        } else {
+            LeaveGroupResponse response = new LeaveGroupResponse(data, 
version);
+            assertEquals(Errors.NONE, response.topLevelError());
+            assertEquals(Collections.emptyList(), response.memberResponses());
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+    public void testNoErrorMultipleMembersResponses(short version) {
+        LeaveGroupResponseData data = new LeaveGroupResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setMembers(memberResponses);
+
+        if (version < 3) {
+            assertThrows(UnsupportedVersionException.class,
+                () -> new LeaveGroupResponse(data, version));
+        } else {
+            LeaveGroupResponse response = new LeaveGroupResponse(data, 
version);
+            assertEquals(Errors.NONE, response.topLevelError());
+            assertEquals(memberResponses, response.memberResponses());
+        }
+    }
+    
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+    public void testErrorResponses(short version) {
+        LeaveGroupResponseData dataNoMembers = new LeaveGroupResponseData()
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setMembers(Collections.emptyList());
+
+        LeaveGroupResponse responseNoMembers = new 
LeaveGroupResponse(dataNoMembers, version);
+        assertEquals(Errors.GROUP_ID_NOT_FOUND, 
responseNoMembers.topLevelError());
+        
+        LeaveGroupResponseData dataMembers = new LeaveGroupResponseData()
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setMembers(memberResponses);
+        
+        LeaveGroupResponse responseMembers = new 
LeaveGroupResponse(dataMembers, version);
+        assertEquals(Errors.GROUP_ID_NOT_FOUND, 
responseMembers.topLevelError());
+    }
+
 }

Reply via email to